Skip to main content

roam_core/
memory_link.rs

1use moire::sync::mpsc;
2use roam_types::{Backing, Link, LinkRx, LinkTx, LinkTxPermit, WriteSlot};
3
4/// In-process [`Link`] backed by tokio mpsc channels.
5///
6/// Each direction is an unbounded channel carrying `Vec<u8>` — raw bytes,
7/// no serialization, no IO. Useful for testing Conduits, Session, and
8/// anything above the transport layer without real networking.
9// r[impl transport.memory]
10// r[impl zerocopy.framing.link.memory]
11pub struct MemoryLink {
12    tx: mpsc::Sender<Vec<u8>>,
13    rx: mpsc::Receiver<Vec<u8>>,
14}
15
16/// Create a pair of connected [`MemoryLink`]s.
17///
18/// Returns `(a, b)` where sending on `a` delivers to `b` and vice versa.
19pub fn memory_link_pair(buffer: usize) -> (MemoryLink, MemoryLink) {
20    let (tx_a, rx_b) = mpsc::channel("memory_link.a→b", buffer);
21    let (tx_b, rx_a) = mpsc::channel("memory_link.b→a", buffer);
22
23    let a = MemoryLink { tx: tx_a, rx: rx_a };
24    let b = MemoryLink { tx: tx_b, rx: rx_b };
25
26    (a, b)
27}
28
29impl Link for MemoryLink {
30    type Tx = MemoryLinkTx;
31    type Rx = MemoryLinkRx;
32
33    fn split(self) -> (Self::Tx, Self::Rx) {
34        (MemoryLinkTx { tx: self.tx }, MemoryLinkRx { rx: self.rx })
35    }
36}
37
38// ---------------------------------------------------------------------------
39// Tx
40// ---------------------------------------------------------------------------
41
42/// Sending half of a [`MemoryLink`].
43#[derive(Clone)]
44pub struct MemoryLinkTx {
45    tx: mpsc::Sender<Vec<u8>>,
46}
47
48pub struct MemoryLinkTxPermit {
49    permit: mpsc::OwnedPermit<Vec<u8>>,
50}
51
52impl LinkTx for MemoryLinkTx {
53    type Permit = MemoryLinkTxPermit;
54
55    async fn reserve(&self) -> std::io::Result<Self::Permit> {
56        let permit = self.tx.clone().reserve_owned().await.map_err(|_| {
57            std::io::Error::new(std::io::ErrorKind::ConnectionReset, "receiver dropped")
58        })?;
59        Ok(MemoryLinkTxPermit { permit })
60    }
61
62    async fn close(self) -> std::io::Result<()> {
63        drop(self.tx);
64        Ok(())
65    }
66}
67
68impl LinkTxPermit for MemoryLinkTxPermit {
69    type Slot = MemoryWriteSlot;
70
71    fn alloc(self, len: usize) -> std::io::Result<Self::Slot> {
72        Ok(MemoryWriteSlot {
73            buf: vec![0u8; len],
74            permit: self.permit,
75        })
76    }
77}
78
79/// Write slot for [`MemoryLinkTx`].
80///
81/// Holds a `Vec<u8>` buffer and a channel permit. Writing fills the buffer;
82/// commit sends it through the channel.
83pub struct MemoryWriteSlot {
84    buf: Vec<u8>,
85    permit: mpsc::OwnedPermit<Vec<u8>>,
86}
87
88impl WriteSlot for MemoryWriteSlot {
89    fn as_mut_slice(&mut self) -> &mut [u8] {
90        &mut self.buf
91    }
92
93    fn commit(self) {
94        drop(self.permit.send(self.buf));
95    }
96}
97
98// ---------------------------------------------------------------------------
99// Rx
100// ---------------------------------------------------------------------------
101
102/// Receiving half of a [`MemoryLink`].
103pub struct MemoryLinkRx {
104    rx: mpsc::Receiver<Vec<u8>>,
105}
106
107/// MemoryLink never fails on recv — the only "error" is channel closed (returns None).
108#[derive(Debug)]
109pub struct MemoryLinkRxError;
110
111impl std::fmt::Display for MemoryLinkRxError {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "memory link rx error (unreachable)")
114    }
115}
116
117impl std::error::Error for MemoryLinkRxError {}
118
119impl LinkRx for MemoryLinkRx {
120    type Error = MemoryLinkRxError;
121
122    async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
123        match self.rx.recv().await {
124            Some(bytes) => Ok(Some(Backing::Boxed(bytes.into_boxed_slice()))),
125            None => Ok(None),
126        }
127    }
128}