Skip to main content

vox_core/bare_conduit/
mod.rs

1use std::marker::PhantomData;
2
3#[cfg(not(target_arch = "wasm32"))]
4use facet_core::PtrConst;
5#[cfg(not(target_arch = "wasm32"))]
6use vox_jit::cache::{CompiledDecoder, CompiledEncoder};
7#[cfg(not(target_arch = "wasm32"))]
8use vox_jit::cal::BorrowMode;
9
10use vox_types::{Conduit, ConduitRx, ConduitTx, Link, LinkTx, MaybeSend, MsgFamily, SelfRef};
11
12use crate::MessagePlan;
13
14/// Wraps a [`Link`] with postcard serialization. No reconnect, no reliability.
15///
16/// If the link dies, the conduit is dead. For localhost, SHM, or any
17/// transport where reconnect isn't needed.
18///
19/// `F` is a [`MsgFamily`] — it maps lifetimes to concrete message types.
20/// The send path accepts `F::Msg<'a>` (borrowed data serialized in place
21/// via `Peek`). The recv path yields `SelfRef<F::Msg<'static>>` (owned).
22// r[impl conduit.bare]
23// r[impl conduit.typeplan]
24// r[impl zerocopy.framing.conduit.bare]
25pub struct BareConduit<F: MsgFamily, L: Link> {
26    link: L,
27    #[cfg(not(target_arch = "wasm32"))]
28    encoder: &'static CompiledEncoder,
29    #[cfg(not(target_arch = "wasm32"))]
30    decoder: &'static CompiledDecoder,
31    #[cfg(target_arch = "wasm32")]
32    message_plan: MessagePlan,
33    _phantom: PhantomData<fn(F) -> F>,
34}
35
36impl<F: MsgFamily, L: Link> BareConduit<F, L> {
37    /// Create a new BareConduit (identity plan — no schema translation).
38    pub fn new(link: L) -> Self {
39        let identity_plan = vox_postcard::build_identity_plan(F::shape());
40        let registry = vox_types::SchemaRegistry::new();
41        #[cfg(not(target_arch = "wasm32"))]
42        {
43            let runtime = vox_jit::global_runtime();
44            Self::resolve(link, runtime, 0, &identity_plan, &registry)
45        }
46        #[cfg(target_arch = "wasm32")]
47        {
48            Self::resolve(
49                link,
50                MessagePlan {
51                    remote_schema_id: 0,
52                    plan: identity_plan,
53                    registry,
54                },
55            )
56        }
57    }
58
59    /// Create a new BareConduit with a pre-built message translation plan.
60    pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
61        #[cfg(not(target_arch = "wasm32"))]
62        {
63            let runtime = vox_jit::global_runtime();
64            Self::resolve(
65                link,
66                runtime,
67                message_plan.remote_schema_id,
68                &message_plan.plan,
69                &message_plan.registry,
70            )
71        }
72        #[cfg(target_arch = "wasm32")]
73        {
74            Self::resolve(link, message_plan)
75        }
76    }
77
78    #[cfg(not(target_arch = "wasm32"))]
79    fn resolve(
80        link: L,
81        runtime: &vox_jit::JitRuntime,
82        remote_schema_id: u64,
83        plan: &vox_postcard::plan::TranslationPlan,
84        registry: &vox_types::SchemaRegistry,
85    ) -> Self {
86        let encoder = runtime
87            .prepare_encoder(F::shape())
88            .expect("JIT encode unavailable for message shape");
89        let decoder = runtime
90            .prepare_decoder(
91                remote_schema_id,
92                F::shape(),
93                plan,
94                registry,
95                BorrowMode::Owned,
96            )
97            .expect("JIT decode unavailable for message shape");
98        Self {
99            link,
100            encoder,
101            decoder,
102            _phantom: PhantomData,
103        }
104    }
105
106    #[cfg(target_arch = "wasm32")]
107    fn resolve(link: L, message_plan: MessagePlan) -> Self {
108        Self {
109            link,
110            message_plan,
111            _phantom: PhantomData,
112        }
113    }
114}
115
116impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
117where
118    L::Tx: MaybeSend + 'static,
119    L::Rx: MaybeSend + 'static,
120{
121    type Msg = F;
122    type Tx = BareConduitTx<F, L::Tx>;
123    type Rx = BareConduitRx<F, L::Rx>;
124
125    fn split(self) -> (Self::Tx, Self::Rx) {
126        let (tx, rx) = self.link.split();
127        (
128            BareConduitTx {
129                link_tx: tx,
130                #[cfg(not(target_arch = "wasm32"))]
131                encoder: self.encoder,
132                _phantom: PhantomData,
133            },
134            BareConduitRx {
135                link_rx: rx,
136                pending_fds: vox_types::FrameFds::default(),
137                #[cfg(not(target_arch = "wasm32"))]
138                decoder: self.decoder,
139                #[cfg(target_arch = "wasm32")]
140                message_plan: self.message_plan,
141                _phantom: PhantomData,
142            },
143        )
144    }
145}
146
147// ---------------------------------------------------------------------------
148// Tx
149// ---------------------------------------------------------------------------
150
151pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
152    link_tx: LTx,
153    #[cfg(not(target_arch = "wasm32"))]
154    encoder: &'static CompiledEncoder,
155    _phantom: PhantomData<fn(F)>,
156}
157
158/// A serialized message plus the file descriptors collected while encoding
159/// it. The descriptors travel out-of-band via `SCM_RIGHTS`; off-Unix
160/// [`FrameFds`](vox_types::FrameFds) is `()`.
161pub struct PreparedFrame {
162    pub bytes: Vec<u8>,
163    pub fds: vox_types::FrameFds,
164}
165
166impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
167    type Msg = F;
168    type Prepared = PreparedFrame;
169    type Error = BareConduitError;
170
171    // r[impl zerocopy.framing.single-pass]
172    // r[impl zerocopy.framing.no-double-serialize]
173    // r[impl zerocopy.scatter]
174    // r[impl zerocopy.scatter.plan]
175    // r[impl zerocopy.scatter.plan.size]
176    // r[impl zerocopy.scatter.write]
177    // r[impl zerocopy.scatter.lifetime]
178    fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
179        let encode = || -> Result<Vec<u8>, BareConduitError> {
180            #[cfg(not(target_arch = "wasm32"))]
181            {
182                let ptr = PtrConst::new((&raw const item).cast::<u8>());
183                vox_jit::encode_with(self.encoder, ptr).map_err(BareConduitError::Encode)
184            }
185            #[cfg(target_arch = "wasm32")]
186            {
187                vox_postcard::to_vec(&item).map_err(BareConduitError::Encode)
188            }
189        };
190        // Collect any `Fd`s the encoder funnels into the thread-local
191        // collector — same install-around-encode shape as the channel
192        // binder (`with_channel_binder`). Off-Unix this is a pass-through
193        // and `fds` is `()`.
194        let (encoded, fds) = vox_types::collect_fds(encode);
195        Ok(PreparedFrame {
196            bytes: encoded?,
197            fds,
198        })
199    }
200
201    async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
202        let PreparedFrame { bytes, fds } = prepared;
203        if vox_types::frame_fds_len(&fds) > 0 && !self.link_tx.supports_fd_passing() {
204            return Err(BareConduitError::Io(std::io::Error::other(
205                "message carries file descriptors but the transport \
206                 cannot pass them",
207            )));
208        }
209        self.link_tx
210            .send_with_fds(bytes, fds)
211            .await
212            .map_err(BareConduitError::Io)
213    }
214
215    async fn close(self) -> std::io::Result<()> {
216        self.link_tx.close().await
217    }
218}
219
220// ---------------------------------------------------------------------------
221// Rx
222// ---------------------------------------------------------------------------
223
224pub struct BareConduitRx<F: MsgFamily, LRx> {
225    link_rx: LRx,
226    /// Descriptors that arrived with the most recently `recv`'d frame,
227    /// awaiting [`take_frame_fds`](vox_types::ConduitRx::take_frame_fds).
228    pending_fds: vox_types::FrameFds,
229    #[cfg(not(target_arch = "wasm32"))]
230    decoder: &'static CompiledDecoder,
231    #[cfg(target_arch = "wasm32")]
232    message_plan: MessagePlan,
233    _phantom: PhantomData<fn() -> F>,
234}
235
236impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
237where
238    LRx: vox_types::LinkRx + MaybeSend + 'static,
239{
240    type Msg = F;
241    type Error = BareConduitError;
242
243    // r[impl zerocopy.recv]
244    #[moire::instrument]
245    async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
246        let backing = match self.link_rx.recv().await.map_err(|error| {
247            BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
248        })? {
249            Some(b) => b,
250            None => return Ok(None),
251        };
252
253        // Capture this frame's descriptors. `Payload` only *borrows* its
254        // bytes during Message decode — the typed `Fd` is decoded later by
255        // the generated stub — so the fds are threaded out via
256        // `take_frame_fds` (the same rail as the schema tracker) and
257        // installed at that decode site, not here.
258        self.pending_fds = self.link_rx.take_frame_fds();
259
260        #[cfg(not(target_arch = "wasm32"))]
261        {
262            crate::deserialize_postcard_with_decoder::<F::Msg<'static>>(backing, self.decoder)
263                .map_err(BareConduitError::Decode)
264                .map(Some)
265        }
266        #[cfg(target_arch = "wasm32")]
267        {
268            crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
269                backing,
270                &self.message_plan.plan,
271                &self.message_plan.registry,
272            )
273            .map_err(BareConduitError::Decode)
274            .map(Some)
275        }
276    }
277
278    fn take_frame_fds(&mut self) -> vox_types::FrameFds {
279        std::mem::take(&mut self.pending_fds)
280    }
281}
282
283// ---------------------------------------------------------------------------
284// Error
285// ---------------------------------------------------------------------------
286
287#[derive(Debug)]
288pub enum BareConduitError {
289    Encode(vox_postcard::SerializeError),
290    Decode(vox_postcard::DeserializeError),
291    Io(std::io::Error),
292    LinkDead,
293}
294
295impl std::fmt::Display for BareConduitError {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        match self {
298            Self::Encode(e) => write!(f, "encode error: {e}"),
299            Self::Decode(e) => write!(f, "decode error: {e}"),
300            Self::Io(e) => write!(f, "io error: {e}"),
301            Self::LinkDead => write!(f, "link dead"),
302        }
303    }
304}
305
306impl std::error::Error for BareConduitError {}
307
308#[cfg(test)]
309mod tests {
310    use vox_types::*;
311
312    use super::*;
313    use crate::memory_link_pair;
314
315    #[test]
316    fn connection_reject_with_nonempty_metadata_round_trips() {
317        let rt = tokio::runtime::Builder::new_current_thread()
318            .build()
319            .unwrap();
320        rt.block_on(async { connection_reject_with_nonempty_metadata_inner().await });
321    }
322
323    async fn connection_reject_with_nonempty_metadata_inner() {
324        let (a, b) = memory_link_pair(64);
325        let a_conduit = BareConduit::<MessageFamily, _>::new(a);
326        let b_conduit = BareConduit::<MessageFamily, _>::new(b);
327        let (a_tx, _a_rx) = a_conduit.split();
328        let (_b_tx, mut b_rx) = b_conduit.split();
329
330        // Send a ConnectionReject with non-empty metadata
331        let msg = Message {
332            connection_id: ConnectionId(1),
333            payload: MessagePayload::ConnectionReject(ConnectionReject {
334                metadata: vec![MetadataEntry::str(
335                    "error",
336                    "missing required vox-service metadata",
337                )],
338            }),
339        };
340        let prepared = a_tx.prepare_send(msg).unwrap();
341        a_tx.send_prepared(prepared).await.unwrap();
342
343        // Receive and verify
344        let received = b_rx.recv().await.unwrap().unwrap();
345        let msg = received.get();
346        if let MessagePayload::ConnectionReject(reject) = &msg.payload {
347            assert_eq!(reject.metadata.len(), 1, "expected 1 metadata entry");
348            assert_eq!(
349                reject.metadata[0].key.as_ref(),
350                "error",
351                "key mismatch: got {:?}",
352                reject.metadata[0].key
353            );
354            match &reject.metadata[0].value {
355                MetadataValue::String(s) => {
356                    assert_eq!(s.as_ref(), "missing required vox-service metadata");
357                }
358                other => panic!("expected String, got {:?}", other),
359            }
360        } else {
361            panic!("expected ConnectionReject, got {:?}", msg.payload);
362        }
363    }
364}