commonware_runtime/
mocks.rs1use crate::{Error, Sink as SinkTrait, Stream as StreamTrait};
4use bytes::Bytes;
5use futures::channel::oneshot;
6use std::{
7 collections::VecDeque,
8 sync::{Arc, Mutex},
9};
10
11pub struct Channel {
13 buffer: VecDeque<u8>,
15
16 waiter: Option<(usize, oneshot::Sender<Bytes>)>,
20}
21
22impl Channel {
23 pub fn init() -> (Sink, Stream) {
25 let channel = Arc::new(Mutex::new(Channel {
26 buffer: VecDeque::new(),
27 waiter: None,
28 }));
29 (
30 Sink {
31 channel: channel.clone(),
32 },
33 Stream { channel },
34 )
35 }
36}
37
38pub struct Sink {
40 channel: Arc<Mutex<Channel>>,
41}
42
43impl SinkTrait for Sink {
44 async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
45 let (os_send, data) = {
46 let mut channel = self.channel.lock().unwrap();
47 channel.buffer.extend(msg);
48
49 if channel
53 .waiter
54 .as_ref()
55 .is_some_and(|(requested, _)| *requested <= channel.buffer.len())
56 {
57 let (requested, os_send) = channel.waiter.take().unwrap();
58 let data: Vec<u8> = channel.buffer.drain(0..requested).collect();
59 (os_send, Bytes::from(data))
60 } else {
61 return Ok(());
62 }
63 };
64
65 os_send.send(data).map_err(|_| Error::SendFailed)?;
67 Ok(())
68 }
69}
70
71pub struct Stream {
73 channel: Arc<Mutex<Channel>>,
74}
75
76impl StreamTrait for Stream {
77 async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
78 let os_recv = {
79 let mut channel = self.channel.lock().unwrap();
80
81 if channel.buffer.len() >= buf.len() {
84 let b: Vec<u8> = channel.buffer.drain(0..buf.len()).collect();
85 buf.copy_from_slice(&b);
86 return Ok(());
87 }
88
89 assert!(channel.waiter.is_none());
91 let (os_send, os_recv) = oneshot::channel();
92 channel.waiter = Some((buf.len(), os_send));
93 os_recv
94 };
95
96 let data = os_recv.await.map_err(|_| Error::RecvFailed)?;
98 buf.copy_from_slice(&data);
99 Ok(())
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::{deterministic::Executor, Clock, Runner};
107 use commonware_macros::select;
108 use futures::{executor::block_on, join};
109 use std::{thread::sleep, time::Duration};
110
111 #[test]
112 fn test_send_recv() {
113 let (mut sink, mut stream) = Channel::init();
114
115 let data = b"hello world";
116 let mut buf = vec![0; data.len()];
117
118 block_on(async {
119 sink.send(data).await.unwrap();
120 stream.recv(&mut buf).await.unwrap();
121 });
122
123 assert_eq!(buf, data);
124 }
125
126 #[test]
127 fn test_send_recv_partial_multiple() {
128 let (mut sink, mut stream) = Channel::init();
129
130 let data1 = b"hello";
131 let data2 = b"world";
132 let mut buf1 = vec![0; data1.len()];
133 let mut buf2 = vec![0; data2.len()];
134
135 block_on(async {
136 sink.send(data1).await.unwrap();
137 sink.send(data2).await.unwrap();
138 stream.recv(&mut buf1[0..3]).await.unwrap();
139 stream.recv(&mut buf1[3..]).await.unwrap();
140 stream.recv(&mut buf2).await.unwrap();
141 });
142
143 assert_eq!(buf1, data1);
144 assert_eq!(buf2, data2);
145 }
146
147 #[test]
148 fn test_send_recv_async() {
149 let (mut sink, mut stream) = Channel::init();
150
151 let data = b"hello world";
152 let mut buf = vec![0; data.len()];
153
154 block_on(async {
155 futures::try_join!(stream.recv(&mut buf), async {
156 sleep(Duration::from_millis(10_000));
157 sink.send(data).await
158 },)
159 .unwrap();
160 });
161
162 assert_eq!(buf, data);
163 }
164
165 #[test]
166 fn test_recv_error() {
167 let (sink, mut stream) = Channel::init();
168 let (executor, _, _) = Executor::default();
169
170 executor.start(async move {
173 let mut buf = vec![0; 5];
174 let (v, _) = join!(stream.recv(&mut buf), async {
175 sink.channel.lock().unwrap().waiter.take();
177 },);
178 assert_eq!(v, Err(Error::RecvFailed));
179 });
180 }
181
182 #[test]
183 fn test_send_error() {
184 let (mut sink, mut stream) = Channel::init();
185 let (executor, runtime, _) = Executor::default();
186
187 executor.start(async move {
190 let mut buf = vec![0; 5];
191
192 select! {
195 v = stream.recv(&mut buf) => {
196 panic!("unexpected value: {:?}", v);
197 },
198 _ = runtime.sleep(Duration::from_millis(100)) => {
199 "timeout"
200 },
201 };
202 drop(stream);
203
204 let result = sink.send(b"hello world").await;
206 assert_eq!(result, Err(Error::SendFailed));
207 });
208 }
209
210 #[test]
211 fn test_recv_timeout() {
212 let (_sink, mut stream) = Channel::init();
213 let (executor, runtime, _) = Executor::default();
214
215 executor.start(async move {
217 let mut buf = vec![0; 5];
218 select! {
219 v = stream.recv(&mut buf) => {
220 panic!("unexpected value: {:?}", v);
221 },
222 _ = runtime.sleep(Duration::from_millis(100)) => {
223 "timeout"
224 },
225 };
226 });
227 }
228}