commonware_runtime/mocks.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
//! A mock implementation of a channel that implements the Sink and Stream traits.
use crate::{Error, Sink as SinkTrait, Stream as StreamTrait};
use bytes::Bytes;
use futures::channel::oneshot;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
/// A mock channel struct that is used internally by Sink and Stream.
pub struct Channel {
/// Stores the bytes sent by the sink that are not yet read by the stream.
buffer: VecDeque<u8>,
/// If the stream is waiting to read bytes, the waiter stores the number of
/// bytes that the stream is waiting for, as well as the oneshot sender that
/// the sink uses to send the bytes to the stream directly.
waiter: Option<(usize, oneshot::Sender<Bytes>)>,
}
impl Channel {
/// Returns an async-safe Sink/Stream pair that share an underlying buffer of bytes.
pub fn init() -> (Sink, Stream) {
let channel = Arc::new(Mutex::new(Channel {
buffer: VecDeque::new(),
waiter: None,
}));
(
Sink {
channel: channel.clone(),
},
Stream { channel },
)
}
}
/// A mock sink that implements the Sink trait.
pub struct Sink {
channel: Arc<Mutex<Channel>>,
}
impl SinkTrait for Sink {
async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
let (os_send, data) = {
let mut channel = self.channel.lock().unwrap();
channel.buffer.extend(msg);
// If there is a waiter and the buffer is large enough,
// return the waiter (while clearing the waiter field).
// Otherwise, return early.
if channel
.waiter
.as_ref()
.is_some_and(|(requested, _)| *requested <= channel.buffer.len())
{
let (requested, os_send) = channel.waiter.take().unwrap();
let data: Vec<u8> = channel.buffer.drain(0..requested).collect();
(os_send, Bytes::from(data))
} else {
return Ok(());
}
};
// Resolve the waiter.
os_send.send(data).map_err(|_| Error::SendFailed)?;
Ok(())
}
}
/// A mock stream that implements the Stream trait.
pub struct Stream {
channel: Arc<Mutex<Channel>>,
}
impl StreamTrait for Stream {
async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
let os_recv = {
let mut channel = self.channel.lock().unwrap();
// If the message is fully available in the buffer,
// drain the value into buf and return.
if channel.buffer.len() >= buf.len() {
let b: Vec<u8> = channel.buffer.drain(0..buf.len()).collect();
buf.copy_from_slice(&b);
return Ok(());
}
// Otherwise, populate the waiter.
assert!(channel.waiter.is_none());
let (os_send, os_recv) = oneshot::channel();
channel.waiter = Some((buf.len(), os_send));
os_recv
};
// Wait for the waiter to be resolved.
let data = os_recv.await.map_err(|_| Error::RecvFailed)?;
buf.copy_from_slice(&data);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{deterministic::Executor, Clock, Runner};
use commonware_macros::select;
use futures::{executor::block_on, join};
use std::{thread::sleep, time::Duration};
#[test]
fn test_send_recv() {
let (mut sink, mut stream) = Channel::init();
let data = b"hello world";
let mut buf = vec![0; data.len()];
block_on(async {
sink.send(data).await.unwrap();
stream.recv(&mut buf).await.unwrap();
});
assert_eq!(buf, data);
}
#[test]
fn test_send_recv_partial_multiple() {
let (mut sink, mut stream) = Channel::init();
let data1 = b"hello";
let data2 = b"world";
let mut buf1 = vec![0; data1.len()];
let mut buf2 = vec![0; data2.len()];
block_on(async {
sink.send(data1).await.unwrap();
sink.send(data2).await.unwrap();
stream.recv(&mut buf1[0..3]).await.unwrap();
stream.recv(&mut buf1[3..]).await.unwrap();
stream.recv(&mut buf2).await.unwrap();
});
assert_eq!(buf1, data1);
assert_eq!(buf2, data2);
}
#[test]
fn test_send_recv_async() {
let (mut sink, mut stream) = Channel::init();
let data = b"hello world";
let mut buf = vec![0; data.len()];
block_on(async {
futures::try_join!(stream.recv(&mut buf), async {
sleep(Duration::from_millis(10_000));
sink.send(data).await
},)
.unwrap();
});
assert_eq!(buf, data);
}
#[test]
fn test_recv_error() {
let (sink, mut stream) = Channel::init();
let (executor, _, _) = Executor::default();
// If the oneshot sender is dropped before the oneshot receiver is resolved,
// the recv function should return an error.
executor.start(async move {
let mut buf = vec![0; 5];
let (v, _) = join!(stream.recv(&mut buf), async {
// Take the waiter and drop it.
sink.channel.lock().unwrap().waiter.take();
},);
assert_eq!(v, Err(Error::RecvFailed));
});
}
#[test]
fn test_send_error() {
let (mut sink, mut stream) = Channel::init();
let (executor, runtime, _) = Executor::default();
// If the waiter value has a min, but the oneshot receiver is dropped,
// the send function should return an error when attempting to send the data.
executor.start(async move {
let mut buf = vec![0; 5];
// Create a waiter using a recv call.
// But then drop the receiver.
select! {
v = stream.recv(&mut buf) => {
panic!("unexpected value: {:?}", v);
},
_ = runtime.sleep(Duration::from_millis(100)) => {
"timeout"
},
};
drop(stream);
// Try to send a message (longer than the requested amount), but the receiver is dropped.
let result = sink.send(b"hello world").await;
assert_eq!(result, Err(Error::SendFailed));
});
}
#[test]
fn test_recv_timeout() {
let (_sink, mut stream) = Channel::init();
let (executor, runtime, _) = Executor::default();
// If there is no data to read, test that the recv function just blocks. A timeout should return first.
executor.start(async move {
let mut buf = vec![0; 5];
select! {
v = stream.recv(&mut buf) => {
panic!("unexpected value: {:?}", v);
},
_ = runtime.sleep(Duration::from_millis(100)) => {
"timeout"
},
};
});
}
}