1use moire::sync::mpsc;
5use vox_types::{Backing, Link, LinkRx, LinkTx, LinkTxPermit, WriteSlot};
6
7pub struct BreakableLink {
8 tx: mpsc::Sender<Option<Vec<u8>>>,
9 rx: mpsc::Receiver<Option<Vec<u8>>>,
10}
11
12#[derive(Clone)]
13pub struct BreakHandle {
14 tx: mpsc::Sender<Option<Vec<u8>>>,
15}
16
17pub fn breakable_link_pair(
18 buffer: usize,
19) -> (BreakableLink, BreakHandle, BreakableLink, BreakHandle) {
20 let (tx_a, rx_b) = mpsc::channel("breakable_link.a→b", buffer);
21 let (tx_b, rx_a) = mpsc::channel("breakable_link.b→a", buffer);
22
23 let a_handle = BreakHandle { tx: tx_b.clone() };
24 let b_handle = BreakHandle { tx: tx_a.clone() };
25
26 (
27 BreakableLink { tx: tx_a, rx: rx_a },
28 a_handle,
29 BreakableLink { tx: tx_b, rx: rx_b },
30 b_handle,
31 )
32}
33
34impl BreakHandle {
35 pub async fn close(&self) {
36 let _ = self.tx.send(None).await;
37 }
38}
39
40impl Link for BreakableLink {
41 type Tx = BreakableLinkTx;
42 type Rx = BreakableLinkRx;
43
44 fn split(self) -> (Self::Tx, Self::Rx) {
45 (
46 BreakableLinkTx { tx: self.tx },
47 BreakableLinkRx { rx: self.rx },
48 )
49 }
50}
51
52#[derive(Clone)]
53pub struct BreakableLinkTx {
54 tx: mpsc::Sender<Option<Vec<u8>>>,
55}
56
57pub struct BreakableLinkTxPermit {
58 permit: mpsc::OwnedPermit<Option<Vec<u8>>>,
59}
60
61impl LinkTx for BreakableLinkTx {
62 type Permit = BreakableLinkTxPermit;
63
64 async fn reserve(&self) -> std::io::Result<Self::Permit> {
65 let permit = self.tx.clone().reserve_owned().await.map_err(|_| {
66 std::io::Error::new(std::io::ErrorKind::ConnectionReset, "receiver dropped")
67 })?;
68 Ok(BreakableLinkTxPermit { permit })
69 }
70
71 async fn close(self) -> std::io::Result<()> {
72 drop(self.tx);
73 Ok(())
74 }
75}
76
77pub struct BreakableWriteSlot {
78 buf: Vec<u8>,
79 permit: mpsc::OwnedPermit<Option<Vec<u8>>>,
80}
81
82impl LinkTxPermit for BreakableLinkTxPermit {
83 type Slot = BreakableWriteSlot;
84
85 fn alloc(self, len: usize) -> std::io::Result<Self::Slot> {
86 Ok(BreakableWriteSlot {
87 buf: vec![0u8; len],
88 permit: self.permit,
89 })
90 }
91}
92
93impl WriteSlot for BreakableWriteSlot {
94 fn as_mut_slice(&mut self) -> &mut [u8] {
95 &mut self.buf
96 }
97
98 fn commit(self) {
99 drop(self.permit.send(Some(self.buf)));
100 }
101}
102
103pub struct BreakableLinkRx {
104 rx: mpsc::Receiver<Option<Vec<u8>>>,
105}
106
107#[derive(Debug)]
108pub struct BreakableLinkRxError;
109
110impl std::fmt::Display for BreakableLinkRxError {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 write!(f, "breakable link rx error")
113 }
114}
115
116impl std::error::Error for BreakableLinkRxError {}
117
118impl LinkRx for BreakableLinkRx {
119 type Error = BreakableLinkRxError;
120
121 async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
122 match self.rx.recv().await {
123 Some(Some(bytes)) => Ok(Some(Backing::Boxed(bytes.into_boxed_slice()))),
124 Some(None) | None => Ok(None),
125 }
126 }
127}