1use moire::sync::mpsc;
2use vox_types::{Backing, Link, LinkRx, LinkTx};
3
4pub struct MemoryLink {
12 tx: mpsc::Sender<Vec<u8>>,
13 rx: mpsc::Receiver<Vec<u8>>,
14}
15
16pub fn memory_link_pair(buffer: usize) -> (MemoryLink, MemoryLink) {
20 let (tx_a, rx_b) = mpsc::channel("memory_link.a→b", buffer);
21 let (tx_b, rx_a) = mpsc::channel("memory_link.b→a", buffer);
22
23 let a = MemoryLink { tx: tx_a, rx: rx_a };
24 let b = MemoryLink { tx: tx_b, rx: rx_b };
25
26 (a, b)
27}
28
29impl Link for MemoryLink {
30 type Tx = MemoryLinkTx;
31 type Rx = MemoryLinkRx;
32
33 fn split(self) -> (Self::Tx, Self::Rx) {
34 (MemoryLinkTx { tx: self.tx }, MemoryLinkRx { rx: self.rx })
35 }
36}
37
38#[derive(Clone)]
44pub struct MemoryLinkTx {
45 tx: mpsc::Sender<Vec<u8>>,
46}
47
48impl LinkTx for MemoryLinkTx {
49 async fn send(&self, bytes: Vec<u8>) -> std::io::Result<()> {
50 let permit = self.tx.clone().reserve_owned().await.map_err(|_| {
51 std::io::Error::new(std::io::ErrorKind::ConnectionReset, "receiver dropped")
52 })?;
53 drop(permit.send(bytes));
54 Ok(())
55 }
56
57 async fn close(self) -> std::io::Result<()> {
58 drop(self.tx);
59 Ok(())
60 }
61}
62
63pub struct MemoryLinkRx {
69 rx: mpsc::Receiver<Vec<u8>>,
70}
71
72#[derive(Debug)]
74pub struct MemoryLinkRxError;
75
76impl std::fmt::Display for MemoryLinkRxError {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 write!(f, "memory link rx error (unreachable)")
79 }
80}
81
82impl std::error::Error for MemoryLinkRxError {}
83
84impl LinkRx for MemoryLinkRx {
85 type Error = MemoryLinkRxError;
86
87 async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
88 match self.rx.recv().await {
89 Some(bytes) => Ok(Some(Backing::Boxed(bytes.into_boxed_slice()))),
90 None => Ok(None),
91 }
92 }
93}