commonware_runtime/
mocks.rs

1//! A mock implementation of a channel that implements the Sink and Stream traits.
2
3use crate::{Error, Sink as SinkTrait, Stream as StreamTrait};
4use bytes::Bytes;
5use futures::channel::oneshot;
6use std::{
7    collections::VecDeque,
8    sync::{Arc, Mutex},
9};
10
11/// A mock channel struct that is used internally by Sink and Stream.
12pub struct Channel {
13    /// Stores the bytes sent by the sink that are not yet read by the stream.
14    buffer: VecDeque<u8>,
15
16    /// If the stream is waiting to read bytes, the waiter stores the number of
17    /// bytes that the stream is waiting for, as well as the oneshot sender that
18    /// the sink uses to send the bytes to the stream directly.
19    waiter: Option<(usize, oneshot::Sender<Bytes>)>,
20}
21
22impl Channel {
23    /// Returns an async-safe Sink/Stream pair that share an underlying buffer of bytes.
24    pub fn init() -> (Sink, Stream) {
25        let channel = Arc::new(Mutex::new(Channel {
26            buffer: VecDeque::new(),
27            waiter: None,
28        }));
29        (
30            Sink {
31                channel: channel.clone(),
32            },
33            Stream { channel },
34        )
35    }
36}
37
38/// A mock sink that implements the Sink trait.
39pub struct Sink {
40    channel: Arc<Mutex<Channel>>,
41}
42
43impl SinkTrait for Sink {
44    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
45        let (os_send, data) = {
46            let mut channel = self.channel.lock().unwrap();
47            channel.buffer.extend(msg);
48
49            // If there is a waiter and the buffer is large enough,
50            // return the waiter (while clearing the waiter field).
51            // Otherwise, return early.
52            if channel
53                .waiter
54                .as_ref()
55                .is_some_and(|(requested, _)| *requested <= channel.buffer.len())
56            {
57                let (requested, os_send) = channel.waiter.take().unwrap();
58                let data: Vec<u8> = channel.buffer.drain(0..requested).collect();
59                (os_send, Bytes::from(data))
60            } else {
61                return Ok(());
62            }
63        };
64
65        // Resolve the waiter.
66        os_send.send(data).map_err(|_| Error::SendFailed)?;
67        Ok(())
68    }
69}
70
71/// A mock stream that implements the Stream trait.
72pub struct Stream {
73    channel: Arc<Mutex<Channel>>,
74}
75
76impl StreamTrait for Stream {
77    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
78        let os_recv = {
79            let mut channel = self.channel.lock().unwrap();
80
81            // If the message is fully available in the buffer,
82            // drain the value into buf and return.
83            if channel.buffer.len() >= buf.len() {
84                let b: Vec<u8> = channel.buffer.drain(0..buf.len()).collect();
85                buf.copy_from_slice(&b);
86                return Ok(());
87            }
88
89            // Otherwise, populate the waiter.
90            assert!(channel.waiter.is_none());
91            let (os_send, os_recv) = oneshot::channel();
92            channel.waiter = Some((buf.len(), os_send));
93            os_recv
94        };
95
96        // Wait for the waiter to be resolved.
97        let data = os_recv.await.map_err(|_| Error::RecvFailed)?;
98        buf.copy_from_slice(&data);
99        Ok(())
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::{deterministic::Executor, Clock, Runner};
107    use commonware_macros::select;
108    use futures::{executor::block_on, join};
109    use std::{thread::sleep, time::Duration};
110
111    #[test]
112    fn test_send_recv() {
113        let (mut sink, mut stream) = Channel::init();
114
115        let data = b"hello world";
116        let mut buf = vec![0; data.len()];
117
118        block_on(async {
119            sink.send(data).await.unwrap();
120            stream.recv(&mut buf).await.unwrap();
121        });
122
123        assert_eq!(buf, data);
124    }
125
126    #[test]
127    fn test_send_recv_partial_multiple() {
128        let (mut sink, mut stream) = Channel::init();
129
130        let data1 = b"hello";
131        let data2 = b"world";
132        let mut buf1 = vec![0; data1.len()];
133        let mut buf2 = vec![0; data2.len()];
134
135        block_on(async {
136            sink.send(data1).await.unwrap();
137            sink.send(data2).await.unwrap();
138            stream.recv(&mut buf1[0..3]).await.unwrap();
139            stream.recv(&mut buf1[3..]).await.unwrap();
140            stream.recv(&mut buf2).await.unwrap();
141        });
142
143        assert_eq!(buf1, data1);
144        assert_eq!(buf2, data2);
145    }
146
147    #[test]
148    fn test_send_recv_async() {
149        let (mut sink, mut stream) = Channel::init();
150
151        let data = b"hello world";
152        let mut buf = vec![0; data.len()];
153
154        block_on(async {
155            futures::try_join!(stream.recv(&mut buf), async {
156                sleep(Duration::from_millis(10_000));
157                sink.send(data).await
158            },)
159            .unwrap();
160        });
161
162        assert_eq!(buf, data);
163    }
164
165    #[test]
166    fn test_recv_error() {
167        let (sink, mut stream) = Channel::init();
168        let (executor, _, _) = Executor::default();
169
170        // If the oneshot sender is dropped before the oneshot receiver is resolved,
171        // the recv function should return an error.
172        executor.start(async move {
173            let mut buf = vec![0; 5];
174            let (v, _) = join!(stream.recv(&mut buf), async {
175                // Take the waiter and drop it.
176                sink.channel.lock().unwrap().waiter.take();
177            },);
178            assert_eq!(v, Err(Error::RecvFailed));
179        });
180    }
181
182    #[test]
183    fn test_send_error() {
184        let (mut sink, mut stream) = Channel::init();
185        let (executor, runtime, _) = Executor::default();
186
187        // If the waiter value has a min, but the oneshot receiver is dropped,
188        // the send function should return an error when attempting to send the data.
189        executor.start(async move {
190            let mut buf = vec![0; 5];
191
192            // Create a waiter using a recv call.
193            // But then drop the receiver.
194            select! {
195                v = stream.recv(&mut buf) => {
196                    panic!("unexpected value: {:?}", v);
197                },
198                _ = runtime.sleep(Duration::from_millis(100)) => {
199                    "timeout"
200                },
201            };
202            drop(stream);
203
204            // Try to send a message (longer than the requested amount), but the receiver is dropped.
205            let result = sink.send(b"hello world").await;
206            assert_eq!(result, Err(Error::SendFailed));
207        });
208    }
209
210    #[test]
211    fn test_recv_timeout() {
212        let (_sink, mut stream) = Channel::init();
213        let (executor, runtime, _) = Executor::default();
214
215        // If there is no data to read, test that the recv function just blocks. A timeout should return first.
216        executor.start(async move {
217            let mut buf = vec![0; 5];
218            select! {
219                v = stream.recv(&mut buf) => {
220                    panic!("unexpected value: {:?}", v);
221                },
222                _ = runtime.sleep(Duration::from_millis(100)) => {
223                    "timeout"
224                },
225            };
226        });
227    }
228}