Skip to main content

vox_core/bare_conduit/
mod.rs

1use std::marker::PhantomData;
2
3#[cfg(not(target_arch = "wasm32"))]
4use facet_core::PtrConst;
5#[cfg(not(target_arch = "wasm32"))]
6use vox_jit::cache::{CompiledDecoder, CompiledEncoder};
7#[cfg(not(target_arch = "wasm32"))]
8use vox_jit::cal::BorrowMode;
9
10use vox_types::{Conduit, ConduitRx, ConduitTx, Link, LinkTx, MaybeSend, MsgFamily, SelfRef};
11
12use crate::MessagePlan;
13
14/// Wraps a [`Link`] with postcard serialization. No reconnect, no reliability.
15///
16/// If the link dies, the conduit is dead. For localhost, SHM, or any
17/// transport where reconnect isn't needed.
18///
19/// `F` is a [`MsgFamily`] — it maps lifetimes to concrete message types.
20/// The send path accepts `F::Msg<'a>` (borrowed data serialized in place
21/// via `Peek`). The recv path yields `SelfRef<F::Msg<'static>>` (owned).
22// r[impl conduit.bare]
23// r[impl conduit.typeplan]
24// r[impl zerocopy.framing.conduit.bare]
25pub struct BareConduit<F: MsgFamily, L: Link> {
26    link: L,
27    #[cfg(not(target_arch = "wasm32"))]
28    encoder: &'static CompiledEncoder,
29    #[cfg(not(target_arch = "wasm32"))]
30    decoder: Option<&'static CompiledDecoder>,
31    message_plan: MessagePlan,
32    _phantom: PhantomData<fn(F) -> F>,
33}
34
35impl<F: MsgFamily, L: Link> BareConduit<F, L> {
36    /// Create a new BareConduit (identity plan — no schema translation).
37    pub fn new(link: L) -> Self {
38        let identity_plan = vox_postcard::build_identity_plan(F::shape());
39        Self::resolve(
40            link,
41            MessagePlan {
42                remote_schema_id: 0,
43                plan: identity_plan,
44                registry: vox_types::SchemaRegistry::new(),
45            },
46        )
47    }
48
49    /// Create a new BareConduit with a pre-built message translation plan.
50    pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
51        Self::resolve(link, message_plan)
52    }
53
54    fn resolve(link: L, message_plan: MessagePlan) -> Self {
55        #[cfg(not(target_arch = "wasm32"))]
56        let runtime = vox_jit::global_runtime();
57        #[cfg(not(target_arch = "wasm32"))]
58        let encoder = runtime
59            .prepare_encoder(F::shape())
60            .expect("JIT encode unavailable for message shape");
61        #[cfg(not(target_arch = "wasm32"))]
62        let decoder = runtime.prepare_decoder(
63            message_plan.remote_schema_id,
64            F::shape(),
65            &message_plan.plan,
66            &message_plan.registry,
67            BorrowMode::Owned,
68        );
69        #[cfg(not(target_arch = "wasm32"))]
70        if decoder.is_none() {
71            tracing::warn!("vox bare conduit message decoder unavailable; falling back");
72        }
73        Self {
74            link,
75            #[cfg(not(target_arch = "wasm32"))]
76            encoder,
77            #[cfg(not(target_arch = "wasm32"))]
78            decoder,
79            message_plan,
80            _phantom: PhantomData,
81        }
82    }
83}
84
85impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
86where
87    L::Tx: MaybeSend + 'static,
88    L::Rx: MaybeSend + 'static,
89{
90    type Msg = F;
91    type Tx = BareConduitTx<F, L::Tx>;
92    type Rx = BareConduitRx<F, L::Rx>;
93
94    fn split(self) -> (Self::Tx, Self::Rx) {
95        let (tx, rx) = self.link.split();
96        (
97            BareConduitTx {
98                link_tx: tx,
99                #[cfg(not(target_arch = "wasm32"))]
100                encoder: self.encoder,
101                _phantom: PhantomData,
102            },
103            BareConduitRx {
104                link_rx: rx,
105                pending_fds: vox_types::FrameFds::default(),
106                #[cfg(not(target_arch = "wasm32"))]
107                decoder: self.decoder,
108                message_plan: self.message_plan,
109                _phantom: PhantomData,
110            },
111        )
112    }
113}
114
115// ---------------------------------------------------------------------------
116// Tx
117// ---------------------------------------------------------------------------
118
119pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
120    link_tx: LTx,
121    #[cfg(not(target_arch = "wasm32"))]
122    encoder: &'static CompiledEncoder,
123    _phantom: PhantomData<fn(F)>,
124}
125
126/// A serialized message plus the file descriptors collected while encoding
127/// it. The descriptors travel out-of-band via `SCM_RIGHTS`; off-Unix
128/// [`FrameFds`](vox_types::FrameFds) is `()`.
129pub struct PreparedFrame {
130    pub bytes: Vec<u8>,
131    pub fds: vox_types::FrameFds,
132}
133
134impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
135    type Msg = F;
136    type Prepared = PreparedFrame;
137    type Error = BareConduitError;
138
139    // r[impl zerocopy.framing.single-pass]
140    // r[impl zerocopy.framing.no-double-serialize]
141    // r[impl zerocopy.scatter]
142    // r[impl zerocopy.scatter.plan]
143    // r[impl zerocopy.scatter.plan.size]
144    // r[impl zerocopy.scatter.write]
145    // r[impl zerocopy.scatter.lifetime]
146    fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
147        let encode = || -> Result<Vec<u8>, BareConduitError> {
148            #[cfg(not(target_arch = "wasm32"))]
149            {
150                let ptr = PtrConst::new((&raw const item).cast::<u8>());
151                vox_jit::encode_with(self.encoder, ptr).map_err(BareConduitError::Encode)
152            }
153            #[cfg(target_arch = "wasm32")]
154            {
155                vox_postcard::to_vec(&item).map_err(BareConduitError::Encode)
156            }
157        };
158        // Collect any `Fd`s the encoder funnels into the thread-local
159        // collector — same install-around-encode shape as the channel
160        // binder (`with_channel_binder`). Off-Unix this is a pass-through
161        // and `fds` is `()`.
162        let (encoded, fds) = vox_types::collect_fds(encode);
163        Ok(PreparedFrame {
164            bytes: encoded?,
165            fds,
166        })
167    }
168
169    async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
170        let PreparedFrame { bytes, fds } = prepared;
171        if vox_types::frame_fds_len(&fds) > 0 && !self.link_tx.supports_fd_passing() {
172            return Err(BareConduitError::Io(std::io::Error::other(
173                "message carries file descriptors but the transport \
174                 cannot pass them",
175            )));
176        }
177        self.link_tx
178            .send_with_fds(bytes, fds)
179            .await
180            .map_err(BareConduitError::Io)
181    }
182
183    async fn close(self) -> std::io::Result<()> {
184        self.link_tx.close().await
185    }
186}
187
188// ---------------------------------------------------------------------------
189// Rx
190// ---------------------------------------------------------------------------
191
192pub struct BareConduitRx<F: MsgFamily, LRx> {
193    link_rx: LRx,
194    /// Descriptors that arrived with the most recently `recv`'d frame,
195    /// awaiting [`take_frame_fds`](vox_types::ConduitRx::take_frame_fds).
196    pending_fds: vox_types::FrameFds,
197    #[cfg(not(target_arch = "wasm32"))]
198    decoder: Option<&'static CompiledDecoder>,
199    message_plan: MessagePlan,
200    _phantom: PhantomData<fn() -> F>,
201}
202
203impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
204where
205    LRx: vox_types::LinkRx + MaybeSend + 'static,
206{
207    type Msg = F;
208    type Error = BareConduitError;
209
210    // r[impl zerocopy.recv]
211    #[moire::instrument]
212    async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
213        let backing = match self.link_rx.recv().await.map_err(|error| {
214            BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
215        })? {
216            Some(b) => b,
217            None => return Ok(None),
218        };
219
220        // Capture this frame's descriptors. `Payload` only *borrows* its
221        // bytes during Message decode — the typed `Fd` is decoded later by
222        // the generated stub — so the fds are threaded out via
223        // `take_frame_fds` (the same rail as the schema tracker) and
224        // installed at that decode site, not here.
225        self.pending_fds = self.link_rx.take_frame_fds();
226
227        #[cfg(not(target_arch = "wasm32"))]
228        {
229            crate::deserialize_postcard_with_decoder::<F::Msg<'static>>(
230                backing,
231                self.decoder,
232                &self.message_plan.plan,
233                &self.message_plan.registry,
234            )
235            .map_err(BareConduitError::Decode)
236            .map(Some)
237        }
238        #[cfg(target_arch = "wasm32")]
239        {
240            crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
241                backing,
242                &self.message_plan.plan,
243                &self.message_plan.registry,
244            )
245            .map_err(BareConduitError::Decode)
246            .map(Some)
247        }
248    }
249
250    fn take_frame_fds(&mut self) -> vox_types::FrameFds {
251        std::mem::take(&mut self.pending_fds)
252    }
253}
254
255// ---------------------------------------------------------------------------
256// Error
257// ---------------------------------------------------------------------------
258
259#[derive(Debug)]
260pub enum BareConduitError {
261    Encode(vox_postcard::SerializeError),
262    Decode(vox_postcard::DeserializeError),
263    Io(std::io::Error),
264    LinkDead,
265}
266
267impl std::fmt::Display for BareConduitError {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        match self {
270            Self::Encode(e) => write!(f, "encode error: {e}"),
271            Self::Decode(e) => write!(f, "decode error: {e}"),
272            Self::Io(e) => write!(f, "io error: {e}"),
273            Self::LinkDead => write!(f, "link dead"),
274        }
275    }
276}
277
278impl std::error::Error for BareConduitError {}
279
280#[cfg(test)]
281mod tests {
282    use vox_types::*;
283
284    use super::*;
285    use crate::memory_link_pair;
286
287    #[test]
288    fn connection_reject_with_nonempty_metadata_round_trips() {
289        let rt = tokio::runtime::Builder::new_current_thread()
290            .build()
291            .unwrap();
292        rt.block_on(async { connection_reject_with_nonempty_metadata_inner().await });
293    }
294
295    async fn connection_reject_with_nonempty_metadata_inner() {
296        let (a, b) = memory_link_pair(64);
297        let a_conduit = BareConduit::<MessageFamily, _>::new(a);
298        let b_conduit = BareConduit::<MessageFamily, _>::new(b);
299        let (a_tx, _a_rx) = a_conduit.split();
300        let (_b_tx, mut b_rx) = b_conduit.split();
301
302        // Send a ConnectionReject with non-empty metadata
303        let msg = Message {
304            connection_id: ConnectionId(1),
305            payload: MessagePayload::ConnectionReject(ConnectionReject {
306                metadata: vec![MetadataEntry::str(
307                    "error",
308                    "missing required vox-service metadata",
309                )],
310            }),
311        };
312        let prepared = a_tx.prepare_send(msg).unwrap();
313        a_tx.send_prepared(prepared).await.unwrap();
314
315        // Receive and verify
316        let received = b_rx.recv().await.unwrap().unwrap();
317        let msg = received.get();
318        if let MessagePayload::ConnectionReject(reject) = &msg.payload {
319            assert_eq!(reject.metadata.len(), 1, "expected 1 metadata entry");
320            assert_eq!(
321                reject.metadata[0].key.as_ref(),
322                "error",
323                "key mismatch: got {:?}",
324                reject.metadata[0].key
325            );
326            match &reject.metadata[0].value {
327                MetadataValue::String(s) => {
328                    assert_eq!(s.as_ref(), "missing required vox-service metadata");
329                }
330                other => panic!("expected String, got {:?}", other),
331            }
332        } else {
333            panic!("expected ConnectionReject, got {:?}", msg.payload);
334        }
335    }
336}