1use std::marker::PhantomData;
2
3#[cfg(not(target_arch = "wasm32"))]
4use facet_core::PtrConst;
5#[cfg(not(target_arch = "wasm32"))]
6use vox_jit::cache::{CompiledDecoder, CompiledEncoder};
7#[cfg(not(target_arch = "wasm32"))]
8use vox_jit::cal::BorrowMode;
9
10use vox_types::{Conduit, ConduitRx, ConduitTx, Link, LinkTx, MaybeSend, MsgFamily, SelfRef};
11
12use crate::MessagePlan;
13
14pub struct BareConduit<F: MsgFamily, L: Link> {
26 link: L,
27 #[cfg(not(target_arch = "wasm32"))]
28 encoder: &'static CompiledEncoder,
29 #[cfg(not(target_arch = "wasm32"))]
30 decoder: &'static CompiledDecoder,
31 #[cfg(target_arch = "wasm32")]
32 message_plan: MessagePlan,
33 _phantom: PhantomData<fn(F) -> F>,
34}
35
36impl<F: MsgFamily, L: Link> BareConduit<F, L> {
37 pub fn new(link: L) -> Self {
39 let identity_plan = vox_postcard::build_identity_plan(F::shape());
40 let registry = vox_types::SchemaRegistry::new();
41 #[cfg(not(target_arch = "wasm32"))]
42 {
43 let runtime = vox_jit::global_runtime();
44 Self::resolve(link, runtime, 0, &identity_plan, ®istry)
45 }
46 #[cfg(target_arch = "wasm32")]
47 {
48 Self::resolve(
49 link,
50 MessagePlan {
51 remote_schema_id: 0,
52 plan: identity_plan,
53 registry,
54 },
55 )
56 }
57 }
58
59 pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
61 #[cfg(not(target_arch = "wasm32"))]
62 {
63 let runtime = vox_jit::global_runtime();
64 Self::resolve(
65 link,
66 runtime,
67 message_plan.remote_schema_id,
68 &message_plan.plan,
69 &message_plan.registry,
70 )
71 }
72 #[cfg(target_arch = "wasm32")]
73 {
74 Self::resolve(link, message_plan)
75 }
76 }
77
78 #[cfg(not(target_arch = "wasm32"))]
79 fn resolve(
80 link: L,
81 runtime: &vox_jit::JitRuntime,
82 remote_schema_id: u64,
83 plan: &vox_postcard::plan::TranslationPlan,
84 registry: &vox_types::SchemaRegistry,
85 ) -> Self {
86 let encoder = runtime
87 .prepare_encoder(F::shape())
88 .expect("JIT encode unavailable for message shape");
89 let decoder = runtime
90 .prepare_decoder(
91 remote_schema_id,
92 F::shape(),
93 plan,
94 registry,
95 BorrowMode::Owned,
96 )
97 .expect("JIT decode unavailable for message shape");
98 Self {
99 link,
100 encoder,
101 decoder,
102 _phantom: PhantomData,
103 }
104 }
105
106 #[cfg(target_arch = "wasm32")]
107 fn resolve(link: L, message_plan: MessagePlan) -> Self {
108 Self {
109 link,
110 message_plan,
111 _phantom: PhantomData,
112 }
113 }
114}
115
116impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
117where
118 L::Tx: MaybeSend + 'static,
119 L::Rx: MaybeSend + 'static,
120{
121 type Msg = F;
122 type Tx = BareConduitTx<F, L::Tx>;
123 type Rx = BareConduitRx<F, L::Rx>;
124
125 fn split(self) -> (Self::Tx, Self::Rx) {
126 let (tx, rx) = self.link.split();
127 (
128 BareConduitTx {
129 link_tx: tx,
130 #[cfg(not(target_arch = "wasm32"))]
131 encoder: self.encoder,
132 _phantom: PhantomData,
133 },
134 BareConduitRx {
135 link_rx: rx,
136 pending_fds: vox_types::FrameFds::default(),
137 #[cfg(not(target_arch = "wasm32"))]
138 decoder: self.decoder,
139 #[cfg(target_arch = "wasm32")]
140 message_plan: self.message_plan,
141 _phantom: PhantomData,
142 },
143 )
144 }
145}
146
147pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
152 link_tx: LTx,
153 #[cfg(not(target_arch = "wasm32"))]
154 encoder: &'static CompiledEncoder,
155 _phantom: PhantomData<fn(F)>,
156}
157
158pub struct PreparedFrame {
162 pub bytes: Vec<u8>,
163 pub fds: vox_types::FrameFds,
164}
165
166impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
167 type Msg = F;
168 type Prepared = PreparedFrame;
169 type Error = BareConduitError;
170
171 fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
179 let encode = || -> Result<Vec<u8>, BareConduitError> {
180 #[cfg(not(target_arch = "wasm32"))]
181 {
182 let ptr = PtrConst::new((&raw const item).cast::<u8>());
183 vox_jit::encode_with(self.encoder, ptr).map_err(BareConduitError::Encode)
184 }
185 #[cfg(target_arch = "wasm32")]
186 {
187 vox_postcard::to_vec(&item).map_err(BareConduitError::Encode)
188 }
189 };
190 let (encoded, fds) = vox_types::collect_fds(encode);
195 Ok(PreparedFrame {
196 bytes: encoded?,
197 fds,
198 })
199 }
200
201 async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
202 let PreparedFrame { bytes, fds } = prepared;
203 if vox_types::frame_fds_len(&fds) > 0 && !self.link_tx.supports_fd_passing() {
204 return Err(BareConduitError::Io(std::io::Error::other(
205 "message carries file descriptors but the transport \
206 cannot pass them",
207 )));
208 }
209 self.link_tx
210 .send_with_fds(bytes, fds)
211 .await
212 .map_err(BareConduitError::Io)
213 }
214
215 async fn close(self) -> std::io::Result<()> {
216 self.link_tx.close().await
217 }
218}
219
220pub struct BareConduitRx<F: MsgFamily, LRx> {
225 link_rx: LRx,
226 pending_fds: vox_types::FrameFds,
229 #[cfg(not(target_arch = "wasm32"))]
230 decoder: &'static CompiledDecoder,
231 #[cfg(target_arch = "wasm32")]
232 message_plan: MessagePlan,
233 _phantom: PhantomData<fn() -> F>,
234}
235
236impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
237where
238 LRx: vox_types::LinkRx + MaybeSend + 'static,
239{
240 type Msg = F;
241 type Error = BareConduitError;
242
243 #[moire::instrument]
245 async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
246 let backing = match self.link_rx.recv().await.map_err(|error| {
247 BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
248 })? {
249 Some(b) => b,
250 None => return Ok(None),
251 };
252
253 self.pending_fds = self.link_rx.take_frame_fds();
259
260 #[cfg(not(target_arch = "wasm32"))]
261 {
262 crate::deserialize_postcard_with_decoder::<F::Msg<'static>>(backing, self.decoder)
263 .map_err(BareConduitError::Decode)
264 .map(Some)
265 }
266 #[cfg(target_arch = "wasm32")]
267 {
268 crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
269 backing,
270 &self.message_plan.plan,
271 &self.message_plan.registry,
272 )
273 .map_err(BareConduitError::Decode)
274 .map(Some)
275 }
276 }
277
278 fn take_frame_fds(&mut self) -> vox_types::FrameFds {
279 std::mem::take(&mut self.pending_fds)
280 }
281}
282
283#[derive(Debug)]
288pub enum BareConduitError {
289 Encode(vox_postcard::SerializeError),
290 Decode(vox_postcard::DeserializeError),
291 Io(std::io::Error),
292 LinkDead,
293}
294
295impl std::fmt::Display for BareConduitError {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 match self {
298 Self::Encode(e) => write!(f, "encode error: {e}"),
299 Self::Decode(e) => write!(f, "decode error: {e}"),
300 Self::Io(e) => write!(f, "io error: {e}"),
301 Self::LinkDead => write!(f, "link dead"),
302 }
303 }
304}
305
306impl std::error::Error for BareConduitError {}
307
308#[cfg(test)]
309mod tests {
310 use vox_types::*;
311
312 use super::*;
313 use crate::memory_link_pair;
314
315 #[test]
316 fn connection_reject_with_nonempty_metadata_round_trips() {
317 let rt = tokio::runtime::Builder::new_current_thread()
318 .build()
319 .unwrap();
320 rt.block_on(async { connection_reject_with_nonempty_metadata_inner().await });
321 }
322
323 async fn connection_reject_with_nonempty_metadata_inner() {
324 let (a, b) = memory_link_pair(64);
325 let a_conduit = BareConduit::<MessageFamily, _>::new(a);
326 let b_conduit = BareConduit::<MessageFamily, _>::new(b);
327 let (a_tx, _a_rx) = a_conduit.split();
328 let (_b_tx, mut b_rx) = b_conduit.split();
329
330 let msg = Message {
332 connection_id: ConnectionId(1),
333 payload: MessagePayload::ConnectionReject(ConnectionReject {
334 metadata: vec![MetadataEntry::str(
335 "error",
336 "missing required vox-service metadata",
337 )],
338 }),
339 };
340 let prepared = a_tx.prepare_send(msg).unwrap();
341 a_tx.send_prepared(prepared).await.unwrap();
342
343 let received = b_rx.recv().await.unwrap().unwrap();
345 let msg = received.get();
346 if let MessagePayload::ConnectionReject(reject) = &msg.payload {
347 assert_eq!(reject.metadata.len(), 1, "expected 1 metadata entry");
348 assert_eq!(
349 reject.metadata[0].key.as_ref(),
350 "error",
351 "key mismatch: got {:?}",
352 reject.metadata[0].key
353 );
354 match &reject.metadata[0].value {
355 MetadataValue::String(s) => {
356 assert_eq!(s.as_ref(), "missing required vox-service metadata");
357 }
358 other => panic!("expected String, got {:?}", other),
359 }
360 } else {
361 panic!("expected ConnectionReject, got {:?}", msg.payload);
362 }
363 }
364}