1use moire::sync::mpsc;
2use vox_types::{Backing, Link, LinkRx, LinkTx};
3
4#[cfg(unix)]
10type MemItem = (Vec<u8>, Vec<std::os::fd::OwnedFd>);
11#[cfg(not(unix))]
12type MemItem = Vec<u8>;
13
14pub struct MemoryLink {
23 tx: mpsc::Sender<MemItem>,
24 rx: mpsc::Receiver<MemItem>,
25}
26
27pub fn memory_link_pair(buffer: usize) -> (MemoryLink, MemoryLink) {
31 let (tx_a, rx_b) = mpsc::channel("memory_link.a→b", buffer);
32 let (tx_b, rx_a) = mpsc::channel("memory_link.b→a", buffer);
33
34 let a = MemoryLink { tx: tx_a, rx: rx_a };
35 let b = MemoryLink { tx: tx_b, rx: rx_b };
36
37 (a, b)
38}
39
40impl Link for MemoryLink {
41 type Tx = MemoryLinkTx;
42 type Rx = MemoryLinkRx;
43
44 fn split(self) -> (Self::Tx, Self::Rx) {
45 (
46 MemoryLinkTx { tx: self.tx },
47 MemoryLinkRx {
48 rx: self.rx,
49 #[cfg(unix)]
50 last_fds: Vec::new(),
51 },
52 )
53 }
54}
55
56#[derive(Clone)]
62pub struct MemoryLinkTx {
63 tx: mpsc::Sender<MemItem>,
64}
65
66impl MemoryLinkTx {
67 async fn send_item(&self, item: MemItem) -> std::io::Result<()> {
68 let permit = self.tx.clone().reserve_owned().await.map_err(|_| {
69 std::io::Error::new(std::io::ErrorKind::ConnectionReset, "receiver dropped")
70 })?;
71 drop(permit.send(item));
72 Ok(())
73 }
74}
75
76impl LinkTx for MemoryLinkTx {
77 async fn send(&self, bytes: Vec<u8>) -> std::io::Result<()> {
78 #[cfg(unix)]
79 {
80 self.send_item((bytes, Vec::new())).await
81 }
82 #[cfg(not(unix))]
83 {
84 self.send_item(bytes).await
85 }
86 }
87
88 async fn close(self) -> std::io::Result<()> {
89 drop(self.tx);
90 Ok(())
91 }
92
93 #[cfg(unix)]
94 fn supports_fd_passing(&self) -> bool {
95 true
96 }
97
98 #[cfg(unix)]
99 async fn send_with_fds(
100 &self,
101 bytes: Vec<u8>,
102 fds: Vec<std::os::fd::OwnedFd>,
103 ) -> std::io::Result<()> {
104 self.send_item((bytes, fds)).await
105 }
106}
107
108pub struct MemoryLinkRx {
114 rx: mpsc::Receiver<MemItem>,
115 #[cfg(unix)]
116 last_fds: Vec<std::os::fd::OwnedFd>,
117}
118
119#[derive(Debug)]
121pub struct MemoryLinkRxError;
122
123impl std::fmt::Display for MemoryLinkRxError {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 write!(f, "memory link rx error (unreachable)")
126 }
127}
128
129impl std::error::Error for MemoryLinkRxError {}
130
131impl LinkRx for MemoryLinkRx {
132 type Error = MemoryLinkRxError;
133
134 async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
135 match self.rx.recv().await {
136 #[cfg(unix)]
137 Some((bytes, fds)) => {
138 self.last_fds = fds;
139 Ok(Some(Backing::Boxed(bytes.into_boxed_slice())))
140 }
141 #[cfg(not(unix))]
142 Some(bytes) => Ok(Some(Backing::Boxed(bytes.into_boxed_slice()))),
143 None => Ok(None),
144 }
145 }
146
147 #[cfg(unix)]
148 fn take_frame_fds(&mut self) -> Vec<std::os::fd::OwnedFd> {
149 std::mem::take(&mut self.last_fds)
150 }
151}