Skip to main content

irpc/
lib.rs

1//! # A minimal RPC library for use with [iroh](https://docs.rs/iroh/latest/iroh/index.html).
2//!
3//! ## Goals
4//!
5//! The main goal of this library is to provide an rpc framework that is so
6//! lightweight that it can be also used for async boundaries within a single
7//! process without any overhead, instead of the usual practice of a mpsc channel
8//! with a giant message enum where each enum case contains mpsc or oneshot
9//! backchannels.
10//!
11//! The second goal is to lightly abstract over remote and local communication,
12//! so that a system can be interacted with cross process or even across networks.
13//!
14//! ## Non-goals
15//!
16//! - Cross language interop. This is for talking from rust to rust
17//! - Any kind of versioning. You have to do this yourself
18//! - Making remote message passing look like local async function calls
19//! - Being runtime agnostic. This is for tokio
20//!
21//! ## Interaction patterns
22//!
23//! For each request, there can be a response and update channel. Each channel
24//! can be either oneshot, carry multiple messages, or be disabled. This enables
25//! the typical interaction patterns known from libraries like grpc:
26//!
27//! - rpc: 1 request, 1 response
28//! - server streaming: 1 request, multiple responses
29//! - client streaming: multiple requests, 1 response
30//! - bidi streaming: multiple requests, multiple responses
31//!
32//! as well as more complex patterns. It is however not possible to have multiple
33//! differently typed tx channels for a single message type.
34//!
35//! ## Transports
36//!
37//! We don't abstract over the send and receive stream. These must always be
38//! noq streams, specifically streams from the [noq].
39//!
40//! This restricts the possible rpc transports to noq (QUIC with dial by
41//! socket address) and iroh (QUIC with dial by endpoint id).
42//!
43//! An upside of this is that the noq streams can be tuned for each rpc
44//! request, e.g. by setting the stream priority or by directly using more
45//! advanced part of the noq SendStream and RecvStream APIs such as out of
46//! order receiving.
47//!
48//! ## Serialization
49//!
50//! Serialization is currently done using [postcard]. Messages are always
51//! length prefixed with postcard varints, even in the case of oneshot
52//! channels.
53//!
54//! Serialization only happens for cross process rpc communication.
55//!
56//! However, the requirement for message enums to be serializable is present even
57//! when disabling the `rpc` feature. Due to the fact that the channels live
58//! outside the message, this is not a big restriction.
59//!
60//! ## Features
61//!
62//! - `derive`: Enable the [`rpc_requests`] macro.
63//! - `rpc`: Enable the rpc features. Enabled by default.
64//!   By disabling this feature, all rpc related dependencies are removed.
65//!   The remaining dependencies are just serde, tokio and tokio-util.
66//! - `spans`: Enable tracing spans for messages. Enabled by default.
67//!   This is useful even without rpc, to not lose tracing context when message
68//!   passing. This is frequently done manually. This obviously requires
69//!   a dependency on tracing.
70//! - `noq_endpoint_setup`: Easy way to create noq endpoints. This is useful
71//!   both for testing and for rpc on localhost. Enabled by default.
72//!
73//! # Example
74//!
75//! ```
76//! use irpc::{
77//!     Client, WithChannels,
78//!     channel::{mpsc, oneshot},
79//!     rpc_requests,
80//! };
81//! use serde::{Deserialize, Serialize};
82//!
83//! #[tokio::main]
84//! async fn main() -> n0_error::Result<()> {
85//!     let client = spawn_server();
86//!     let res = client.rpc(Multiply(3, 7)).await?;
87//!     assert_eq!(res, 21);
88//!
89//!     let (tx, mut rx) = client.bidi_streaming(Sum, 4, 4).await?;
90//!     tx.send(4).await?;
91//!     assert_eq!(rx.recv().await?, Some(4));
92//!     tx.send(6).await?;
93//!     assert_eq!(rx.recv().await?, Some(10));
94//!     tx.send(11).await?;
95//!     assert_eq!(rx.recv().await?, Some(21));
96//!     Ok(())
97//! }
98//!
99//! /// We define a simple protocol using the derive macro.
100//! #[rpc_requests(message = ComputeMessage)]
101//! #[derive(Debug, Serialize, Deserialize)]
102//! enum ComputeProtocol {
103//!     /// Multiply two numbers, return the result over a oneshot channel.
104//!     #[rpc(tx=oneshot::Sender<i64>)]
105//!     #[wrap(Multiply)]
106//!     Multiply(i64, i64),
107//!     /// Sum all numbers received via the `rx` stream,
108//!     /// reply with the updating sum over the `tx` stream.
109//!     #[rpc(tx=mpsc::Sender<i64>, rx=mpsc::Receiver<i64>)]
110//!     #[wrap(Sum)]
111//!     Sum,
112//! }
113//!
114//! fn spawn_server() -> Client<ComputeProtocol> {
115//!     let (tx, rx) = tokio::sync::mpsc::channel(16);
116//!     // Spawn an actor task to handle incoming requests.
117//!     tokio::task::spawn(server_actor(rx));
118//!     // Return a local client to talk to our actor.
119//!     irpc::Client::local(tx)
120//! }
121//!
122//! async fn server_actor(mut rx: tokio::sync::mpsc::Receiver<ComputeMessage>) {
123//!     while let Some(msg) = rx.recv().await {
124//!         match msg {
125//!             ComputeMessage::Multiply(msg) => {
126//!                 let WithChannels { inner, tx, .. } = msg;
127//!                 let Multiply(a, b) = inner;
128//!                 tx.send(a * b).await.ok();
129//!             }
130//!             ComputeMessage::Sum(msg) => {
131//!                 let WithChannels { tx, mut rx, .. } = msg;
132//!                 // Spawn a separate task for this potentially long-running request.
133//!                 tokio::task::spawn(async move {
134//!                     let mut sum = 0;
135//!                     while let Ok(Some(number)) = rx.recv().await {
136//!                         sum += number;
137//!                         if tx.send(sum).await.is_err() {
138//!                             break;
139//!                         }
140//!                     }
141//!                 });
142//!             }
143//!         }
144//!     }
145//! }
146//! ```
147//!
148//! # History
149//!
150//! This crate evolved out of the [quic-rpc](https://docs.rs/quic-rpc/latest/quic-rpc/index.html) crate, which is a generic RPC
151//! framework for any transport with cheap streams such as QUIC. Compared to
152//! quic-rpc, this crate does not abstract over the stream type and is focused
153//! on [iroh](https://docs.rs/iroh/latest/iroh/index.html) and our [noq](https://docs.rs/noq/latest/noq/index.html).
154#![cfg_attr(quicrpc_docsrs, feature(doc_cfg))]
155use std::{fmt::Debug, future::Future, io, marker::PhantomData, ops::Deref};
156
157/// Processes an RPC request enum and generates trait implementations for use with `irpc`.
158///
159/// This attribute macro may be applied to an enum where each variant represents
160/// a different RPC request type. Each variant of the enum must contain a single unnamed field
161/// of a distinct type (unless the `wrap` attribute is used on a variant, see below).
162///
163/// Basic usage example:
164/// ```
165/// use irpc::{
166///     channel::{mpsc, oneshot},
167///     rpc_requests,
168/// };
169/// use serde::{Deserialize, Serialize};
170///
171/// #[rpc_requests(message = ComputeMessage)]
172/// #[derive(Debug, Serialize, Deserialize)]
173/// enum ComputeProtocol {
174///     /// Multiply two numbers, return the result over a oneshot channel.
175///     #[rpc(tx=oneshot::Sender<i64>)]
176///     Multiply(Multiply),
177///     /// Sum all numbers received via the `rx` stream,
178///     /// reply with the updating sum over the `tx` stream.
179///     #[rpc(tx=mpsc::Sender<i64>, rx=mpsc::Receiver<i64>)]
180///     Sum(Sum),
181/// }
182///
183/// #[derive(Debug, Serialize, Deserialize)]
184/// struct Multiply(i64, i64);
185///
186/// #[derive(Debug, Serialize, Deserialize)]
187/// struct Sum;
188/// ```
189///
190/// ## Generated code
191///
192/// If no further arguments are set, the macro generates:
193///
194/// * A [`Channels<S>`] implementation for each request type (i.e. the type of the variant's
195///   single unnamed field).
196///   The `Tx` and `Rx` types are set to the types provided via the variant's `rpc` attribute.
197/// * A `From` implementation to convert from each request type to the protocol enum.
198///
199/// When the `message` argument is set, the macro will also create a message enum and implement the
200/// [`Service`] and [`RemoteService`] traits for the protocol enum. This is recommended for the
201/// typical use of the macro.
202///
203/// ## Macro arguments
204///
205/// * `message = <name>` *(optional but recommended)*:
206///     * Generates an extended enum wrapping each type in [`WithChannels<T, Service>`].
207///       The attribute value is the name of the message enum type.
208///     * Generates a [`Service`] implementation for the protocol enum, with the `Message`
209///       type set to the message enum.
210///     * Generates a [`rpc::RemoteService`] implementation for the protocol enum.
211/// * `alias = "<suffix>"` *(optional)*: Generate type aliases with the given suffix for each `WithChannels<T, Service>`.
212/// * `rpc_feature = "<feature>"` *(optional)*: If set, the `RemoteService` implementation will be feature-flagged
213///   with this feature. Set this if your crate only optionally enables the `rpc` feature
214///   of `irpc`.
215/// * `no_rpc` *(optional, no value)*: If set, no implementation of `RemoteService` will be generated and the generated
216///   code works without the `rpc` feature of `irpc`.
217/// * `no_spans` *(optional, no value)*: If set, the generated code works without the `spans` feature of `irpc`.
218/// * `span_propagation` *(optional, no value)*: If set, enables OpenTelemetry span context propagation
219///   across remote connections. When enabled, span context is included in the wire format as
220///   `(Option<SpanContextCarrier>, Message)`, and the generated `RemoteService` implementation
221///   will set the parent span from the propagated remote context. Requires the `tracing-opentelemetry`
222///   feature to be enabled for actual OpenTelemetry integration; without it, the context is
223///   still serialized but has no effect.
224///
225/// ## Variant attributes
226///
227/// #### `#[rpc]` attribute
228///
229/// Individual enum variants are annotated with the `#[rpc(...)]` attribute to specify channel types.
230/// The `rpc` attribute contains two optional arguments:
231///
232/// * `tx = SomeType`: Set the kind of channel for sending responses from the server to the client.
233///   Must be a `Sender` type from the [`channel`] module.
234///   If `tx` is not set, it defaults to [`channel::none::NoSender`].
235/// * `rx = OtherType`: Set the kind of channel for receiving updates from the client at the server.
236///   Must be a `Receiver` type from the [`channel`] module.
237///   If `rx` is not set, it defaults to [`channel::none::NoReceiver`].
238///
239/// #### `#[wrap]` attribute
240///
241/// The attribute has the syntax `#[wrap(TypeName, derive(Foo, Bar))]`
242///
243/// If set, a struct `TypeName` will be generated from the variant's fields, and the variant
244/// will be changed to have a single, unnamed field of `TypeName`.
245///
246/// * `TypeName` is the name of the generated type.
247///   By default it will inherit the visibility of the protocol enum. You can set a different
248///   visibility by prefixing it with the visibility (e.g. `pub(crate) TypeName`).
249/// * `derive(Foo, Bar)` is optional and allows to set additional derives for the generated struct.
250///   By default, the struct will get `Serialize`, `Deserialize`, and `Debug` derives.
251///
252/// ## Examples
253///
254/// With `wrap`:
255/// ```
256/// use irpc::{
257///     Client,
258///     channel::{mpsc, oneshot},
259///     rpc_requests,
260/// };
261/// use serde::{Deserialize, Serialize};
262///
263/// #[rpc_requests(message = StoreMessage)]
264/// #[derive(Debug, Serialize, Deserialize)]
265/// enum StoreProtocol {
266///     /// Doc comment for `GetRequest`.
267///     #[rpc(tx=oneshot::Sender<String>)]
268///     #[wrap(GetRequest, derive(Clone))]
269///     Get(String),
270///
271///     /// Doc comment for `SetRequest`.
272///     #[rpc(tx=oneshot::Sender<()>)]
273///     #[wrap(SetRequest)]
274///     Set { key: String, value: String },
275/// }
276///
277/// async fn client_usage(client: Client<StoreProtocol>) -> n0_error::Result<()> {
278///     client
279///         .rpc(SetRequest {
280///             key: "foo".to_string(),
281///             value: "bar".to_string(),
282///         })
283///         .await?;
284///     let value = client.rpc(GetRequest("foo".to_string())).await?;
285///     Ok(())
286/// }
287/// ```
288///
289/// With type aliases:
290/// ```no_compile
291/// #[rpc_requests(message = ComputeMessage, alias = "Msg")]
292/// enum ComputeProtocol {
293///     #[rpc(tx=oneshot::Sender<u128>)]
294///     Sqr(Sqr), // Generates type SqrMsg = WithChannels<Sqr, ComputeProtocol>
295///     #[rpc(tx=mpsc::Sender<i64>)]
296///     Sum(Sum), // Generates type SumMsg = WithChannels<Sum, ComputeProtocol>
297/// }
298/// ```
299///
300/// [`RemoteService`]: rpc::RemoteService
301/// [`WithChannels<T, Service>`]: WithChannels
302/// [`Channels<S>`]: Channels
303#[cfg(feature = "derive")]
304#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "derive")))]
305pub use irpc_derive::rpc_requests;
306#[cfg(feature = "rpc")]
307use n0_error::AnyError;
308use n0_error::stack_error;
309use serde::{Serialize, de::DeserializeOwned};
310
311use self::{
312    channel::{
313        mpsc,
314        none::{NoReceiver, NoSender},
315        oneshot,
316    },
317    sealed::Sealed,
318};
319use crate::channel::SendError;
320
321#[cfg(test)]
322mod tests;
323pub mod util;
324
325mod sealed {
326    pub trait Sealed {}
327}
328
329/// Span context propagation for remote RPC calls
330///
331/// This module provides the `SpanContextCarrier` type for propagating trace context
332/// across remote boundaries. The type is always available when `rpc` feature is enabled,
333/// but actual OpenTelemetry integration requires the `tracing-opentelemetry` feature.
334///
335/// The propagated context is scoped to a single request handler via a tokio task-local,
336/// installed by the dispatch loop in `handle_connection`. This isolates concurrent
337/// requests from each other and is robust to thread migration across `.await` points.
338#[cfg(feature = "rpc")]
339#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
340pub mod span_propagation {
341    use std::{collections::HashMap, future::Future};
342
343    use serde::{Deserialize, Serialize};
344
345    #[cfg(feature = "tracing-opentelemetry")]
346    tokio::task_local! {
347        static SPAN_CONTEXT: opentelemetry::Context;
348    }
349
350    /// Carrier for propagating span context across RPC boundaries using W3C Trace Context format.
351    ///
352    /// This type is always available for serialization purposes. When the
353    /// `tracing-opentelemetry` feature is enabled, it can extract/inject actual
354    /// OpenTelemetry trace context. Without that feature, it simply serializes as an
355    /// empty map.
356    #[derive(Debug, Clone, Serialize, Deserialize, Default)]
357    pub struct SpanContextCarrier {
358        headers: HashMap<String, String>,
359    }
360
361    #[cfg(feature = "tracing-opentelemetry")]
362    impl opentelemetry::propagation::Injector for SpanContextCarrier {
363        fn set(&mut self, key: &str, value: String) {
364            self.headers.insert(key.to_string(), value);
365        }
366    }
367
368    #[cfg(feature = "tracing-opentelemetry")]
369    impl opentelemetry::propagation::Extractor for SpanContextCarrier {
370        fn get(&self, key: &str) -> Option<&str> {
371            self.headers.get(key).map(|v| v.as_str())
372        }
373
374        fn keys(&self) -> Vec<&str> {
375            self.headers.keys().map(|k| k.as_str()).collect()
376        }
377    }
378
379    impl SpanContextCarrier {
380        /// Create a carrier from the current OpenTelemetry context.
381        ///
382        /// When `tracing-opentelemetry` feature is enabled, this extracts the current
383        /// trace context. Without the feature, this returns an empty carrier.
384        #[cfg(feature = "tracing-opentelemetry")]
385        pub fn from_current() -> Self {
386            use opentelemetry::global;
387            use tracing_opentelemetry::OpenTelemetrySpanExt;
388            let mut carrier = Self::default();
389            // Get the OTel context from the current tracing span, not from
390            // opentelemetry::Context::current(). The tracing-opentelemetry layer
391            // stores OTel spans inside tracing spans, so the thread-local OTel
392            // context won't have the right span.
393            let ctx = tracing::Span::current().context();
394            global::get_text_map_propagator(|prop| {
395                prop.inject_context(&ctx, &mut carrier);
396            });
397            carrier
398        }
399
400        #[cfg(not(feature = "tracing-opentelemetry"))]
401        pub fn from_current() -> Self {
402            Self::default()
403        }
404
405        /// Extract an OpenTelemetry context from this carrier.
406        #[cfg(feature = "tracing-opentelemetry")]
407        pub fn to_context(&self) -> opentelemetry::Context {
408            use opentelemetry::global;
409            global::get_text_map_propagator(|prop| {
410                prop.extract_with_context(&opentelemetry::Context::current(), self)
411            })
412        }
413    }
414
415    /// Run `fut` with `carrier`'s context installed as the per-task scope read by
416    /// [`set_span_parent_from_remote`].
417    ///
418    /// Used by transport implementations (`irpc::rpc`, `irpc-iroh`) to wrap a single
419    /// request handler. Most users will not call this directly.
420    pub async fn scope_remote<F: Future>(carrier: Option<SpanContextCarrier>, fut: F) -> F::Output {
421        #[cfg(feature = "tracing-opentelemetry")]
422        if let Some(carrier) = carrier {
423            return SPAN_CONTEXT.scope(carrier.to_context(), fut).await;
424        }
425        let _ = carrier;
426        fut.await
427    }
428
429    /// Set the parent of a span from the propagated remote context, if one is in scope.
430    ///
431    /// Called by the code generated by `rpc_requests(span_propagation)`. Looks up the
432    /// task-local installed by the dispatch loop; no-op outside that scope.
433    pub fn set_span_parent_from_remote(span: &tracing::Span) {
434        #[cfg(feature = "tracing-opentelemetry")]
435        {
436            let _ = SPAN_CONTEXT.try_with(|ctx| {
437                use tracing_opentelemetry::OpenTelemetrySpanExt;
438                let _ = span.set_parent(ctx.clone());
439            });
440        }
441        let _ = span;
442    }
443}
444
445/// Requirements for a RPC message
446///
447/// Even when just using the mem transport, we require messages to be Serializable and Deserializable.
448/// Likewise, even when using the noq transport, we require messages to be Send.
449///
450/// This does not seem like a big restriction. If you want a pure memory channel without the possibility
451/// to also use the noq transport, you might want to use a mpsc channel directly.
452pub trait RpcMessage: Debug + Serialize + DeserializeOwned + Send + Sync + Unpin + 'static {}
453
454impl<T> RpcMessage for T where
455    T: Debug + Serialize + DeserializeOwned + Send + Sync + Unpin + 'static
456{
457}
458
459/// Trait for a service
460///
461/// This is implemented on the protocol enum.
462/// It is usually auto-implemented via the [`rpc_requests] macro.
463///
464/// A service acts as a scope for defining the tx and rx channels for each
465/// message type, and provides some type safety when sending messages.
466pub trait Service: Serialize + DeserializeOwned + Send + Sync + Debug + 'static {
467    /// Message enum for this protocol.
468    ///
469    /// This is expected to be an enum with identical variant names than the
470    /// protocol enum, but its single unit field is the [`WithChannels`] struct
471    /// that contains the inner request plus the `tx` and `rx` channels.
472    type Message: Send + Unpin + 'static;
473
474    /// Whether this protocol includes span context in the wire format.
475    ///
476    /// When `true`, messages are serialized as `(Option<SpanContextCarrier>, Message)`.
477    /// When `false` (default), messages are serialized directly without span context wrapper.
478    ///
479    /// This is controlled by the `span_propagation` attribute on the `rpc_requests` macro.
480    const SPAN_PROPAGATION: bool = false;
481}
482
483/// Sealed marker trait for a sender
484pub trait Sender: Debug + Sealed {}
485
486/// Sealed marker trait for a receiver
487pub trait Receiver: Debug + Sealed {}
488
489/// Trait to specify channels for a message and service
490pub trait Channels<S: Service>: Send + 'static {
491    /// The sender type, can be either mpsc, oneshot or none
492    type Tx: Sender;
493    /// The receiver type, can be either mpsc, oneshot or none
494    ///
495    /// For many services, the receiver is not needed, so it can be set to [`NoReceiver`].
496    type Rx: Receiver;
497}
498
499/// Channels that abstract over local or remote sending
500pub mod channel {
501    use std::io;
502
503    use n0_error::stack_error;
504
505    /// Oneshot channel, similar to tokio's oneshot channel
506    pub mod oneshot {
507        use std::{fmt::Debug, future::Future, io, pin::Pin, task};
508
509        use n0_error::{e, stack_error};
510        use n0_future::future::Boxed as BoxFuture;
511
512        use super::SendError;
513        use crate::util::FusedOneshotReceiver;
514
515        /// Error when receiving a oneshot or mpsc message. For local communication,
516        /// the only thing that can go wrong is that the sender has been closed.
517        ///
518        /// For rpc communication, there can be any number of errors, so this is a
519        /// generic io error.
520        #[stack_error(derive, add_meta, from_sources)]
521        pub enum RecvError {
522            /// The sender has been closed. This is the only error that can occur
523            /// for local communication.
524            #[error("Sender closed")]
525            SenderClosed,
526            /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
527            ///
528            /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
529            #[error("Maximum message size exceeded")]
530            MaxMessageSizeExceeded,
531            /// An io error occurred. This can occur for remote communication,
532            /// due to a network error or deserialization error.
533            #[error("Io error")]
534            Io {
535                #[error(std_err)]
536                source: io::Error,
537            },
538        }
539
540        impl From<RecvError> for io::Error {
541            fn from(e: RecvError) -> Self {
542                match e {
543                    RecvError::Io { source, .. } => source,
544                    RecvError::SenderClosed { .. } => io::Error::new(io::ErrorKind::BrokenPipe, e),
545                    RecvError::MaxMessageSizeExceeded { .. } => {
546                        io::Error::new(io::ErrorKind::InvalidData, e)
547                    }
548                }
549            }
550        }
551
552        /// Create a local oneshot sender and receiver pair.
553        ///
554        /// This is currently using a tokio channel pair internally.
555        pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
556            let (tx, rx) = tokio::sync::oneshot::channel();
557            (tx.into(), rx.into())
558        }
559
560        /// A generic boxed sender.
561        ///
562        /// Remote senders are always boxed, since for remote communication the boxing
563        /// overhead is negligible. However, boxing can also be used for local communication,
564        /// e.g. when applying a transform or filter to the message before sending it.
565        pub type BoxedSender<T> =
566            Box<dyn FnOnce(T) -> BoxFuture<Result<(), SendError>> + Send + Sync + 'static>;
567
568        /// A sender that can be wrapped in a `Box<dyn DynSender<T>>`.
569        ///
570        /// In addition to implementing `Future`, this provides a fn to check if the sender is
571        /// an rpc sender.
572        ///
573        /// Remote receivers are always boxed, since for remote communication the boxing
574        /// overhead is negligible. However, boxing can also be used for local communication,
575        /// e.g. when applying a transform or filter to the message before receiving it.
576        pub trait DynSender<T>:
577            Future<Output = Result<(), SendError>> + Send + Sync + 'static
578        {
579            fn is_rpc(&self) -> bool;
580        }
581
582        /// A generic boxed receiver
583        ///
584        /// Remote receivers are always boxed, since for remote communication the boxing
585        /// overhead is negligible. However, boxing can also be used for local communication,
586        /// e.g. when applying a transform or filter to the message before receiving it.
587        pub type BoxedReceiver<T> = BoxFuture<Result<T, RecvError>>;
588
589        /// A oneshot sender.
590        ///
591        /// Compared to a local onehsot sender, sending a message is async since in the case
592        /// of remote communication, sending over the wire is async. Other than that it
593        /// behaves like a local oneshot sender and has no overhead in the local case.
594        pub enum Sender<T> {
595            Tokio(tokio::sync::oneshot::Sender<T>),
596            /// we can't yet distinguish between local and remote boxed oneshot senders.
597            /// If we ever want to have local boxed oneshot senders, we need to add a
598            /// third variant here.
599            Boxed(BoxedSender<T>),
600        }
601
602        impl<T> Debug for Sender<T> {
603            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604                match self {
605                    Self::Tokio(_) => f.debug_tuple("Tokio").finish(),
606                    Self::Boxed(_) => f.debug_tuple("Boxed").finish(),
607                }
608            }
609        }
610
611        impl<T> From<tokio::sync::oneshot::Sender<T>> for Sender<T> {
612            fn from(tx: tokio::sync::oneshot::Sender<T>) -> Self {
613                Self::Tokio(tx)
614            }
615        }
616
617        impl<T> TryFrom<Sender<T>> for tokio::sync::oneshot::Sender<T> {
618            type Error = Sender<T>;
619
620            fn try_from(value: Sender<T>) -> Result<Self, Self::Error> {
621                match value {
622                    Sender::Tokio(tx) => Ok(tx),
623                    Sender::Boxed(_) => Err(value),
624                }
625            }
626        }
627
628        impl<T> Sender<T> {
629            /// Send a message
630            ///
631            /// If this is a boxed sender that represents a remote connection, sending may yield or fail with an io error.
632            /// Local senders will never yield, but can fail if the receiver has been closed.
633            pub async fn send(self, value: T) -> Result<(), SendError> {
634                match self {
635                    Sender::Tokio(tx) => tx.send(value).map_err(|_| e!(SendError::ReceiverClosed)),
636                    Sender::Boxed(f) => f(value).await,
637                }
638            }
639
640            /// Check if this is a remote sender
641            pub fn is_rpc(&self) -> bool
642            where
643                T: 'static,
644            {
645                match self {
646                    Sender::Tokio(_) => false,
647                    Sender::Boxed(_) => true,
648                }
649            }
650        }
651
652        impl<T: Send + Sync + 'static> Sender<T> {
653            /// Applies a filter before sending.
654            ///
655            /// Messages that don't pass the filter are dropped.
656            pub fn with_filter(self, f: impl Fn(&T) -> bool + Send + Sync + 'static) -> Sender<T> {
657                self.with_filter_map(move |u| if f(&u) { Some(u) } else { None })
658            }
659
660            /// Applies a transform before sending.
661            pub fn with_map<U, F>(self, f: F) -> Sender<U>
662            where
663                F: Fn(U) -> T + Send + Sync + 'static,
664                U: Send + Sync + 'static,
665            {
666                self.with_filter_map(move |u| Some(f(u)))
667            }
668
669            /// Applies a filter and transform before sending.
670            ///
671            /// Messages that don't pass the filter are dropped.
672            pub fn with_filter_map<U, F>(self, f: F) -> Sender<U>
673            where
674                F: Fn(U) -> Option<T> + Send + Sync + 'static,
675                U: Send + Sync + 'static,
676            {
677                let inner: BoxedSender<U> = Box::new(move |value| {
678                    let opt = f(value);
679                    Box::pin(async move {
680                        if let Some(v) = opt {
681                            self.send(v).await
682                        } else {
683                            Ok(())
684                        }
685                    })
686                });
687                Sender::Boxed(inner)
688            }
689        }
690
691        impl<T> crate::sealed::Sealed for Sender<T> {}
692        impl<T> crate::Sender for Sender<T> {}
693
694        /// A oneshot receiver.
695        ///
696        /// Compared to a local oneshot receiver, receiving a message can fail not just
697        /// when the sender has been closed, but also when the remote connection fails.
698        pub enum Receiver<T> {
699            Tokio(FusedOneshotReceiver<T>),
700            Boxed(BoxedReceiver<T>),
701        }
702
703        impl<T> Future for Receiver<T> {
704            type Output = Result<T, RecvError>;
705
706            fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
707                match self.get_mut() {
708                    Self::Tokio(rx) => Pin::new(rx)
709                        .poll(cx)
710                        .map_err(|_| e!(RecvError::SenderClosed)),
711                    Self::Boxed(rx) => Pin::new(rx).poll(cx),
712                }
713            }
714        }
715
716        /// Convert a tokio oneshot receiver to a receiver for this crate
717        impl<T> From<tokio::sync::oneshot::Receiver<T>> for Receiver<T> {
718            fn from(rx: tokio::sync::oneshot::Receiver<T>) -> Self {
719                Self::Tokio(FusedOneshotReceiver(rx))
720            }
721        }
722
723        impl<T> TryFrom<Receiver<T>> for tokio::sync::oneshot::Receiver<T> {
724            type Error = Receiver<T>;
725
726            fn try_from(value: Receiver<T>) -> Result<Self, Self::Error> {
727                match value {
728                    Receiver::Tokio(tx) => Ok(tx.0),
729                    Receiver::Boxed(_) => Err(value),
730                }
731            }
732        }
733
734        /// Convert a function that produces a future to a receiver for this crate
735        impl<T, F, Fut> From<F> for Receiver<T>
736        where
737            F: FnOnce() -> Fut,
738            Fut: Future<Output = Result<T, RecvError>> + Send + 'static,
739        {
740            fn from(f: F) -> Self {
741                Self::Boxed(Box::pin(f()))
742            }
743        }
744
745        impl<T> Debug for Receiver<T> {
746            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
747                match self {
748                    Self::Tokio(_) => f.debug_tuple("Tokio").finish(),
749                    Self::Boxed(_) => f.debug_tuple("Boxed").finish(),
750                }
751            }
752        }
753
754        impl<T> crate::sealed::Sealed for Receiver<T> {}
755        impl<T> crate::Receiver for Receiver<T> {}
756    }
757
758    /// SPSC channel, similar to tokio's mpsc channel
759    ///
760    /// For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc.
761    pub mod mpsc {
762        use std::{fmt::Debug, future::Future, io, marker::PhantomData, pin::Pin, sync::Arc};
763
764        use n0_error::{e, stack_error};
765
766        use super::SendError;
767
768        /// Error when receiving a oneshot or mpsc message. For local communication,
769        /// the only thing that can go wrong is that the sender has been closed.
770        ///
771        /// For rpc communication, there can be any number of errors, so this is a
772        /// generic io error.
773        #[stack_error(derive, add_meta, from_sources)]
774        pub enum RecvError {
775            /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
776            ///
777            /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
778            #[error("Maximum message size exceeded")]
779            MaxMessageSizeExceeded,
780            /// An io error occurred. This can occur for remote communication,
781            /// due to a network error or deserialization error.
782            #[error("Io error")]
783            Io {
784                #[error(std_err)]
785                source: io::Error,
786            },
787        }
788
789        impl From<RecvError> for io::Error {
790            fn from(e: RecvError) -> Self {
791                match e {
792                    RecvError::Io { source, .. } => source,
793                    RecvError::MaxMessageSizeExceeded { .. } => {
794                        io::Error::new(io::ErrorKind::InvalidData, e)
795                    }
796                }
797            }
798        }
799
800        /// Create a local mpsc sender and receiver pair, with the given buffer size.
801        ///
802        /// This is currently using a tokio channel pair internally.
803        pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
804            let (tx, rx) = tokio::sync::mpsc::channel(buffer);
805            (tx.into(), rx.into())
806        }
807
808        /// Single producer, single consumer sender.
809        ///
810        /// For the local case, this wraps a tokio::sync::mpsc::Sender.
811        pub enum Sender<T> {
812            Tokio(tokio::sync::mpsc::Sender<T>),
813            Boxed(Arc<dyn DynSender<T>>),
814        }
815
816        impl<T> Clone for Sender<T> {
817            fn clone(&self) -> Self {
818                match self {
819                    Self::Tokio(tx) => Self::Tokio(tx.clone()),
820                    Self::Boxed(inner) => Self::Boxed(inner.clone()),
821                }
822            }
823        }
824
825        impl<T> Sender<T> {
826            pub fn is_rpc(&self) -> bool
827            where
828                T: 'static,
829            {
830                match self {
831                    Sender::Tokio(_) => false,
832                    Sender::Boxed(x) => x.is_rpc(),
833                }
834            }
835
836            #[cfg(feature = "stream")]
837            pub fn into_sink(self) -> impl n0_future::Sink<T, Error = SendError> + Send + 'static
838            where
839                T: Send + Sync + 'static,
840            {
841                futures_util::sink::unfold(self, |sink, value| async move {
842                    sink.send(value).await?;
843                    Ok(sink)
844                })
845            }
846        }
847
848        impl<T: Send + Sync + 'static> Sender<T> {
849            /// Applies a filter before sending.
850            ///
851            /// Messages that don't pass the filter are dropped.
852            ///
853            /// If you want to combine multiple filters and maps with minimal
854            /// overhead, use `with_filter_map` directly.
855            pub fn with_filter<F>(self, f: F) -> Sender<T>
856            where
857                F: Fn(&T) -> bool + Send + Sync + 'static,
858            {
859                self.with_filter_map(move |u| if f(&u) { Some(u) } else { None })
860            }
861
862            /// Applies a transform before sending.
863            ///
864            /// If you want to combine multiple filters and maps with minimal
865            /// overhead, use `with_filter_map` directly.
866            pub fn with_map<U, F>(self, f: F) -> Sender<U>
867            where
868                F: Fn(U) -> T + Send + Sync + 'static,
869                U: Send + Sync + 'static,
870            {
871                self.with_filter_map(move |u| Some(f(u)))
872            }
873
874            /// Applies a filter and transform before sending.
875            ///
876            /// Any combination of filters and maps can be expressed using
877            /// a single filter_map.
878            pub fn with_filter_map<U, F>(self, f: F) -> Sender<U>
879            where
880                F: Fn(U) -> Option<T> + Send + Sync + 'static,
881                U: Send + Sync + 'static,
882            {
883                let inner: Arc<dyn DynSender<U>> = Arc::new(FilterMapSender {
884                    f,
885                    sender: self,
886                    _p: PhantomData,
887                });
888                Sender::Boxed(inner)
889            }
890
891            /// Future that resolves when the sender is closed
892            pub async fn closed(&self) {
893                match self {
894                    Sender::Tokio(tx) => tx.closed().await,
895                    Sender::Boxed(sink) => sink.closed().await,
896                }
897            }
898        }
899
900        impl<T> From<tokio::sync::mpsc::Sender<T>> for Sender<T> {
901            fn from(tx: tokio::sync::mpsc::Sender<T>) -> Self {
902                Self::Tokio(tx)
903            }
904        }
905
906        impl<T> TryFrom<Sender<T>> for tokio::sync::mpsc::Sender<T> {
907            type Error = Sender<T>;
908
909            fn try_from(value: Sender<T>) -> Result<Self, Self::Error> {
910                match value {
911                    Sender::Tokio(tx) => Ok(tx),
912                    Sender::Boxed(_) => Err(value),
913                }
914            }
915        }
916
917        /// A sender that can be wrapped in a `Arc<dyn DynSender<T>>`.
918        pub trait DynSender<T>: Debug + Send + Sync + 'static {
919            /// Send a message.
920            ///
921            /// For the remote case, if the message can not be completely sent,
922            /// this must return an error and disable the channel.
923            fn send(
924                &self,
925                value: T,
926            ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>>;
927
928            /// Try to send a message, returning as fast as possible if sending
929            /// is not currently possible.
930            ///
931            /// For the remote case, it must be guaranteed that the message is
932            /// either completely sent or not at all.
933            fn try_send(
934                &self,
935                value: T,
936            ) -> Pin<Box<dyn Future<Output = Result<bool, SendError>> + Send + '_>>;
937
938            /// Await the sender close
939            fn closed(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + '_>>;
940
941            /// True if this is a remote sender
942            fn is_rpc(&self) -> bool;
943        }
944
945        /// A receiver that can be wrapped in a `Box<dyn DynReceiver<T>>`.
946        pub trait DynReceiver<T>: Debug + Send + Sync + 'static {
947            fn recv(
948                &mut self,
949            ) -> Pin<Box<dyn Future<Output = Result<Option<T>, RecvError>> + Send + Sync + '_>>;
950        }
951
952        impl<T> Debug for Sender<T> {
953            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
954                match self {
955                    Self::Tokio(x) => f
956                        .debug_struct("Tokio")
957                        .field("avail", &x.capacity())
958                        .field("cap", &x.max_capacity())
959                        .finish(),
960                    Self::Boxed(inner) => f.debug_tuple("Boxed").field(&inner).finish(),
961                }
962            }
963        }
964
965        impl<T: Send + 'static> Sender<T> {
966            /// Send a message and yield until either it is sent or an error occurs.
967            ///
968            /// ## Cancellation safety
969            ///
970            /// If the future is dropped before completion, and if this is a remote sender,
971            /// then the sender will be closed and further sends will return an [`SendError::Io`]
972            /// with [`std::io::ErrorKind::BrokenPipe`]. Therefore, make sure to always poll the
973            /// future until completion if you want to reuse the sender or any clone afterwards.
974            pub async fn send(&self, value: T) -> Result<(), SendError> {
975                match self {
976                    Sender::Tokio(tx) => tx
977                        .send(value)
978                        .await
979                        .map_err(|_| e!(SendError::ReceiverClosed)),
980                    Sender::Boxed(sink) => sink.send(value).await,
981                }
982            }
983
984            /// Try to send a message, returning as fast as possible if sending
985            /// is not currently possible. This can be used to send ephemeral
986            /// messages.
987            ///
988            /// For the local case, this will immediately return false if the
989            /// channel is full.
990            ///
991            /// For the remote case, it will attempt to send the message and
992            /// return false if sending the first byte fails, otherwise yield
993            /// until the message is completely sent or an error occurs. This
994            /// guarantees that the message is sent either completely or not at
995            /// all.
996            ///
997            /// Returns true if the message was sent.
998            ///
999            /// ## Cancellation safety
1000            ///
1001            /// If the future is dropped before completion, and if this is a remote sender,
1002            /// then the sender will be closed and further sends will return an [`SendError::Io`]
1003            /// with [`std::io::ErrorKind::BrokenPipe`]. Therefore, make sure to always poll the
1004            /// future until completion if you want to reuse the sender or any clone afterwards.
1005            pub async fn try_send(&self, value: T) -> Result<bool, SendError> {
1006                match self {
1007                    Sender::Tokio(tx) => match tx.try_send(value) {
1008                        Ok(()) => Ok(true),
1009                        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
1010                            Err(e!(SendError::ReceiverClosed))
1011                        }
1012                        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => Ok(false),
1013                    },
1014                    Sender::Boxed(sink) => sink.try_send(value).await,
1015                }
1016            }
1017        }
1018
1019        impl<T> crate::sealed::Sealed for Sender<T> {}
1020        impl<T> crate::Sender for Sender<T> {}
1021
1022        pub enum Receiver<T> {
1023            Tokio(tokio::sync::mpsc::Receiver<T>),
1024            Boxed(Box<dyn DynReceiver<T>>),
1025        }
1026
1027        impl<T: Send + Sync + 'static> Receiver<T> {
1028            /// Receive a message
1029            ///
1030            /// Returns Ok(None) if the sender has been dropped or the remote end has
1031            /// cleanly closed the connection.
1032            ///
1033            /// Returns an an io error if there was an error receiving the message.
1034            pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
1035                match self {
1036                    Self::Tokio(rx) => Ok(rx.recv().await),
1037                    Self::Boxed(rx) => Ok(rx.recv().await?),
1038                }
1039            }
1040
1041            /// Map messages, transforming them from type T to type U.
1042            pub fn map<U, F>(self, f: F) -> Receiver<U>
1043            where
1044                F: Fn(T) -> U + Send + Sync + 'static,
1045                U: Send + Sync + 'static,
1046            {
1047                self.filter_map(move |u| Some(f(u)))
1048            }
1049
1050            /// Filter messages, only passing through those for which the predicate returns true.
1051            ///
1052            /// Messages that don't pass the filter are dropped.
1053            pub fn filter<F>(self, f: F) -> Receiver<T>
1054            where
1055                F: Fn(&T) -> bool + Send + Sync + 'static,
1056            {
1057                self.filter_map(move |u| if f(&u) { Some(u) } else { None })
1058            }
1059
1060            /// Filter and map messages, only passing through those for which the function returns Some.
1061            ///
1062            /// Messages that don't pass the filter are dropped.
1063            pub fn filter_map<F, U>(self, f: F) -> Receiver<U>
1064            where
1065                U: Send + Sync + 'static,
1066                F: Fn(T) -> Option<U> + Send + Sync + 'static,
1067            {
1068                let inner: Box<dyn DynReceiver<U>> = Box::new(FilterMapReceiver {
1069                    f,
1070                    receiver: self,
1071                    _p: PhantomData,
1072                });
1073                Receiver::Boxed(inner)
1074            }
1075
1076            #[cfg(feature = "stream")]
1077            pub fn into_stream(
1078                self,
1079            ) -> impl n0_future::Stream<Item = Result<T, RecvError>> + Send + Sync + 'static
1080            {
1081                n0_future::stream::unfold(self, |mut recv| async move {
1082                    recv.recv().await.transpose().map(|msg| (msg, recv))
1083                })
1084            }
1085        }
1086
1087        impl<T> From<tokio::sync::mpsc::Receiver<T>> for Receiver<T> {
1088            fn from(rx: tokio::sync::mpsc::Receiver<T>) -> Self {
1089                Self::Tokio(rx)
1090            }
1091        }
1092
1093        impl<T> TryFrom<Receiver<T>> for tokio::sync::mpsc::Receiver<T> {
1094            type Error = Receiver<T>;
1095
1096            fn try_from(value: Receiver<T>) -> Result<Self, Self::Error> {
1097                match value {
1098                    Receiver::Tokio(tx) => Ok(tx),
1099                    Receiver::Boxed(_) => Err(value),
1100                }
1101            }
1102        }
1103
1104        impl<T> Debug for Receiver<T> {
1105            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1106                match self {
1107                    Self::Tokio(inner) => f
1108                        .debug_struct("Tokio")
1109                        .field("avail", &inner.capacity())
1110                        .field("cap", &inner.max_capacity())
1111                        .finish(),
1112                    Self::Boxed(inner) => f.debug_tuple("Boxed").field(&inner).finish(),
1113                }
1114            }
1115        }
1116
1117        struct FilterMapSender<F, T, U> {
1118            f: F,
1119            sender: Sender<T>,
1120            _p: PhantomData<U>,
1121        }
1122
1123        impl<F, T, U> Debug for FilterMapSender<F, T, U> {
1124            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1125                f.debug_struct("FilterMapSender").finish_non_exhaustive()
1126            }
1127        }
1128
1129        impl<F, T, U> DynSender<U> for FilterMapSender<F, T, U>
1130        where
1131            F: Fn(U) -> Option<T> + Send + Sync + 'static,
1132            T: Send + Sync + 'static,
1133            U: Send + Sync + 'static,
1134        {
1135            fn send(
1136                &self,
1137                value: U,
1138            ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
1139                Box::pin(async move {
1140                    match (self.f)(value) {
1141                        Some(v) => self.sender.send(v).await,
1142                        _ => Ok(()),
1143                    }
1144                })
1145            }
1146
1147            fn try_send(
1148                &self,
1149                value: U,
1150            ) -> Pin<Box<dyn Future<Output = Result<bool, SendError>> + Send + '_>> {
1151                Box::pin(async move {
1152                    match (self.f)(value) {
1153                        Some(v) => self.sender.try_send(v).await,
1154                        _ => Ok(true),
1155                    }
1156                })
1157            }
1158
1159            fn is_rpc(&self) -> bool {
1160                self.sender.is_rpc()
1161            }
1162
1163            fn closed(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + '_>> {
1164                match self {
1165                    FilterMapSender {
1166                        sender: Sender::Tokio(tx),
1167                        ..
1168                    } => Box::pin(tx.closed()),
1169                    FilterMapSender {
1170                        sender: Sender::Boxed(sink),
1171                        ..
1172                    } => sink.closed(),
1173                }
1174            }
1175        }
1176
1177        struct FilterMapReceiver<F, T, U> {
1178            f: F,
1179            receiver: Receiver<T>,
1180            _p: PhantomData<U>,
1181        }
1182
1183        impl<F, T, U> Debug for FilterMapReceiver<F, T, U> {
1184            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1185                f.debug_struct("FilterMapReceiver").finish_non_exhaustive()
1186            }
1187        }
1188
1189        impl<F, T, U> DynReceiver<U> for FilterMapReceiver<F, T, U>
1190        where
1191            F: Fn(T) -> Option<U> + Send + Sync + 'static,
1192            T: Send + Sync + 'static,
1193            U: Send + Sync + 'static,
1194        {
1195            fn recv(
1196                &mut self,
1197            ) -> Pin<Box<dyn Future<Output = Result<Option<U>, RecvError>> + Send + Sync + '_>>
1198            {
1199                Box::pin(async move {
1200                    while let Some(msg) = self.receiver.recv().await? {
1201                        if let Some(v) = (self.f)(msg) {
1202                            return Ok(Some(v));
1203                        }
1204                    }
1205                    Ok(None)
1206                })
1207            }
1208        }
1209
1210        impl<T> crate::sealed::Sealed for Receiver<T> {}
1211        impl<T> crate::Receiver for Receiver<T> {}
1212    }
1213
1214    /// No channels, used when no communication is needed
1215    pub mod none {
1216        use crate::sealed::Sealed;
1217
1218        /// A sender that does nothing. This is used when no communication is needed.
1219        #[derive(Debug)]
1220        pub struct NoSender;
1221        impl Sealed for NoSender {}
1222        impl crate::Sender for NoSender {}
1223
1224        /// A receiver that does nothing. This is used when no communication is needed.
1225        #[derive(Debug)]
1226        pub struct NoReceiver;
1227
1228        impl Sealed for NoReceiver {}
1229        impl crate::Receiver for NoReceiver {}
1230    }
1231
1232    /// Error when sending a oneshot or mpsc message. For local communication,
1233    /// the only thing that can go wrong is that the receiver has been dropped.
1234    ///
1235    /// For rpc communication, there can be any number of errors, so this is a
1236    /// generic io error.
1237    #[stack_error(derive, add_meta, from_sources)]
1238    pub enum SendError {
1239        /// The receiver has been closed. This is the only error that can occur
1240        /// for local communication.
1241        #[error("Receiver closed")]
1242        ReceiverClosed,
1243        /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
1244        ///
1245        /// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
1246        #[error("Maximum message size exceeded")]
1247        MaxMessageSizeExceeded,
1248        /// The underlying io error. This can occur for remote communication,
1249        /// due to a network error or serialization error.
1250        #[error("Io error")]
1251        Io {
1252            #[error(std_err)]
1253            source: io::Error,
1254        },
1255    }
1256
1257    impl From<SendError> for io::Error {
1258        fn from(e: SendError) -> Self {
1259            match e {
1260                SendError::ReceiverClosed { .. } => io::Error::new(io::ErrorKind::BrokenPipe, e),
1261                SendError::MaxMessageSizeExceeded { .. } => {
1262                    io::Error::new(io::ErrorKind::InvalidData, e)
1263                }
1264                SendError::Io { source, .. } => source,
1265            }
1266        }
1267    }
1268}
1269
1270/// A wrapper for a message with channels to send and receive it.
1271/// This expands the protocol message to a full message that includes the
1272/// active and unserializable channels.
1273///
1274/// The channel kind for rx and tx is defined by implementing the `Channels`
1275/// trait, either manually or using a macro.
1276///
1277/// When the `spans` feature is enabled, this also includes a tracing
1278/// span to carry the tracing context during message passing.
1279pub struct WithChannels<I: Channels<S>, S: Service> {
1280    /// The inner message.
1281    pub inner: I,
1282    /// The return channel to send the response to. Can be set to [`crate::channel::none::NoSender`] if not needed.
1283    pub tx: <I as Channels<S>>::Tx,
1284    /// The request channel to receive the request from. Can be set to [`NoReceiver`] if not needed.
1285    pub rx: <I as Channels<S>>::Rx,
1286    /// The current span where the full message was created.
1287    #[cfg(feature = "spans")]
1288    #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))]
1289    pub span: tracing::Span,
1290}
1291
1292impl<I: Channels<S> + Debug, S: Service> Debug for WithChannels<I, S> {
1293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1294        f.debug_tuple("")
1295            .field(&self.inner)
1296            .field(&self.tx)
1297            .field(&self.rx)
1298            .finish()
1299    }
1300}
1301
1302impl<I: Channels<S>, S: Service> WithChannels<I, S> {
1303    /// Get the parent span
1304    #[cfg(feature = "spans")]
1305    pub fn parent_span_opt(&self) -> Option<&tracing::Span> {
1306        Some(&self.span)
1307    }
1308}
1309
1310/// Tuple conversion from inner message and tx/rx channels to a WithChannels struct
1311///
1312/// For the case where you want both tx and rx channels.
1313impl<I: Channels<S>, S: Service, Tx, Rx> From<(I, Tx, Rx)> for WithChannels<I, S>
1314where
1315    I: Channels<S>,
1316    <I as Channels<S>>::Tx: From<Tx>,
1317    <I as Channels<S>>::Rx: From<Rx>,
1318{
1319    fn from(inner: (I, Tx, Rx)) -> Self {
1320        let (inner, tx, rx) = inner;
1321        Self {
1322            inner,
1323            tx: tx.into(),
1324            rx: rx.into(),
1325            #[cfg(feature = "spans")]
1326            #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))]
1327            span: tracing::Span::current(),
1328        }
1329    }
1330}
1331
1332/// Tuple conversion from inner message and tx channel to a WithChannels struct
1333///
1334/// For the very common case where you just need a tx channel to send the response to.
1335impl<I, S, Tx> From<(I, Tx)> for WithChannels<I, S>
1336where
1337    I: Channels<S, Rx = NoReceiver>,
1338    S: Service,
1339    <I as Channels<S>>::Tx: From<Tx>,
1340{
1341    fn from(inner: (I, Tx)) -> Self {
1342        let (inner, tx) = inner;
1343        Self {
1344            inner,
1345            tx: tx.into(),
1346            rx: NoReceiver,
1347            #[cfg(feature = "spans")]
1348            #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))]
1349            span: tracing::Span::current(),
1350        }
1351    }
1352}
1353
1354/// Tuple conversion from inner message to a WithChannels struct without channels
1355impl<I, S> From<(I,)> for WithChannels<I, S>
1356where
1357    I: Channels<S, Rx = NoReceiver, Tx = NoSender>,
1358    S: Service,
1359{
1360    fn from(inner: (I,)) -> Self {
1361        let (inner,) = inner;
1362        Self {
1363            inner,
1364            tx: NoSender,
1365            rx: NoReceiver,
1366            #[cfg(feature = "spans")]
1367            #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))]
1368            span: tracing::Span::current(),
1369        }
1370    }
1371}
1372
1373/// Deref so you can access the inner fields directly.
1374///
1375/// If the inner message has fields named `tx`, `rx` or `span`, you need to use the
1376/// `inner` field to access them.
1377impl<I: Channels<S>, S: Service> Deref for WithChannels<I, S> {
1378    type Target = I;
1379
1380    fn deref(&self) -> &Self::Target {
1381        &self.inner
1382    }
1383}
1384
1385/// A client to the service `S` using the local message type `M` and the remote
1386/// message type `R`.
1387///
1388/// `R` is typically a serializable enum with a case for each possible message
1389/// type. It can be thought of as the definition of the protocol.
1390///
1391/// `M` is typically an enum with a case for each possible message type, where
1392/// each case is a `WithChannels` struct that extends the inner protocol message
1393/// with a local tx and rx channel as well as a tracing span to allow for
1394/// keeping tracing context across async boundaries.
1395///
1396/// In some cases, `M` and `R` can be enums for a subset of the protocol. E.g.
1397/// if you have a subsystem that only handles a part of the messages.
1398///
1399/// The service type `S` provides a scope for the protocol messages. It exists
1400/// so you can use the same message with multiple services.
1401#[derive(Debug)]
1402pub struct Client<S: Service>(ClientInner<S::Message>, PhantomData<S>);
1403
1404impl<S: Service> Clone for Client<S> {
1405    fn clone(&self) -> Self {
1406        Self(self.0.clone(), PhantomData)
1407    }
1408}
1409
1410impl<S: Service> From<LocalSender<S>> for Client<S> {
1411    fn from(tx: LocalSender<S>) -> Self {
1412        Self(ClientInner::Local(tx.0), PhantomData)
1413    }
1414}
1415
1416impl<S: Service> From<tokio::sync::mpsc::Sender<S::Message>> for Client<S> {
1417    fn from(tx: tokio::sync::mpsc::Sender<S::Message>) -> Self {
1418        LocalSender::from(tx).into()
1419    }
1420}
1421
1422impl<S: Service> Client<S> {
1423    /// Create a new client to a remote service using the given noq `endpoint`
1424    /// and a socket `addr` of the remote service.
1425    #[cfg(feature = "rpc")]
1426    pub fn noq(endpoint: noq::Endpoint, addr: std::net::SocketAddr) -> Self {
1427        Self::boxed(rpc::NoqLazyRemoteConnection::new(endpoint, addr))
1428    }
1429
1430    /// Create a new client from a `rpc::RemoteConnection` trait object.
1431    /// This is used from crates that want to provide other transports than noq,
1432    /// such as the iroh transport.
1433    #[cfg(feature = "rpc")]
1434    pub fn boxed(remote: impl rpc::RemoteConnection) -> Self {
1435        Self(ClientInner::Remote(Box::new(remote)), PhantomData)
1436    }
1437
1438    /// Creates a new client from a `tokio::sync::mpsc::Sender`.
1439    pub fn local(tx: impl Into<crate::channel::mpsc::Sender<S::Message>>) -> Self {
1440        let tx: crate::channel::mpsc::Sender<S::Message> = tx.into();
1441        Self(ClientInner::Local(tx), PhantomData)
1442    }
1443
1444    /// Get the local sender. This is useful if you don't care about remote
1445    /// requests.
1446    pub fn as_local(&self) -> Option<LocalSender<S>> {
1447        match &self.0 {
1448            ClientInner::Local(tx) => Some(tx.clone().into()),
1449            ClientInner::Remote(..) => None,
1450        }
1451    }
1452
1453    /// Start a request by creating a sender that can be used to send the initial
1454    /// message to the local or remote service.
1455    ///
1456    /// In the local case, this is just a clone which has almost zero overhead.
1457    /// Creating a local sender can not fail.
1458    ///
1459    /// In the remote case, this involves lazily creating a connection to the
1460    /// remote side and then creating a new stream on the underlying
1461    /// [`noq`] or iroh connection.
1462    ///
1463    /// In both cases, the returned sender is fully self contained.
1464    #[allow(clippy::type_complexity)]
1465    pub fn request(
1466        &self,
1467    ) -> impl Future<Output = Result<Request<LocalSender<S>, rpc::RemoteSender<S>>, RequestError>> + use<S>
1468    {
1469        #[cfg(feature = "rpc")]
1470        {
1471            let cloned = match &self.0 {
1472                ClientInner::Local(tx) => Request::Local(tx.clone()),
1473                ClientInner::Remote(connection) => Request::Remote(connection.clone_boxed()),
1474            };
1475            async move {
1476                match cloned {
1477                    Request::Local(tx) => Ok(Request::Local(tx.into())),
1478                    Request::Remote(conn) => {
1479                        let (send, recv) = conn.open_bi().await?;
1480                        Ok(Request::Remote(rpc::RemoteSender::new(send, recv)))
1481                    }
1482                }
1483            }
1484        }
1485        #[cfg(not(feature = "rpc"))]
1486        {
1487            let ClientInner::Local(tx) = &self.0 else {
1488                unreachable!()
1489            };
1490            let tx = tx.clone().into();
1491            async move { Ok(Request::Local(tx)) }
1492        }
1493    }
1494
1495    /// Performs a request for which the client can send updates.
1496    pub fn client_streaming<Req, Update, Res>(
1497        &self,
1498        msg: Req,
1499        local_update_cap: usize,
1500    ) -> impl Future<Output = Result<(mpsc::Sender<Update>, oneshot::Receiver<Res>)>>
1501    + use<Req, Update, Res, S>
1502    where
1503        S: From<Req>,
1504        S::Message: From<WithChannels<Req, S>>,
1505        Req: Channels<S, Tx = oneshot::Sender<Res>, Rx = mpsc::Receiver<Update>>,
1506        Update: RpcMessage,
1507        Res: RpcMessage,
1508    {
1509        let request = self.request();
1510        async move {
1511            let (update_tx, res_rx): (mpsc::Sender<Update>, oneshot::Receiver<Res>) =
1512                match request.await? {
1513                    Request::Local(request) => {
1514                        let (req_tx, req_rx) = mpsc::channel(local_update_cap);
1515                        let (res_tx, res_rx) = oneshot::channel();
1516                        request.send((msg, res_tx, req_rx)).await?;
1517                        (req_tx, res_rx)
1518                    }
1519                    #[cfg(not(feature = "rpc"))]
1520                    Request::Remote(_request) => unreachable!(),
1521                    #[cfg(feature = "rpc")]
1522                    Request::Remote(request) => {
1523                        let (tx, rx) = request.write(msg).await?;
1524                        (tx.into(), rx.into())
1525                    }
1526                };
1527            Ok((update_tx, res_rx))
1528        }
1529    }
1530
1531    /// Performs a request for which the client can send updates, and the server returns a mpsc receiver.
1532    pub fn bidi_streaming<Req, Update, Res>(
1533        &self,
1534        msg: Req,
1535        local_update_cap: usize,
1536        local_response_cap: usize,
1537    ) -> impl Future<Output = Result<(mpsc::Sender<Update>, mpsc::Receiver<Res>)>>
1538    + Send
1539    + 'static
1540    + use<Req, Update, Res, S>
1541    where
1542        S: From<Req>,
1543        S::Message: From<WithChannels<Req, S>>,
1544        Req: Channels<S, Tx = mpsc::Sender<Res>, Rx = mpsc::Receiver<Update>>,
1545        Update: RpcMessage,
1546        Res: RpcMessage,
1547    {
1548        let request = self.request();
1549        async move {
1550            let (update_tx, res_rx): (mpsc::Sender<Update>, mpsc::Receiver<Res>) =
1551                match request.await? {
1552                    Request::Local(request) => {
1553                        let (update_tx, update_rx) = mpsc::channel(local_update_cap);
1554                        let (res_tx, res_rx) = mpsc::channel(local_response_cap);
1555                        request.send((msg, res_tx, update_rx)).await?;
1556                        (update_tx, res_rx)
1557                    }
1558                    #[cfg(not(feature = "rpc"))]
1559                    Request::Remote(_request) => unreachable!(),
1560                    #[cfg(feature = "rpc")]
1561                    Request::Remote(request) => {
1562                        let (tx, rx) = request.write(msg).await?;
1563                        (tx.into(), rx.into())
1564                    }
1565                };
1566            Ok((update_tx, res_rx))
1567        }
1568    }
1569
1570    /// Performs a request for which the server returns nothing.
1571    ///
1572    /// The purpose of notify is to send messages to the remote without waiting
1573    /// for the remote to respond.
1574    ///
1575    /// The returned future completes once the message is written *locally*.
1576    /// Therefore we have no guarantee that the remote has received the message.
1577    ///
1578    /// If we close the connection immediately after the future returns, the
1579    /// connection might be closed *before* the message is on the wire, so the
1580    /// remote might never receive it.
1581    ///
1582    /// If you need to send a message with unit result but want to wait until the
1583    /// remote has received it, consider using [`rpc`] with a unit `()` return
1584    /// type instead.
1585    ///
1586    /// This method is safe to use with both regular and 0-RTT connections.
1587    /// If 0-RTT data is rejected, the message will be automatically re-sent.
1588    pub fn notify<Req>(&self, msg: Req) -> impl Future<Output = Result<()>> + Send + 'static
1589    where
1590        S: From<Req>,
1591        S::Message: From<WithChannels<Req, S>>,
1592        Req: Channels<S, Tx = NoSender, Rx = NoReceiver>,
1593    {
1594        let this = self.clone();
1595        async move {
1596            match this.request().await? {
1597                Request::Local(request) => {
1598                    request.send((msg,)).await?;
1599                }
1600                #[cfg(not(feature = "rpc"))]
1601                Request::Remote(_request) => unreachable!(),
1602                #[cfg(feature = "rpc")]
1603                Request::Remote(request) => {
1604                    // see https://www.iroh.computer/blog/0rtt-api#connect-side
1605                    let buf = rpc::prepare_write::<S>(msg)?;
1606                    let (_tx, _rx) = request.write_raw(&buf).await?;
1607                    if this.0.zero_rtt_rejected().await {
1608                        // 0rtt was not accepted, the data is lost, send it again!
1609                        let Request::Remote(request) = this.request().await? else {
1610                            unreachable!()
1611                        };
1612                        let (_tx, _rx) = request.write_raw(&buf).await?;
1613                    }
1614                }
1615            };
1616            Ok(())
1617        }
1618    }
1619
1620    /// Performs a request for which the server returns a oneshot receiver.
1621    ///
1622    /// This method is safe to use with both regular and 0-RTT connections.
1623    /// If 0-RTT data is rejected, the message will be automatically re-sent.
1624    pub fn rpc<Req, Res>(&self, msg: Req) -> impl Future<Output = Result<Res>> + Send + 'static
1625    where
1626        S: From<Req>,
1627        S::Message: From<WithChannels<Req, S>>,
1628        Req: Channels<S, Tx = oneshot::Sender<Res>, Rx = NoReceiver>,
1629        Res: RpcMessage,
1630    {
1631        let this = self.clone();
1632        async move {
1633            let recv: oneshot::Receiver<Res> = match this.request().await? {
1634                Request::Local(request) => {
1635                    let (tx, rx) = oneshot::channel();
1636                    request.send((msg, tx)).await?;
1637                    rx
1638                }
1639                #[cfg(not(feature = "rpc"))]
1640                Request::Remote(_request) => unreachable!(),
1641                #[cfg(feature = "rpc")]
1642                Request::Remote(request) => {
1643                    // see https://www.iroh.computer/blog/0rtt-api#connect-side
1644                    let buf = rpc::prepare_write::<S>(msg)?;
1645                    let (_tx, rx) = request.write_raw(&buf).await?;
1646                    if this.0.zero_rtt_rejected().await {
1647                        // 0rtt was not accepted, the data is lost, send it again!
1648                        let Request::Remote(request) = this.request().await? else {
1649                            unreachable!()
1650                        };
1651                        let (_tx, rx) = request.write_raw(&buf).await?;
1652                        rx
1653                    } else {
1654                        rx
1655                    }
1656                    .into()
1657                }
1658            };
1659            let res = recv.await?;
1660            Ok(res)
1661        }
1662    }
1663
1664    /// Performs a request for which the server returns a mpsc receiver.
1665    ///
1666    /// This method is safe to use with both regular and 0-RTT connections.
1667    /// If 0-RTT data is rejected, the message will be automatically re-sent.
1668    pub fn server_streaming<Req, Res>(
1669        &self,
1670        msg: Req,
1671        local_response_cap: usize,
1672    ) -> impl Future<Output = Result<mpsc::Receiver<Res>>> + Send + 'static + use<Req, Res, S>
1673    where
1674        S: From<Req>,
1675        S::Message: From<WithChannels<Req, S>>,
1676        Req: Channels<S, Tx = mpsc::Sender<Res>, Rx = NoReceiver>,
1677        Res: RpcMessage,
1678    {
1679        let this = self.clone();
1680        async move {
1681            let recv: mpsc::Receiver<Res> = match this.request().await? {
1682                Request::Local(request) => {
1683                    let (tx, rx) = mpsc::channel(local_response_cap);
1684                    request.send((msg, tx)).await?;
1685                    rx
1686                }
1687                #[cfg(not(feature = "rpc"))]
1688                Request::Remote(_request) => unreachable!(),
1689                #[cfg(feature = "rpc")]
1690                Request::Remote(request) => {
1691                    // see https://www.iroh.computer/blog/0rtt-api#connect-side
1692                    let buf = rpc::prepare_write::<S>(msg)?;
1693                    let (_tx, rx) = request.write_raw(&buf).await?;
1694                    if this.0.zero_rtt_rejected().await {
1695                        // 0rtt was not accepted, the data is lost, send it again!
1696                        let Request::Remote(request) = this.request().await? else {
1697                            unreachable!()
1698                        };
1699                        let (_tx, rx) = request.write_raw(&buf).await?;
1700                        rx
1701                    } else {
1702                        rx
1703                    }
1704                    .into()
1705                }
1706            };
1707            Ok(recv)
1708        }
1709    }
1710
1711    /// Deprecated: use [`Self::notify`] instead, it handles 0rtt automatically.
1712    #[deprecated(note = "use `notify` instead, it handles 0rtt automatically")]
1713    pub fn notify_0rtt<Req>(&self, msg: Req) -> impl Future<Output = Result<()>> + Send + 'static
1714    where
1715        S: From<Req>,
1716        S::Message: From<WithChannels<Req, S>>,
1717        Req: Channels<S, Tx = NoSender, Rx = NoReceiver>,
1718    {
1719        self.notify(msg)
1720    }
1721
1722    /// Deprecated: use [`Self::rpc`] instead, it handles 0rtt automatically.
1723    #[deprecated(note = "use `rpc` instead, it handles 0rtt automatically")]
1724    pub fn rpc_0rtt<Req, Res>(&self, msg: Req) -> impl Future<Output = Result<Res>> + Send + 'static
1725    where
1726        S: From<Req>,
1727        S::Message: From<WithChannels<Req, S>>,
1728        Req: Channels<S, Tx = oneshot::Sender<Res>, Rx = NoReceiver>,
1729        Res: RpcMessage,
1730    {
1731        self.rpc(msg)
1732    }
1733
1734    /// Deprecated: use [`Self::server_streaming`] instead, it handles 0rtt automatically.
1735    #[deprecated(note = "use `server_streaming` instead, it handles 0rtt automatically")]
1736    pub fn server_streaming_0rtt<Req, Res>(
1737        &self,
1738        msg: Req,
1739        local_response_cap: usize,
1740    ) -> impl Future<Output = Result<mpsc::Receiver<Res>>> + Send + 'static
1741    where
1742        S: From<Req>,
1743        S::Message: From<WithChannels<Req, S>>,
1744        Req: Channels<S, Tx = mpsc::Sender<Res>, Rx = NoReceiver>,
1745        Res: RpcMessage,
1746    {
1747        self.server_streaming(msg, local_response_cap)
1748    }
1749}
1750
1751#[derive(Debug)]
1752pub(crate) enum ClientInner<M> {
1753    Local(crate::channel::mpsc::Sender<M>),
1754    #[cfg(feature = "rpc")]
1755    #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1756    Remote(Box<dyn rpc::RemoteConnection>),
1757    #[cfg(not(feature = "rpc"))]
1758    #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1759    #[allow(dead_code)]
1760    Remote(PhantomData<M>),
1761}
1762
1763impl<M> Clone for ClientInner<M> {
1764    fn clone(&self) -> Self {
1765        match self {
1766            Self::Local(tx) => Self::Local(tx.clone()),
1767            #[cfg(feature = "rpc")]
1768            Self::Remote(conn) => Self::Remote(conn.clone_boxed()),
1769            #[cfg(not(feature = "rpc"))]
1770            Self::Remote(_) => unreachable!(),
1771        }
1772    }
1773}
1774
1775impl<M> ClientInner<M> {
1776    #[allow(dead_code)]
1777    async fn zero_rtt_rejected(&self) -> bool {
1778        match self {
1779            ClientInner::Local(_sender) => false,
1780            #[cfg(feature = "rpc")]
1781            ClientInner::Remote(remote_connection) => remote_connection.zero_rtt_rejected().await,
1782            #[cfg(not(feature = "rpc"))]
1783            Self::Remote(_) => unreachable!(),
1784        }
1785    }
1786}
1787
1788/// Error when opening a request. When cross-process rpc is disabled, this is
1789/// an empty enum since local requests can not fail.
1790#[stack_error(derive, add_meta, from_sources)]
1791pub enum RequestError {
1792    /// Error in noq during connect
1793    #[cfg(feature = "rpc")]
1794    #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1795    #[error("Error establishing connection")]
1796    Connect {
1797        #[error(std_err)]
1798        source: noq::ConnectError,
1799    },
1800    /// Error in noq when the connection already exists, when opening a stream pair
1801    #[cfg(feature = "rpc")]
1802    #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1803    #[error("Error opening stream")]
1804    Connection {
1805        #[error(std_err)]
1806        source: noq::ConnectionError,
1807    },
1808    /// Generic error for non-noq transports
1809    #[cfg(feature = "rpc")]
1810    #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1811    #[error("Error opening stream")]
1812    Other { source: AnyError },
1813
1814    #[cfg(not(feature = "rpc"))]
1815    #[error("(Without the rpc feature, requests cannot fail")]
1816    Unreachable,
1817}
1818
1819/// Error type that subsumes all possible errors in this crate, for convenience.
1820#[stack_error(derive, add_meta, from_sources)]
1821pub enum Error {
1822    #[error("Request error")]
1823    Request { source: RequestError },
1824    #[error("Send error")]
1825    Send { source: channel::SendError },
1826    #[error("Mpsc recv error")]
1827    MpscRecv { source: channel::mpsc::RecvError },
1828    #[error("Oneshot recv error")]
1829    OneshotRecv { source: channel::oneshot::RecvError },
1830    #[cfg(feature = "rpc")]
1831    #[error("Recv error")]
1832    Write { source: rpc::WriteError },
1833}
1834
1835/// Type alias for a result with an irpc error type.
1836pub type Result<T, E = Error> = std::result::Result<T, E>;
1837
1838impl From<Error> for io::Error {
1839    fn from(e: Error) -> Self {
1840        match e {
1841            Error::Request { source, .. } => source.into(),
1842            Error::Send { source, .. } => source.into(),
1843            Error::MpscRecv { source, .. } => source.into(),
1844            Error::OneshotRecv { source, .. } => source.into(),
1845            #[cfg(feature = "rpc")]
1846            Error::Write { source, .. } => source.into(),
1847        }
1848    }
1849}
1850
1851impl From<RequestError> for io::Error {
1852    fn from(e: RequestError) -> Self {
1853        match e {
1854            #[cfg(feature = "rpc")]
1855            RequestError::Connect { source, .. } => io::Error::other(source),
1856            #[cfg(feature = "rpc")]
1857            RequestError::Connection { source, .. } => source.into(),
1858            #[cfg(feature = "rpc")]
1859            RequestError::Other { source, .. } => io::Error::other(source),
1860            #[cfg(not(feature = "rpc"))]
1861            RequestError::Unreachable { .. } => unreachable!(),
1862        }
1863    }
1864}
1865
1866/// A local sender for the service `S` using the message type `M`.
1867///
1868/// This is a wrapper around an in-memory channel (currently [`tokio::sync::mpsc::Sender`]),
1869/// that adds nice syntax for sending messages that can be converted into
1870/// [`WithChannels`].
1871#[derive(Debug)]
1872#[repr(transparent)]
1873pub struct LocalSender<S: Service>(crate::channel::mpsc::Sender<S::Message>);
1874
1875impl<S: Service> Clone for LocalSender<S> {
1876    fn clone(&self) -> Self {
1877        Self(self.0.clone())
1878    }
1879}
1880
1881impl<S: Service> From<tokio::sync::mpsc::Sender<S::Message>> for LocalSender<S> {
1882    fn from(tx: tokio::sync::mpsc::Sender<S::Message>) -> Self {
1883        Self(tx.into())
1884    }
1885}
1886
1887impl<S: Service> From<crate::channel::mpsc::Sender<S::Message>> for LocalSender<S> {
1888    fn from(tx: crate::channel::mpsc::Sender<S::Message>) -> Self {
1889        Self(tx)
1890    }
1891}
1892
1893#[cfg(not(feature = "rpc"))]
1894pub mod rpc {
1895    pub struct RemoteSender<S>(std::marker::PhantomData<S>);
1896}
1897
1898#[cfg(feature = "rpc")]
1899#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "rpc")))]
1900pub mod rpc {
1901    //! Module for cross-process RPC using [`noq`].
1902    use std::{
1903        fmt::Debug, future::Future, io, marker::PhantomData, ops::DerefMut, pin::Pin, sync::Arc,
1904    };
1905
1906    use n0_error::{e, stack_error};
1907    use n0_future::{future::Boxed as BoxFuture, task::JoinSet};
1908    /// This is used by irpc-derive to refer to noq types (SendStream and RecvStream)
1909    /// to make generated code work for users without having to depend on noq directly
1910    /// (i.e. when using iroh).
1911    #[doc(hidden)]
1912    pub use noq;
1913    use noq::{ConnectionError, PathId};
1914    use serde::de::DeserializeOwned;
1915    use smallvec::SmallVec;
1916    use tracing::{Instrument, debug, error_span, trace, warn};
1917
1918    use crate::{
1919        LocalSender, RequestError, RpcMessage, Service,
1920        channel::{
1921            SendError,
1922            mpsc::{self, DynReceiver, DynSender},
1923            none::NoSender,
1924            oneshot,
1925        },
1926        util::{AsyncReadVarintExt, WriteVarintExt, now_or_never},
1927    };
1928
1929    /// Default max message size (16 MiB).
1930    pub const MAX_MESSAGE_SIZE: u64 = 1024 * 1024 * 16;
1931
1932    /// Error code on streams if the max message size was exceeded.
1933    pub const ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED: u32 = 1;
1934
1935    /// Error code on streams if the sender tried to send an message that could not be postcard serialized.
1936    pub const ERROR_CODE_INVALID_POSTCARD: u32 = 2;
1937
1938    /// Error that can occur when writing the initial message when doing a
1939    /// cross-process RPC.
1940    #[stack_error(derive, add_meta, from_sources)]
1941    pub enum WriteError {
1942        /// Error writing to the stream with noq
1943        #[error("Error writing to stream")]
1944        Noq {
1945            #[error(std_err)]
1946            source: noq::WriteError,
1947        },
1948        /// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
1949        #[error("Maximum message size exceeded")]
1950        MaxMessageSizeExceeded,
1951        /// Generic IO error, e.g. when serializing the message or when using
1952        /// other transports.
1953        #[error("Error serializing")]
1954        Io {
1955            #[error(std_err)]
1956            source: io::Error,
1957        },
1958    }
1959
1960    impl From<postcard::Error> for WriteError {
1961        fn from(value: postcard::Error) -> Self {
1962            e!(Self::Io, io::Error::new(io::ErrorKind::InvalidData, value))
1963        }
1964    }
1965
1966    impl From<postcard::Error> for SendError {
1967        fn from(value: postcard::Error) -> Self {
1968            e!(Self::Io, io::Error::new(io::ErrorKind::InvalidData, value))
1969        }
1970    }
1971
1972    impl From<WriteError> for io::Error {
1973        fn from(e: WriteError) -> Self {
1974            match e {
1975                WriteError::Io { source, .. } => source,
1976                WriteError::MaxMessageSizeExceeded { .. } => {
1977                    io::Error::new(io::ErrorKind::InvalidData, e)
1978                }
1979                WriteError::Noq { source, .. } => source.into(),
1980            }
1981        }
1982    }
1983
1984    impl From<noq::WriteError> for SendError {
1985        fn from(err: noq::WriteError) -> Self {
1986            match err {
1987                noq::WriteError::Stopped(code)
1988                    if code == ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into() =>
1989                {
1990                    e!(SendError::MaxMessageSizeExceeded)
1991                }
1992                _ => e!(SendError::Io, io::Error::from(err)),
1993            }
1994        }
1995    }
1996
1997    /// Trait to abstract over a client connection to a remote service.
1998    ///
1999    /// This isn't really that much abstracted, since the result of open_bi must
2000    /// still be a noq::SendStream and noq::RecvStream. This is just so we
2001    /// can have different connection implementations for normal noq connections,
2002    /// iroh connections, and possibly noq connections with disabled encryption
2003    /// for performance.
2004    ///
2005    /// This is done as a trait instead of an enum, so we don't need an iroh
2006    /// dependency in the main crate.
2007    pub trait RemoteConnection: Send + Sync + Debug + 'static {
2008        /// Boxed clone so the trait is dynable.
2009        fn clone_boxed(&self) -> Box<dyn RemoteConnection>;
2010
2011        /// Open a bidirectional stream to the remote service.
2012        fn open_bi(
2013            &self,
2014        ) -> BoxFuture<std::result::Result<(noq::SendStream, noq::RecvStream), RequestError>>;
2015
2016        /// Returns whether 0-RTT data was rejected by the server.
2017        ///
2018        /// For connections that were fully authenticated before allowing to send any data, this should return `false`.
2019        fn zero_rtt_rejected(&self) -> BoxFuture<bool>;
2020    }
2021
2022    /// A connection to a remote service.
2023    ///
2024    /// Initially this does just have the endpoint and the address. Once a
2025    /// connection is established, it will be stored.
2026    #[derive(Debug, Clone)]
2027    pub(crate) struct NoqLazyRemoteConnection(Arc<NoqLazyRemoteConnectionInner>);
2028
2029    #[derive(Debug)]
2030    struct NoqLazyRemoteConnectionInner {
2031        pub endpoint: noq::Endpoint,
2032        pub addr: std::net::SocketAddr,
2033        pub connection: tokio::sync::Mutex<Option<noq::Connection>>,
2034    }
2035
2036    impl RemoteConnection for noq::Connection {
2037        fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
2038            Box::new(self.clone())
2039        }
2040
2041        fn open_bi(
2042            &self,
2043        ) -> BoxFuture<std::result::Result<(noq::SendStream, noq::RecvStream), RequestError>>
2044        {
2045            let conn = self.clone();
2046            Box::pin(async move {
2047                let pair = conn.open_bi().await?;
2048                Ok(pair)
2049            })
2050        }
2051
2052        fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
2053            Box::pin(async { false })
2054        }
2055    }
2056
2057    impl NoqLazyRemoteConnection {
2058        pub fn new(endpoint: noq::Endpoint, addr: std::net::SocketAddr) -> Self {
2059            Self(Arc::new(NoqLazyRemoteConnectionInner {
2060                endpoint,
2061                addr,
2062                connection: Default::default(),
2063            }))
2064        }
2065    }
2066
2067    impl RemoteConnection for NoqLazyRemoteConnection {
2068        fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
2069            Box::new(self.clone())
2070        }
2071
2072        fn open_bi(
2073            &self,
2074        ) -> BoxFuture<std::result::Result<(noq::SendStream, noq::RecvStream), RequestError>>
2075        {
2076            let this = self.0.clone();
2077            Box::pin(async move {
2078                let mut guard = this.connection.lock().await;
2079                let pair = match guard.as_mut() {
2080                    Some(conn) => {
2081                        // try to reuse the connection
2082                        match conn.open_bi().await {
2083                            Ok(pair) => pair,
2084                            Err(_) => {
2085                                // try with a new connection, just once
2086                                *guard = None;
2087                                connect_and_open_bi(&this.endpoint, &this.addr, guard).await?
2088                            }
2089                        }
2090                    }
2091                    None => connect_and_open_bi(&this.endpoint, &this.addr, guard).await?,
2092                };
2093                Ok(pair)
2094            })
2095        }
2096
2097        fn zero_rtt_rejected(&self) -> BoxFuture<bool> {
2098            Box::pin(async { false })
2099        }
2100    }
2101
2102    async fn connect_and_open_bi(
2103        endpoint: &noq::Endpoint,
2104        addr: &std::net::SocketAddr,
2105        mut guard: tokio::sync::MutexGuard<'_, Option<noq::Connection>>,
2106    ) -> Result<(noq::SendStream, noq::RecvStream), RequestError> {
2107        let conn = endpoint.connect(*addr, "localhost")?.await?;
2108        let (send, recv) = conn.open_bi().await?;
2109        *guard = Some(conn);
2110        Ok((send, recv))
2111    }
2112
2113    /// A connection to a remote service that can be used to send the initial message.
2114    #[derive(Debug)]
2115    pub struct RemoteSender<S>(
2116        noq::SendStream,
2117        noq::RecvStream,
2118        std::marker::PhantomData<S>,
2119    );
2120
2121    /// Serialize a message for sending over the wire.
2122    ///
2123    /// When `S::SPAN_PROPAGATION` is true, the message is wrapped in a tuple with
2124    /// span context: `(Option<SpanContextCarrier>, msg)`.
2125    /// When false, the message is serialized directly.
2126    pub(crate) fn prepare_write<S: Service>(
2127        msg: impl Into<S>,
2128    ) -> Result<SmallVec<[u8; 128]>, WriteError> {
2129        let msg = msg.into();
2130        let mut buf = SmallVec::<[u8; 128]>::new();
2131
2132        if S::SPAN_PROPAGATION {
2133            // Include span context in wire format
2134            let span_ctx = Some(crate::span_propagation::SpanContextCarrier::from_current());
2135            let payload = (span_ctx, msg);
2136            if postcard::experimental::serialized_size(&payload)? as u64 > MAX_MESSAGE_SIZE {
2137                return Err(e!(WriteError::MaxMessageSizeExceeded));
2138            }
2139            buf.write_length_prefixed(&payload)?;
2140        } else {
2141            // Original wire format without span context
2142            if postcard::experimental::serialized_size(&msg)? as u64 > MAX_MESSAGE_SIZE {
2143                return Err(e!(WriteError::MaxMessageSizeExceeded));
2144            }
2145            buf.write_length_prefixed(&msg)?;
2146        }
2147
2148        Ok(buf)
2149    }
2150
2151    impl<S: Service> RemoteSender<S> {
2152        pub fn new(send: noq::SendStream, recv: noq::RecvStream) -> Self {
2153            Self(send, recv, PhantomData)
2154        }
2155
2156        pub async fn write(
2157            self,
2158            msg: impl Into<S>,
2159        ) -> std::result::Result<(noq::SendStream, noq::RecvStream), WriteError> {
2160            let buf = prepare_write(msg)?;
2161            self.write_raw(&buf).await
2162        }
2163
2164        pub(crate) async fn write_raw(
2165            self,
2166            buf: &[u8],
2167        ) -> std::result::Result<(noq::SendStream, noq::RecvStream), WriteError> {
2168            let RemoteSender(mut send, recv, _) = self;
2169            send.write_all(buf).await?;
2170            Ok((send, recv))
2171        }
2172    }
2173
2174    impl<T: DeserializeOwned> From<noq::RecvStream> for oneshot::Receiver<T> {
2175        fn from(mut read: noq::RecvStream) -> Self {
2176            let fut = async move {
2177                let size = read.read_varint_u64().await?.ok_or(io::Error::new(
2178                    io::ErrorKind::UnexpectedEof,
2179                    "failed to read size",
2180                ))?;
2181                if size > MAX_MESSAGE_SIZE {
2182                    read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok();
2183                    return Err(e!(oneshot::RecvError::MaxMessageSizeExceeded));
2184                }
2185                let rest = read
2186                    .read_to_end(size as usize)
2187                    .await
2188                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
2189                let msg: T = postcard::from_bytes(&rest)
2190                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
2191                Ok(msg)
2192            };
2193            oneshot::Receiver::from(|| fut)
2194        }
2195    }
2196
2197    impl From<noq::RecvStream> for crate::channel::none::NoReceiver {
2198        fn from(read: noq::RecvStream) -> Self {
2199            drop(read);
2200            Self
2201        }
2202    }
2203
2204    impl<T: RpcMessage> From<noq::RecvStream> for mpsc::Receiver<T> {
2205        fn from(read: noq::RecvStream) -> Self {
2206            mpsc::Receiver::Boxed(Box::new(NoqReceiver {
2207                recv: read,
2208                _marker: PhantomData,
2209            }))
2210        }
2211    }
2212
2213    impl From<noq::SendStream> for NoSender {
2214        fn from(write: noq::SendStream) -> Self {
2215            let _ = write;
2216            NoSender
2217        }
2218    }
2219
2220    impl<T: RpcMessage> From<noq::SendStream> for oneshot::Sender<T> {
2221        fn from(mut writer: noq::SendStream) -> Self {
2222            oneshot::Sender::Boxed(Box::new(move |value| {
2223                Box::pin(async move {
2224                    let size = match postcard::experimental::serialized_size(&value) {
2225                        Ok(size) => size,
2226                        Err(e) => {
2227                            writer.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok();
2228                            return Err(e!(
2229                                SendError::Io,
2230                                io::Error::new(io::ErrorKind::InvalidData, e,)
2231                            ));
2232                        }
2233                    };
2234                    if size as u64 > MAX_MESSAGE_SIZE {
2235                        writer
2236                            .reset(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into())
2237                            .ok();
2238                        return Err(e!(SendError::MaxMessageSizeExceeded));
2239                    }
2240                    // write via a small buffer to avoid allocation for small values
2241                    let mut buf = SmallVec::<[u8; 128]>::new();
2242                    if let Err(e) = buf.write_length_prefixed(value) {
2243                        writer.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok();
2244                        return Err(e.into());
2245                    }
2246                    writer.write_all(&buf).await?;
2247                    Ok(())
2248                })
2249            }))
2250        }
2251    }
2252
2253    impl<T: RpcMessage> From<noq::SendStream> for mpsc::Sender<T> {
2254        fn from(write: noq::SendStream) -> Self {
2255            mpsc::Sender::Boxed(Arc::new(NoqSender(tokio::sync::Mutex::new(
2256                NoqSenderState::Open(NoqSenderInner {
2257                    send: write,
2258                    buffer: SmallVec::new(),
2259                    _marker: PhantomData,
2260                }),
2261            ))))
2262        }
2263    }
2264
2265    struct NoqReceiver<T> {
2266        recv: noq::RecvStream,
2267        _marker: std::marker::PhantomData<T>,
2268    }
2269
2270    impl<T> Debug for NoqReceiver<T> {
2271        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2272            f.debug_struct("NoqReceiver").finish()
2273        }
2274    }
2275
2276    impl<T: RpcMessage> DynReceiver<T> for NoqReceiver<T> {
2277        fn recv(
2278            &mut self,
2279        ) -> Pin<Box<dyn Future<Output = Result<Option<T>, mpsc::RecvError>> + Send + Sync + '_>>
2280        {
2281            Box::pin(async {
2282                let read = &mut self.recv;
2283                let Some(size) = read.read_varint_u64().await? else {
2284                    return Ok(None);
2285                };
2286                if size > MAX_MESSAGE_SIZE {
2287                    self.recv
2288                        .stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into())
2289                        .ok();
2290                    return Err(e!(mpsc::RecvError::MaxMessageSizeExceeded));
2291                }
2292                let mut buf = vec![0; size as usize];
2293                read.read_exact(&mut buf)
2294                    .await
2295                    .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?;
2296                let msg: T = postcard::from_bytes(&buf)
2297                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
2298                Ok(Some(msg))
2299            })
2300        }
2301    }
2302
2303    impl<T> Drop for NoqReceiver<T> {
2304        fn drop(&mut self) {}
2305    }
2306
2307    struct NoqSenderInner<T> {
2308        send: noq::SendStream,
2309        buffer: SmallVec<[u8; 128]>,
2310        _marker: std::marker::PhantomData<T>,
2311    }
2312
2313    impl<T: RpcMessage> NoqSenderInner<T> {
2314        fn send(
2315            &mut self,
2316            value: T,
2317        ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + Sync + '_>> {
2318            Box::pin(async {
2319                let size = match postcard::experimental::serialized_size(&value) {
2320                    Ok(size) => size,
2321                    Err(e) => {
2322                        self.send.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok();
2323                        return Err(e!(
2324                            SendError::Io,
2325                            io::Error::new(io::ErrorKind::InvalidData, e)
2326                        ));
2327                    }
2328                };
2329                if size as u64 > MAX_MESSAGE_SIZE {
2330                    self.send
2331                        .reset(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into())
2332                        .ok();
2333                    return Err(e!(SendError::MaxMessageSizeExceeded));
2334                }
2335                let value = value;
2336                self.buffer.clear();
2337                if let Err(e) = self.buffer.write_length_prefixed(value) {
2338                    self.send.reset(ERROR_CODE_INVALID_POSTCARD.into()).ok();
2339                    return Err(e.into());
2340                }
2341                self.send.write_all(&self.buffer).await?;
2342                self.buffer.clear();
2343                Ok(())
2344            })
2345        }
2346
2347        fn try_send(
2348            &mut self,
2349            value: T,
2350        ) -> Pin<Box<dyn Future<Output = Result<bool, SendError>> + Send + Sync + '_>> {
2351            Box::pin(async {
2352                if postcard::experimental::serialized_size(&value)? as u64 > MAX_MESSAGE_SIZE {
2353                    return Err(e!(SendError::MaxMessageSizeExceeded));
2354                }
2355                // todo: move the non-async part out of the box. Will require a new return type.
2356                let value = value;
2357                self.buffer.clear();
2358                self.buffer.write_length_prefixed(value)?;
2359                let Some(n) = now_or_never(self.send.write(&self.buffer)) else {
2360                    return Ok(false);
2361                };
2362                let n = n?;
2363                self.send.write_all(&self.buffer[n..]).await?;
2364                self.buffer.clear();
2365                Ok(true)
2366            })
2367        }
2368
2369        fn closed(&mut self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + '_>> {
2370            Box::pin(async move {
2371                self.send.stopped().await.ok();
2372            })
2373        }
2374    }
2375
2376    #[derive(Default)]
2377    enum NoqSenderState<T> {
2378        Open(NoqSenderInner<T>),
2379        #[default]
2380        Closed,
2381    }
2382
2383    struct NoqSender<T>(tokio::sync::Mutex<NoqSenderState<T>>);
2384
2385    impl<T> Debug for NoqSender<T> {
2386        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2387            f.debug_struct("NoqSender").finish()
2388        }
2389    }
2390
2391    impl<T: RpcMessage> DynSender<T> for NoqSender<T> {
2392        fn send(
2393            &self,
2394            value: T,
2395        ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
2396            Box::pin(async {
2397                let mut guard = self.0.lock().await;
2398                let sender = std::mem::take(guard.deref_mut());
2399                match sender {
2400                    NoqSenderState::Open(mut sender) => {
2401                        let res = sender.send(value).await;
2402                        if res.is_ok() {
2403                            *guard = NoqSenderState::Open(sender);
2404                        }
2405                        res
2406                    }
2407                    NoqSenderState::Closed => {
2408                        Err(io::Error::from(io::ErrorKind::BrokenPipe).into())
2409                    }
2410                }
2411            })
2412        }
2413
2414        fn try_send(
2415            &self,
2416            value: T,
2417        ) -> Pin<Box<dyn Future<Output = Result<bool, SendError>> + Send + '_>> {
2418            Box::pin(async {
2419                let mut guard = self.0.lock().await;
2420                let sender = std::mem::take(guard.deref_mut());
2421                match sender {
2422                    NoqSenderState::Open(mut sender) => {
2423                        let res = sender.try_send(value).await;
2424                        if res.is_ok() {
2425                            *guard = NoqSenderState::Open(sender);
2426                        }
2427                        res
2428                    }
2429                    NoqSenderState::Closed => {
2430                        Err(io::Error::from(io::ErrorKind::BrokenPipe).into())
2431                    }
2432                }
2433            })
2434        }
2435
2436        fn closed(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + '_>> {
2437            Box::pin(async {
2438                let mut guard = self.0.lock().await;
2439                match guard.deref_mut() {
2440                    NoqSenderState::Open(sender) => sender.closed().await,
2441                    NoqSenderState::Closed => {}
2442                }
2443            })
2444        }
2445
2446        fn is_rpc(&self) -> bool {
2447            true
2448        }
2449    }
2450
2451    /// Type alias for a handler fn for remote requests
2452    pub type Handler<R> = Arc<
2453        dyn Fn(R, noq::RecvStream, noq::SendStream) -> BoxFuture<std::result::Result<(), SendError>>
2454            + Send
2455            + Sync
2456            + 'static,
2457    >;
2458
2459    /// Extension trait to [`Service`] to create a [`Service::Message`] from a [`Service`]
2460    /// and a pair of QUIC streams.
2461    ///
2462    /// This trait is auto-implemented when using the [`crate::rpc_requests`] macro.
2463    pub trait RemoteService: Service + Sized {
2464        /// Returns the message enum for this request by combining `self` (the protocol enum)
2465        /// with a pair of QUIC streams for `tx` and `rx` channels.
2466        fn with_remote_channels(self, rx: noq::RecvStream, tx: noq::SendStream) -> Self::Message;
2467
2468        /// Creates a [`Handler`] that forwards all messages to a [`LocalSender`].
2469        fn remote_handler(local_sender: LocalSender<Self>) -> Handler<Self> {
2470            Arc::new(move |msg, rx, tx| {
2471                // `with_remote_channels` reads the task-local span context installed by
2472                // the dispatch loop, so it must run inside the future (which is polled
2473                // within that scope) rather than eagerly here.
2474                let local_sender = local_sender.clone();
2475                Box::pin(async move {
2476                    let msg = Self::with_remote_channels(msg, rx, tx);
2477                    local_sender.send_raw(msg).await
2478                })
2479            })
2480        }
2481    }
2482
2483    /// Utility function to listen for incoming connections and handle them with the provided handler.
2484    ///
2485    /// The wire format used depends on `S::SPAN_PROPAGATION` - if true, span context is expected.
2486    pub async fn listen<S: Service>(endpoint: noq::Endpoint, handler: Handler<S>) {
2487        let mut request_id = 0u64;
2488        let mut tasks = JoinSet::new();
2489        loop {
2490            let incoming = tokio::select! {
2491                Some(res) = tasks.join_next(), if !tasks.is_empty() => {
2492                    res.expect("irpc connection task panicked");
2493                    continue;
2494                }
2495                incoming = endpoint.accept() => {
2496                    match incoming {
2497                        None => break,
2498                        Some(incoming) => incoming
2499                    }
2500                }
2501            };
2502            let handler = handler.clone();
2503            let fut = async move {
2504                match incoming.await {
2505                    Ok(connection) => match handle_connection(connection, handler).await {
2506                        Err(err) => warn!("connection closed with error: {err:?}"),
2507                        Ok(()) => debug!("connection closed"),
2508                    },
2509                    Err(cause) => {
2510                        warn!("failed to accept connection: {cause:?}");
2511                    }
2512                };
2513            };
2514            let span = error_span!("rpc", id = request_id, remote = tracing::field::Empty);
2515            tasks.spawn(fut.instrument(span));
2516            request_id += 1;
2517        }
2518    }
2519
2520    /// Handles a quic connection with the provided `handler`.
2521    ///
2522    /// This function handles requests for a service `S`. The wire format used depends on
2523    /// `S::SPAN_PROPAGATION` - if true, span context is expected in the wire format.
2524    pub async fn handle_connection<S: Service>(
2525        connection: noq::Connection,
2526        handler: Handler<S>,
2527    ) -> io::Result<()> {
2528        let remote = connection
2529            .path(PathId::ZERO)
2530            .and_then(|p| p.remote_address().ok());
2531        if let Some(remote) = remote {
2532            tracing::Span::current().record("remote", tracing::field::display(remote));
2533        }
2534        debug!("connection accepted");
2535        loop {
2536            let Some((msg, carrier, rx, tx)) = read_request_inner::<S>(&connection).await? else {
2537                return Ok(());
2538            };
2539            crate::span_propagation::scope_remote(carrier, handler(msg, rx, tx)).await?;
2540        }
2541    }
2542
2543    /// Reads a request from a connection and converts it to a message enum.
2544    ///
2545    /// This combines `read_request_raw` with `RemoteService::with_remote_channels`.
2546    pub async fn read_request<S: RemoteService>(
2547        connection: &noq::Connection,
2548    ) -> std::io::Result<Option<S::Message>> {
2549        let Some((msg, carrier, rx, tx)) = read_request_inner::<S>(connection).await? else {
2550            return Ok(None);
2551        };
2552        Ok(Some(
2553            crate::span_propagation::scope_remote(carrier, async move {
2554                S::with_remote_channels(msg, rx, tx)
2555            })
2556            .await,
2557        ))
2558    }
2559
2560    /// Reads a single request from the connection.
2561    ///
2562    /// This accepts a bi-directional stream from the connection and reads and parses the request.
2563    ///
2564    /// When `S::SPAN_PROPAGATION` is true, any propagated span context on the wire is
2565    /// silently dropped. Use [`handle_connection`] (or [`read_request`]) if you need
2566    /// the propagated context to reach the generated handler spans.
2567    ///
2568    /// Returns the parsed request and the stream pair if reading and parsing the request succeeded.
2569    /// Returns None if the remote closed the connection with error code `0`.
2570    /// Returns an error for all other failure cases.
2571    pub async fn read_request_raw<S: Service>(
2572        connection: &noq::Connection,
2573    ) -> std::io::Result<Option<(S, noq::RecvStream, noq::SendStream)>> {
2574        Ok(read_request_inner::<S>(connection)
2575            .await?
2576            .map(|(msg, _carrier, rx, tx)| (msg, rx, tx)))
2577    }
2578
2579    /// Internal: read a request and also return the propagated span context carrier.
2580    ///
2581    /// The carrier is `Some` iff `S::SPAN_PROPAGATION` is true and the remote sent one.
2582    async fn read_request_inner<S: Service>(
2583        connection: &noq::Connection,
2584    ) -> std::io::Result<
2585        Option<(
2586            S,
2587            Option<crate::span_propagation::SpanContextCarrier>,
2588            noq::RecvStream,
2589            noq::SendStream,
2590        )>,
2591    > {
2592        let (send, mut recv) = match connection.accept_bi().await {
2593            Ok((s, r)) => (s, r),
2594            Err(ConnectionError::ApplicationClosed(cause))
2595                if cause.error_code.into_inner() == 0 =>
2596            {
2597                trace!("remote side closed connection {cause:?}");
2598                return Ok(None);
2599            }
2600            Err(cause) => {
2601                warn!("failed to accept bi stream {cause:?}");
2602                return Err(cause.into());
2603            }
2604        };
2605        let size = recv
2606            .read_varint_u64()
2607            .await?
2608            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "failed to read size"))?;
2609        if size > MAX_MESSAGE_SIZE {
2610            connection.close(
2611                ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into(),
2612                b"request exceeded max message size",
2613            );
2614            return Err(e!(mpsc::RecvError::MaxMessageSizeExceeded).into());
2615        }
2616        let mut buf = vec![0; size as usize];
2617        recv.read_exact(&mut buf)
2618            .await
2619            .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?;
2620
2621        let (carrier, msg): (Option<crate::span_propagation::SpanContextCarrier>, S) =
2622            if S::SPAN_PROPAGATION {
2623                postcard::from_bytes(&buf)
2624                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
2625            } else {
2626                let msg = postcard::from_bytes(&buf)
2627                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
2628                (None, msg)
2629            };
2630
2631        Ok(Some((msg, carrier, recv, send)))
2632    }
2633}
2634
2635/// A request to a service. This can be either local or remote.
2636#[derive(Debug)]
2637pub enum Request<L, R> {
2638    /// Local in memory request
2639    Local(L),
2640    /// Remote cross process request
2641    Remote(R),
2642}
2643
2644impl<S: Service> LocalSender<S> {
2645    /// Send a message to the service
2646    pub fn send<T>(
2647        &self,
2648        value: impl Into<WithChannels<T, S>>,
2649    ) -> impl Future<Output = Result<(), SendError>> + Send + 'static
2650    where
2651        T: Channels<S>,
2652        S::Message: From<WithChannels<T, S>>,
2653    {
2654        let value: S::Message = value.into().into();
2655        self.send_raw(value)
2656    }
2657
2658    /// Send a message to the service without the type conversion magic
2659    pub fn send_raw(
2660        &self,
2661        value: S::Message,
2662    ) -> impl Future<Output = Result<(), SendError>> + Send + 'static + use<S> {
2663        let x = self.0.clone();
2664        async move { x.send(value).await }
2665    }
2666}