Skip to main content

vox_core/bare_conduit/
mod.rs

1use std::marker::PhantomData;
2
3use facet_core::{PtrConst, Shape};
4use facet_reflect::Peek;
5
6use vox_types::{
7    Conduit, ConduitRx, ConduitTx, ConduitTxPermit, Link, LinkTx, LinkTxPermit, MaybeSend,
8    MsgFamily, SelfRef, WriteSlot,
9};
10
11use crate::MessagePlan;
12
13/// Wraps a [`Link`] with postcard serialization. No reconnect, no reliability.
14///
15/// If the link dies, the conduit is dead. For localhost, SHM, or any
16/// transport where reconnect isn't needed.
17///
18/// `F` is a [`MsgFamily`] — it maps lifetimes to concrete message types.
19/// The send path accepts `F::Msg<'a>` (borrowed data serialized in place
20/// via `Peek`). The recv path yields `SelfRef<F::Msg<'static>>` (owned).
21// r[impl conduit.bare]
22// r[impl conduit.typeplan]
23// r[impl zerocopy.framing.conduit.bare]
24pub struct BareConduit<F: MsgFamily, L: Link> {
25    link: L,
26    shape: &'static Shape,
27    message_plan: Option<MessagePlan>,
28    _phantom: PhantomData<fn(F) -> F>,
29}
30
31impl<F: MsgFamily, L: Link> BareConduit<F, L> {
32    /// Create a new BareConduit (identity plan — no schema translation).
33    pub fn new(link: L) -> Self {
34        Self {
35            link,
36            shape: F::shape(),
37            message_plan: None,
38            _phantom: PhantomData,
39        }
40    }
41
42    /// Create a new BareConduit with a pre-built message translation plan.
43    pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
44        Self {
45            link,
46            shape: F::shape(),
47            message_plan: Some(message_plan),
48            _phantom: PhantomData,
49        }
50    }
51}
52
53impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
54where
55    L::Tx: MaybeSend + 'static,
56    L::Rx: MaybeSend + 'static,
57{
58    type Msg = F;
59    type Tx = BareConduitTx<F, L::Tx>;
60    type Rx = BareConduitRx<F, L::Rx>;
61
62    fn split(self) -> (Self::Tx, Self::Rx) {
63        let (tx, rx) = self.link.split();
64        (
65            BareConduitTx {
66                link_tx: tx,
67                shape: self.shape,
68                _phantom: PhantomData,
69            },
70            BareConduitRx {
71                link_rx: rx,
72                message_plan: self.message_plan,
73                _phantom: PhantomData,
74            },
75        )
76    }
77}
78
79// ---------------------------------------------------------------------------
80// Tx
81// ---------------------------------------------------------------------------
82
83pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
84    link_tx: LTx,
85    shape: &'static Shape,
86    _phantom: PhantomData<fn(F)>,
87}
88
89impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
90    type Msg = F;
91    type Permit<'a>
92        = BareConduitPermit<'a, F, LTx>
93    where
94        Self: 'a;
95
96    async fn reserve(&self) -> std::io::Result<Self::Permit<'_>> {
97        let permit = self.link_tx.reserve().await?;
98        Ok(BareConduitPermit {
99            permit,
100            shape: self.shape,
101            _phantom: PhantomData,
102        })
103    }
104
105    async fn close(self) -> std::io::Result<()> {
106        self.link_tx.close().await
107    }
108}
109
110// ---------------------------------------------------------------------------
111// Permit
112// ---------------------------------------------------------------------------
113
114pub struct BareConduitPermit<'a, F: MsgFamily, LTx: LinkTx> {
115    permit: LTx::Permit,
116    shape: &'static Shape,
117    _phantom: PhantomData<fn(F, &'a ())>,
118}
119
120impl<F: MsgFamily, LTx: LinkTx> ConduitTxPermit for BareConduitPermit<'_, F, LTx> {
121    type Msg = F;
122    type Error = BareConduitError;
123
124    // r[impl zerocopy.framing.single-pass]
125    // r[impl zerocopy.framing.no-double-serialize]
126    // r[impl zerocopy.scatter]
127    // r[impl zerocopy.scatter.plan]
128    // r[impl zerocopy.scatter.plan.size]
129    // r[impl zerocopy.scatter.write]
130    // r[impl zerocopy.scatter.lifetime]
131    fn send(self, item: F::Msg<'_>) -> Result<(), Self::Error> {
132        // SAFETY: shape was set from F::shape() at construction time.
133        // The item is a valid instance of F::Msg<'_>, which shares the same
134        // layout and shape as F::Msg<'static>.
135        #[allow(unsafe_code)]
136        let peek = unsafe {
137            Peek::unchecked_new(PtrConst::new((&raw const item).cast::<u8>()), self.shape)
138        };
139        let plan = vox_postcard::peek_to_scatter_plan(peek).map_err(BareConduitError::Encode)?;
140
141        let mut slot = self
142            .permit
143            .alloc(plan.total_size())
144            .map_err(BareConduitError::Io)?;
145        plan.write_into(slot.as_mut_slice());
146        slot.commit();
147        Ok(())
148    }
149}
150
151// ---------------------------------------------------------------------------
152// Rx
153// ---------------------------------------------------------------------------
154
155pub struct BareConduitRx<F: MsgFamily, LRx> {
156    link_rx: LRx,
157    message_plan: Option<MessagePlan>,
158    _phantom: PhantomData<fn() -> F>,
159}
160
161impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
162where
163    LRx: vox_types::LinkRx + MaybeSend + 'static,
164{
165    type Msg = F;
166    type Error = BareConduitError;
167
168    // r[impl zerocopy.recv]
169    #[moire::instrument]
170    async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
171        let backing = match self.link_rx.recv().await.map_err(|error| {
172            BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
173        })? {
174            Some(b) => b,
175            None => return Ok(None),
176        };
177
178        match &self.message_plan {
179            Some(plan) => crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
180                backing,
181                &plan.plan,
182                &plan.registry,
183            ),
184            None => crate::deserialize_postcard::<F::Msg<'static>>(backing),
185        }
186        .map_err(BareConduitError::Decode)
187        .map(Some)
188    }
189}
190
191// ---------------------------------------------------------------------------
192// Error
193// ---------------------------------------------------------------------------
194
195#[derive(Debug)]
196pub enum BareConduitError {
197    Encode(vox_postcard::SerializeError),
198    Decode(vox_postcard::DeserializeError),
199    Io(std::io::Error),
200    LinkDead,
201}
202
203impl std::fmt::Display for BareConduitError {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        match self {
206            Self::Encode(e) => write!(f, "encode error: {e}"),
207            Self::Decode(e) => write!(f, "decode error: {e}"),
208            Self::Io(e) => write!(f, "io error: {e}"),
209            Self::LinkDead => write!(f, "link dead"),
210        }
211    }
212}
213
214impl std::error::Error for BareConduitError {}