1use std::marker::PhantomData;
2
3use facet_core::{PtrConst, Shape};
4use facet_reflect::Peek;
5
6use vox_types::{Conduit, ConduitRx, ConduitTx, Link, LinkTx, MaybeSend, MsgFamily, SelfRef};
7
8use crate::MessagePlan;
9
10pub struct BareConduit<F: MsgFamily, L: Link> {
22 link: L,
23 shape: &'static Shape,
24 message_plan: Option<MessagePlan>,
25 _phantom: PhantomData<fn(F) -> F>,
26}
27
28impl<F: MsgFamily, L: Link> BareConduit<F, L> {
29 pub fn new(link: L) -> Self {
31 Self {
32 link,
33 shape: F::shape(),
34 message_plan: None,
35 _phantom: PhantomData,
36 }
37 }
38
39 pub fn with_message_plan(link: L, message_plan: MessagePlan) -> Self {
41 Self {
42 link,
43 shape: F::shape(),
44 message_plan: Some(message_plan),
45 _phantom: PhantomData,
46 }
47 }
48}
49
50impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
51where
52 L::Tx: MaybeSend + 'static,
53 L::Rx: MaybeSend + 'static,
54{
55 type Msg = F;
56 type Tx = BareConduitTx<F, L::Tx>;
57 type Rx = BareConduitRx<F, L::Rx>;
58
59 fn split(self) -> (Self::Tx, Self::Rx) {
60 let (tx, rx) = self.link.split();
61 (
62 BareConduitTx {
63 link_tx: tx,
64 shape: self.shape,
65 _phantom: PhantomData,
66 },
67 BareConduitRx {
68 link_rx: rx,
69 message_plan: self.message_plan,
70 _phantom: PhantomData,
71 },
72 )
73 }
74}
75
76pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
81 link_tx: LTx,
82 shape: &'static Shape,
83 _phantom: PhantomData<fn(F)>,
84}
85
86impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
87 type Msg = F;
88 type Prepared = Vec<u8>;
89 type Error = BareConduitError;
90
91 fn prepare_send(&self, item: F::Msg<'_>) -> Result<Self::Prepared, Self::Error> {
92 encode_message::<F>(self.shape, item)
93 }
94
95 async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
96 self.link_tx
97 .send(prepared)
98 .await
99 .map_err(BareConduitError::Io)
100 }
101
102 async fn close(self) -> std::io::Result<()> {
103 self.link_tx.close().await
104 }
105}
106
107fn encode_message<F: MsgFamily>(
115 shape: &'static Shape,
116 item: F::Msg<'_>,
117) -> Result<Vec<u8>, BareConduitError> {
118 #[allow(unsafe_code)]
119 let peek = unsafe { Peek::unchecked_new(PtrConst::new((&raw const item).cast::<u8>()), shape) };
120 let plan = vox_postcard::peek_to_scatter_plan(peek).map_err(BareConduitError::Encode)?;
121 let mut bytes = vec![0u8; plan.total_size()];
122 plan.write_into(&mut bytes);
123 Ok(bytes)
124}
125
126pub struct BareConduitRx<F: MsgFamily, LRx> {
131 link_rx: LRx,
132 message_plan: Option<MessagePlan>,
133 _phantom: PhantomData<fn() -> F>,
134}
135
136impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
137where
138 LRx: vox_types::LinkRx + MaybeSend + 'static,
139{
140 type Msg = F;
141 type Error = BareConduitError;
142
143 #[moire::instrument]
145 async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
146 let backing = match self.link_rx.recv().await.map_err(|error| {
147 BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
148 })? {
149 Some(b) => b,
150 None => return Ok(None),
151 };
152
153 match &self.message_plan {
154 Some(plan) => crate::deserialize_postcard_with_plan::<F::Msg<'static>>(
155 backing,
156 &plan.plan,
157 &plan.registry,
158 ),
159 None => crate::deserialize_postcard::<F::Msg<'static>>(backing),
160 }
161 .map_err(BareConduitError::Decode)
162 .map(Some)
163 }
164}
165
166#[derive(Debug)]
171pub enum BareConduitError {
172 Encode(vox_postcard::SerializeError),
173 Decode(vox_postcard::DeserializeError),
174 Io(std::io::Error),
175 LinkDead,
176}
177
178impl std::fmt::Display for BareConduitError {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 match self {
181 Self::Encode(e) => write!(f, "encode error: {e}"),
182 Self::Decode(e) => write!(f, "decode error: {e}"),
183 Self::Io(e) => write!(f, "io error: {e}"),
184 Self::LinkDead => write!(f, "link dead"),
185 }
186 }
187}
188
189impl std::error::Error for BareConduitError {}
190
191#[cfg(test)]
192mod tests {
193 use vox_types::*;
194
195 use super::*;
196 use crate::memory_link_pair;
197
198 #[test]
199 fn connection_reject_with_nonempty_metadata_round_trips() {
200 let rt = tokio::runtime::Builder::new_current_thread()
201 .build()
202 .unwrap();
203 rt.block_on(async { connection_reject_with_nonempty_metadata_inner().await });
204 }
205
206 async fn connection_reject_with_nonempty_metadata_inner() {
207 let (a, b) = memory_link_pair(64);
208 let a_conduit = BareConduit::<MessageFamily, _>::new(a);
209 let b_conduit = BareConduit::<MessageFamily, _>::new(b);
210 let (a_tx, _a_rx) = a_conduit.split();
211 let (_b_tx, mut b_rx) = b_conduit.split();
212
213 let msg = Message {
215 connection_id: ConnectionId(1),
216 payload: MessagePayload::ConnectionReject(ConnectionReject {
217 metadata: vec![MetadataEntry::str(
218 "error",
219 "missing required vox-service metadata",
220 )],
221 }),
222 };
223 let prepared = a_tx.prepare_send(msg).unwrap();
224 a_tx.send_prepared(prepared).await.unwrap();
225
226 let received = b_rx.recv().await.unwrap().unwrap();
228 let msg = received.get();
229 if let MessagePayload::ConnectionReject(reject) = &msg.payload {
230 assert_eq!(reject.metadata.len(), 1, "expected 1 metadata entry");
231 assert_eq!(
232 reject.metadata[0].key.as_ref(),
233 "error",
234 "key mismatch: got {:?}",
235 reject.metadata[0].key
236 );
237 match &reject.metadata[0].value {
238 MetadataValue::String(s) => {
239 assert_eq!(s.as_ref(), "missing required vox-service metadata");
240 }
241 other => panic!("expected String, got {:?}", other),
242 }
243 } else {
244 panic!("expected ConnectionReject, got {:?}", msg.payload);
245 }
246 }
247}