1use std::collections::VecDeque;
5
6use moire::sync::mpsc;
7use vox_types::{Backing, Link, LinkRx, LinkTx};
8
9use crate::{Attachment, LinkSource};
10
11pub struct BreakableLink {
12 tx: mpsc::Sender<Option<Vec<u8>>>,
13 rx: mpsc::Receiver<Option<Vec<u8>>>,
14}
15
16#[derive(Clone)]
17pub struct BreakHandle {
18 tx: mpsc::Sender<Option<Vec<u8>>>,
19}
20
21pub fn breakable_link_pair(
22 buffer: usize,
23) -> (BreakableLink, BreakHandle, BreakableLink, BreakHandle) {
24 let (tx_a, rx_b) = mpsc::channel("breakable_link.a→b", buffer);
25 let (tx_b, rx_a) = mpsc::channel("breakable_link.b→a", buffer);
26
27 let a_handle = BreakHandle { tx: tx_b.clone() };
28 let b_handle = BreakHandle { tx: tx_a.clone() };
29
30 (
31 BreakableLink { tx: tx_a, rx: rx_a },
32 a_handle,
33 BreakableLink { tx: tx_b, rx: rx_b },
34 b_handle,
35 )
36}
37
38impl BreakHandle {
39 pub async fn close(&self) {
40 let _ = self.tx.send(None).await;
41 }
42}
43
44impl Link for BreakableLink {
45 type Tx = BreakableLinkTx;
46 type Rx = BreakableLinkRx;
47
48 fn split(self) -> (Self::Tx, Self::Rx) {
49 (
50 BreakableLinkTx { tx: self.tx },
51 BreakableLinkRx { rx: self.rx },
52 )
53 }
54}
55
56#[derive(Clone)]
57pub struct BreakableLinkTx {
58 tx: mpsc::Sender<Option<Vec<u8>>>,
59}
60
61impl LinkTx for BreakableLinkTx {
62 async fn send(&self, bytes: Vec<u8>) -> std::io::Result<()> {
63 let permit = self.tx.clone().reserve_owned().await.map_err(|_| {
64 std::io::Error::new(std::io::ErrorKind::ConnectionReset, "receiver dropped")
65 })?;
66 drop(permit.send(Some(bytes)));
67 Ok(())
68 }
69
70 async fn close(self) -> std::io::Result<()> {
71 drop(self.tx);
72 Ok(())
73 }
74}
75
76pub struct BreakableLinkRx {
77 rx: mpsc::Receiver<Option<Vec<u8>>>,
78}
79
80#[derive(Debug)]
81pub struct BreakableLinkRxError;
82
83impl std::fmt::Display for BreakableLinkRxError {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 write!(f, "breakable link rx error")
86 }
87}
88
89impl std::error::Error for BreakableLinkRxError {}
90
91impl LinkRx for BreakableLinkRx {
92 type Error = BreakableLinkRxError;
93
94 async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
95 match self.rx.recv().await {
96 Some(Some(bytes)) => Ok(Some(Backing::Boxed(bytes.into_boxed_slice()))),
97 Some(None) | None => Ok(None),
98 }
99 }
100}
101
102pub struct TestLinkSource {
107 attachments: VecDeque<Attachment<BreakableLink>>,
108}
109
110impl TestLinkSource {
111 pub fn new(attachments: impl IntoIterator<Item = Attachment<BreakableLink>>) -> Self {
112 Self {
113 attachments: attachments.into_iter().collect(),
114 }
115 }
116}
117
118impl LinkSource for TestLinkSource {
119 type Link = BreakableLink;
120
121 async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
122 self.attachments.pop_front().ok_or_else(|| {
123 std::io::Error::new(
124 std::io::ErrorKind::UnexpectedEof,
125 "test link source exhausted",
126 )
127 })
128 }
129}