Skip to main content

roam_types/
conduit.rs

1#![allow(async_fn_in_trait)]
2
3use std::future::Future;
4
5use facet::Facet;
6use facet_core::Shape;
7
8use crate::{MaybeSend, RpcPlan, SelfRef};
9
10/// Maps a lifetime to a concrete message type.
11///
12/// Rust doesn't have higher-kinded types, so this trait bridges the gap:
13/// `F::Msg<'a>` gives you the message type for any lifetime `'a`.
14///
15/// The send path uses `Msg<'a>` (borrowed data serialized in place).
16/// The recv path uses `Msg<'static>` (owned, via `SelfRef`).
17pub trait MsgFamily: 'static {
18    type Msg<'a>: Facet<'a> + 'a;
19
20    fn shape() -> &'static Shape {
21        <Self::Msg<'static> as Facet<'static>>::SHAPE
22    }
23
24    fn rpc_plan() -> &'static RpcPlan {
25        RpcPlan::for_shape(Self::shape())
26    }
27}
28
29/// Bidirectional typed transport. Wraps a [`Link`](crate::Link) and owns serialization.
30///
31/// Uses a `MsgFamily` so that the same type family serves both sides:
32/// - Send: `MsgFamily::Msg<'a>` for any `'a` (borrowed data serialized in place)
33/// - Recv: `MsgFamily::Msg<'static>` (owned, via `SelfRef`)
34///
35/// Two implementations:
36/// - `BareConduit`: Link + postcard. If the link dies, it's dead.
37/// - `StableConduit`: Link + postcard + seq/ack/replay. Handles reconnect
38///   transparently. Replay buffer stores encoded bytes (no clone needed).
39// r[impl conduit]
40pub trait Conduit {
41    type Msg: MsgFamily;
42    type Tx: ConduitTx<Msg = Self::Msg>;
43    type Rx: ConduitRx<Msg = Self::Msg>;
44
45    // r[impl conduit.split]
46    fn split(self) -> (Self::Tx, Self::Rx);
47}
48
49/// Sending half of a [`Conduit`].
50///
51/// Permit-based: `reserve()` is the backpressure point, `permit.send()`
52/// serializes and writes.
53// r[impl conduit.permit]
54pub trait ConduitTx {
55    type Msg: MsgFamily;
56    type Permit<'a>: for<'m> ConduitTxPermit<Msg = Self::Msg> + MaybeSend
57    where
58        Self: 'a;
59
60    /// Reserve capacity for one outbound message.
61    ///
62    /// Backpressure lives here — this may block waiting for:
63    /// - StableConduit: replay buffer capacity (bounded outstanding)
64    /// - Flow control from the peer
65    ///
66    /// Dropping the permit without sending releases the reservation.
67    fn reserve(&self) -> impl Future<Output = std::io::Result<Self::Permit<'_>>> + MaybeSend + '_;
68
69    /// Graceful close of the outbound direction.
70    async fn close(self) -> std::io::Result<()>
71    where
72        Self: Sized;
73}
74
75/// Permit for sending exactly one message through a [`ConduitTx`].
76// r[impl conduit.permit.send]
77// r[impl zerocopy.framing.conduit]
78pub trait ConduitTxPermit {
79    type Msg: MsgFamily;
80    type Error: std::error::Error + MaybeSend + 'static;
81
82    fn send(self, item: <Self::Msg as MsgFamily>::Msg<'_>) -> Result<(), Self::Error>;
83}
84
85/// Receiving half of a [`Conduit`].
86///
87/// Yields decoded values as [`SelfRef<Msg<'static>>`](SelfRef) (value + backing storage).
88/// Uses a precomputed `TypePlanCore` for fast plan-driven deserialization.
89pub trait ConduitRx {
90    type Msg: MsgFamily;
91    type Error: std::error::Error + MaybeSend + 'static;
92
93    /// Receive and decode the next message.
94    ///
95    /// Returns `Ok(None)` when the peer has closed.
96    #[allow(clippy::type_complexity)]
97    fn recv(
98        &mut self,
99    ) -> impl Future<
100        Output = Result<Option<SelfRef<<Self::Msg as MsgFamily>::Msg<'static>>>, Self::Error>,
101    > + MaybeSend
102    + '_;
103}
104
105/// Yields new conduits from inbound connections.
106pub trait ConduitAcceptor {
107    type Conduit: Conduit;
108
109    async fn accept(&mut self) -> std::io::Result<Self::Conduit>;
110}
111
112/// Whether the session is acting as initiator or acceptor.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum SessionRole {
115    Initiator,
116    Acceptor,
117}