Skip to main content

vox_core/bare_conduit/
mod.rs

1use std::marker::PhantomData;
2
3#[cfg(not(target_arch = "wasm32"))]
4use facet_core::PtrConst;
5#[cfg(not(target_arch = "wasm32"))]
6use vox_jit::cache::{CompiledDecoder, CompiledEncoder};
7#[cfg(not(target_arch = "wasm32"))]
8use vox_jit::cal::BorrowMode;
9
10use vox_types::{Conduit, ConduitRx, ConduitTx, Link, LinkTx, MaybeSend, MsgFamily, SelfRef};
11
12use crate::MessagePlan;
13
14/// Wraps a [`Link`] with postcard serialization. No reconnect, no reliability.
15///
16/// If the link dies, the conduit is dead. For localhost, SHM, or any
17/// transport where reconnect isn't needed.
18///
19/// `F` is a [`MsgFamily`] — it maps lifetimes to concrete message types.
20/// The send path accepts `F::Msg<'a>` (borrowed data serialized in place
21/// via `Peek`). The recv path yields `SelfRef<F::Msg<'static>>` (owned).
22// r[impl conduit.bare]
23// r[impl conduit.typeplan]
24// r[impl zerocopy.framing.conduit.bare]
25pub struct BareConduit<F: MsgFamily, L: Link> {
26    link: L,
27    #[cfg(not(target_arch = "wasm32"))]
28    encoder: &'static CompiledEncoder,
29    #[cfg(not(target_arch = "wasm32"))]
30    decoder: &'static CompiledDecoder,
31    #[cfg(target_arch = "wasm32")]
32    message_plan: MessagePlan,
33    _phantom: PhantomData<fn(F) -> F>,
34}
35
36impl<F: MsgFamily, L: Link> BareConduit<F, L> {
37    /// Create a new BareConduit (identity plan — no schema translation).
38    pub fn new(link: L) -> Self {
39        let identity_plan = vox_postcard::build_identity_plan(F::shape());
40        let registry = vox_types::SchemaRegistry::new();
41        #[cfg(not(target_arch = "wasm32"))]
42        {
43            let runtime = vox_jit::global_runtime();
44            Self::resolve(link, runtime, 0, &identity_plan, &registry)
45        }
46        #[cfg(target_arch = "wasm32")]
47        {
48            Self::resolve(
49                link,
50                MessagePlan {
51                    remote_schema_id: 0,
52                    plan: identity_plan,
53                    registry,
54                },
55            )
56        }
57    }
58
59    /// Create a new BareConduit with a pre-built message translation plan.
60    pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
61        #[cfg(not(target_arch = "wasm32"))]
62        {
63            let runtime = vox_jit::global_runtime();
64            Self::resolve(
65                link,
66                runtime,
67                message_plan.remote_schema_id,
68                &message_plan.plan,
69                &message_plan.registry,
70            )
71        }
72        #[cfg(target_arch = "wasm32")]
73        {
74            Self::resolve(link, message_plan)
75        }
76    }
77
78    #[cfg(not(target_arch = "wasm32"))]
79    fn resolve(
80        link: L,
81        runtime: &vox_jit::JitRuntime,
82        remote_schema_id: u64,
83        plan: &vox_postcard::plan::TranslationPlan,
84        registry: &vox_types::SchemaRegistry,
85    ) -> Self {
86        let encoder = runtime
87            .prepare_encoder(F::shape())
88            .expect("JIT encode unavailable for message shape");
89        let decoder = runtime
90            .prepare_decoder(
91                remote_schema_id,
92                F::shape(),
93                plan,
94                registry,
95                BorrowMode::Owned,
96            )
97            .expect("JIT decode unavailable for message shape");
98        Self {
99            link,
100            encoder,
101            decoder,
102            _phantom: PhantomData,
103        }
104    }
105
106    #[cfg(target_arch = "wasm32")]
107    fn resolve(link: L, message_plan: MessagePlan) -> Self {
108        Self {
109            link,
110            message_plan,
111            _phantom: PhantomData,
112        }
113    }
114}
115
116impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
117where
118    L::Tx: MaybeSend + 'static,
119    L::Rx: MaybeSend + 'static,
120{
121    type Msg = F;
122    type Tx = BareConduitTx<F, L::Tx>;
123    type Rx = BareConduitRx<F, L::Rx>;
124
125    fn split(self) -> (Self::Tx, Self::Rx) {
126        let (tx, rx) = self.link.split();
127        (
128            BareConduitTx {
129                link_tx: tx,
130                #[cfg(not(target_arch = "wasm32"))]
131                encoder: self.encoder,
132                _phantom: PhantomData,
133            },
134            BareConduitRx {
135                link_rx: rx,
136                #[cfg(not(target_arch = "wasm32"))]
137                decoder: self.decoder,
138                #[cfg(target_arch = "wasm32")]
139                message_plan: self.message_plan,
140                _phantom: PhantomData,
141            },
142        )
143    }
144}
145
146// ---------------------------------------------------------------------------
147// Tx
148// ---------------------------------------------------------------------------
149
150pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
151    link_tx: LTx,
152    #[cfg(not(target_arch = "wasm32"))]
153    encoder: &'static CompiledEncoder,
154    _phantom: PhantomData<fn(F)>,
155}
156
157impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
158    type Msg = F;
159    type Prepared = Vec<u8>;
160    type Error = BareConduitError;
161
162    // r[impl zerocopy.framing.single-pass]
163    // r[impl zerocopy.framing.no-double-serialize]
164    // r[impl zerocopy.scatter]
165    // r[impl zerocopy.scatter.plan]
166    // r[impl zerocopy.scatter.plan.size]
167    // r[impl zerocopy.scatter.write]
168    // r[impl zerocopy.scatter.lifetime]
169    fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
170        #[cfg(not(target_arch = "wasm32"))]
171        {
172            let ptr = PtrConst::new((&raw const item).cast::<u8>());
173            vox_jit::encode_with(self.encoder, ptr).map_err(BareConduitError::Encode)
174        }
175        #[cfg(target_arch = "wasm32")]
176        {
177            vox_postcard::to_vec(&item).map_err(BareConduitError::Encode)
178        }
179    }
180
181    async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
182        self.link_tx
183            .send(prepared)
184            .await
185            .map_err(BareConduitError::Io)
186    }
187
188    async fn close(self) -> std::io::Result<()> {
189        self.link_tx.close().await
190    }
191}
192
193// ---------------------------------------------------------------------------
194// Rx
195// ---------------------------------------------------------------------------
196
197pub struct BareConduitRx<F: MsgFamily, LRx> {
198    link_rx: LRx,
199    #[cfg(not(target_arch = "wasm32"))]
200    decoder: &'static CompiledDecoder,
201    #[cfg(target_arch = "wasm32")]
202    message_plan: MessagePlan,
203    _phantom: PhantomData<fn() -> F>,
204}
205
206impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
207where
208    LRx: vox_types::LinkRx + MaybeSend + 'static,
209{
210    type Msg = F;
211    type Error = BareConduitError;
212
213    // r[impl zerocopy.recv]
214    #[moire::instrument]
215    async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
216        let backing = match self.link_rx.recv().await.map_err(|error| {
217            BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
218        })? {
219            Some(b) => b,
220            None => return Ok(None),
221        };
222
223        #[cfg(not(target_arch = "wasm32"))]
224        {
225            crate::deserialize_postcard_with_decoder::<F::Msg<'static>>(backing, self.decoder)
226                .map_err(BareConduitError::Decode)
227                .map(Some)
228        }
229        #[cfg(target_arch = "wasm32")]
230        {
231            crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
232                backing,
233                &self.message_plan.plan,
234                &self.message_plan.registry,
235            )
236            .map_err(BareConduitError::Decode)
237            .map(Some)
238        }
239    }
240}
241
242// ---------------------------------------------------------------------------
243// Error
244// ---------------------------------------------------------------------------
245
246#[derive(Debug)]
247pub enum BareConduitError {
248    Encode(vox_postcard::SerializeError),
249    Decode(vox_postcard::DeserializeError),
250    Io(std::io::Error),
251    LinkDead,
252}
253
254impl std::fmt::Display for BareConduitError {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        match self {
257            Self::Encode(e) => write!(f, "encode error: {e}"),
258            Self::Decode(e) => write!(f, "decode error: {e}"),
259            Self::Io(e) => write!(f, "io error: {e}"),
260            Self::LinkDead => write!(f, "link dead"),
261        }
262    }
263}
264
265impl std::error::Error for BareConduitError {}
266
267#[cfg(test)]
268mod tests {
269    use vox_types::*;
270
271    use super::*;
272    use crate::memory_link_pair;
273
274    #[test]
275    fn connection_reject_with_nonempty_metadata_round_trips() {
276        let rt = tokio::runtime::Builder::new_current_thread()
277            .build()
278            .unwrap();
279        rt.block_on(async { connection_reject_with_nonempty_metadata_inner().await });
280    }
281
282    async fn connection_reject_with_nonempty_metadata_inner() {
283        let (a, b) = memory_link_pair(64);
284        let a_conduit = BareConduit::<MessageFamily, _>::new(a);
285        let b_conduit = BareConduit::<MessageFamily, _>::new(b);
286        let (a_tx, _a_rx) = a_conduit.split();
287        let (_b_tx, mut b_rx) = b_conduit.split();
288
289        // Send a ConnectionReject with non-empty metadata
290        let msg = Message {
291            connection_id: ConnectionId(1),
292            payload: MessagePayload::ConnectionReject(ConnectionReject {
293                metadata: vec![MetadataEntry::str(
294                    "error",
295                    "missing required vox-service metadata",
296                )],
297            }),
298        };
299        let prepared = a_tx.prepare_send(msg).unwrap();
300        a_tx.send_prepared(prepared).await.unwrap();
301
302        // Receive and verify
303        let received = b_rx.recv().await.unwrap().unwrap();
304        let msg = received.get();
305        if let MessagePayload::ConnectionReject(reject) = &msg.payload {
306            assert_eq!(reject.metadata.len(), 1, "expected 1 metadata entry");
307            assert_eq!(
308                reject.metadata[0].key.as_ref(),
309                "error",
310                "key mismatch: got {:?}",
311                reject.metadata[0].key
312            );
313            match &reject.metadata[0].value {
314                MetadataValue::String(s) => {
315                    assert_eq!(s.as_ref(), "missing required vox-service metadata");
316                }
317                other => panic!("expected String, got {:?}", other),
318            }
319        } else {
320            panic!("expected ConnectionReject, got {:?}", msg.payload);
321        }
322    }
323}