Skip to main content

vox_core/bare_conduit/
mod.rs

1use std::marker::PhantomData;
2
3use facet_core::{PtrConst, Shape};
4use facet_reflect::Peek;
5
6use vox_types::{Conduit, ConduitRx, ConduitTx, Link, LinkTx, MaybeSend, MsgFamily, SelfRef};
7
8use crate::MessagePlan;
9
10/// Wraps a [`Link`] with postcard serialization. No reconnect, no reliability.
11///
12/// If the link dies, the conduit is dead. For localhost, SHM, or any
13/// transport where reconnect isn't needed.
14///
15/// `F` is a [`MsgFamily`] — it maps lifetimes to concrete message types.
16/// The send path accepts `F::Msg<'a>` (borrowed data serialized in place
17/// via `Peek`). The recv path yields `SelfRef<F::Msg<'static>>` (owned).
18// r[impl conduit.bare]
19// r[impl conduit.typeplan]
20// r[impl zerocopy.framing.conduit.bare]
21pub struct BareConduit<F: MsgFamily, L: Link> {
22    link: L,
23    shape: &'static Shape,
24    message_plan: Option<MessagePlan>,
25    _phantom: PhantomData<fn(F) -> F>,
26}
27
28impl<F: MsgFamily, L: Link> BareConduit<F, L> {
29    /// Create a new BareConduit (identity plan — no schema translation).
30    pub fn new(link: L) -> Self {
31        Self {
32            link,
33            shape: F::shape(),
34            message_plan: None,
35            _phantom: PhantomData,
36        }
37    }
38
39    /// Create a new BareConduit with a pre-built message translation plan.
40    pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
41        Self {
42            link,
43            shape: F::shape(),
44            message_plan: Some(message_plan),
45            _phantom: PhantomData,
46        }
47    }
48}
49
50impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
51where
52    L::Tx: MaybeSend + 'static,
53    L::Rx: MaybeSend + 'static,
54{
55    type Msg = F;
56    type Tx = BareConduitTx<F, L::Tx>;
57    type Rx = BareConduitRx<F, L::Rx>;
58
59    fn split(self) -> (Self::Tx, Self::Rx) {
60        let (tx, rx) = self.link.split();
61        (
62            BareConduitTx {
63                link_tx: tx,
64                shape: self.shape,
65                _phantom: PhantomData,
66            },
67            BareConduitRx {
68                link_rx: rx,
69                message_plan: self.message_plan,
70                _phantom: PhantomData,
71            },
72        )
73    }
74}
75
76// ---------------------------------------------------------------------------
77// Tx
78// ---------------------------------------------------------------------------
79
80pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
81    link_tx: LTx,
82    shape: &'static Shape,
83    _phantom: PhantomData<fn(F)>,
84}
85
86impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
87    type Msg = F;
88    type Prepared = Vec<u8>;
89    type Error = BareConduitError;
90
91    fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
92        encode_message::<F>(self.shape, item)
93    }
94
95    async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
96        self.link_tx
97            .send(prepared)
98            .await
99            .map_err(BareConduitError::Io)
100    }
101
102    async fn close(self) -> std::io::Result<()> {
103        self.link_tx.close().await
104    }
105}
106
107// r[impl zerocopy.framing.single-pass]
108// r[impl zerocopy.framing.no-double-serialize]
109// r[impl zerocopy.scatter]
110// r[impl zerocopy.scatter.plan]
111// r[impl zerocopy.scatter.plan.size]
112// r[impl zerocopy.scatter.write]
113// r[impl zerocopy.scatter.lifetime]
114fn encode_message<F: MsgFamily>(
115    shape: &'static Shape,
116    item: F::Msg<'_>,
117) -> Result<Vec<u8>, BareConduitError> {
118    #[allow(unsafe_code)]
119    let peek = unsafe { Peek::unchecked_new(PtrConst::new((&raw const item).cast::<u8>()), shape) };
120    let plan = vox_postcard::peek_to_scatter_plan(peek).map_err(BareConduitError::Encode)?;
121    let mut bytes = vec![0u8; plan.total_size()];
122    plan.write_into(&mut bytes);
123    Ok(bytes)
124}
125
126// ---------------------------------------------------------------------------
127// Rx
128// ---------------------------------------------------------------------------
129
130pub struct BareConduitRx<F: MsgFamily, LRx> {
131    link_rx: LRx,
132    message_plan: Option<MessagePlan>,
133    _phantom: PhantomData<fn() -> F>,
134}
135
136impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
137where
138    LRx: vox_types::LinkRx + MaybeSend + 'static,
139{
140    type Msg = F;
141    type Error = BareConduitError;
142
143    // r[impl zerocopy.recv]
144    #[moire::instrument]
145    async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
146        let backing = match self.link_rx.recv().await.map_err(|error| {
147            BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
148        })? {
149            Some(b) => b,
150            None => return Ok(None),
151        };
152
153        match &self.message_plan {
154            Some(plan) => crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
155                backing,
156                &plan.plan,
157                &plan.registry,
158            ),
159            None => crate::deserialize_postcard::<F::Msg<'static>>(backing),
160        }
161        .map_err(BareConduitError::Decode)
162        .map(Some)
163    }
164}
165
166// ---------------------------------------------------------------------------
167// Error
168// ---------------------------------------------------------------------------
169
170#[derive(Debug)]
171pub enum BareConduitError {
172    Encode(vox_postcard::SerializeError),
173    Decode(vox_postcard::DeserializeError),
174    Io(std::io::Error),
175    LinkDead,
176}
177
178impl std::fmt::Display for BareConduitError {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        match self {
181            Self::Encode(e) => write!(f, "encode error: {e}"),
182            Self::Decode(e) => write!(f, "decode error: {e}"),
183            Self::Io(e) => write!(f, "io error: {e}"),
184            Self::LinkDead => write!(f, "link dead"),
185        }
186    }
187}
188
189impl std::error::Error for BareConduitError {}
190
191#[cfg(test)]
192mod tests {
193    use vox_types::*;
194
195    use super::*;
196    use crate::memory_link_pair;
197
198    #[test]
199    fn connection_reject_with_nonempty_metadata_round_trips() {
200        let rt = tokio::runtime::Builder::new_current_thread()
201            .build()
202            .unwrap();
203        rt.block_on(async { connection_reject_with_nonempty_metadata_inner().await });
204    }
205
206    async fn connection_reject_with_nonempty_metadata_inner() {
207        let (a, b) = memory_link_pair(64);
208        let a_conduit = BareConduit::<MessageFamily, _>::new(a);
209        let b_conduit = BareConduit::<MessageFamily, _>::new(b);
210        let (a_tx, _a_rx) = a_conduit.split();
211        let (_b_tx, mut b_rx) = b_conduit.split();
212
213        // Send a ConnectionReject with non-empty metadata
214        let msg = Message {
215            connection_id: ConnectionId(1),
216            payload: MessagePayload::ConnectionReject(ConnectionReject {
217                metadata: vec![MetadataEntry::str(
218                    "error",
219                    "missing required vox-service metadata",
220                )],
221            }),
222        };
223        let prepared = a_tx.prepare_send(msg).unwrap();
224        a_tx.send_prepared(prepared).await.unwrap();
225
226        // Receive and verify
227        let received = b_rx.recv().await.unwrap().unwrap();
228        let msg = received.get();
229        if let MessagePayload::ConnectionReject(reject) = &msg.payload {
230            assert_eq!(reject.metadata.len(), 1, "expected 1 metadata entry");
231            assert_eq!(
232                reject.metadata[0].key.as_ref(),
233                "error",
234                "key mismatch: got {:?}",
235                reject.metadata[0].key
236            );
237            match &reject.metadata[0].value {
238                MetadataValue::String(s) => {
239                    assert_eq!(s.as_ref(), "missing required vox-service metadata");
240                }
241                other => panic!("expected String, got {:?}", other),
242            }
243        } else {
244            panic!("expected ConnectionReject, got {:?}", msg.payload);
245        }
246    }
247}