Skip to main content

vox_core/
testing.rs

1//! Test utilities for vox-core. Publicly exported for use by integration
2//! tests in downstream crates.
3
4use std::collections::VecDeque;
5
6use moire::sync::mpsc;
7use vox_types::{Backing, Link, LinkRx, LinkTx};
8
9use crate::{Attachment, LinkSource};
10
11pub struct BreakableLink {
12    tx: mpsc::Sender<Option<Vec<u8>>>,
13    rx: mpsc::Receiver<Option<Vec<u8>>>,
14}
15
16#[derive(Clone)]
17pub struct BreakHandle {
18    tx: mpsc::Sender<Option<Vec<u8>>>,
19}
20
21pub fn breakable_link_pair(
22    buffer: usize,
23) -> (BreakableLink, BreakHandle, BreakableLink, BreakHandle) {
24    let (tx_a, rx_b) = mpsc::channel("breakable_link.a→b", buffer);
25    let (tx_b, rx_a) = mpsc::channel("breakable_link.b→a", buffer);
26
27    let a_handle = BreakHandle { tx: tx_b.clone() };
28    let b_handle = BreakHandle { tx: tx_a.clone() };
29
30    (
31        BreakableLink { tx: tx_a, rx: rx_a },
32        a_handle,
33        BreakableLink { tx: tx_b, rx: rx_b },
34        b_handle,
35    )
36}
37
38impl BreakHandle {
39    pub async fn close(&self) {
40        let _ = self.tx.send(None).await;
41    }
42}
43
44impl Link for BreakableLink {
45    type Tx = BreakableLinkTx;
46    type Rx = BreakableLinkRx;
47
48    fn split(self) -> (Self::Tx, Self::Rx) {
49        (
50            BreakableLinkTx { tx: self.tx },
51            BreakableLinkRx { rx: self.rx },
52        )
53    }
54}
55
56#[derive(Clone)]
57pub struct BreakableLinkTx {
58    tx: mpsc::Sender<Option<Vec<u8>>>,
59}
60
61impl LinkTx for BreakableLinkTx {
62    async fn send(&self, bytes: Vec<u8>) -> std::io::Result<()> {
63        let permit = self.tx.clone().reserve_owned().await.map_err(|_| {
64            std::io::Error::new(std::io::ErrorKind::ConnectionReset, "receiver dropped")
65        })?;
66        drop(permit.send(Some(bytes)));
67        Ok(())
68    }
69
70    async fn close(self) -> std::io::Result<()> {
71        drop(self.tx);
72        Ok(())
73    }
74}
75
76pub struct BreakableLinkRx {
77    rx: mpsc::Receiver<Option<Vec<u8>>>,
78}
79
80#[derive(Debug)]
81pub struct BreakableLinkRxError;
82
83impl std::fmt::Display for BreakableLinkRxError {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        write!(f, "breakable link rx error")
86    }
87}
88
89impl std::error::Error for BreakableLinkRxError {}
90
91impl LinkRx for BreakableLinkRx {
92    type Error = BreakableLinkRxError;
93
94    async fn recv(&mut self) -> Result<Option<Backing>, Self::Error> {
95        match self.rx.recv().await {
96            Some(Some(bytes)) => Ok(Some(Backing::Boxed(bytes.into_boxed_slice()))),
97            Some(None) | None => Ok(None),
98        }
99    }
100}
101
102/// A [`LinkSource`] backed by a fixed list of pre-made links.
103///
104/// Each call to `next_link()` pops the next attachment. When exhausted,
105/// returns an error (which causes the recoverer to give up).
106pub struct TestLinkSource {
107    attachments: VecDeque<Attachment<BreakableLink>>,
108}
109
110impl TestLinkSource {
111    pub fn new(attachments: impl IntoIterator<Item = Attachment<BreakableLink>>) -> Self {
112        Self {
113            attachments: attachments.into_iter().collect(),
114        }
115    }
116}
117
118impl LinkSource for TestLinkSource {
119    type Link = BreakableLink;
120
121    async fn next_link(&mut self) -> std::io::Result<Attachment<Self::Link>> {
122        self.attachments.pop_front().ok_or_else(|| {
123            std::io::Error::new(
124                std::io::ErrorKind::UnexpectedEof,
125                "test link source exhausted",
126            )
127        })
128    }
129}