Skip to main content

vox_core/
testing.rs

1//! Test utilities for vox-core. Publicly exported for use by integration
2//! tests in downstream crates.
3
4use moire::sync::mpsc;
5use vox_types::{Backing, Link, LinkRx, LinkTx, LinkTxPermit, WriteSlot};
6
7pub struct BreakableLink {
8    tx: mpsc::Sender<Option<Vec<u8>>>,
9    rx: mpsc::Receiver<Option<Vec<u8>>>,
10}
11
12#[derive(Clone)]
13pub struct BreakHandle {
14    tx: mpsc::Sender<Option<Vec<u8>>>,
15}
16
17pub fn breakable_link_pair(
18    buffer: usize,
19) -> (BreakableLink, BreakHandle, BreakableLink, BreakHandle) {
20    let (tx_a, rx_b) = mpsc::channel("breakable_link.a→b", buffer);
21    let (tx_b, rx_a) = mpsc::channel("breakable_link.b→a", buffer);
22
23    let a_handle = BreakHandle { tx: tx_b.clone() };
24    let b_handle = BreakHandle { tx: tx_a.clone() };
25
26    (
27        BreakableLink { tx: tx_a, rx: rx_a },
28        a_handle,
29        BreakableLink { tx: tx_b, rx: rx_b },
30        b_handle,
31    )
32}
33
34impl BreakHandle {
35    pub async fn close(&self) {
36        let _ = self.tx.send(None).await;
37    }
38}
39
40impl Link for BreakableLink {
41    type Tx = BreakableLinkTx;
42    type Rx = BreakableLinkRx;
43
44    fn split(self) -> (Self::Tx, Self::Rx) {
45        (
46            BreakableLinkTx { tx: self.tx },
47            BreakableLinkRx { rx: self.rx },
48        )
49    }
50}
51
52#[derive(Clone)]
53pub struct BreakableLinkTx {
54    tx: mpsc::Sender<Option<Vec<u8>>>,
55}
56
57pub struct BreakableLinkTxPermit {
58    permit: mpsc::OwnedPermit<Option<Vec<u8>>>,
59}
60
61impl LinkTx for BreakableLinkTx {
62    type Permit = BreakableLinkTxPermit;
63
64    async fn reserve(&self) -> std::io::Result<Self::Permit> {
65        let permit = self.tx.clone().reserve_owned().await.map_err(|_| {
66            std::io::Error::new(std::io::ErrorKind::ConnectionReset, "receiver dropped")
67        })?;
68        Ok(BreakableLinkTxPermit { permit })
69    }
70
71    async fn close(self) -> std::io::Result<()> {
72        drop(self.tx);
73        Ok(())
74    }
75}
76
77pub struct BreakableWriteSlot {
78    buf: Vec<u8>,
79    permit: mpsc::OwnedPermit<Option<Vec<u8>>>,
80}
81
82impl LinkTxPermit for BreakableLinkTxPermit {
83    type Slot = BreakableWriteSlot;
84
85    fn alloc(self, len: usize) -> std::io::Result<Self::Slot> {
86        Ok(BreakableWriteSlot {
87            buf: vec![0u8; len],
88            permit: self.permit,
89        })
90    }
91}
92
93impl WriteSlot for BreakableWriteSlot {
94    fn as_mut_slice(&mut self) -> &mut [u8] {
95        &mut self.buf
96    }
97
98    fn commit(self) {
99        drop(self.permit.send(Some(self.buf)));
100    }
101}
102
103pub struct BreakableLinkRx {
104    rx: mpsc::Receiver<Option<Vec<u8>>>,
105}
106
107#[derive(Debug)]
108pub struct BreakableLinkRxError;
109
110impl std::fmt::Display for BreakableLinkRxError {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        write!(f, "breakable link rx error")
113    }
114}
115
116impl std::error::Error for BreakableLinkRxError {}
117
118impl LinkRx for BreakableLinkRx {
119    type Error = BreakableLinkRxError;
120
121    async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
122        match self.rx.recv().await {
123            Some(Some(bytes)) => Ok(Some(Backing::Boxed(bytes.into_boxed_slice()))),
124            Some(None) | None => Ok(None),
125        }
126    }
127}