1use std::marker::PhantomData;
2
3#[cfg(not(target_arch = "wasm32"))]
4use facet_core::PtrConst;
5#[cfg(not(target_arch = "wasm32"))]
6use vox_jit::cache::{CompiledDecoder, CompiledEncoder};
7#[cfg(not(target_arch = "wasm32"))]
8use vox_jit::cal::BorrowMode;
9
10use vox_types::{Conduit, ConduitRx, ConduitTx, Link, LinkTx, MaybeSend, MsgFamily, SelfRef};
11
12use crate::MessagePlan;
13
14pub struct BareConduit<F: MsgFamily, L: Link> {
26 link: L,
27 #[cfg(not(target_arch = "wasm32"))]
28 encoder: &'static CompiledEncoder,
29 #[cfg(not(target_arch = "wasm32"))]
30 decoder: &'static CompiledDecoder,
31 #[cfg(target_arch = "wasm32")]
32 message_plan: MessagePlan,
33 _phantom: PhantomData<fn(F) -> F>,
34}
35
36impl<F: MsgFamily, L: Link> BareConduit<F, L> {
37 pub fn new(link: L) -> Self {
39 let identity_plan = vox_postcard::build_identity_plan(F::shape());
40 let registry = vox_types::SchemaRegistry::new();
41 #[cfg(not(target_arch = "wasm32"))]
42 {
43 let runtime = vox_jit::global_runtime();
44 Self::resolve(link, runtime, 0, &identity_plan, ®istry)
45 }
46 #[cfg(target_arch = "wasm32")]
47 {
48 Self::resolve(
49 link,
50 MessagePlan {
51 remote_schema_id: 0,
52 plan: identity_plan,
53 registry,
54 },
55 )
56 }
57 }
58
59 pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
61 #[cfg(not(target_arch = "wasm32"))]
62 {
63 let runtime = vox_jit::global_runtime();
64 Self::resolve(
65 link,
66 runtime,
67 message_plan.remote_schema_id,
68 &message_plan.plan,
69 &message_plan.registry,
70 )
71 }
72 #[cfg(target_arch = "wasm32")]
73 {
74 Self::resolve(link, message_plan)
75 }
76 }
77
78 #[cfg(not(target_arch = "wasm32"))]
79 fn resolve(
80 link: L,
81 runtime: &vox_jit::JitRuntime,
82 remote_schema_id: u64,
83 plan: &vox_postcard::plan::TranslationPlan,
84 registry: &vox_types::SchemaRegistry,
85 ) -> Self {
86 let encoder = runtime
87 .prepare_encoder(F::shape())
88 .expect("JIT encode unavailable for message shape");
89 let decoder = runtime
90 .prepare_decoder(
91 remote_schema_id,
92 F::shape(),
93 plan,
94 registry,
95 BorrowMode::Owned,
96 )
97 .expect("JIT decode unavailable for message shape");
98 Self {
99 link,
100 encoder,
101 decoder,
102 _phantom: PhantomData,
103 }
104 }
105
106 #[cfg(target_arch = "wasm32")]
107 fn resolve(link: L, message_plan: MessagePlan) -> Self {
108 Self {
109 link,
110 message_plan,
111 _phantom: PhantomData,
112 }
113 }
114}
115
116impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
117where
118 L::Tx: MaybeSend + 'static,
119 L::Rx: MaybeSend + 'static,
120{
121 type Msg = F;
122 type Tx = BareConduitTx<F, L::Tx>;
123 type Rx = BareConduitRx<F, L::Rx>;
124
125 fn split(self) -> (Self::Tx, Self::Rx) {
126 let (tx, rx) = self.link.split();
127 (
128 BareConduitTx {
129 link_tx: tx,
130 #[cfg(not(target_arch = "wasm32"))]
131 encoder: self.encoder,
132 _phantom: PhantomData,
133 },
134 BareConduitRx {
135 link_rx: rx,
136 #[cfg(not(target_arch = "wasm32"))]
137 decoder: self.decoder,
138 #[cfg(target_arch = "wasm32")]
139 message_plan: self.message_plan,
140 _phantom: PhantomData,
141 },
142 )
143 }
144}
145
146pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
151 link_tx: LTx,
152 #[cfg(not(target_arch = "wasm32"))]
153 encoder: &'static CompiledEncoder,
154 _phantom: PhantomData<fn(F)>,
155}
156
157impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
158 type Msg = F;
159 type Prepared = Vec<u8>;
160 type Error = BareConduitError;
161
162 fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
170 #[cfg(not(target_arch = "wasm32"))]
171 {
172 let ptr = PtrConst::new((&raw const item).cast::<u8>());
173 vox_jit::encode_with(self.encoder, ptr).map_err(BareConduitError::Encode)
174 }
175 #[cfg(target_arch = "wasm32")]
176 {
177 vox_postcard::to_vec(&item).map_err(BareConduitError::Encode)
178 }
179 }
180
181 async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
182 self.link_tx
183 .send(prepared)
184 .await
185 .map_err(BareConduitError::Io)
186 }
187
188 async fn close(self) -> std::io::Result<()> {
189 self.link_tx.close().await
190 }
191}
192
193pub struct BareConduitRx<F: MsgFamily, LRx> {
198 link_rx: LRx,
199 #[cfg(not(target_arch = "wasm32"))]
200 decoder: &'static CompiledDecoder,
201 #[cfg(target_arch = "wasm32")]
202 message_plan: MessagePlan,
203 _phantom: PhantomData<fn() -> F>,
204}
205
206impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
207where
208 LRx: vox_types::LinkRx + MaybeSend + 'static,
209{
210 type Msg = F;
211 type Error = BareConduitError;
212
213 #[moire::instrument]
215 async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
216 let backing = match self.link_rx.recv().await.map_err(|error| {
217 BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
218 })? {
219 Some(b) => b,
220 None => return Ok(None),
221 };
222
223 #[cfg(not(target_arch = "wasm32"))]
224 {
225 crate::deserialize_postcard_with_decoder::<F::Msg<'static>>(backing, self.decoder)
226 .map_err(BareConduitError::Decode)
227 .map(Some)
228 }
229 #[cfg(target_arch = "wasm32")]
230 {
231 crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
232 backing,
233 &self.message_plan.plan,
234 &self.message_plan.registry,
235 )
236 .map_err(BareConduitError::Decode)
237 .map(Some)
238 }
239 }
240}
241
242#[derive(Debug)]
247pub enum BareConduitError {
248 Encode(vox_postcard::SerializeError),
249 Decode(vox_postcard::DeserializeError),
250 Io(std::io::Error),
251 LinkDead,
252}
253
254impl std::fmt::Display for BareConduitError {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 match self {
257 Self::Encode(e) => write!(f, "encode error: {e}"),
258 Self::Decode(e) => write!(f, "decode error: {e}"),
259 Self::Io(e) => write!(f, "io error: {e}"),
260 Self::LinkDead => write!(f, "link dead"),
261 }
262 }
263}
264
265impl std::error::Error for BareConduitError {}
266
267#[cfg(test)]
268mod tests {
269 use vox_types::*;
270
271 use super::*;
272 use crate::memory_link_pair;
273
274 #[test]
275 fn connection_reject_with_nonempty_metadata_round_trips() {
276 let rt = tokio::runtime::Builder::new_current_thread()
277 .build()
278 .unwrap();
279 rt.block_on(async { connection_reject_with_nonempty_metadata_inner().await });
280 }
281
282 async fn connection_reject_with_nonempty_metadata_inner() {
283 let (a, b) = memory_link_pair(64);
284 let a_conduit = BareConduit::<MessageFamily, _>::new(a);
285 let b_conduit = BareConduit::<MessageFamily, _>::new(b);
286 let (a_tx, _a_rx) = a_conduit.split();
287 let (_b_tx, mut b_rx) = b_conduit.split();
288
289 let msg = Message {
291 connection_id: ConnectionId(1),
292 payload: MessagePayload::ConnectionReject(ConnectionReject {
293 metadata: vec![MetadataEntry::str(
294 "error",
295 "missing required vox-service metadata",
296 )],
297 }),
298 };
299 let prepared = a_tx.prepare_send(msg).unwrap();
300 a_tx.send_prepared(prepared).await.unwrap();
301
302 let received = b_rx.recv().await.unwrap().unwrap();
304 let msg = received.get();
305 if let MessagePayload::ConnectionReject(reject) = &msg.payload {
306 assert_eq!(reject.metadata.len(), 1, "expected 1 metadata entry");
307 assert_eq!(
308 reject.metadata[0].key.as_ref(),
309 "error",
310 "key mismatch: got {:?}",
311 reject.metadata[0].key
312 );
313 match &reject.metadata[0].value {
314 MetadataValue::String(s) => {
315 assert_eq!(s.as_ref(), "missing required vox-service metadata");
316 }
317 other => panic!("expected String, got {:?}", other),
318 }
319 } else {
320 panic!("expected ConnectionReject, got {:?}", msg.payload);
321 }
322 }
323}