1use std::marker::PhantomData;
2
3#[cfg(not(target_arch = "wasm32"))]
4use facet_core::PtrConst;
5#[cfg(not(target_arch = "wasm32"))]
6use vox_jit::cache::{CompiledDecoder, CompiledEncoder};
7#[cfg(not(target_arch = "wasm32"))]
8use vox_jit::cal::BorrowMode;
9
10use vox_types::{Conduit, ConduitRx, ConduitTx, Link, LinkTx, MaybeSend, MsgFamily, SelfRef};
11
12use crate::MessagePlan;
13
14pub struct BareConduit<F: MsgFamily, L: Link> {
26 link: L,
27 #[cfg(not(target_arch = "wasm32"))]
28 encoder: &'static CompiledEncoder,
29 #[cfg(not(target_arch = "wasm32"))]
30 decoder: Option<&'static CompiledDecoder>,
31 message_plan: MessagePlan,
32 _phantom: PhantomData<fn(F) -> F>,
33}
34
35impl<F: MsgFamily, L: Link> BareConduit<F, L> {
36 pub fn new(link: L) -> Self {
38 let identity_plan = vox_postcard::build_identity_plan(F::shape());
39 Self::resolve(
40 link,
41 MessagePlan {
42 remote_schema_id: 0,
43 plan: identity_plan,
44 registry: vox_types::SchemaRegistry::new(),
45 },
46 )
47 }
48
49 pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
51 Self::resolve(link, message_plan)
52 }
53
54 fn resolve(link: L, message_plan: MessagePlan) -> Self {
55 #[cfg(not(target_arch = "wasm32"))]
56 let runtime = vox_jit::global_runtime();
57 #[cfg(not(target_arch = "wasm32"))]
58 let encoder = runtime
59 .prepare_encoder(F::shape())
60 .expect("JIT encode unavailable for message shape");
61 #[cfg(not(target_arch = "wasm32"))]
62 let decoder = runtime.prepare_decoder(
63 message_plan.remote_schema_id,
64 F::shape(),
65 &message_plan.plan,
66 &message_plan.registry,
67 BorrowMode::Owned,
68 );
69 #[cfg(not(target_arch = "wasm32"))]
70 if decoder.is_none() {
71 tracing::warn!("vox bare conduit message decoder unavailable; falling back");
72 }
73 Self {
74 link,
75 #[cfg(not(target_arch = "wasm32"))]
76 encoder,
77 #[cfg(not(target_arch = "wasm32"))]
78 decoder,
79 message_plan,
80 _phantom: PhantomData,
81 }
82 }
83}
84
85impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
86where
87 L::Tx: MaybeSend + 'static,
88 L::Rx: MaybeSend + 'static,
89{
90 type Msg = F;
91 type Tx = BareConduitTx<F, L::Tx>;
92 type Rx = BareConduitRx<F, L::Rx>;
93
94 fn split(self) -> (Self::Tx, Self::Rx) {
95 let (tx, rx) = self.link.split();
96 (
97 BareConduitTx {
98 link_tx: tx,
99 #[cfg(not(target_arch = "wasm32"))]
100 encoder: self.encoder,
101 _phantom: PhantomData,
102 },
103 BareConduitRx {
104 link_rx: rx,
105 pending_fds: vox_types::FrameFds::default(),
106 #[cfg(not(target_arch = "wasm32"))]
107 decoder: self.decoder,
108 message_plan: self.message_plan,
109 _phantom: PhantomData,
110 },
111 )
112 }
113}
114
115pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
120 link_tx: LTx,
121 #[cfg(not(target_arch = "wasm32"))]
122 encoder: &'static CompiledEncoder,
123 _phantom: PhantomData<fn(F)>,
124}
125
126pub struct PreparedFrame {
130 pub bytes: Vec<u8>,
131 pub fds: vox_types::FrameFds,
132}
133
134impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
135 type Msg = F;
136 type Prepared = PreparedFrame;
137 type Error = BareConduitError;
138
139 fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
147 let encode = || -> Result<Vec<u8>, BareConduitError> {
148 #[cfg(not(target_arch = "wasm32"))]
149 {
150 let ptr = PtrConst::new((&raw const item).cast::<u8>());
151 vox_jit::encode_with(self.encoder, ptr).map_err(BareConduitError::Encode)
152 }
153 #[cfg(target_arch = "wasm32")]
154 {
155 vox_postcard::to_vec(&item).map_err(BareConduitError::Encode)
156 }
157 };
158 let (encoded, fds) = vox_types::collect_fds(encode);
163 Ok(PreparedFrame {
164 bytes: encoded?,
165 fds,
166 })
167 }
168
169 async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
170 let PreparedFrame { bytes, fds } = prepared;
171 if vox_types::frame_fds_len(&fds) > 0 && !self.link_tx.supports_fd_passing() {
172 return Err(BareConduitError::Io(std::io::Error::other(
173 "message carries file descriptors but the transport \
174 cannot pass them",
175 )));
176 }
177 self.link_tx
178 .send_with_fds(bytes, fds)
179 .await
180 .map_err(BareConduitError::Io)
181 }
182
183 async fn close(self) -> std::io::Result<()> {
184 self.link_tx.close().await
185 }
186}
187
188pub struct BareConduitRx<F: MsgFamily, LRx> {
193 link_rx: LRx,
194 pending_fds: vox_types::FrameFds,
197 #[cfg(not(target_arch = "wasm32"))]
198 decoder: Option<&'static CompiledDecoder>,
199 message_plan: MessagePlan,
200 _phantom: PhantomData<fn() -> F>,
201}
202
203impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
204where
205 LRx: vox_types::LinkRx + MaybeSend + 'static,
206{
207 type Msg = F;
208 type Error = BareConduitError;
209
210 #[moire::instrument]
212 async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
213 let backing = match self.link_rx.recv().await.map_err(|error| {
214 BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
215 })? {
216 Some(b) => b,
217 None => return Ok(None),
218 };
219
220 self.pending_fds = self.link_rx.take_frame_fds();
226
227 #[cfg(not(target_arch = "wasm32"))]
228 {
229 crate::deserialize_postcard_with_decoder::<F::Msg<'static>>(
230 backing,
231 self.decoder,
232 &self.message_plan.plan,
233 &self.message_plan.registry,
234 )
235 .map_err(BareConduitError::Decode)
236 .map(Some)
237 }
238 #[cfg(target_arch = "wasm32")]
239 {
240 crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
241 backing,
242 &self.message_plan.plan,
243 &self.message_plan.registry,
244 )
245 .map_err(BareConduitError::Decode)
246 .map(Some)
247 }
248 }
249
250 fn take_frame_fds(&mut self) -> vox_types::FrameFds {
251 std::mem::take(&mut self.pending_fds)
252 }
253}
254
255#[derive(Debug)]
260pub enum BareConduitError {
261 Encode(vox_postcard::SerializeError),
262 Decode(vox_postcard::DeserializeError),
263 Io(std::io::Error),
264 LinkDead,
265}
266
267impl std::fmt::Display for BareConduitError {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 match self {
270 Self::Encode(e) => write!(f, "encode error: {e}"),
271 Self::Decode(e) => write!(f, "decode error: {e}"),
272 Self::Io(e) => write!(f, "io error: {e}"),
273 Self::LinkDead => write!(f, "link dead"),
274 }
275 }
276}
277
278impl std::error::Error for BareConduitError {}
279
280#[cfg(test)]
281mod tests {
282 use vox_types::*;
283
284 use super::*;
285 use crate::memory_link_pair;
286
287 #[test]
288 fn connection_reject_with_nonempty_metadata_round_trips() {
289 let rt = tokio::runtime::Builder::new_current_thread()
290 .build()
291 .unwrap();
292 rt.block_on(async { connection_reject_with_nonempty_metadata_inner().await });
293 }
294
295 async fn connection_reject_with_nonempty_metadata_inner() {
296 let (a, b) = memory_link_pair(64);
297 let a_conduit = BareConduit::<MessageFamily, _>::new(a);
298 let b_conduit = BareConduit::<MessageFamily, _>::new(b);
299 let (a_tx, _a_rx) = a_conduit.split();
300 let (_b_tx, mut b_rx) = b_conduit.split();
301
302 let msg = Message {
304 connection_id: ConnectionId(1),
305 payload: MessagePayload::ConnectionReject(ConnectionReject {
306 metadata: vec![MetadataEntry::str(
307 "error",
308 "missing required vox-service metadata",
309 )],
310 }),
311 };
312 let prepared = a_tx.prepare_send(msg).unwrap();
313 a_tx.send_prepared(prepared).await.unwrap();
314
315 let received = b_rx.recv().await.unwrap().unwrap();
317 let msg = received.get();
318 if let MessagePayload::ConnectionReject(reject) = &msg.payload {
319 assert_eq!(reject.metadata.len(), 1, "expected 1 metadata entry");
320 assert_eq!(
321 reject.metadata[0].key.as_ref(),
322 "error",
323 "key mismatch: got {:?}",
324 reject.metadata[0].key
325 );
326 match &reject.metadata[0].value {
327 MetadataValue::String(s) => {
328 assert_eq!(s.as_ref(), "missing required vox-service metadata");
329 }
330 other => panic!("expected String, got {:?}", other),
331 }
332 } else {
333 panic!("expected ConnectionReject, got {:?}", msg.payload);
334 }
335 }
336}