roam_core/bare_conduit/
mod.rs1use std::marker::PhantomData;
2
3use facet_core::{PtrConst, Shape};
4use facet_reflect::Peek;
5
6use roam_types::{
7 Conduit, ConduitRx, ConduitTx, ConduitTxPermit, Link, LinkTx, LinkTxPermit, MaybeSend,
8 MsgFamily, SelfRef, WriteSlot,
9};
10
11pub struct BareConduit<F: MsgFamily, L: Link> {
23 link: L,
24 shape: &'static Shape,
25 _phantom: PhantomData<fn(F) -> F>,
26}
27
28impl<F: MsgFamily, L: Link> BareConduit<F, L> {
29 pub fn new(link: L) -> Self {
31 Self {
32 link,
33 shape: F::shape(),
34 _phantom: PhantomData,
35 }
36 }
37}
38
39impl<F: MsgFamily, L: Link> Conduit for BareConduit<F, L>
40where
41 L::Tx: MaybeSend + 'static,
42 L::Rx: MaybeSend + 'static,
43{
44 type Msg = F;
45 type Tx = BareConduitTx<F, L::Tx>;
46 type Rx = BareConduitRx<F, L::Rx>;
47
48 fn split(self) -> (Self::Tx, Self::Rx) {
49 let (tx, rx) = self.link.split();
50 (
51 BareConduitTx {
52 link_tx: tx,
53 shape: self.shape,
54 _phantom: PhantomData,
55 },
56 BareConduitRx {
57 link_rx: rx,
58 _phantom: PhantomData,
59 },
60 )
61 }
62}
63
64pub struct BareConduitTx<F: MsgFamily, LTx: LinkTx> {
69 link_tx: LTx,
70 shape: &'static Shape,
71 _phantom: PhantomData<fn(F)>,
72}
73
74impl<F: MsgFamily, LTx: LinkTx + MaybeSend + 'static> ConduitTx for BareConduitTx<F, LTx> {
75 type Msg = F;
76 type Permit<'a>
77 = BareConduitPermit<'a, F, LTx>
78 where
79 Self: 'a;
80
81 async fn reserve(&self) -> std::io::Result<Self::Permit<'_>> {
82 let permit = self.link_tx.reserve().await?;
83 Ok(BareConduitPermit {
84 permit,
85 shape: self.shape,
86 _phantom: PhantomData,
87 })
88 }
89
90 async fn close(self) -> std::io::Result<()> {
91 self.link_tx.close().await
92 }
93}
94
95pub struct BareConduitPermit<'a, F: MsgFamily, LTx: LinkTx> {
100 permit: LTx::Permit,
101 shape: &'static Shape,
102 _phantom: PhantomData<fn(F, &'a ())>,
103}
104
105impl<F: MsgFamily, LTx: LinkTx> ConduitTxPermit for BareConduitPermit<'_, F, LTx> {
106 type Msg = F;
107 type Error = BareConduitError;
108
109 fn send(self, item: F::Msg<'_>) -> Result<(), Self::Error> {
117 #[allow(unsafe_code)]
121 let peek = unsafe {
122 Peek::unchecked_new(PtrConst::new((&raw const item).cast::<u8>()), self.shape)
123 };
124 let plan = facet_postcard::peek_to_scatter_plan(peek).map_err(BareConduitError::Encode)?;
125
126 let mut slot = self
127 .permit
128 .alloc(plan.total_size())
129 .map_err(BareConduitError::Io)?;
130 plan.write_into(slot.as_mut_slice())
131 .map_err(BareConduitError::Encode)?;
132 slot.commit();
133 Ok(())
134 }
135}
136
137pub struct BareConduitRx<F: MsgFamily, LRx> {
142 link_rx: LRx,
143 _phantom: PhantomData<fn() -> F>,
144}
145
146impl<F: MsgFamily, LRx> ConduitRx for BareConduitRx<F, LRx>
147where
148 LRx: roam_types::LinkRx + MaybeSend + 'static,
149{
150 type Msg = F;
151 type Error = BareConduitError;
152
153 #[moire::instrument]
155 async fn recv(&mut self) -> Result<Option<SelfRef<F::Msg<'static>>>, Self::Error> {
156 let backing = match self.link_rx.recv().await.map_err(|error| {
157 BareConduitError::Io(std::io::Error::other(format!("link recv failed: {error}")))
158 })? {
159 Some(b) => b,
160 None => return Ok(None),
161 };
162
163 crate::deserialize_postcard::<F::Msg<'static>>(backing)
164 .map_err(BareConduitError::Decode)
165 .map(Some)
166 }
167}
168
169#[derive(Debug)]
174pub enum BareConduitError {
175 Encode(facet_postcard::SerializeError),
176 Decode(facet_format::DeserializeError),
177 Io(std::io::Error),
178 LinkDead,
179}
180
181impl std::fmt::Display for BareConduitError {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 match self {
184 Self::Encode(e) => write!(f, "encode error: {e}"),
185 Self::Decode(e) => write!(f, "decode error: {e}"),
186 Self::Io(e) => write!(f, "io error: {e}"),
187 Self::LinkDead => write!(f, "link dead"),
188 }
189 }
190}
191
192impl std::error::Error for BareConduitError {}