commonware_runtime/
mocks.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
//! A mock implementation of a channel that implements the Sink and Stream traits.

use crate::{Error, Sink as SinkTrait, Stream as StreamTrait};
use bytes::Bytes;
use futures::channel::oneshot;
use std::{
    collections::VecDeque,
    sync::{Arc, Mutex},
};

/// A mock channel struct that is used internally by Sink and Stream.
pub struct Channel {
    /// Stores the bytes sent by the sink that are not yet read by the stream.
    buffer: VecDeque<u8>,

    /// If the stream is waiting to read bytes, the waiter stores the number of
    /// bytes that the stream is waiting for, as well as the oneshot sender that
    /// the sink uses to send the bytes to the stream directly.
    waiter: Option<(usize, oneshot::Sender<Bytes>)>,
}

impl Channel {
    /// Returns an async-safe Sink/Stream pair that share an underlying buffer of bytes.
    pub fn init() -> (Sink, Stream) {
        let channel = Arc::new(Mutex::new(Channel {
            buffer: VecDeque::new(),
            waiter: None,
        }));
        (
            Sink {
                channel: channel.clone(),
            },
            Stream { channel },
        )
    }
}

/// A mock sink that implements the Sink trait.
pub struct Sink {
    channel: Arc<Mutex<Channel>>,
}

impl SinkTrait for Sink {
    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
        let (os_send, data) = {
            let mut channel = self.channel.lock().unwrap();
            channel.buffer.extend(msg);

            // If there is a waiter and the buffer is large enough,
            // return the waiter (while clearing the waiter field).
            // Otherwise, return early.
            if channel
                .waiter
                .as_ref()
                .is_some_and(|(requested, _)| *requested <= channel.buffer.len())
            {
                let (requested, os_send) = channel.waiter.take().unwrap();
                let data: Vec<u8> = channel.buffer.drain(0..requested).collect();
                (os_send, Bytes::from(data))
            } else {
                return Ok(());
            }
        };

        // Resolve the waiter.
        os_send.send(data).map_err(|_| Error::SendFailed)?;
        Ok(())
    }
}

/// A mock stream that implements the Stream trait.
pub struct Stream {
    channel: Arc<Mutex<Channel>>,
}

impl StreamTrait for Stream {
    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
        let os_recv = {
            let mut channel = self.channel.lock().unwrap();

            // If the message is fully available in the buffer,
            // drain the value into buf and return.
            if channel.buffer.len() >= buf.len() {
                let b: Vec<u8> = channel.buffer.drain(0..buf.len()).collect();
                buf.copy_from_slice(&b);
                return Ok(());
            }

            // Otherwise, populate the waiter.
            assert!(channel.waiter.is_none());
            let (os_send, os_recv) = oneshot::channel();
            channel.waiter = Some((buf.len(), os_send));
            os_recv
        };

        // Wait for the waiter to be resolved.
        let data = os_recv.await.map_err(|_| Error::RecvFailed)?;
        buf.copy_from_slice(&data);
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{deterministic::Executor, Clock, Runner};
    use commonware_macros::select;
    use futures::{executor::block_on, join};
    use std::{thread::sleep, time::Duration};

    #[test]
    fn test_send_recv() {
        let (mut sink, mut stream) = Channel::init();

        let data = b"hello world";
        let mut buf = vec![0; data.len()];

        block_on(async {
            sink.send(data).await.unwrap();
            stream.recv(&mut buf).await.unwrap();
        });

        assert_eq!(buf, data);
    }

    #[test]
    fn test_send_recv_partial_multiple() {
        let (mut sink, mut stream) = Channel::init();

        let data1 = b"hello";
        let data2 = b"world";
        let mut buf1 = vec![0; data1.len()];
        let mut buf2 = vec![0; data2.len()];

        block_on(async {
            sink.send(data1).await.unwrap();
            sink.send(data2).await.unwrap();
            stream.recv(&mut buf1[0..3]).await.unwrap();
            stream.recv(&mut buf1[3..]).await.unwrap();
            stream.recv(&mut buf2).await.unwrap();
        });

        assert_eq!(buf1, data1);
        assert_eq!(buf2, data2);
    }

    #[test]
    fn test_send_recv_async() {
        let (mut sink, mut stream) = Channel::init();

        let data = b"hello world";
        let mut buf = vec![0; data.len()];

        block_on(async {
            futures::try_join!(stream.recv(&mut buf), async {
                sleep(Duration::from_millis(10_000));
                sink.send(data).await
            },)
            .unwrap();
        });

        assert_eq!(buf, data);
    }

    #[test]
    fn test_recv_error() {
        let (sink, mut stream) = Channel::init();
        let (executor, _, _) = Executor::default();

        // If the oneshot sender is dropped before the oneshot receiver is resolved,
        // the recv function should return an error.
        executor.start(async move {
            let mut buf = vec![0; 5];
            let (v, _) = join!(stream.recv(&mut buf), async {
                // Take the waiter and drop it.
                sink.channel.lock().unwrap().waiter.take();
            },);
            assert_eq!(v, Err(Error::RecvFailed));
        });
    }

    #[test]
    fn test_send_error() {
        let (mut sink, mut stream) = Channel::init();
        let (executor, runtime, _) = Executor::default();

        // If the waiter value has a min, but the oneshot receiver is dropped,
        // the send function should return an error when attempting to send the data.
        executor.start(async move {
            let mut buf = vec![0; 5];

            // Create a waiter using a recv call.
            // But then drop the receiver.
            select! {
                v = stream.recv(&mut buf) => {
                    panic!("unexpected value: {:?}", v);
                },
                _ = runtime.sleep(Duration::from_millis(100)) => {
                    "timeout"
                },
            };
            drop(stream);

            // Try to send a message (longer than the requested amount), but the receiver is dropped.
            let result = sink.send(b"hello world").await;
            assert_eq!(result, Err(Error::SendFailed));
        });
    }

    #[test]
    fn test_recv_timeout() {
        let (_sink, mut stream) = Channel::init();
        let (executor, runtime, _) = Executor::default();

        // If there is no data to read, test that the recv function just blocks. A timeout should return first.
        executor.start(async move {
            let mut buf = vec![0; 5];
            select! {
                v = stream.recv(&mut buf) => {
                    panic!("unexpected value: {:?}", v);
                },
                _ = runtime.sleep(Duration::from_millis(100)) => {
                    "timeout"
                },
            };
        });
    }
}