irpc/lib.rs
1//! # A minimal RPC library for use with [iroh](https://docs.rs/iroh/latest/iroh/index.html).
2//!
3//! ## Goals
4//!
5//! The main goal of this library is to provide an rpc framework that is so
6//! lightweight that it can be also used for async boundaries within a single
7//! process without any overhead, instead of the usual practice of a mpsc channel
8//! with a giant message enum where each enum case contains mpsc or oneshot
9//! backchannels.
10//!
11//! The second goal is to lightly abstract over remote and local communication,
12//! so that a system can be interacted with cross process or even across networks.
13//!
14//! ## Non-goals
15//!
16//! - Cross language interop. This is for talking from rust to rust
17//! - Any kind of versioning. You have to do this yourself
18//! - Making remote message passing look like local async function calls
19//! - Being runtime agnostic. This is for tokio
20//!
21//! ## Interaction patterns
22//!
23//! For each request, there can be a response and update channel. Each channel
24//! can be either oneshot, carry multiple messages, or be disabled. This enables
25//! the typical interaction patterns known from libraries like grpc:
26//!
27//! - rpc: 1 request, 1 response
28//! - server streaming: 1 request, multiple responses
29//! - client streaming: multiple requests, 1 response
30//! - bidi streaming: multiple requests, multiple responses
31//!
32//! as well as more complex patterns. It is however not possible to have multiple
33//! differently typed tx channels for a single message type.
34//!
35//! ## Transports
36//!
37//! We don't abstract over the send and receive stream. These must always be
38//! noq streams, specifically streams from the [noq].
39//!
40//! This restricts the possible rpc transports to noq (QUIC with dial by
41//! socket address) and iroh (QUIC with dial by endpoint id).
42//!
43//! An upside of this is that the noq streams can be tuned for each rpc
44//! request, e.g. by setting the stream priority or by directly using more
45//! advanced part of the noq SendStream and RecvStream APIs such as out of
46//! order receiving.
47//!
48//! ## Serialization
49//!
50//! Serialization is currently done using [postcard]. Messages are always
51//! length prefixed with postcard varints, even in the case of oneshot
52//! channels.
53//!
54//! Serialization only happens for cross process rpc communication.
55//!
56//! However, the requirement for message enums to be serializable is present even
57//! when disabling the `rpc` feature. Due to the fact that the channels live
58//! outside the message, this is not a big restriction.
59//!
60//! ## Features
61//!
62//! - `derive`: Enable the [`rpc_requests`] macro.
63//! - `rpc`: Enable the rpc features. Enabled by default.
64//! By disabling this feature, all rpc related dependencies are removed.
65//! The remaining dependencies are just serde, tokio and tokio-util.
66//! - `spans`: Enable tracing spans for messages. Enabled by default.
67//! This is useful even without rpc, to not lose tracing context when message
68//! passing. This is frequently done manually. This obviously requires
69//! a dependency on tracing.
70//! - `noq_endpoint_setup`: Easy way to create noq endpoints. This is useful
71//! both for testing and for rpc on localhost. Enabled by default.
72//!
73//! # Example
74//!
75//! ```
76//! use irpc::{
77//! Client, WithChannels,
78//! channel::{mpsc, oneshot},
79//! rpc_requests,
80//! };
81//! use serde::{Deserialize, Serialize};
82//!
83//! #[tokio::main]
84//! async fn main() -> n0_error::Result<()> {
85//! let client = spawn_server();
86//! let res = client.rpc(Multiply(3, 7)).await?;
87//! assert_eq!(res, 21);
88//!
89//! let (tx, mut rx) = client.bidi_streaming(Sum, 4, 4).await?;
90//! tx.send(4).await?;
91//! assert_eq!(rx.recv().await?, Some(4));
92//! tx.send(6).await?;
93//! assert_eq!(rx.recv().await?, Some(10));
94//! tx.send(11).await?;
95//! assert_eq!(rx.recv().await?, Some(21));
96//! Ok(())
97//! }
98//!
99//! /// We define a simple protocol using the derive macro.
100//! #[rpc_requests(message = ComputeMessage)]
101//! #[derive(Debug, Serialize, Deserialize)]
102//! enum ComputeProtocol {
103//! /// Multiply two numbers, return the result over a oneshot channel.
104//! #[rpc(tx=oneshot::Sender<i64>)]
105//! #[wrap(Multiply)]
106//! Multiply(i64, i64),
107//! /// Sum all numbers received via the `rx` stream,
108//! /// reply with the updating sum over the `tx` stream.
109//! #[rpc(tx=mpsc::Sender<i64>, rx=mpsc::Receiver<i64>)]
110//! #[wrap(Sum)]
111//! Sum,
112//! }
113//!
114//! fn spawn_server() -> Client<ComputeProtocol> {
115//! let (tx, rx) = tokio::sync::mpsc::channel(16);
116//! // Spawn an actor task to handle incoming requests.
117//! tokio::task::spawn(server_actor(rx));
118//! // Return a local client to talk to our actor.
119//! irpc::Client::local(tx)
120//! }
121//!
122//! async fn server_actor(mut rx: tokio::sync::mpsc::Receiver<ComputeMessage>) {
123//! while let Some(msg) = rx.recv().await {
124//! match msg {
125//! ComputeMessage::Multiply(msg) => {
126//! let WithChannels { inner, tx, .. } = msg;
127//! let Multiply(a, b) = inner;
128//! tx.send(a * b).await.ok();
129//! }
130//! ComputeMessage::Sum(msg) => {
131//! let WithChannels { tx, mut rx, .. } = msg;
132//! // Spawn a separate task for this potentially long-running request.
133//! tokio::task::spawn(async move {
134//! let mut sum = 0;
135//! while let Ok(Some(number)) = rx.recv().await {
136//! sum += number;
137//! if tx.send(sum).await.is_err() {
138//! break;
139//! }
140//! }
141//! });
142//! }
143//! }
144//! }
145//! }
146//! ```
147//!
148//! # History
149//!
150//! This crate evolved out of the [quic-rpc](https://docs.rs/quic-rpc/latest/quic-rpc/index.html) crate, which is a generic RPC
151//! framework for any transport with cheap streams such as QUIC. Compared to
152//! quic-rpc, this crate does not abstract over the stream type and is focused
153//! on [iroh](https://docs.rs/iroh/latest/iroh/index.html) and our [noq](https://docs.rs/noq/latest/noq/index.html).
154#![cfg_attr(quicrpc_docsrs, feature(doc_cfg))]
155use std::{fmt::Debug, future::Future, io, marker::PhantomData, ops::Deref};
156
157/// Processes an RPC request enum and generates trait implementations for use with `irpc`.
158///
159/// This attribute macro may be applied to an enum where each variant represents
160/// a different RPC request type. Each variant of the enum must contain a single unnamed field
161/// of a distinct type (unless the `wrap` attribute is used on a variant, see below).
162///
163/// Basic usage example:
164/// ```
165/// use irpc::{
166/// channel::{mpsc, oneshot},
167/// rpc_requests,
168/// };
169/// use serde::{Deserialize, Serialize};
170///
171/// #[rpc_requests(message = ComputeMessage)]
172/// #[derive(Debug, Serialize, Deserialize)]
173/// enum ComputeProtocol {
174/// /// Multiply two numbers, return the result over a oneshot channel.
175/// #[rpc(tx=oneshot::Sender<i64>)]
176/// Multiply(Multiply),
177/// /// Sum all numbers received via the `rx` stream,
178/// /// reply with the updating sum over the `tx` stream.
179/// #[rpc(tx=mpsc::Sender<i64>, rx=mpsc::Receiver<i64>)]
180/// Sum(Sum),
181/// }
182///
183/// #[derive(Debug, Serialize, Deserialize)]
184/// struct Multiply(i64, i64);
185///
186/// #[derive(Debug, Serialize, Deserialize)]
187/// struct Sum;
188/// ```
189///
190/// ## Generated code
191///
192/// If no further arguments are set, the macro generates:
193///
194/// * A [`Channels<S>`] implementation for each request type (i.e. the type of the variant's
195/// single unnamed field).
196/// The `Tx` and `Rx` types are set to the types provided via the variant's `rpc` attribute.
197/// * A `From` implementation to convert from each request type to the protocol enum.
198///
199/// When the `message` argument is set, the macro will also create a message enum and implement the
200/// [`Service`] and [`RemoteService`] traits for the protocol enum. This is recommended for the
201/// typical use of the macro.
202///
203/// ## Macro arguments
204///
205/// * `message = <name>` *(optional but recommended)*:
206/// * Generates an extended enum wrapping each type in [`WithChannels<T, Service>`].
207/// The attribute value is the name of the message enum type.
208/// * Generates a [`Service`] implementation for the protocol enum, with the `Message`
209/// type set to the message enum.
210/// * Generates a [`rpc::RemoteService`] implementation for the protocol enum.
211/// * `alias = "<suffix>"` *(optional)*: Generate type aliases with the given suffix for each `WithChannels<T, Service>`.
212/// * `rpc_feature = "<feature>"` *(optional)*: If set, the `RemoteService` implementation will be feature-flagged
213/// with this feature. Set this if your crate only optionally enables the `rpc` feature
214/// of `irpc`.
215/// * `no_rpc` *(optional, no value)*: If set, no implementation of `RemoteService` will be generated and the generated
216/// code works without the `rpc` feature of `irpc`.
217/// * `no_spans` *(optional, no value)*: If set, the generated code works without the `spans` feature of `irpc`.
218/// * `span_propagation` *(optional, no value)*: If set, enables OpenTelemetry span context propagation
219/// across remote connections. When enabled, span context is included in the wire format as
220/// `(Option<SpanContextCarrier>, Message)`, and the generated `RemoteService` implementation
221/// will set the parent span from the propagated remote context. Requires the `tracing-opentelemetry`
222/// feature to be enabled for actual OpenTelemetry integration; without it, the context is
223/// still serialized but has no effect.
224///
225/// ## Variant attributes
226///
227/// #### `#[rpc]` attribute
228///
229/// Individual enum variants are annotated with the `#[rpc(...)]` attribute to specify channel types.
230/// The `rpc` attribute contains two optional arguments:
231///
232/// * `tx = SomeType`: Set the kind of channel for sending responses from the server to the client.
233/// Must be a `Sender` type from the [`channel`] module.
234/// If `tx` is not set, it defaults to [`channel::none::NoSender`].
235/// * `rx = OtherType`: Set the kind of channel for receiving updates from the client at the server.
236/// Must be a `Receiver` type from the [`channel`] module.
237/// If `rx` is not set, it defaults to [`channel::none::NoReceiver`].
238///
239/// #### `#[wrap]` attribute
240///
241/// The attribute has the syntax `#[wrap(TypeName, derive(Foo, Bar))]`
242///
243/// If set, a struct `TypeName` will be generated from the variant's fields, and the variant
244/// will be changed to have a single, unnamed field of `TypeName`.
245///
246/// * `TypeName` is the name of the generated type.
247/// By default it will inherit the visibility of the protocol enum. You can set a different
248/// visibility by prefixing it with the visibility (e.g. `pub(crate) TypeName`).
249/// * `derive(Foo, Bar)` is optional and allows to set additional derives for the generated struct.
250/// By default, the struct will get `Serialize`, `Deserialize`, and `Debug` derives.
251///
252/// ## Examples
253///
254/// With `wrap`:
255/// ```
256/// use irpc::{
257/// Client,
258/// channel::{mpsc, oneshot},
259/// rpc_requests,
260/// };
261/// use serde::{Deserialize, Serialize};
262///
263/// #[rpc_requests(message = StoreMessage)]
264/// #[derive(Debug, Serialize, Deserialize)]
265/// enum StoreProtocol {
266/// /// Doc comment for `GetRequest`.
267/// #[rpc(tx=oneshot::Sender<String>)]
268/// #[wrap(GetRequest, derive(Clone))]
269/// Get(String),
270///
271/// /// Doc comment for `SetRequest`.
272/// #[rpc(tx=oneshot::Sender<()>)]
273/// #[wrap(SetRequest)]
274/// Set { key: String, value: String },
275/// }
276///
277/// async fn client_usage(client: Client<StoreProtocol>) -> n0_error::Result<()> {
278/// client
279/// .rpc(SetRequest {
280/// key: "foo".to_string(),
281/// value: "bar".to_string(),
282/// })
283/// .await?;
284/// let value = client.rpc(GetRequest("foo".to_string())).await?;
285/// Ok(())
286/// }
287/// ```
288///
289/// With type aliases:
290/// ```no_compile
291/// #[rpc_requests(message = ComputeMessage, alias = "Msg")]
292/// enum ComputeProtocol {
293/// #[rpc(tx=oneshot::Sender<u128>)]
294/// Sqr(Sqr), // Generates type SqrMsg = WithChannels<Sqr, ComputeProtocol>
295/// #[rpc(tx=mpsc::Sender<i64>)]
296/// Sum(Sum), // Generates type SumMsg = WithChannels<Sum, ComputeProtocol>
297/// }
298/// ```
299///
300/// [`RemoteService`]: rpc::RemoteService
301/// [`WithChannels<T, Service>`]: WithChannels
302/// [`Channels<S>`]: Channels
303#[cfg(feature = "derive")]
304#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "derive")))]
305pub use irpc_derive::rpc_requests;
306#[cfg(feature = "rpc")]
307use n0_error::AnyError;
308use n0_error::stack_error;
309use serde::{Serialize, de::DeserializeOwned};
310
311use self::{
312 channel::{
313 mpsc,
314 none::{NoReceiver, NoSender},
315 oneshot,
316 },
317 sealed::Sealed,
318};
319use crate::channel::SendError;
320
321#[cfg(test)]
322mod tests;
323pub mod util;
324
325mod sealed {
326 pub trait Sealed {}
327}
328
329/// Span context propagation for remote RPC calls
330///
331/// This module provides the `SpanContextCarrier` type for propagating trace context
332/// across remote boundaries. The type is always available when `rpc` feature is enabled,
333/// but actual OpenTelemetry integration requires the `tracing-opentelemetry` feature.
334///
335/// The propagated context is scoped to a single request handler via a tokio task-local,
336/// installed by the dispatch loop in `handle_connection`. This isolates concurrent
337/// requests from each other and is robust to thread migration across `.await` points.
338#[cfg(feature = "rpc")]
339#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
340pub mod span_propagation {
341 use std::{collections::HashMap, future::Future};
342
343 use serde::{Deserialize, Serialize};
344
345 #[cfg(feature = "tracing-opentelemetry")]
346 tokio::task_local! {
347 static SPAN_CONTEXT: opentelemetry::Context;
348 }
349
350 /// Carrier for propagating span context across RPC boundaries using W3C Trace Context format.
351 ///
352 /// This type is always available for serialization purposes. When the
353 /// `tracing-opentelemetry` feature is enabled, it can extract/inject actual
354 /// OpenTelemetry trace context. Without that feature, it simply serializes as an
355 /// empty map.
356 #[derive(Debug, Clone, Serialize, Deserialize, Default)]
357 pub struct SpanContextCarrier {
358 headers: HashMap<String, String>,
359 }
360
361 #[cfg(feature = "tracing-opentelemetry")]
362 impl opentelemetry::propagation::Injector for SpanContextCarrier {
363 fn set(&mut self, key: &str, value: String) {
364 self.headers.insert(key.to_string(), value);
365 }
366 }
367
368 #[cfg(feature = "tracing-opentelemetry")]
369 impl opentelemetry::propagation::Extractor for SpanContextCarrier {
370 fn get(&self, key: &str) -> Option<&str> {
371 self.headers.get(key).map(|v| v.as_str())
372 }
373
374 fn keys(&self) -> Vec<&str> {
375 self.headers.keys().map(|k| k.as_str()).collect()
376 }
377 }
378
379 impl SpanContextCarrier {
380 /// Create a carrier from the current OpenTelemetry context.
381 ///
382 /// When `tracing-opentelemetry` feature is enabled, this extracts the current
383 /// trace context. Without the feature, this returns an empty carrier.
384 #[cfg(feature = "tracing-opentelemetry")]
385 pub fn from_current() -> Self {
386 use opentelemetry::global;
387 use tracing_opentelemetry::OpenTelemetrySpanExt;
388 let mut carrier = Self::default();
389 // Get the OTel context from the current tracing span, not from
390 // opentelemetry::Context::current(). The tracing-opentelemetry layer
391 // stores OTel spans inside tracing spans, so the thread-local OTel
392 // context won't have the right span.
393 let ctx = tracing::Span::current().context();
394 global::get_text_map_propagator(|prop| {
395 prop.inject_context(&ctx, &mut carrier);
396 });
397 carrier
398 }
399
400 #[cfg(not(feature = "tracing-opentelemetry"))]
401 pub fn from_current() -> Self {
402 Self::default()
403 }
404
405 /// Extract an OpenTelemetry context from this carrier.
406 #[cfg(feature = "tracing-opentelemetry")]
407 pub fn to_context(&self) -> opentelemetry::Context {
408 use opentelemetry::global;
409 global::get_text_map_propagator(|prop| {
410 prop.extract_with_context(&opentelemetry::Context::current(), self)
411 })
412 }
413 }
414
415 /// Run `fut` with `carrier`'s context installed as the per-task scope read by
416 /// [`set_span_parent_from_remote`].
417 ///
418 /// Used by transport implementations (`irpc::rpc`, `irpc-iroh`) to wrap a single
419 /// request handler. Most users will not call this directly.
420 pub async fn scope_remote<F: Future>(carrier: Option<SpanContextCarrier>, fut: F) -> F::Output {
421 #[cfg(feature = "tracing-opentelemetry")]
422 if let Some(carrier) = carrier {
423 return SPAN_CONTEXT.scope(carrier.to_context(), fut).await;
424 }
425 let _ = carrier;
426 fut.await
427 }
428
429 /// Set the parent of a span from the propagated remote context, if one is in scope.
430 ///
431 /// Called by the code generated by `rpc_requests(span_propagation)`. Looks up the
432 /// task-local installed by the dispatch loop; no-op outside that scope.
433 pub fn set_span_parent_from_remote(span: &tracing::Span) {
434 #[cfg(feature = "tracing-opentelemetry")]
435 {
436 let _ = SPAN_CONTEXT.try_with(|ctx| {
437 use tracing_opentelemetry::OpenTelemetrySpanExt;
438 let _ = span.set_parent(ctx.clone());
439 });
440 }
441 let _ = span;
442 }
443}
444
445/// Requirements for a RPC message
446///
447/// Even when just using the mem transport, we require messages to be Serializable and Deserializable.
448/// Likewise, even when using the noq transport, we require messages to be Send.
449///
450/// This does not seem like a big restriction. If you want a pure memory channel without the possibility
451/// to also use the noq transport, you might want to use a mpsc channel directly.
452pub trait RpcMessage: Debug + Serialize + DeserializeOwned + Send + Sync + Unpin + 'static {}
453
454impl<T> RpcMessage for T where
455 T: Debug + Serialize + DeserializeOwned + Send + Sync + Unpin + 'static
456{
457}
458
459/// Trait for a service
460///
461/// This is implemented on the protocol enum.
462/// It is usually auto-implemented via the [`rpc_requests] macro.
463///
464/// A service acts as a scope for defining the tx and rx channels for each
465/// message type, and provides some type safety when sending messages.
466pub trait Service: Serialize + DeserializeOwned + Send + Sync + Debug + 'static {
467 /// Message enum for this protocol.
468 ///
469 /// This is expected to be an enum with identical variant names than the
470 /// protocol enum, but its single unit field is the [`WithChannels`] struct
471 /// that contains the inner request plus the `tx` and `rx` channels.
472 type Message: Send + Unpin + 'static;
473
474 /// Whether this protocol includes span context in the wire format.
475 ///
476 /// When `true`, messages are serialized as `(Option<SpanContextCarrier>, Message)`.
477 /// When `false` (default), messages are serialized directly without span context wrapper.
478 ///
479 /// This is controlled by the `span_propagation` attribute on the `rpc_requests` macro.
480 const SPAN_PROPAGATION: bool = false;
481}
482
483/// Sealed marker trait for a sender
484pub trait Sender: Debug + Sealed {}
485
486/// Sealed marker trait for a receiver
487pub trait Receiver: Debug + Sealed {}
488
489/// Trait to specify channels for a message and service
490pub trait Channels<S: Service>: Send + 'static {
491 /// The sender type, can be either mpsc, oneshot or none
492 type Tx: Sender;
493 /// The receiver type, can be either mpsc, oneshot or none
494 ///
495 /// For many services, the receiver is not needed, so it can be set to [`NoReceiver`].
496 type Rx: Receiver;
497}
498
499/// Channels that abstract over local or remote sending
500pub mod channel {
501 use std::io;
502
503 use n0_error::stack_error;
504
505 /// Oneshot channel, similar to tokio's oneshot channel
506 pub mod oneshot {
507 use std::{fmt::Debug, future::Future, io, pin::Pin, task};
508
509 use n0_error::{e, stack_error};
510 use n0_future::future::Boxed as BoxFuture;
511
512 use super::SendError;
513 use crate::util::FusedOneshotReceiver;
514
515 /// Error when receiving a oneshot or mpsc message. For local communication,
516 /// the only thing that can go wrong is that the sender has been closed.
517 ///
518 /// For rpc communication, there can be any number of errors, so this is a
519 /// generic io error.
520 #[stack_error(derive, add_meta, from_sources)]
521 pub enum RecvError {
522 /// The sender has been closed. This is the only error that can occur
523 /// for local communication.
524 #[error("Sender closed")]
525 SenderClosed,
526 /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
527 ///
528 /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
529 #[error("Maximum message size exceeded")]
530 MaxMessageSizeExceeded,
531 /// An io error occurred. This can occur for remote communication,
532 /// due to a network error or deserialization error.
533 #[error("Io error")]
534 Io {
535 #[error(std_err)]
536 source: io::Error,
537 },
538 }
539
540 impl From<RecvError> for io::Error {
541 fn from(e: RecvError) -> Self {
542 match e {
543 RecvError::Io { source, .. } => source,
544 RecvError::SenderClosed { .. } => io::Error::new(io::ErrorKind::BrokenPipe, e),
545 RecvError::MaxMessageSizeExceeded { .. } => {
546 io::Error::new(io::ErrorKind::InvalidData, e)
547 }
548 }
549 }
550 }
551
552 /// Create a local oneshot sender and receiver pair.
553 ///
554 /// This is currently using a tokio channel pair internally.
555 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
556 let (tx, rx) = tokio::sync::oneshot::channel();
557 (tx.into(), rx.into())
558 }
559
560 /// A generic boxed sender.
561 ///
562 /// Remote senders are always boxed, since for remote communication the boxing
563 /// overhead is negligible. However, boxing can also be used for local communication,
564 /// e.g. when applying a transform or filter to the message before sending it.
565 pub type BoxedSender<T> =
566 Box<dyn FnOnce(T) -> BoxFuture<Result<(), SendError>> + Send + Sync + 'static>;
567
568 /// A sender that can be wrapped in a `Box<dyn DynSender<T>>`.
569 ///
570 /// In addition to implementing `Future`, this provides a fn to check if the sender is
571 /// an rpc sender.
572 ///
573 /// Remote receivers are always boxed, since for remote communication the boxing
574 /// overhead is negligible. However, boxing can also be used for local communication,
575 /// e.g. when applying a transform or filter to the message before receiving it.
576 pub trait DynSender<T>:
577 Future<Output = Result<(), SendError>> + Send + Sync + 'static
578 {
579 fn is_rpc(&self) -> bool;
580 }
581
582 /// A generic boxed receiver
583 ///
584 /// Remote receivers are always boxed, since for remote communication the boxing
585 /// overhead is negligible. However, boxing can also be used for local communication,
586 /// e.g. when applying a transform or filter to the message before receiving it.
587 pub type BoxedReceiver<T> = BoxFuture<Result<T, RecvError>>;
588
589 /// A oneshot sender.
590 ///
591 /// Compared to a local onehsot sender, sending a message is async since in the case
592 /// of remote communication, sending over the wire is async. Other than that it
593 /// behaves like a local oneshot sender and has no overhead in the local case.
594 pub enum Sender<T> {
595 Tokio(tokio::sync::oneshot::Sender<T>),
596 /// we can't yet distinguish between local and remote boxed oneshot senders.
597 /// If we ever want to have local boxed oneshot senders, we need to add a
598 /// third variant here.
599 Boxed(BoxedSender<T>),
600 }
601
602 impl<T> Debug for Sender<T> {
603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604 match self {
605 Self::Tokio(_) => f.debug_tuple("Tokio").finish(),
606 Self::Boxed(_) => f.debug_tuple("Boxed").finish(),
607 }
608 }
609 }
610
611 impl<T> From<tokio::sync::oneshot::Sender<T>> for Sender<T> {
612 fn from(tx: tokio::sync::oneshot::Sender<T>) -> Self {
613 Self::Tokio(tx)
614 }
615 }
616
617 impl<T> TryFrom<Sender<T>> for tokio::sync::oneshot::Sender<T> {
618 type Error = Sender<T>;
619
620 fn try_from(value: Sender<T>) -> Result<Self, Self::Error> {
621 match value {
622 Sender::Tokio(tx) => Ok(tx),
623 Sender::Boxed(_) => Err(value),
624 }
625 }
626 }
627
628 impl<T> Sender<T> {
629 /// Send a message
630 ///
631 /// If this is a boxed sender that represents a remote connection, sending may yield or fail with an io error.
632 /// Local senders will never yield, but can fail if the receiver has been closed.
633 pub async fn send(self, value: T) -> Result<(), SendError> {
634 match self {
635 Sender::Tokio(tx) => tx.send(value).map_err(|_| e!(SendError::ReceiverClosed)),
636 Sender::Boxed(f) => f(value).await,
637 }
638 }
639
640 /// Check if this is a remote sender
641 pub fn is_rpc(&self) -> bool
642 where
643 T: 'static,
644 {
645 match self {
646 Sender::Tokio(_) => false,
647 Sender::Boxed(_) => true,
648 }
649 }
650 }
651
652 impl<T: Send + Sync + 'static> Sender<T> {
653 /// Applies a filter before sending.
654 ///
655 /// Messages that don't pass the filter are dropped.
656 pub fn with_filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Sender<T> {
657 self.with_filter_map(move |u| if f(&u) { Some(u) } else { None })
658 }
659
660 /// Applies a transform before sending.
661 pub fn with_map<U, F>(self, f: F) -> Sender<U>
662 where
663 F: Fn(U) -> T + Send + Sync + 'static,
664 U: Send + Sync + 'static,
665 {
666 self.with_filter_map(move |u| Some(f(u)))
667 }
668
669 /// Applies a filter and transform before sending.
670 ///
671 /// Messages that don't pass the filter are dropped.
672 pub fn with_filter_map<U, F>(self, f: F) -> Sender<U>
673 where
674 F: Fn(U) -> Option<T> + Send + Sync + 'static,
675 U: Send + Sync + 'static,
676 {
677 let inner: BoxedSender<U> = Box::new(move |value| {
678 let opt = f(value);
679 Box::pin(async move {
680 if let Some(v) = opt {
681 self.send(v).await
682 } else {
683 Ok(())
684 }
685 })
686 });
687 Sender::Boxed(inner)
688 }
689 }
690
691 impl<T> crate::sealed::Sealed for Sender<T> {}
692 impl<T> crate::Sender for Sender<T> {}
693
694 /// A oneshot receiver.
695 ///
696 /// Compared to a local oneshot receiver, receiving a message can fail not just
697 /// when the sender has been closed, but also when the remote connection fails.
698 pub enum Receiver<T> {
699 Tokio(FusedOneshotReceiver<T>),
700 Boxed(BoxedReceiver<T>),
701 }
702
703 impl<T> Future for Receiver<T> {
704 type Output = Result<T, RecvError>;
705
706 fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
707 match self.get_mut() {
708 Self::Tokio(rx) => Pin::new(rx)
709 .poll(cx)
710 .map_err(|_| e!(RecvError::SenderClosed)),
711 Self::Boxed(rx) => Pin::new(rx).poll(cx),
712 }
713 }
714 }
715
716 /// Convert a tokio oneshot receiver to a receiver for this crate
717 impl<T> From<tokio::sync::oneshot::Receiver<T>> for Receiver<T> {
718 fn from(rx: tokio::sync::oneshot::Receiver<T>) -> Self {
719 Self::Tokio(FusedOneshotReceiver(rx))
720 }
721 }
722
723 impl<T> TryFrom<Receiver<T>> for tokio::sync::oneshot::Receiver<T> {
724 type Error = Receiver<T>;
725
726 fn try_from(value: Receiver<T>) -> Result<Self, Self::Error> {
727 match value {
728 Receiver::Tokio(tx) => Ok(tx.0),
729 Receiver::Boxed(_) => Err(value),
730 }
731 }
732 }
733
734 /// Convert a function that produces a future to a receiver for this crate
735 impl<T, F, Fut> From<F> for Receiver<T>
736 where
737 F: FnOnce() -> Fut,
738 Fut: Future<Output = Result<T, RecvError>> + Send + 'static,
739 {
740 fn from(f: F) -> Self {
741 Self::Boxed(Box::pin(f()))
742 }
743 }
744
745 impl<T> Debug for Receiver<T> {
746 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
747 match self {
748 Self::Tokio(_) => f.debug_tuple("Tokio").finish(),
749 Self::Boxed(_) => f.debug_tuple("Boxed").finish(),
750 }
751 }
752 }
753
754 impl<T> crate::sealed::Sealed for Receiver<T> {}
755 impl<T> crate::Receiver for Receiver<T> {}
756 }
757
758 /// SPSC channel, similar to tokio's mpsc channel
759 ///
760 /// For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc.
761 pub mod mpsc {
762 use std::{fmt::Debug, future::Future, io, marker::PhantomData, pin::Pin, sync::Arc};
763
764 use n0_error::{e, stack_error};
765
766 use super::SendError;
767
768 /// Error when receiving a oneshot or mpsc message. For local communication,
769 /// the only thing that can go wrong is that the sender has been closed.
770 ///
771 /// For rpc communication, there can be any number of errors, so this is a
772 /// generic io error.
773 #[stack_error(derive, add_meta, from_sources)]
774 pub enum RecvError {
775 /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
776 ///
777 /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
778 #[error("Maximum message size exceeded")]
779 MaxMessageSizeExceeded,
780 /// An io error occurred. This can occur for remote communication,
781 /// due to a network error or deserialization error.
782 #[error("Io error")]
783 Io {
784 #[error(std_err)]
785 source: io::Error,
786 },
787 }
788
789 impl From<RecvError> for io::Error {
790 fn from(e: RecvError) -> Self {
791 match e {
792 RecvError::Io { source, .. } => source,
793 RecvError::MaxMessageSizeExceeded { .. } => {
794 io::Error::new(io::ErrorKind::InvalidData, e)
795 }
796 }
797 }
798 }
799
800 /// Create a local mpsc sender and receiver pair, with the given buffer size.
801 ///
802 /// This is currently using a tokio channel pair internally.
803 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
804 let (tx, rx) = tokio::sync::mpsc::channel(buffer);
805 (tx.into(), rx.into())
806 }
807
808 /// Single producer, single consumer sender.
809 ///
810 /// For the local case, this wraps a tokio::sync::mpsc::Sender.
811 pub enum Sender<T> {
812 Tokio(tokio::sync::mpsc::Sender<T>),
813 Boxed(Arc<dyn DynSender<T>>),
814 }
815
816 impl<T> Clone for Sender<T> {
817 fn clone(&self) -> Self {
818 match self {
819 Self::Tokio(tx) => Self::Tokio(tx.clone()),
820 Self::Boxed(inner) => Self::Boxed(inner.clone()),
821 }
822 }
823 }
824
825 impl<T> Sender<T> {
826 pub fn is_rpc(&self) -> bool
827 where
828 T: 'static,
829 {
830 match self {
831 Sender::Tokio(_) => false,
832 Sender::Boxed(x) => x.is_rpc(),
833 }
834 }
835
836 #[cfg(feature = "stream")]
837 pub fn into_sink(self) -> impl n0_future::Sink<T, Error = SendError> + Send + 'static
838 where
839 T: Send + Sync + 'static,
840 {
841 futures_util::sink::unfold(self, |sink, value| async move {
842 sink.send(value).await?;
843 Ok(sink)
844 })
845 }
846 }
847
848 impl<T: Send + Sync + 'static> Sender<T> {
849 /// Applies a filter before sending.
850 ///
851 /// Messages that don't pass the filter are dropped.
852 ///
853 /// If you want to combine multiple filters and maps with minimal
854 /// overhead, use `with_filter_map` directly.
855 pub fn with_filter<F>(self, f: F) -> Sender<T>
856 where
857 F: Fn(&T) -> bool + Send + Sync + 'static,
858 {
859 self.with_filter_map(move |u| if f(&u) { Some(u) } else { None })
860 }
861
862 /// Applies a transform before sending.
863 ///
864 /// If you want to combine multiple filters and maps with minimal
865 /// overhead, use `with_filter_map` directly.
866 pub fn with_map<U, F>(self, f: F) -> Sender<U>
867 where
868 F: Fn(U) -> T + Send + Sync + 'static,
869 U: Send + Sync + 'static,
870 {
871 self.with_filter_map(move |u| Some(f(u)))
872 }
873
874 /// Applies a filter and transform before sending.
875 ///
876 /// Any combination of filters and maps can be expressed using
877 /// a single filter_map.
878 pub fn with_filter_map<U, F>(self, f: F) -> Sender<U>
879 where
880 F: Fn(U) -> Option<T> + Send + Sync + 'static,
881 U: Send + Sync + 'static,
882 {
883 let inner: Arc<dyn DynSender<U>> = Arc::new(FilterMapSender {
884 f,
885 sender: self,
886 _p: PhantomData,
887 });
888 Sender::Boxed(inner)
889 }
890
891 /// Future that resolves when the sender is closed
892 pub async fn closed(&self) {
893 match self {
894 Sender::Tokio(tx) => tx.closed().await,
895 Sender::Boxed(sink) => sink.closed().await,
896 }
897 }
898 }
899
900 impl<T> From<tokio::sync::mpsc::Sender<T>> for Sender<T> {
901 fn from(tx: tokio::sync::mpsc::Sender<T>) -> Self {
902 Self::Tokio(tx)
903 }
904 }
905
906 impl<T> TryFrom<Sender<T>> for tokio::sync::mpsc::Sender<T> {
907 type Error = Sender<T>;
908
909 fn try_from(value: Sender<T>) -> Result<Self, Self::Error> {
910 match value {
911 Sender::Tokio(tx) => Ok(tx),
912 Sender::Boxed(_) => Err(value),
913 }
914 }
915 }
916
917 /// A sender that can be wrapped in a `Arc<dyn DynSender<T>>`.
918 pub trait DynSender<T>: Debug + Send + Sync + 'static {
919 /// Send a message.
920 ///
921 /// For the remote case, if the message can not be completely sent,
922 /// this must return an error and disable the channel.
923 fn send(
924 &self,
925 value: T,
926 ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>>;
927
928 /// Try to send a message, returning as fast as possible if sending
929 /// is not currently possible.
930 ///
931 /// For the remote case, it must be guaranteed that the message is
932 /// either completely sent or not at all.
933 fn try_send(
934 &self,
935 value: T,
936 ) -> Pin<Box<dyn Future<Output = Result<bool, SendError>> + Send + '_>>;
937
938 /// Await the sender close
939 fn closed(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + '_>>;
940
941 /// True if this is a remote sender
942 fn is_rpc(&self) -> bool;
943 }
944
945 /// A receiver that can be wrapped in a `Box<dyn DynReceiver<T>>`.
946 pub trait DynReceiver<T>: Debug + Send + Sync + 'static {
947 fn recv(
948 &mut self,
949 ) -> Pin<Box<dyn Future<Output = Result<Option<T>, RecvError>> + Send + Sync + '_>>;
950 }
951
952 impl<T> Debug for Sender<T> {
953 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
954 match self {
955 Self::Tokio(x) => f
956 .debug_struct("Tokio")
957 .field("avail", &x.capacity())
958 .field("cap", &x.max_capacity())
959 .finish(),
960 Self::Boxed(inner) => f.debug_tuple("Boxed").field(&inner).finish(),
961 }
962 }
963 }
964
965 impl<T: Send + 'static> Sender<T> {
966 /// Send a message and yield until either it is sent or an error occurs.
967 ///
968 /// ## Cancellation safety
969 ///
970 /// If the future is dropped before completion, and if this is a remote sender,
971 /// then the sender will be closed and further sends will return an [`SendError::Io`]
972 /// with [`std::io::ErrorKind::BrokenPipe`]. Therefore, make sure to always poll the
973 /// future until completion if you want to reuse the sender or any clone afterwards.
974 pub async fn send(&self, value: T) -> Result<(), SendError> {
975 match self {
976 Sender::Tokio(tx) => tx
977 .send(value)
978 .await
979 .map_err(|_| e!(SendError::ReceiverClosed)),
980 Sender::Boxed(sink) => sink.send(value).await,
981 }
982 }
983
984 /// Try to send a message, returning as fast as possible if sending
985 /// is not currently possible. This can be used to send ephemeral
986 /// messages.
987 ///
988 /// For the local case, this will immediately return false if the
989 /// channel is full.
990 ///
991 /// For the remote case, it will attempt to send the message and
992 /// return false if sending the first byte fails, otherwise yield
993 /// until the message is completely sent or an error occurs. This
994 /// guarantees that the message is sent either completely or not at
995 /// all.
996 ///
997 /// Returns true if the message was sent.
998 ///
999 /// ## Cancellation safety
1000 ///
1001 /// If the future is dropped before completion, and if this is a remote sender,
1002 /// then the sender will be closed and further sends will return an [`SendError::Io`]
1003 /// with [`std::io::ErrorKind::BrokenPipe`]. Therefore, make sure to always poll the
1004 /// future until completion if you want to reuse the sender or any clone afterwards.
1005 pub async fn try_send(&self, value: T) -> Result<bool, SendError> {
1006 match self {
1007 Sender::Tokio(tx) => match tx.try_send(value) {
1008 Ok(()) => Ok(true),
1009 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
1010 Err(e!(SendError::ReceiverClosed))
1011 }
1012 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => Ok(false),
1013 },
1014 Sender::Boxed(sink) => sink.try_send(value).await,
1015 }
1016 }
1017 }
1018
1019 impl<T> crate::sealed::Sealed for Sender<T> {}
1020 impl<T> crate::Sender for Sender<T> {}
1021
1022 pub enum Receiver<T> {
1023 Tokio(tokio::sync::mpsc::Receiver<T>),
1024 Boxed(Box<dyn DynReceiver<T>>),
1025 }
1026
1027 impl<T: Send + Sync + 'static> Receiver<T> {
1028 /// Receive a message
1029 ///
1030 /// Returns Ok(None) if the sender has been dropped or the remote end has
1031 /// cleanly closed the connection.
1032 ///
1033 /// Returns an an io error if there was an error receiving the message.
1034 pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
1035 match self {
1036 Self::Tokio(rx) => Ok(rx.recv().await),
1037 Self::Boxed(rx) => Ok(rx.recv().await?),
1038 }
1039 }
1040
1041 /// Map messages, transforming them from type T to type U.
1042 pub fn map<U, F>(self, f: F) -> Receiver<U>
1043 where
1044 F: Fn(T) -> U + Send + Sync + 'static,
1045 U: Send + Sync + 'static,
1046 {
1047 self.filter_map(move |u| Some(f(u)))
1048 }
1049
1050 /// Filter messages, only passing through those for which the predicate returns true.
1051 ///
1052 /// Messages that don't pass the filter are dropped.
1053 pub fn filter<F>(self, f: F) -> Receiver<T>
1054 where
1055 F: Fn(&T) -> bool + Send + Sync + 'static,
1056 {
1057 self.filter_map(move |u| if f(&u) { Some(u) } else { None })
1058 }
1059
1060 /// Filter and map messages, only passing through those for which the function returns Some.
1061 ///
1062 /// Messages that don't pass the filter are dropped.
1063 pub fn filter_map<F, U>(self, f: F) -> Receiver<U>
1064 where
1065 U: Send + Sync + 'static,
1066 F: Fn(T) -> Option<U> + Send + Sync + 'static,
1067 {
1068 let inner: Box<dyn DynReceiver<U>> = Box::new(FilterMapReceiver {
1069 f,
1070 receiver: self,
1071 _p: PhantomData,
1072 });
1073 Receiver::Boxed(inner)
1074 }
1075
1076 #[cfg(feature = "stream")]
1077 pub fn into_stream(
1078 self,
1079 ) -> impl n0_future::Stream<Item = Result<T, RecvError>> + Send + Sync + 'static
1080 {
1081 n0_future::stream::unfold(self, |mut recv| async move {
1082 recv.recv().await.transpose().map(|msg| (msg, recv))
1083 })
1084 }
1085 }
1086
1087 impl<T> From<tokio::sync::mpsc::Receiver<T>> for Receiver<T> {
1088 fn from(rx: tokio::sync::mpsc::Receiver<T>) -> Self {
1089 Self::Tokio(rx)
1090 }
1091 }
1092
1093 impl<T> TryFrom<Receiver<T>> for tokio::sync::mpsc::Receiver<T> {
1094 type Error = Receiver<T>;
1095
1096 fn try_from(value: Receiver<T>) -> Result<Self, Self::Error> {
1097 match value {
1098 Receiver::Tokio(tx) => Ok(tx),
1099 Receiver::Boxed(_) => Err(value),
1100 }
1101 }
1102 }
1103
1104 impl<T> Debug for Receiver<T> {
1105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1106 match self {
1107 Self::Tokio(inner) => f
1108 .debug_struct("Tokio")
1109 .field("avail", &inner.capacity())
1110 .field("cap", &inner.max_capacity())
1111 .finish(),
1112 Self::Boxed(inner) => f.debug_tuple("Boxed").field(&inner).finish(),
1113 }
1114 }
1115 }
1116
1117 struct FilterMapSender<F, T, U> {
1118 f: F,
1119 sender: Sender<T>,
1120 _p: PhantomData<U>,
1121 }
1122
1123 impl<F, T, U> Debug for FilterMapSender<F, T, U> {
1124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1125 f.debug_struct("FilterMapSender").finish_non_exhaustive()
1126 }
1127 }
1128
1129 impl<F, T, U> DynSender<U> for FilterMapSender<F, T, U>
1130 where
1131 F: Fn(U) -> Option<T> + Send + Sync + 'static,
1132 T: Send + Sync + 'static,
1133 U: Send + Sync + 'static,
1134 {
1135 fn send(
1136 &self,
1137 value: U,
1138 ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
1139 Box::pin(async move {
1140 match (self.f)(value) {
1141 Some(v) => self.sender.send(v).await,
1142 _ => Ok(()),
1143 }
1144 })
1145 }
1146
1147 fn try_send(
1148 &self,
1149 value: U,
1150 ) -> Pin<Box<dyn Future<Output = Result<bool, SendError>> + Send + '_>> {
1151 Box::pin(async move {
1152 match (self.f)(value) {
1153 Some(v) => self.sender.try_send(v).await,
1154 _ => Ok(true),
1155 }
1156 })
1157 }
1158
1159 fn is_rpc(&self) -> bool {
1160 self.sender.is_rpc()
1161 }
1162
1163 fn closed(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + '_>> {
1164 match self {
1165 FilterMapSender {
1166 sender: Sender::Tokio(tx),
1167 ..
1168 } => Box::pin(tx.closed()),
1169 FilterMapSender {
1170 sender: Sender::Boxed(sink),
1171 ..
1172 } => sink.closed(),
1173 }
1174 }
1175 }
1176
1177 struct FilterMapReceiver<F, T, U> {
1178 f: F,
1179 receiver: Receiver<T>,
1180 _p: PhantomData<U>,
1181 }
1182
1183 impl<F, T, U> Debug for FilterMapReceiver<F, T, U> {
1184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1185 f.debug_struct("FilterMapReceiver").finish_non_exhaustive()
1186 }
1187 }
1188
1189 impl<F, T, U> DynReceiver<U> for FilterMapReceiver<F, T, U>
1190 where
1191 F: Fn(T) -> Option<U> + Send + Sync + 'static,
1192 T: Send + Sync + 'static,
1193 U: Send + Sync + 'static,
1194 {
1195 fn recv(
1196 &mut self,
1197 ) -> Pin<Box<dyn Future<Output = Result<Option<U>, RecvError>> + Send + Sync + '_>>
1198 {
1199 Box::pin(async move {
1200 while let Some(msg) = self.receiver.recv().await? {
1201 if let Some(v) = (self.f)(msg) {
1202 return Ok(Some(v));
1203 }
1204 }
1205 Ok(None)
1206 })
1207 }
1208 }
1209
1210 impl<T> crate::sealed::Sealed for Receiver<T> {}
1211 impl<T> crate::Receiver for Receiver<T> {}
1212 }
1213
1214 /// No channels, used when no communication is needed
1215 pub mod none {
1216 use crate::sealed::Sealed;
1217
1218 /// A sender that does nothing. This is used when no communication is needed.
1219 #[derive(Debug)]
1220 pub struct NoSender;
1221 impl Sealed for NoSender {}
1222 impl crate::Sender for NoSender {}
1223
1224 /// A receiver that does nothing. This is used when no communication is needed.
1225 #[derive(Debug)]
1226 pub struct NoReceiver;
1227
1228 impl Sealed for NoReceiver {}
1229 impl crate::Receiver for NoReceiver {}
1230 }
1231
1232 /// Error when sending a oneshot or mpsc message. For local communication,
1233 /// the only thing that can go wrong is that the receiver has been dropped.
1234 ///
1235 /// For rpc communication, there can be any number of errors, so this is a
1236 /// generic io error.
1237 #[stack_error(derive, add_meta, from_sources)]
1238 pub enum SendError {
1239 /// The receiver has been closed. This is the only error that can occur
1240 /// for local communication.
1241 #[error("Receiver closed")]
1242 ReceiverClosed,
1243 /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
1244 ///
1245 /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
1246 #[error("Maximum message size exceeded")]
1247 MaxMessageSizeExceeded,
1248 /// The underlying io error. This can occur for remote communication,
1249 /// due to a network error or serialization error.
1250 #[error("Io error")]
1251 Io {
1252 #[error(std_err)]
1253 source: io::Error,
1254 },
1255 }
1256
1257 impl From<SendError> for io::Error {
1258 fn from(e: SendError) -> Self {
1259 match e {
1260 SendError::ReceiverClosed { .. } => io::Error::new(io::ErrorKind::BrokenPipe, e),
1261 SendError::MaxMessageSizeExceeded { .. } => {
1262 io::Error::new(io::ErrorKind::InvalidData, e)
1263 }
1264 SendError::Io { source, .. } => source,
1265 }
1266 }
1267 }
1268}
1269
1270/// A wrapper for a message with channels to send and receive it.
1271/// This expands the protocol message to a full message that includes the
1272/// active and unserializable channels.
1273///
1274/// The channel kind for rx and tx is defined by implementing the `Channels`
1275/// trait, either manually or using a macro.
1276///
1277/// When the `spans` feature is enabled, this also includes a tracing
1278/// span to carry the tracing context during message passing.
1279pub struct WithChannels<I: Channels<S>, S: Service> {
1280 /// The inner message.
1281 pub inner: I,
1282 /// The return channel to send the response to. Can be set to [`crate::channel::none::NoSender`] if not needed.
1283 pub tx: <I as Channels<S>>::Tx,
1284 /// The request channel to receive the request from. Can be set to [`NoReceiver`] if not needed.
1285 pub rx: <I as Channels<S>>::Rx,
1286 /// The current span where the full message was created.
1287 #[cfg(feature = "spans")]
1288 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))]
1289 pub span: tracing::Span,
1290}
1291
1292impl<I: Channels<S> + Debug, S: Service> Debug for WithChannels<I, S> {
1293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1294 f.debug_tuple("")
1295 .field(&self.inner)
1296 .field(&self.tx)
1297 .field(&self.rx)
1298 .finish()
1299 }
1300}
1301
1302impl<I: Channels<S>, S: Service> WithChannels<I, S> {
1303 /// Get the parent span
1304 #[cfg(feature = "spans")]
1305 pub fn parent_span_opt(&self) -> Option<&tracing::Span> {
1306 Some(&self.span)
1307 }
1308}
1309
1310/// Tuple conversion from inner message and tx/rx channels to a WithChannels struct
1311///
1312/// For the case where you want both tx and rx channels.
1313impl<I: Channels<S>, S: Service, Tx, Rx> From<(I, Tx, Rx)> for WithChannels<I, S>
1314where
1315 I: Channels<S>,
1316 <I as Channels<S>>::Tx: From<Tx>,
1317 <I as Channels<S>>::Rx: From<Rx>,
1318{
1319 fn from(inner: (I, Tx, Rx)) -> Self {
1320 let (inner, tx, rx) = inner;
1321 Self {
1322 inner,
1323 tx: tx.into(),
1324 rx: rx.into(),
1325 #[cfg(feature = "spans")]
1326 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))]
1327 span: tracing::Span::current(),
1328 }
1329 }
1330}
1331
1332/// Tuple conversion from inner message and tx channel to a WithChannels struct
1333///
1334/// For the very common case where you just need a tx channel to send the response to.
1335impl<I, S, Tx> From<(I, Tx)> for WithChannels<I, S>
1336where
1337 I: Channels<S, Rx = NoReceiver>,
1338 S: Service,
1339 <I as Channels<S>>::Tx: From<Tx>,
1340{
1341 fn from(inner: (I, Tx)) -> Self {
1342 let (inner, tx) = inner;
1343 Self {
1344 inner,
1345 tx: tx.into(),
1346 rx: NoReceiver,
1347 #[cfg(feature = "spans")]
1348 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))]
1349 span: tracing::Span::current(),
1350 }
1351 }
1352}
1353
1354/// Tuple conversion from inner message to a WithChannels struct without channels
1355impl<I, S> From<(I,)> for WithChannels<I, S>
1356where
1357 I: Channels<S, Rx = NoReceiver, Tx = NoSender>,
1358 S: Service,
1359{
1360 fn from(inner: (I,)) -> Self {
1361 let (inner,) = inner;
1362 Self {
1363 inner,
1364 tx: NoSender,
1365 rx: NoReceiver,
1366 #[cfg(feature = "spans")]
1367 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))]
1368 span: tracing::Span::current(),
1369 }
1370 }
1371}
1372
1373/// Deref so you can access the inner fields directly.
1374///
1375/// If the inner message has fields named `tx`, `rx` or `span`, you need to use the
1376/// `inner` field to access them.
1377impl<I: Channels<S>, S: Service> Deref for WithChannels<I, S> {
1378 type Target = I;
1379
1380 fn deref(&self) -> &Self::Target {
1381 &self.inner
1382 }
1383}
1384
1385/// A client to the service `S` using the local message type `M` and the remote
1386/// message type `R`.
1387///
1388/// `R` is typically a serializable enum with a case for each possible message
1389/// type. It can be thought of as the definition of the protocol.
1390///
1391/// `M` is typically an enum with a case for each possible message type, where
1392/// each case is a `WithChannels` struct that extends the inner protocol message
1393/// with a local tx and rx channel as well as a tracing span to allow for
1394/// keeping tracing context across async boundaries.
1395///
1396/// In some cases, `M` and `R` can be enums for a subset of the protocol. E.g.
1397/// if you have a subsystem that only handles a part of the messages.
1398///
1399/// The service type `S` provides a scope for the protocol messages. It exists
1400/// so you can use the same message with multiple services.
1401#[derive(Debug)]
1402pub struct Client<S: Service>(ClientInner<S::Message>, PhantomData<S>);
1403
1404impl<S: Service> Clone for Client<S> {
1405 fn clone(&self) -> Self {
1406 Self(self.0.clone(), PhantomData)
1407 }
1408}
1409
1410impl<S: Service> From<LocalSender<S>> for Client<S> {
1411 fn from(tx: LocalSender<S>) -> Self {
1412 Self(ClientInner::Local(tx.0), PhantomData)
1413 }
1414}
1415
1416impl<S: Service> From<tokio::sync::mpsc::Sender<S::Message>> for Client<S> {
1417 fn from(tx: tokio::sync::mpsc::Sender<S::Message>) -> Self {
1418 LocalSender::from(tx).into()
1419 }
1420}
1421
1422impl<S: Service> Client<S> {
1423 /// Create a new client to a remote service using the given noq `endpoint`
1424 /// and a socket `addr` of the remote service.
1425 #[cfg(feature = "rpc")]
1426 pub fn noq(endpoint: noq::Endpoint, addr: std::net::SocketAddr) -> Self {
1427 Self::boxed(rpc::NoqLazyRemoteConnection::new(endpoint, addr))
1428 }
1429
1430 /// Create a new client from a `rpc::RemoteConnection` trait object.
1431 /// This is used from crates that want to provide other transports than noq,
1432 /// such as the iroh transport.
1433 #[cfg(feature = "rpc")]
1434 pub fn boxed(remote: impl rpc::RemoteConnection) -> Self {
1435 Self(ClientInner::Remote(Box::new(remote)), PhantomData)
1436 }
1437
1438 /// Creates a new client from a `tokio::sync::mpsc::Sender`.
1439 pub fn local(tx: impl Into<crate::channel::mpsc::Sender<S::Message>>) -> Self {
1440 let tx: crate::channel::mpsc::Sender<S::Message> = tx.into();
1441 Self(ClientInner::Local(tx), PhantomData)
1442 }
1443
1444 /// Get the local sender. This is useful if you don't care about remote
1445 /// requests.
1446 pub fn as_local(&self) -> Option<LocalSender<S>> {
1447 match &self.0 {
1448 ClientInner::Local(tx) => Some(tx.clone().into()),
1449 ClientInner::Remote(..) => None,
1450 }
1451 }
1452
1453 /// Start a request by creating a sender that can be used to send the initial
1454 /// message to the local or remote service.
1455 ///
1456 /// In the local case, this is just a clone which has almost zero overhead.
1457 /// Creating a local sender can not fail.
1458 ///
1459 /// In the remote case, this involves lazily creating a connection to the
1460 /// remote side and then creating a new stream on the underlying
1461 /// [`noq`] or iroh connection.
1462 ///
1463 /// In both cases, the returned sender is fully self contained.
1464 #[allow(clippy::type_complexity)]
1465 pub fn request(
1466 &self,
1467 ) -> impl Future<Output = Result<Request<LocalSender<S>, rpc::RemoteSender<S>>, RequestError>> + use<S>
1468 {
1469 #[cfg(feature = "rpc")]
1470 {
1471 let cloned = match &self.0 {
1472 ClientInner::Local(tx) => Request::Local(tx.clone()),
1473 ClientInner::Remote(connection) => Request::Remote(connection.clone_boxed()),
1474 };
1475 async move {
1476 match cloned {
1477 Request::Local(tx) => Ok(Request::Local(tx.into())),
1478 Request::Remote(conn) => {
1479 let (send, recv) = conn.open_bi().await?;
1480 Ok(Request::Remote(rpc::RemoteSender::new(send, recv)))
1481 }
1482 }
1483 }
1484 }
1485 #[cfg(not(feature = "rpc"))]
1486 {
1487 let ClientInner::Local(tx) = &self.0 else {
1488 unreachable!()
1489 };
1490 let tx = tx.clone().into();
1491 async move { Ok(Request::Local(tx)) }
1492 }
1493 }
1494
1495 /// Performs a request for which the client can send updates.
1496 pub fn client_streaming<Req, Update, Res>(
1497 &self,
1498 msg: Req,
1499 local_update_cap: usize,
1500 ) -> impl Future<Output = Result<(mpsc::Sender<Update>, oneshot::Receiver<Res>)>>
1501 + use<Req, Update, Res, S>
1502 where
1503 S: From<Req>,
1504 S::Message: From<WithChannels<Req, S>>,
1505 Req: Channels<S, Tx = oneshot::Sender<Res>, Rx = mpsc::Receiver<Update>>,
1506 Update: RpcMessage,
1507 Res: RpcMessage,
1508 {
1509 let request = self.request();
1510 async move {
1511 let (update_tx, res_rx): (mpsc::Sender<Update>, oneshot::Receiver<Res>) =
1512 match request.await? {
1513 Request::Local(request) => {
1514 let (req_tx, req_rx) = mpsc::channel(local_update_cap);
1515 let (res_tx, res_rx) = oneshot::channel();
1516 request.send((msg, res_tx, req_rx)).await?;
1517 (req_tx, res_rx)
1518 }
1519 #[cfg(not(feature = "rpc"))]
1520 Request::Remote(_request) => unreachable!(),
1521 #[cfg(feature = "rpc")]
1522 Request::Remote(request) => {
1523 let (tx, rx) = request.write(msg).await?;
1524 (tx.into(), rx.into())
1525 }
1526 };
1527 Ok((update_tx, res_rx))
1528 }
1529 }
1530
1531 /// Performs a request for which the client can send updates, and the server returns a mpsc receiver.
1532 pub fn bidi_streaming<Req, Update, Res>(
1533 &self,
1534 msg: Req,
1535 local_update_cap: usize,
1536 local_response_cap: usize,
1537 ) -> impl Future<Output = Result<(mpsc::Sender<Update>, mpsc::Receiver<Res>)>>
1538 + Send
1539 + 'static
1540 + use<Req, Update, Res, S>
1541 where
1542 S: From<Req>,
1543 S::Message: From<WithChannels<Req, S>>,
1544 Req: Channels<S, Tx = mpsc::Sender<Res>, Rx = mpsc::Receiver<Update>>,
1545 Update: RpcMessage,
1546 Res: RpcMessage,
1547 {
1548 let request = self.request();
1549 async move {
1550 let (update_tx, res_rx): (mpsc::Sender<Update>, mpsc::Receiver<Res>) =
1551 match request.await? {
1552 Request::Local(request) => {
1553 let (update_tx, update_rx) = mpsc::channel(local_update_cap);
1554 let (res_tx, res_rx) = mpsc::channel(local_response_cap);
1555 request.send((msg, res_tx, update_rx)).await?;
1556 (update_tx, res_rx)
1557 }
1558 #[cfg(not(feature = "rpc"))]
1559 Request::Remote(_request) => unreachable!(),
1560 #[cfg(feature = "rpc")]
1561 Request::Remote(request) => {
1562 let (tx, rx) = request.write(msg).await?;
1563 (tx.into(), rx.into())
1564 }
1565 };
1566 Ok((update_tx, res_rx))
1567 }
1568 }
1569
1570 /// Performs a request for which the server returns nothing.
1571 ///
1572 /// The purpose of notify is to send messages to the remote without waiting
1573 /// for the remote to respond.
1574 ///
1575 /// The returned future completes once the message is written *locally*.
1576 /// Therefore we have no guarantee that the remote has received the message.
1577 ///
1578 /// If we close the connection immediately after the future returns, the
1579 /// connection might be closed *before* the message is on the wire, so the
1580 /// remote might never receive it.
1581 ///
1582 /// If you need to send a message with unit result but want to wait until the
1583 /// remote has received it, consider using [`rpc`] with a unit `()` return
1584 /// type instead.
1585 ///
1586 /// This method is safe to use with both regular and 0-RTT connections.
1587 /// If 0-RTT data is rejected, the message will be automatically re-sent.
1588 pub fn notify<Req>(&self, msg: Req) -> impl Future<Output = Result<()>> + Send + 'static
1589 where
1590 S: From<Req>,
1591 S::Message: From<WithChannels<Req, S>>,
1592 Req: Channels<S, Tx = NoSender, Rx = NoReceiver>,
1593 {
1594 let this = self.clone();
1595 async move {
1596 match this.request().await? {
1597 Request::Local(request) => {
1598 request.send((msg,)).await?;
1599 }
1600 #[cfg(not(feature = "rpc"))]
1601 Request::Remote(_request) => unreachable!(),
1602 #[cfg(feature = "rpc")]
1603 Request::Remote(request) => {
1604 // see https://www.iroh.computer/blog/0rtt-api#connect-side
1605 let buf = rpc::prepare_write::<S>(msg)?;
1606 let (_tx, _rx) = request.write_raw(&buf).await?;
1607 if this.0.zero_rtt_rejected().await {
1608 // 0rtt was not accepted, the data is lost, send it again!
1609 let Request::Remote(request) = this.request().await? else {
1610 unreachable!()
1611 };
1612 let (_tx, _rx) = request.write_raw(&buf).await?;
1613 }
1614 }
1615 };
1616 Ok(())
1617 }
1618 }
1619
1620 /// Performs a request for which the server returns a oneshot receiver.
1621 ///
1622 /// This method is safe to use with both regular and 0-RTT connections.
1623 /// If 0-RTT data is rejected, the message will be automatically re-sent.
1624 pub fn rpc<Req, Res>(&self, msg: Req) -> impl Future<Output = Result<Res>> + Send + 'static
1625 where
1626 S: From<Req>,
1627 S::Message: From<WithChannels<Req, S>>,
1628 Req: Channels<S, Tx = oneshot::Sender<Res>, Rx = NoReceiver>,
1629 Res: RpcMessage,
1630 {
1631 let this = self.clone();
1632 async move {
1633 let recv: oneshot::Receiver<Res> = match this.request().await? {
1634 Request::Local(request) => {
1635 let (tx, rx) = oneshot::channel();
1636 request.send((msg, tx)).await?;
1637 rx
1638 }
1639 #[cfg(not(feature = "rpc"))]
1640 Request::Remote(_request) => unreachable!(),
1641 #[cfg(feature = "rpc")]
1642 Request::Remote(request) => {
1643 // see https://www.iroh.computer/blog/0rtt-api#connect-side
1644 let buf = rpc::prepare_write::<S>(msg)?;
1645 let (_tx, rx) = request.write_raw(&buf).await?;
1646 if this.0.zero_rtt_rejected().await {
1647 // 0rtt was not accepted, the data is lost, send it again!
1648 let Request::Remote(request) = this.request().await? else {
1649 unreachable!()
1650 };
1651 let (_tx, rx) = request.write_raw(&buf).await?;
1652 rx
1653 } else {
1654 rx
1655 }
1656 .into()
1657 }
1658 };
1659 let res = recv.await?;
1660 Ok(res)
1661 }
1662 }
1663
1664 /// Performs a request for which the server returns a mpsc receiver.
1665 ///
1666 /// This method is safe to use with both regular and 0-RTT connections.
1667 /// If 0-RTT data is rejected, the message will be automatically re-sent.
1668 pub fn server_streaming<Req, Res>(
1669 &self,
1670 msg: Req,
1671 local_response_cap: usize,
1672 ) -> impl Future<Output = Result<mpsc::Receiver<Res>>> + Send + 'static + use<Req, Res, S>
1673 where
1674 S: From<Req>,
1675 S::Message: From<WithChannels<Req, S>>,
1676 Req: Channels<S, Tx = mpsc::Sender<Res>, Rx = NoReceiver>,
1677 Res: RpcMessage,
1678 {
1679 let this = self.clone();
1680 async move {
1681 let recv: mpsc::Receiver<Res> = match this.request().await? {
1682 Request::Local(request) => {
1683 let (tx, rx) = mpsc::channel(local_response_cap);
1684 request.send((msg, tx)).await?;
1685 rx
1686 }
1687 #[cfg(not(feature = "rpc"))]
1688 Request::Remote(_request) => unreachable!(),
1689 #[cfg(feature = "rpc")]
1690 Request::Remote(request) => {
1691 // see https://www.iroh.computer/blog/0rtt-api#connect-side
1692 let buf = rpc::prepare_write::<S>(msg)?;
1693 let (_tx, rx) = request.write_raw(&buf).await?;
1694 if this.0.zero_rtt_rejected().await {
1695 // 0rtt was not accepted, the data is lost, send it again!
1696 let Request::Remote(request) = this.request().await? else {
1697 unreachable!()
1698 };
1699 let (_tx, rx) = request.write_raw(&buf).await?;
1700 rx
1701 } else {
1702 rx
1703 }
1704 .into()
1705 }
1706 };
1707 Ok(recv)
1708 }
1709 }
1710
1711 /// Deprecated: use [`Self::notify`] instead, it handles 0rtt automatically.
1712 #[deprecated(note = "use `notify` instead, it handles 0rtt automatically")]
1713 pub fn notify_0rtt<Req>(&self, msg: Req) -> impl Future<Output = Result<()>> + Send + 'static
1714 where
1715 S: From<Req>,
1716 S::Message: From<WithChannels<Req, S>>,
1717 Req: Channels<S, Tx = NoSender, Rx = NoReceiver>,
1718 {
1719 self.notify(msg)
1720 }
1721
1722 /// Deprecated: use [`Self::rpc`] instead, it handles 0rtt automatically.
1723 #[deprecated(note = "use `rpc` instead, it handles 0rtt automatically")]
1724 pub fn rpc_0rtt<Req, Res>(&self, msg: Req) -> impl Future<Output = Result<Res>> + Send + 'static
1725 where
1726 S: From<Req>,
1727 S::Message: From<WithChannels<Req, S>>,
1728 Req: Channels<S, Tx = oneshot::Sender<Res>, Rx = NoReceiver>,
1729 Res: RpcMessage,
1730 {
1731 self.rpc(msg)
1732 }
1733
1734 /// Deprecated: use [`Self::server_streaming`] instead, it handles 0rtt automatically.
1735 #[deprecated(note = "use `server_streaming` instead, it handles 0rtt automatically")]
1736 pub fn server_streaming_0rtt<Req, Res>(
1737 &self,
1738 msg: Req,
1739 local_response_cap: usize,
1740 ) -> impl Future<Output = Result<mpsc::Receiver<Res>>> + Send + 'static
1741 where
1742 S: From<Req>,
1743 S::Message: From<WithChannels<Req, S>>,
1744 Req: Channels<S, Tx = mpsc::Sender<Res>, Rx = NoReceiver>,
1745 Res: RpcMessage,
1746 {
1747 self.server_streaming(msg, local_response_cap)
1748 }
1749}
1750
1751#[derive(Debug)]
1752pub(crate) enum ClientInner<M> {
1753 Local(crate::channel::mpsc::Sender<M>),
1754 #[cfg(feature = "rpc")]
1755 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1756 Remote(Box<dyn rpc::RemoteConnection>),
1757 #[cfg(not(feature = "rpc"))]
1758 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1759 #[allow(dead_code)]
1760 Remote(PhantomData<M>),
1761}
1762
1763impl<M> Clone for ClientInner<M> {
1764 fn clone(&self) -> Self {
1765 match self {
1766 Self::Local(tx) => Self::Local(tx.clone()),
1767 #[cfg(feature = "rpc")]
1768 Self::Remote(conn) => Self::Remote(conn.clone_boxed()),
1769 #[cfg(not(feature = "rpc"))]
1770 Self::Remote(_) => unreachable!(),
1771 }
1772 }
1773}
1774
1775impl<M> ClientInner<M> {
1776 #[allow(dead_code)]
1777 async fn zero_rtt_rejected(&self) -> bool {
1778 match self {
1779 ClientInner::Local(_sender) => false,
1780 #[cfg(feature = "rpc")]
1781 ClientInner::Remote(remote_connection) => remote_connection.zero_rtt_rejected().await,
1782 #[cfg(not(feature = "rpc"))]
1783 Self::Remote(_) => unreachable!(),
1784 }
1785 }
1786}
1787
1788/// Error when opening a request. When cross-process rpc is disabled, this is
1789/// an empty enum since local requests can not fail.
1790#[stack_error(derive, add_meta, from_sources)]
1791pub enum RequestError {
1792 /// Error in noq during connect
1793 #[cfg(feature = "rpc")]
1794 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1795 #[error("Error establishing connection")]
1796 Connect {
1797 #[error(std_err)]
1798 source: noq::ConnectError,
1799 },
1800 /// Error in noq when the connection already exists, when opening a stream pair
1801 #[cfg(feature = "rpc")]
1802 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1803 #[error("Error opening stream")]
1804 Connection {
1805 #[error(std_err)]
1806 source: noq::ConnectionError,
1807 },
1808 /// Generic error for non-noq transports
1809 #[cfg(feature = "rpc")]
1810 #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1811 #[error("Error opening stream")]
1812 Other { source: AnyError },
1813
1814 #[cfg(not(feature = "rpc"))]
1815 #[error("(Without the rpc feature, requests cannot fail")]
1816 Unreachable,
1817}
1818
1819/// Error type that subsumes all possible errors in this crate, for convenience.
1820#[stack_error(derive, add_meta, from_sources)]
1821pub enum Error {
1822 #[error("Request error")]
1823 Request { source: RequestError },
1824 #[error("Send error")]
1825 Send { source: channel::SendError },
1826 #[error("Mpsc recv error")]
1827 MpscRecv { source: channel::mpsc::RecvError },
1828 #[error("Oneshot recv error")]
1829 OneshotRecv { source: channel::oneshot::RecvError },
1830 #[cfg(feature = "rpc")]
1831 #[error("Recv error")]
1832 Write { source: rpc::WriteError },
1833}
1834
1835/// Type alias for a result with an irpc error type.
1836pub type Result<T, E = Error> = std::result::Result<T, E>;
1837
1838impl From<Error> for io::Error {
1839 fn from(e: Error) -> Self {
1840 match e {
1841 Error::Request { source, .. } => source.into(),
1842 Error::Send { source, .. } => source.into(),
1843 Error::MpscRecv { source, .. } => source.into(),
1844 Error::OneshotRecv { source, .. } => source.into(),
1845 #[cfg(feature = "rpc")]
1846 Error::Write { source, .. } => source.into(),
1847 }
1848 }
1849}
1850
1851impl From<RequestError> for io::Error {
1852 fn from(e: RequestError) -> Self {
1853 match e {
1854 #[cfg(feature = "rpc")]
1855 RequestError::Connect { source, .. } => io::Error::other(source),
1856 #[cfg(feature = "rpc")]
1857 RequestError::Connection { source, .. } => source.into(),
1858 #[cfg(feature = "rpc")]
1859 RequestError::Other { source, .. } => io::Error::other(source),
1860 #[cfg(not(feature = "rpc"))]
1861 RequestError::Unreachable { .. } => unreachable!(),
1862 }
1863 }
1864}
1865
1866/// A local sender for the service `S` using the message type `M`.
1867///
1868/// This is a wrapper around an in-memory channel (currently [`tokio::sync::mpsc::Sender`]),
1869/// that adds nice syntax for sending messages that can be converted into
1870/// [`WithChannels`].
1871#[derive(Debug)]
1872#[repr(transparent)]
1873pub struct LocalSender<S: Service>(crate::channel::mpsc::Sender<S::Message>);
1874
1875impl<S: Service> Clone for LocalSender<S> {
1876 fn clone(&self) -> Self {
1877 Self(self.0.clone())
1878 }
1879}
1880
1881impl<S: Service> From<tokio::sync::mpsc::Sender<S::Message>> for LocalSender<S> {
1882 fn from(tx: tokio::sync::mpsc::Sender<S::Message>) -> Self {
1883 Self(tx.into())
1884 }
1885}
1886
1887impl<S: Service> From<crate::channel::mpsc::Sender<S::Message>> for LocalSender<S> {
1888 fn from(tx: crate::channel::mpsc::Sender<S::Message>) -> Self {
1889 Self(tx)
1890 }
1891}
1892
1893#[cfg(not(feature = "rpc"))]
1894pub mod rpc {
1895 pub struct RemoteSender<S>(std::marker::PhantomData<S>);
1896}
1897
1898#[cfg(feature = "rpc")]
1899#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1900pub mod rpc {
1901 //! Module for cross-process RPC using [`noq`].
1902 use std::{
1903 fmt::Debug, future::Future, io, marker::PhantomData, ops::DerefMut, pin::Pin, sync::Arc,
1904 };
1905
1906 use n0_error::{e, stack_error};
1907 use n0_future::{future::Boxed as BoxFuture, task::JoinSet};
1908 /// This is used by irpc-derive to refer to noq types (SendStream and RecvStream)
1909 /// to make generated code work for users without having to depend on noq directly
1910 /// (i.e. when using iroh).
1911 #[doc(hidden)]
1912 pub use noq;
1913 use noq::{ConnectionError, PathId};
1914 use serde::de::DeserializeOwned;
1915 use smallvec::SmallVec;
1916 use tracing::{Instrument, debug, error_span, trace, warn};
1917
1918 use crate::{
1919 LocalSender, RequestError, RpcMessage, Service,
1920 channel::{
1921 SendError,
1922 mpsc::{self, DynReceiver, DynSender},
1923 none::NoSender,
1924 oneshot,
1925 },
1926 util::{AsyncReadVarintExt, WriteVarintExt, now_or_never},
1927 };
1928
1929 /// Default max message size (16 MiB).
1930 pub const MAX_MESSAGE_SIZE: u64 = 1024 * 1024 * 16;
1931
1932 /// Error code on streams if the max message size was exceeded.
1933 pub const ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED: u32 = 1;
1934
1935 /// Error code on streams if the sender tried to send an message that could not be postcard serialized.
1936 pub const ERROR_CODE_INVALID_POSTCARD: u32 = 2;
1937
1938 /// Error that can occur when writing the initial message when doing a
1939 /// cross-process RPC.
1940 #[stack_error(derive, add_meta, from_sources)]
1941 pub enum WriteError {
1942 /// Error writing to the stream with noq
1943 #[error("Error writing to stream")]
1944 Noq {
1945 #[error(std_err)]
1946 source: noq::WriteError,
1947 },
1948 /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
1949 #[error("Maximum message size exceeded")]
1950 MaxMessageSizeExceeded,
1951 /// Generic IO error, e.g. when serializing the message or when using
1952 /// other transports.
1953 #[error("Error serializing")]
1954 Io {
1955 #[error(std_err)]
1956 source: io::Error,
1957 },
1958 }
1959
1960 impl From<postcard::Error> for WriteError {
1961 fn from(value: postcard::Error) -> Self {
1962 e!(Self::Io, io::Error::new(io::ErrorKind::InvalidData, value))
1963 }
1964 }
1965
1966 impl From<postcard::Error> for SendError {
1967 fn from(value: postcard::Error) -> Self {
1968 e!(Self::Io, io::Error::new(io::ErrorKind::InvalidData, value))
1969 }
1970 }
1971
1972 impl From<WriteError> for io::Error {
1973 fn from(e: WriteError) -> Self {
1974 match e {
1975 WriteError::Io { source, .. } => source,
1976 WriteError::MaxMessageSizeExceeded { .. } => {
1977 io::Error::new(io::ErrorKind::InvalidData, e)
1978 }
1979 WriteError::Noq { source, .. } => source.into(),
1980 }
1981 }
1982 }
1983
1984 impl From<noq::WriteError> for SendError {
1985 fn from(err: noq::WriteError) -> Self {
1986 match err {
1987 noq::WriteError::Stopped(code)
1988 if code == ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into() =>
1989 {
1990 e!(SendError::MaxMessageSizeExceeded)
1991 }
1992 _ => e!(SendError::Io, io::Error::from(err)),
1993 }
1994 }
1995 }
1996
1997 /// Trait to abstract over a client connection to a remote service.
1998 ///
1999 /// This isn't really that much abstracted, since the result of open_bi must
2000 /// still be a noq::SendStream and noq::RecvStream. This is just so we
2001 /// can have different connection implementations for normal noq connections,
2002 /// iroh connections, and possibly noq connections with disabled encryption
2003 /// for performance.
2004 ///
2005 /// This is done as a trait instead of an enum, so we don't need an iroh
2006 /// dependency in the main crate.
2007 pub trait RemoteConnection: Send + Sync + Debug + 'static {
2008 /// Boxed clone so the trait is dynable.
2009 fn clone_boxed(&self) -> Box<dyn RemoteConnection>;
2010
2011 /// Open a bidirectional stream to the remote service.
2012 fn open_bi(
2013 &self,
2014 ) -> BoxFuture<std::result::Result<(noq::SendStream, noq::RecvStream), RequestError>>;
2015
2016 /// Returns whether 0-RTT data was rejected by the server.
2017 ///
2018 /// For connections that were fully authenticated before allowing to send any data, this should return `false`.
2019 fn zero_rtt_rejected(&self) -> BoxFuture<bool>;
2020 }
2021
2022 /// A connection to a remote service.
2023 ///
2024 /// Initially this does just have the endpoint and the address. Once a
2025 /// connection is established, it will be stored.
2026 #[derive(Debug, Clone)]
2027 pub(crate) struct NoqLazyRemoteConnection(Arc<NoqLazyRemoteConnectionInner>);
2028
2029 #[derive(Debug)]
2030 struct NoqLazyRemoteConnectionInner {
2031 pub endpoint: noq::Endpoint,
2032 pub addr: std::net::SocketAddr,
2033 pub connection: tokio::sync::Mutex<Option<noq::Connection>>,
2034 }
2035
2036 impl RemoteConnection for noq::Connection {
2037 fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
2038 Box::new(self.clone())
2039 }
2040
2041 fn open_bi(
2042 &self,
2043 ) -> BoxFuture<std::result::Result<(noq::SendStream, noq::RecvStream), RequestError>>
2044 {
2045 let conn = self.clone();
2046 Box::pin(async move {
2047 let pair = conn.open_bi().await?;
2048 Ok(pair)
2049 })
2050 }
2051
2052 fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
2053 Box::pin(async { false })
2054 }
2055 }
2056
2057 impl NoqLazyRemoteConnection {
2058 pub fn new(endpoint: noq::Endpoint, addr: std::net::SocketAddr) -> Self {
2059 Self(Arc::new(NoqLazyRemoteConnectionInner {
2060 endpoint,
2061 addr,
2062 connection: Default::default(),
2063 }))
2064 }
2065 }
2066
2067 impl RemoteConnection for NoqLazyRemoteConnection {
2068 fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
2069 Box::new(self.clone())
2070 }
2071
2072 fn open_bi(
2073 &self,
2074 ) -> BoxFuture<std::result::Result<(noq::SendStream, noq::RecvStream), RequestError>>
2075 {
2076 let this = self.0.clone();
2077 Box::pin(async move {
2078 let mut guard = this.connection.lock().await;
2079 let pair = match guard.as_mut() {
2080 Some(conn) => {
2081 // try to reuse the connection
2082 match conn.open_bi().await {
2083 Ok(pair) => pair,
2084 Err(_) => {
2085 // try with a new connection, just once
2086 *guard = None;
2087 connect_and_open_bi(&this.endpoint, &this.addr, guard).await?
2088 }
2089 }
2090 }
2091 None => connect_and_open_bi(&this.endpoint, &this.addr, guard).await?,
2092 };
2093 Ok(pair)
2094 })
2095 }
2096
2097 fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
2098 Box::pin(async { false })
2099 }
2100 }
2101
2102 async fn connect_and_open_bi(
2103 endpoint: &noq::Endpoint,
2104 addr: &std::net::SocketAddr,
2105 mut guard: tokio::sync::MutexGuard<'_, Option<noq::Connection>>,
2106 ) -> Result<(noq::SendStream, noq::RecvStream), RequestError> {
2107 let conn = endpoint.connect(*addr, "localhost")?.await?;
2108 let (send, recv) = conn.open_bi().await?;
2109 *guard = Some(conn);
2110 Ok((send, recv))
2111 }
2112
2113 /// A connection to a remote service that can be used to send the initial message.
2114 #[derive(Debug)]
2115 pub struct RemoteSender<S>(
2116 noq::SendStream,
2117 noq::RecvStream,
2118 std::marker::PhantomData<S>,
2119 );
2120
2121 /// Serialize a message for sending over the wire.
2122 ///
2123 /// When `S::SPAN_PROPAGATION` is true, the message is wrapped in a tuple with
2124 /// span context: `(Option<SpanContextCarrier>, msg)`.
2125 /// When false, the message is serialized directly.
2126 pub(crate) fn prepare_write<S: Service>(
2127 msg: impl Into<S>,
2128 ) -> Result<SmallVec<[u8; 128]>, WriteError> {
2129 let msg = msg.into();
2130 let mut buf = SmallVec::<[u8; 128]>::new();
2131
2132 if S::SPAN_PROPAGATION {
2133 // Include span context in wire format
2134 let span_ctx = Some(crate::span_propagation::SpanContextCarrier::from_current());
2135 let payload = (span_ctx, msg);
2136 if postcard::experimental::serialized_size(&payload)? as u64 > MAX_MESSAGE_SIZE {
2137 return Err(e!(WriteError::MaxMessageSizeExceeded));
2138 }
2139 buf.write_length_prefixed(&payload)?;
2140 } else {
2141 // Original wire format without span context
2142 if postcard::experimental::serialized_size(&msg)? as u64 > MAX_MESSAGE_SIZE {
2143 return Err(e!(WriteError::MaxMessageSizeExceeded));
2144 }
2145 buf.write_length_prefixed(&msg)?;
2146 }
2147
2148 Ok(buf)
2149 }
2150
2151 impl<S: Service> RemoteSender<S> {
2152 pub fn new(send: noq::SendStream, recv: noq::RecvStream) -> Self {
2153 Self(send, recv, PhantomData)
2154 }
2155
2156 pub async fn write(
2157 self,
2158 msg: impl Into<S>,
2159 ) -> std::result::Result<(noq::SendStream, noq::RecvStream), WriteError> {
2160 let buf = prepare_write(msg)?;
2161 self.write_raw(&buf).await
2162 }
2163
2164 pub(crate) async fn write_raw(
2165 self,
2166 buf: &[u8],
2167 ) -> std::result::Result<(noq::SendStream, noq::RecvStream), WriteError> {
2168 let RemoteSender(mut send, recv, _) = self;
2169 send.write_all(buf).await?;
2170 Ok((send, recv))
2171 }
2172 }
2173
2174 impl<T: DeserializeOwned> From<noq::RecvStream> for oneshot::Receiver<T> {
2175 fn from(mut read: noq::RecvStream) -> Self {
2176 let fut = async move {
2177 let size = read.read_varint_u64().await?.ok_or(io::Error::new(
2178 io::ErrorKind::UnexpectedEof,
2179 "failed to read size",
2180 ))?;
2181 if size > MAX_MESSAGE_SIZE {
2182 read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok();
2183 return Err(e!(oneshot::RecvError::MaxMessageSizeExceeded));
2184 }
2185 let rest = read
2186 .read_to_end(size as usize)
2187 .await
2188 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
2189 let msg: T = postcard::from_bytes(&rest)
2190 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
2191 Ok(msg)
2192 };
2193 oneshot::Receiver::from(|| fut)
2194 }
2195 }
2196
2197 impl From<noq::RecvStream> for crate::channel::none::NoReceiver {
2198 fn from(read: noq::RecvStream) -> Self {
2199 drop(read);
2200 Self
2201 }
2202 }
2203
2204 impl<T: RpcMessage> From<noq::RecvStream> for mpsc::Receiver<T> {
2205 fn from(read: noq::RecvStream) -> Self {
2206 mpsc::Receiver::Boxed(Box::new(NoqReceiver {
2207 recv: read,
2208 _marker: PhantomData,
2209 }))
2210 }
2211 }
2212
2213 impl From<noq::SendStream> for NoSender {
2214 fn from(write: noq::SendStream) -> Self {
2215 let _ = write;
2216 NoSender
2217 }
2218 }
2219
2220 impl<T: RpcMessage> From<noq::SendStream> for oneshot::Sender<T> {
2221 fn from(mut writer: noq::SendStream) -> Self {
2222 oneshot::Sender::Boxed(Box::new(move |value| {
2223 Box::pin(async move {
2224 let size = match postcard::experimental::serialized_size(&value) {
2225 Ok(size) => size,
2226 Err(e) => {
2227 writer.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok();
2228 return Err(e!(
2229 SendError::Io,
2230 io::Error::new(io::ErrorKind::InvalidData, e,)
2231 ));
2232 }
2233 };
2234 if size as u64 > MAX_MESSAGE_SIZE {
2235 writer
2236 .reset(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into())
2237 .ok();
2238 return Err(e!(SendError::MaxMessageSizeExceeded));
2239 }
2240 // write via a small buffer to avoid allocation for small values
2241 let mut buf = SmallVec::<[u8; 128]>::new();
2242 if let Err(e) = buf.write_length_prefixed(value) {
2243 writer.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok();
2244 return Err(e.into());
2245 }
2246 writer.write_all(&buf).await?;
2247 Ok(())
2248 })
2249 }))
2250 }
2251 }
2252
2253 impl<T: RpcMessage> From<noq::SendStream> for mpsc::Sender<T> {
2254 fn from(write: noq::SendStream) -> Self {
2255 mpsc::Sender::Boxed(Arc::new(NoqSender(tokio::sync::Mutex::new(
2256 NoqSenderState::Open(NoqSenderInner {
2257 send: write,
2258 buffer: SmallVec::new(),
2259 _marker: PhantomData,
2260 }),
2261 ))))
2262 }
2263 }
2264
2265 struct NoqReceiver<T> {
2266 recv: noq::RecvStream,
2267 _marker: std::marker::PhantomData<T>,
2268 }
2269
2270 impl<T> Debug for NoqReceiver<T> {
2271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2272 f.debug_struct("NoqReceiver").finish()
2273 }
2274 }
2275
2276 impl<T: RpcMessage> DynReceiver<T> for NoqReceiver<T> {
2277 fn recv(
2278 &mut self,
2279 ) -> Pin<Box<dyn Future<Output = Result<Option<T>, mpsc::RecvError>> + Send + Sync + '_>>
2280 {
2281 Box::pin(async {
2282 let read = &mut self.recv;
2283 let Some(size) = read.read_varint_u64().await? else {
2284 return Ok(None);
2285 };
2286 if size > MAX_MESSAGE_SIZE {
2287 self.recv
2288 .stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into())
2289 .ok();
2290 return Err(e!(mpsc::RecvError::MaxMessageSizeExceeded));
2291 }
2292 let mut buf = vec![0; size as usize];
2293 read.read_exact(&mut buf)
2294 .await
2295 .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?;
2296 let msg: T = postcard::from_bytes(&buf)
2297 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
2298 Ok(Some(msg))
2299 })
2300 }
2301 }
2302
2303 impl<T> Drop for NoqReceiver<T> {
2304 fn drop(&mut self) {}
2305 }
2306
2307 struct NoqSenderInner<T> {
2308 send: noq::SendStream,
2309 buffer: SmallVec<[u8; 128]>,
2310 _marker: std::marker::PhantomData<T>,
2311 }
2312
2313 impl<T: RpcMessage> NoqSenderInner<T> {
2314 fn send(
2315 &mut self,
2316 value: T,
2317 ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + Sync + '_>> {
2318 Box::pin(async {
2319 let size = match postcard::experimental::serialized_size(&value) {
2320 Ok(size) => size,
2321 Err(e) => {
2322 self.send.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok();
2323 return Err(e!(
2324 SendError::Io,
2325 io::Error::new(io::ErrorKind::InvalidData, e)
2326 ));
2327 }
2328 };
2329 if size as u64 > MAX_MESSAGE_SIZE {
2330 self.send
2331 .reset(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into())
2332 .ok();
2333 return Err(e!(SendError::MaxMessageSizeExceeded));
2334 }
2335 let value = value;
2336 self.buffer.clear();
2337 if let Err(e) = self.buffer.write_length_prefixed(value) {
2338 self.send.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok();
2339 return Err(e.into());
2340 }
2341 self.send.write_all(&self.buffer).await?;
2342 self.buffer.clear();
2343 Ok(())
2344 })
2345 }
2346
2347 fn try_send(
2348 &mut self,
2349 value: T,
2350 ) -> Pin<Box<dyn Future<Output = Result<bool, SendError>> + Send + Sync + '_>> {
2351 Box::pin(async {
2352 if postcard::experimental::serialized_size(&value)? as u64 > MAX_MESSAGE_SIZE {
2353 return Err(e!(SendError::MaxMessageSizeExceeded));
2354 }
2355 // todo: move the non-async part out of the box. Will require a new return type.
2356 let value = value;
2357 self.buffer.clear();
2358 self.buffer.write_length_prefixed(value)?;
2359 let Some(n) = now_or_never(self.send.write(&self.buffer)) else {
2360 return Ok(false);
2361 };
2362 let n = n?;
2363 self.send.write_all(&self.buffer[n..]).await?;
2364 self.buffer.clear();
2365 Ok(true)
2366 })
2367 }
2368
2369 fn closed(&mut self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + '_>> {
2370 Box::pin(async move {
2371 self.send.stopped().await.ok();
2372 })
2373 }
2374 }
2375
2376 #[derive(Default)]
2377 enum NoqSenderState<T> {
2378 Open(NoqSenderInner<T>),
2379 #[default]
2380 Closed,
2381 }
2382
2383 struct NoqSender<T>(tokio::sync::Mutex<NoqSenderState<T>>);
2384
2385 impl<T> Debug for NoqSender<T> {
2386 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2387 f.debug_struct("NoqSender").finish()
2388 }
2389 }
2390
2391 impl<T: RpcMessage> DynSender<T> for NoqSender<T> {
2392 fn send(
2393 &self,
2394 value: T,
2395 ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
2396 Box::pin(async {
2397 let mut guard = self.0.lock().await;
2398 let sender = std::mem::take(guard.deref_mut());
2399 match sender {
2400 NoqSenderState::Open(mut sender) => {
2401 let res = sender.send(value).await;
2402 if res.is_ok() {
2403 *guard = NoqSenderState::Open(sender);
2404 }
2405 res
2406 }
2407 NoqSenderState::Closed => {
2408 Err(io::Error::from(io::ErrorKind::BrokenPipe).into())
2409 }
2410 }
2411 })
2412 }
2413
2414 fn try_send(
2415 &self,
2416 value: T,
2417 ) -> Pin<Box<dyn Future<Output = Result<bool, SendError>> + Send + '_>> {
2418 Box::pin(async {
2419 let mut guard = self.0.lock().await;
2420 let sender = std::mem::take(guard.deref_mut());
2421 match sender {
2422 NoqSenderState::Open(mut sender) => {
2423 let res = sender.try_send(value).await;
2424 if res.is_ok() {
2425 *guard = NoqSenderState::Open(sender);
2426 }
2427 res
2428 }
2429 NoqSenderState::Closed => {
2430 Err(io::Error::from(io::ErrorKind::BrokenPipe).into())
2431 }
2432 }
2433 })
2434 }
2435
2436 fn closed(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + '_>> {
2437 Box::pin(async {
2438 let mut guard = self.0.lock().await;
2439 match guard.deref_mut() {
2440 NoqSenderState::Open(sender) => sender.closed().await,
2441 NoqSenderState::Closed => {}
2442 }
2443 })
2444 }
2445
2446 fn is_rpc(&self) -> bool {
2447 true
2448 }
2449 }
2450
2451 /// Type alias for a handler fn for remote requests
2452 pub type Handler<R> = Arc<
2453 dyn Fn(R, noq::RecvStream, noq::SendStream) -> BoxFuture<std::result::Result<(), SendError>>
2454 + Send
2455 + Sync
2456 + 'static,
2457 >;
2458
2459 /// Extension trait to [`Service`] to create a [`Service::Message`] from a [`Service`]
2460 /// and a pair of QUIC streams.
2461 ///
2462 /// This trait is auto-implemented when using the [`crate::rpc_requests`] macro.
2463 pub trait RemoteService: Service + Sized {
2464 /// Returns the message enum for this request by combining `self` (the protocol enum)
2465 /// with a pair of QUIC streams for `tx` and `rx` channels.
2466 fn with_remote_channels(self, rx: noq::RecvStream, tx: noq::SendStream) -> Self::Message;
2467
2468 /// Creates a [`Handler`] that forwards all messages to a [`LocalSender`].
2469 fn remote_handler(local_sender: LocalSender<Self>) -> Handler<Self> {
2470 Arc::new(move |msg, rx, tx| {
2471 // `with_remote_channels` reads the task-local span context installed by
2472 // the dispatch loop, so it must run inside the future (which is polled
2473 // within that scope) rather than eagerly here.
2474 let local_sender = local_sender.clone();
2475 Box::pin(async move {
2476 let msg = Self::with_remote_channels(msg, rx, tx);
2477 local_sender.send_raw(msg).await
2478 })
2479 })
2480 }
2481 }
2482
2483 /// Utility function to listen for incoming connections and handle them with the provided handler.
2484 ///
2485 /// The wire format used depends on `S::SPAN_PROPAGATION` - if true, span context is expected.
2486 pub async fn listen<S: Service>(endpoint: noq::Endpoint, handler: Handler<S>) {
2487 let mut request_id = 0u64;
2488 let mut tasks = JoinSet::new();
2489 loop {
2490 let incoming = tokio::select! {
2491 Some(res) = tasks.join_next(), if !tasks.is_empty() => {
2492 res.expect("irpc connection task panicked");
2493 continue;
2494 }
2495 incoming = endpoint.accept() => {
2496 match incoming {
2497 None => break,
2498 Some(incoming) => incoming
2499 }
2500 }
2501 };
2502 let handler = handler.clone();
2503 let fut = async move {
2504 match incoming.await {
2505 Ok(connection) => match handle_connection(connection, handler).await {
2506 Err(err) => warn!("connection closed with error: {err:?}"),
2507 Ok(()) => debug!("connection closed"),
2508 },
2509 Err(cause) => {
2510 warn!("failed to accept connection: {cause:?}");
2511 }
2512 };
2513 };
2514 let span = error_span!("rpc", id = request_id, remote = tracing::field::Empty);
2515 tasks.spawn(fut.instrument(span));
2516 request_id += 1;
2517 }
2518 }
2519
2520 /// Handles a quic connection with the provided `handler`.
2521 ///
2522 /// This function handles requests for a service `S`. The wire format used depends on
2523 /// `S::SPAN_PROPAGATION` - if true, span context is expected in the wire format.
2524 pub async fn handle_connection<S: Service>(
2525 connection: noq::Connection,
2526 handler: Handler<S>,
2527 ) -> io::Result<()> {
2528 let remote = connection
2529 .path(PathId::ZERO)
2530 .and_then(|p| p.remote_address().ok());
2531 if let Some(remote) = remote {
2532 tracing::Span::current().record("remote", tracing::field::display(remote));
2533 }
2534 debug!("connection accepted");
2535 loop {
2536 let Some((msg, carrier, rx, tx)) = read_request_inner::<S>(&connection).await? else {
2537 return Ok(());
2538 };
2539 crate::span_propagation::scope_remote(carrier, handler(msg, rx, tx)).await?;
2540 }
2541 }
2542
2543 /// Reads a request from a connection and converts it to a message enum.
2544 ///
2545 /// This combines `read_request_raw` with `RemoteService::with_remote_channels`.
2546 pub async fn read_request<S: RemoteService>(
2547 connection: &noq::Connection,
2548 ) -> std::io::Result<Option<S::Message>> {
2549 let Some((msg, carrier, rx, tx)) = read_request_inner::<S>(connection).await? else {
2550 return Ok(None);
2551 };
2552 Ok(Some(
2553 crate::span_propagation::scope_remote(carrier, async move {
2554 S::with_remote_channels(msg, rx, tx)
2555 })
2556 .await,
2557 ))
2558 }
2559
2560 /// Reads a single request from the connection.
2561 ///
2562 /// This accepts a bi-directional stream from the connection and reads and parses the request.
2563 ///
2564 /// When `S::SPAN_PROPAGATION` is true, any propagated span context on the wire is
2565 /// silently dropped. Use [`handle_connection`] (or [`read_request`]) if you need
2566 /// the propagated context to reach the generated handler spans.
2567 ///
2568 /// Returns the parsed request and the stream pair if reading and parsing the request succeeded.
2569 /// Returns None if the remote closed the connection with error code `0`.
2570 /// Returns an error for all other failure cases.
2571 pub async fn read_request_raw<S: Service>(
2572 connection: &noq::Connection,
2573 ) -> std::io::Result<Option<(S, noq::RecvStream, noq::SendStream)>> {
2574 Ok(read_request_inner::<S>(connection)
2575 .await?
2576 .map(|(msg, _carrier, rx, tx)| (msg, rx, tx)))
2577 }
2578
2579 /// Internal: read a request and also return the propagated span context carrier.
2580 ///
2581 /// The carrier is `Some` iff `S::SPAN_PROPAGATION` is true and the remote sent one.
2582 async fn read_request_inner<S: Service>(
2583 connection: &noq::Connection,
2584 ) -> std::io::Result<
2585 Option<(
2586 S,
2587 Option<crate::span_propagation::SpanContextCarrier>,
2588 noq::RecvStream,
2589 noq::SendStream,
2590 )>,
2591 > {
2592 let (send, mut recv) = match connection.accept_bi().await {
2593 Ok((s, r)) => (s, r),
2594 Err(ConnectionError::ApplicationClosed(cause))
2595 if cause.error_code.into_inner() == 0 =>
2596 {
2597 trace!("remote side closed connection {cause:?}");
2598 return Ok(None);
2599 }
2600 Err(cause) => {
2601 warn!("failed to accept bi stream {cause:?}");
2602 return Err(cause.into());
2603 }
2604 };
2605 let size = recv
2606 .read_varint_u64()
2607 .await?
2608 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "failed to read size"))?;
2609 if size > MAX_MESSAGE_SIZE {
2610 connection.close(
2611 ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into(),
2612 b"request exceeded max message size",
2613 );
2614 return Err(e!(mpsc::RecvError::MaxMessageSizeExceeded).into());
2615 }
2616 let mut buf = vec![0; size as usize];
2617 recv.read_exact(&mut buf)
2618 .await
2619 .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?;
2620
2621 let (carrier, msg): (Option<crate::span_propagation::SpanContextCarrier>, S) =
2622 if S::SPAN_PROPAGATION {
2623 postcard::from_bytes(&buf)
2624 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
2625 } else {
2626 let msg = postcard::from_bytes(&buf)
2627 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
2628 (None, msg)
2629 };
2630
2631 Ok(Some((msg, carrier, recv, send)))
2632 }
2633}
2634
2635/// A request to a service. This can be either local or remote.
2636#[derive(Debug)]
2637pub enum Request<L, R> {
2638 /// Local in memory request
2639 Local(L),
2640 /// Remote cross process request
2641 Remote(R),
2642}
2643
2644impl<S: Service> LocalSender<S> {
2645 /// Send a message to the service
2646 pub fn send<T>(
2647 &self,
2648 value: impl Into<WithChannels<T, S>>,
2649 ) -> impl Future<Output = Result<(), SendError>> + Send + 'static
2650 where
2651 T: Channels<S>,
2652 S::Message: From<WithChannels<T, S>>,
2653 {
2654 let value: S::Message = value.into().into();
2655 self.send_raw(value)
2656 }
2657
2658 /// Send a message to the service without the type conversion magic
2659 pub fn send_raw(
2660 &self,
2661 value: S::Message,
2662 ) -> impl Future<Output = Result<(), SendError>> + Send + 'static + use<S> {
2663 let x = self.0.clone();
2664 async move { x.send(value).await }
2665 }
2666}