Skip to main content

roam_core/bare_conduit/
mod.rs

1use std::marker::PhantomData;
2
3use facet_core::{PtrConst, Shape};
4use facet_reflect::Peek;
5
6use roam_types::{
7    Conduit, ConduitRx, ConduitTx, ConduitTxPermit, Link, LinkTx, LinkTxPermit, MaybeSend,
8    MsgFamily, SelfRef, WriteSlot,
9};
10
11/// Wraps a [`Link`] with postcard serialization. No reconnect, no reliability.
12///
13/// If the link dies, the conduit is dead. For localhost, SHM, or any
14/// transport where reconnect isn't needed.
15///
16/// `F` is a [`MsgFamily`] — it maps lifetimes to concrete message types.
17/// The send path accepts `F::Msg<'a>` (borrowed data serialized in place
18/// via `Peek`). The recv path yields `SelfRef<F::Msg<'static>>` (owned).
19// r[impl conduit.bare]
20// r[impl conduit.typeplan]
21// r[impl zerocopy.framing.conduit.bare]
22pub struct BareConduit<F: MsgFamily, L: Link> {
23    link: L,
24    shape: &'static Shape,
25    _phantom: PhantomData<fn(F) -> F>,
26}
27
28impl<F: MsgFamily, L: Link> BareConduit<F, L> {
29    /// Create a new BareConduit.
30    pub fn new(link: L) -> Self {
31        Self {
32            link,
33            shape: F::shape(),
34            _phantom: PhantomData,
35        }
36    }
37}
38
39impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
40where
41    L::Tx: MaybeSend + 'static,
42    L::Rx: MaybeSend + 'static,
43{
44    type Msg = F;
45    type Tx = BareConduitTx<F, L::Tx>;
46    type Rx = BareConduitRx<F, L::Rx>;
47
48    fn split(self) -> (Self::Tx, Self::Rx) {
49        let (tx, rx) = self.link.split();
50        (
51            BareConduitTx {
52                link_tx: tx,
53                shape: self.shape,
54                _phantom: PhantomData,
55            },
56            BareConduitRx {
57                link_rx: rx,
58                _phantom: PhantomData,
59            },
60        )
61    }
62}
63
64// ---------------------------------------------------------------------------
65// Tx
66// ---------------------------------------------------------------------------
67
68pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
69    link_tx: LTx,
70    shape: &'static Shape,
71    _phantom: PhantomData<fn(F)>,
72}
73
74impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
75    type Msg = F;
76    type Permit<'a>
77        = BareConduitPermit<'a, F, LTx>
78    where
79        Self: 'a;
80
81    async fn reserve(&self) -> std::io::Result<Self::Permit<'_>> {
82        let permit = self.link_tx.reserve().await?;
83        Ok(BareConduitPermit {
84            permit,
85            shape: self.shape,
86            _phantom: PhantomData,
87        })
88    }
89
90    async fn close(self) -> std::io::Result<()> {
91        self.link_tx.close().await
92    }
93}
94
95// ---------------------------------------------------------------------------
96// Permit
97// ---------------------------------------------------------------------------
98
99pub struct BareConduitPermit<'a, F: MsgFamily, LTx: LinkTx> {
100    permit: LTx::Permit,
101    shape: &'static Shape,
102    _phantom: PhantomData<fn(F, &'a ())>,
103}
104
105impl<F: MsgFamily, LTx: LinkTx> ConduitTxPermit for BareConduitPermit<'_, F, LTx> {
106    type Msg = F;
107    type Error = BareConduitError;
108
109    // r[impl zerocopy.framing.single-pass]
110    // r[impl zerocopy.framing.no-double-serialize]
111    // r[impl zerocopy.scatter]
112    // r[impl zerocopy.scatter.plan]
113    // r[impl zerocopy.scatter.plan.size]
114    // r[impl zerocopy.scatter.write]
115    // r[impl zerocopy.scatter.lifetime]
116    fn send(self, item: F::Msg<'_>) -> Result<(), Self::Error> {
117        // SAFETY: shape was set from F::shape() at construction time.
118        // The item is a valid instance of F::Msg<'_>, which shares the same
119        // layout and shape as F::Msg<'static>.
120        #[allow(unsafe_code)]
121        let peek = unsafe {
122            Peek::unchecked_new(PtrConst::new((&raw const item).cast::<u8>()), self.shape)
123        };
124        let plan = facet_postcard::peek_to_scatter_plan(peek).map_err(BareConduitError::Encode)?;
125
126        let mut slot = self
127            .permit
128            .alloc(plan.total_size())
129            .map_err(BareConduitError::Io)?;
130        plan.write_into(slot.as_mut_slice())
131            .map_err(BareConduitError::Encode)?;
132        slot.commit();
133        Ok(())
134    }
135}
136
137// ---------------------------------------------------------------------------
138// Rx
139// ---------------------------------------------------------------------------
140
141pub struct BareConduitRx<F: MsgFamily, LRx> {
142    link_rx: LRx,
143    _phantom: PhantomData<fn() -> F>,
144}
145
146impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
147where
148    LRx: roam_types::LinkRx + MaybeSend + 'static,
149{
150    type Msg = F;
151    type Error = BareConduitError;
152
153    // r[impl zerocopy.recv]
154    #[moire::instrument]
155    async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
156        let backing = match self.link_rx.recv().await.map_err(|error| {
157            BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
158        })? {
159            Some(b) => b,
160            None => return Ok(None),
161        };
162
163        crate::deserialize_postcard::<F::Msg<'static>>(backing)
164            .map_err(BareConduitError::Decode)
165            .map(Some)
166    }
167}
168
169// ---------------------------------------------------------------------------
170// Error
171// ---------------------------------------------------------------------------
172
173#[derive(Debug)]
174pub enum BareConduitError {
175    Encode(facet_postcard::SerializeError),
176    Decode(facet_format::DeserializeError),
177    Io(std::io::Error),
178    LinkDead,
179}
180
181impl std::fmt::Display for BareConduitError {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        match self {
184            Self::Encode(e) => write!(f, "encode error: {e}"),
185            Self::Decode(e) => write!(f, "decode error: {e}"),
186            Self::Io(e) => write!(f, "io error: {e}"),
187            Self::LinkDead => write!(f, "link dead"),
188        }
189    }
190}
191
192impl std::error::Error for BareConduitError {}