Skip to main content

vox_core/
memory_link.rs

1use moire::sync::mpsc;
2use vox_types::{Backing, Link, LinkRx, LinkTx};
3
4/// One in-process frame: bytes, plus any descriptors moving with it.
5///
6/// In-process fd "passing" is just an ownership move through the same
7/// channel as the bytes — no `SCM_RIGHTS`, and no separate stream that
8/// could desync.
9#[cfg(unix)]
10type MemItem = (Vec<u8>, Vec<std::os::fd::OwnedFd>);
11#[cfg(not(unix))]
12type MemItem = Vec<u8>;
13
14/// In-process [`Link`] backed by tokio mpsc channels.
15///
16/// Each direction is an unbounded channel carrying raw bytes (and, on Unix,
17/// any `Fd`s travelling with them) — no serialization, no IO. Useful for
18/// testing Conduits, Session, and anything above the transport layer
19/// without real networking.
20// r[impl transport.memory]
21// r[impl zerocopy.framing.link.memory]
22pub struct MemoryLink {
23    tx: mpsc::Sender<MemItem>,
24    rx: mpsc::Receiver<MemItem>,
25}
26
27/// Create a pair of connected [`MemoryLink`]s.
28///
29/// Returns `(a, b)` where sending on `a` delivers to `b` and vice versa.
30pub fn memory_link_pair(buffer: usize) -> (MemoryLink, MemoryLink) {
31    let (tx_a, rx_b) = mpsc::channel("memory_link.a→b", buffer);
32    let (tx_b, rx_a) = mpsc::channel("memory_link.b→a", buffer);
33
34    let a = MemoryLink { tx: tx_a, rx: rx_a };
35    let b = MemoryLink { tx: tx_b, rx: rx_b };
36
37    (a, b)
38}
39
40impl Link for MemoryLink {
41    type Tx = MemoryLinkTx;
42    type Rx = MemoryLinkRx;
43
44    fn split(self) -> (Self::Tx, Self::Rx) {
45        (
46            MemoryLinkTx { tx: self.tx },
47            MemoryLinkRx {
48                rx: self.rx,
49                #[cfg(unix)]
50                last_fds: Vec::new(),
51            },
52        )
53    }
54}
55
56// ---------------------------------------------------------------------------
57// Tx
58// ---------------------------------------------------------------------------
59
60/// Sending half of a [`MemoryLink`].
61#[derive(Clone)]
62pub struct MemoryLinkTx {
63    tx: mpsc::Sender<MemItem>,
64}
65
66impl MemoryLinkTx {
67    async fn send_item(&self, item: MemItem) -> std::io::Result<()> {
68        let permit = self.tx.clone().reserve_owned().await.map_err(|_| {
69            std::io::Error::new(std::io::ErrorKind::ConnectionReset, "receiver dropped")
70        })?;
71        drop(permit.send(item));
72        Ok(())
73    }
74}
75
76impl LinkTx for MemoryLinkTx {
77    async fn send(&self, bytes: Vec<u8>) -> std::io::Result<()> {
78        #[cfg(unix)]
79        {
80            self.send_item((bytes, Vec::new())).await
81        }
82        #[cfg(not(unix))]
83        {
84            self.send_item(bytes).await
85        }
86    }
87
88    async fn close(self) -> std::io::Result<()> {
89        drop(self.tx);
90        Ok(())
91    }
92
93    #[cfg(unix)]
94    fn supports_fd_passing(&self) -> bool {
95        true
96    }
97
98    #[cfg(unix)]
99    async fn send_with_fds(
100        &self,
101        bytes: Vec<u8>,
102        fds: Vec<std::os::fd::OwnedFd>,
103    ) -> std::io::Result<()> {
104        self.send_item((bytes, fds)).await
105    }
106}
107
108// ---------------------------------------------------------------------------
109// Rx
110// ---------------------------------------------------------------------------
111
112/// Receiving half of a [`MemoryLink`].
113pub struct MemoryLinkRx {
114    rx: mpsc::Receiver<MemItem>,
115    #[cfg(unix)]
116    last_fds: Vec<std::os::fd::OwnedFd>,
117}
118
119/// MemoryLink never fails on recv — the only "error" is channel closed (returns None).
120#[derive(Debug)]
121pub struct MemoryLinkRxError;
122
123impl std::fmt::Display for MemoryLinkRxError {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        write!(f, "memory link rx error (unreachable)")
126    }
127}
128
129impl std::error::Error for MemoryLinkRxError {}
130
131impl LinkRx for MemoryLinkRx {
132    type Error = MemoryLinkRxError;
133
134    async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
135        match self.rx.recv().await {
136            #[cfg(unix)]
137            Some((bytes, fds)) => {
138                self.last_fds = fds;
139                Ok(Some(Backing::Boxed(bytes.into_boxed_slice())))
140            }
141            #[cfg(not(unix))]
142            Some(bytes) => Ok(Some(Backing::Boxed(bytes.into_boxed_slice()))),
143            None => Ok(None),
144        }
145    }
146
147    #[cfg(unix)]
148    fn take_frame_fds(&mut self) -> Vec<std::os::fd::OwnedFd> {
149        std::mem::take(&mut self.last_fds)
150    }
151}