pub mod deterministic;
pub mod mocks;
pub mod tokio;
mod utils;
pub use utils::{reschedule, Handle};
use bytes::Bytes;
use std::{
future::Future,
net::SocketAddr,
time::{Duration, SystemTime},
};
use thiserror::Error;
#[derive(Error, Debug, PartialEq)]
pub enum Error {
#[error("exited")]
Exited,
#[error("closed")]
Closed,
#[error("timeout")]
Timeout,
#[error("bind failed")]
BindFailed,
#[error("connection failed")]
ConnectionFailed,
#[error("write failed")]
WriteFailed,
#[error("read failed")]
ReadFailed,
#[error("partition creation failed: {0}")]
PartitionCreationFailed(String),
#[error("partition missing: {0}")]
PartitionMissing(String),
#[error("partition corrupt: {0}")]
PartitionCorrupt(String),
#[error("blob open failed: {0}/{1}")]
BlobOpenFailed(String, String),
#[error("blob missing: {0}/{1}")]
BlobMissing(String, String),
#[error("blob truncate failed: {0}/{1}")]
BlobTruncateFailed(String, String),
#[error("blob sync failed: {0}/{1}")]
BlobSyncFailed(String, String),
#[error("blob close failed: {0}/{1}")]
BlobCloseFailed(String, String),
}
pub trait Runner {
fn start<F>(self, f: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static;
}
pub trait Spawner: Clone + Send + Sync + 'static {
fn spawn<F, T>(&self, label: &str, f: F) -> Handle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static;
}
pub trait Clock: Clone + Send + Sync + 'static {
fn current(&self) -> SystemTime;
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
}
pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
where
L: Listener<Si, St>,
Si: Sink,
St: Stream,
{
fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
}
pub trait Listener<Si, St>: Sync + Send + 'static
where
Si: Sink,
St: Stream,
{
fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
}
pub trait Sink: Sync + Send + 'static {
fn send(&mut self, msg: Bytes) -> impl Future<Output = Result<(), Error>> + Send;
}
pub trait Stream: Sync + Send + 'static {
fn recv(&mut self) -> impl Future<Output = Result<Bytes, Error>> + Send;
}
pub trait Storage<B>: Clone + Send + Sync + 'static
where
B: Blob,
{
fn open(
&mut self,
partition: &str,
name: &[u8],
) -> impl Future<Output = Result<B, Error>> + Send;
fn remove(
&mut self,
partition: &str,
name: Option<&[u8]>,
) -> impl Future<Output = Result<(), Error>> + Send;
fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
}
#[allow(clippy::len_without_is_empty)]
pub trait Blob: Send + Sync + 'static {
fn len(&self) -> impl Future<Output = Result<usize, Error>> + Send;
fn read_at(
&mut self,
buf: &mut [u8],
offset: usize,
) -> impl Future<Output = Result<usize, Error>> + Send;
fn write_at(
&mut self,
buf: &[u8],
offset: usize,
) -> impl Future<Output = Result<(), Error>> + Send;
fn truncate(&mut self, len: usize) -> impl Future<Output = Result<(), Error>> + Send;
fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
fn close(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_macros::select;
use core::panic;
use futures::future::ready;
use futures::{channel::mpsc, SinkExt, StreamExt};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::Mutex;
use utils::reschedule;
fn test_error_future(runner: impl Runner) {
async fn error_future() -> Result<&'static str, &'static str> {
Err("An error occurred")
}
let result = runner.start(error_future());
assert_eq!(result, Err("An error occurred"));
}
fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
runner.start(async move {
let start = context.current();
let sleep_duration = Duration::from_millis(10);
context.sleep(sleep_duration).await;
let end = context.current();
assert!(end.duration_since(start).unwrap() >= sleep_duration);
});
}
fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
runner.start(async move {
let now = context.current();
context.sleep_until(now + Duration::from_millis(100)).await;
let elapsed = now.elapsed().unwrap();
assert!(elapsed >= Duration::from_millis(100));
});
}
fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
runner.start(async move {
context.spawn("test", async move {
loop {
reschedule().await;
}
});
});
}
fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
runner.start(async move {
let handle = context.spawn("test", async move {
loop {
reschedule().await;
}
});
handle.abort();
assert_eq!(handle.await, Err(Error::Closed));
});
}
fn test_panic_aborts_root(runner: impl Runner) {
let result = catch_unwind(AssertUnwindSafe(|| {
runner.start(async move {
panic!("blah");
});
}));
result.unwrap_err();
}
fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
let result = runner.start(async move {
let result = context.spawn("test", async move {
panic!("blah");
});
assert_eq!(result.await, Err(Error::Exited));
Result::<(), Error>::Ok(())
});
result.unwrap();
}
fn test_select(runner: impl Runner) {
runner.start(async move {
let output = Mutex::new(0);
select! {
v1 = ready(1) => {
*output.lock().unwrap() = v1;
},
v2 = ready(2) => {
*output.lock().unwrap() = v2;
},
};
assert_eq!(*output.lock().unwrap(), 1);
select! {
v1 = std::future::pending::<i32>() => {
*output.lock().unwrap() = v1;
},
v2 = ready(2) => {
*output.lock().unwrap() = v2;
},
};
assert_eq!(*output.lock().unwrap(), 2);
});
}
fn test_select_loop(runner: impl Runner, context: impl Clock) {
runner.start(async move {
let (mut sender, mut receiver) = mpsc::unbounded();
for _ in 0..2 {
select! {
v = receiver.next() => {
panic!("unexpected value: {:?}", v);
},
_ = context.sleep(Duration::from_millis(100)) => {
continue;
},
};
}
sender.send(0).await.unwrap();
sender.send(1).await.unwrap();
select! {
_ = async {} => {
},
v = receiver.next() => {
panic!("unexpected value: {:?}", v);
},
};
for i in 0..2 {
select! {
_ = context.sleep(Duration::from_millis(100)) => {
panic!("timeout");
},
v = receiver.next() => {
assert_eq!(v.unwrap(), i);
},
};
}
});
}
fn test_storage_operations<B>(runner: impl Runner, mut context: impl Spawner + Storage<B>)
where
B: Blob,
{
runner.start(async move {
let partition = "test_partition";
let name = b"test_blob";
let mut blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let data = b"Hello, Storage!";
blob.write_at(data, 0)
.await
.expect("Failed to write to blob");
blob.sync().await.expect("Failed to sync blob");
let mut buffer = vec![0u8; data.len()];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read from blob");
assert_eq!(&buffer, data);
let length = blob.len().await.expect("Failed to get blob length");
assert_eq!(length, data.len());
blob.close().await.expect("Failed to close blob");
let blobs = context
.scan(partition)
.await
.expect("Failed to scan partition");
assert!(blobs.contains(&name.to_vec()));
let mut blob = context
.open(partition, name)
.await
.expect("Failed to reopen blob");
let mut buffer = vec![0u8; 7];
blob.read_at(&mut buffer, 7)
.await
.expect("Failed to read data");
assert_eq!(&buffer, b"Storage");
blob.close().await.expect("Failed to close blob");
context
.remove(partition, Some(name))
.await
.expect("Failed to remove blob");
let blobs = context
.scan(partition)
.await
.expect("Failed to scan partition");
assert!(!blobs.contains(&name.to_vec()));
context
.remove(partition, None)
.await
.expect("Failed to remove partition");
let result = context.scan(partition).await;
assert!(matches!(result, Err(Error::PartitionMissing(_))));
});
}
fn test_blob_read_write<B>(runner: impl Runner, mut context: impl Spawner + Storage<B>)
where
B: Blob,
{
runner.start(async move {
let partition = "test_partition";
let name = b"test_blob_rw";
let mut blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let data1 = b"Hello";
let data2 = b"World";
blob.write_at(data1, 0)
.await
.expect("Failed to write data1");
blob.write_at(data2, 5)
.await
.expect("Failed to write data2");
let mut buffer = vec![0u8; 10];
let read = blob
.read_at(&mut buffer, 0)
.await
.expect("Failed to read data");
assert_eq!(read, 10);
assert_eq!(&buffer[..5], data1);
assert_eq!(&buffer[5..], data2);
let mut buffer = vec![0u8; 10];
let read = blob
.read_at(&mut buffer, 10)
.await
.expect("Failed to read data");
assert_eq!(read, 0);
assert_eq!(&buffer, &[0u8; 10]);
blob.truncate(5).await.expect("Failed to truncate blob");
let length = blob.len().await.expect("Failed to get blob length");
assert_eq!(length, 5);
let mut buffer = vec![0u8; 10];
let read = blob
.read_at(&mut buffer, 0)
.await
.expect("Failed to read data");
assert_eq!(read, 5);
assert_eq!(&buffer[..5], data1);
blob.close().await.expect("Failed to close blob");
});
}
fn test_many_partition_read_write<B>(
runner: impl Runner,
mut context: impl Spawner + Storage<B>,
) where
B: Blob,
{
runner.start(async move {
let partitions = ["partition1", "partition2", "partition3"];
let name = b"test_blob_rw";
for (additional, partition) in partitions.iter().enumerate() {
let mut blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let data1 = b"Hello";
let data2 = b"World";
blob.write_at(data1, 0)
.await
.expect("Failed to write data1");
blob.write_at(data2, 5 + additional)
.await
.expect("Failed to write data2");
blob.close().await.expect("Failed to close blob");
}
for (additional, partition) in partitions.iter().enumerate() {
let mut blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let mut buffer = vec![0u8; 10 + additional];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read data");
assert_eq!(&buffer[..5], b"Hello");
assert_eq!(&buffer[5 + additional..], b"World");
blob.close().await.expect("Failed to close blob");
}
});
}
#[test]
fn test_deterministic_future() {
let (runner, _, _) = deterministic::Executor::default();
test_error_future(runner);
}
#[test]
fn test_deterministic_clock_sleep() {
let (executor, runtime, _) = deterministic::Executor::default();
assert_eq!(runtime.current(), SystemTime::UNIX_EPOCH);
test_clock_sleep(executor, runtime);
}
#[test]
fn test_deterministic_clock_sleep_until() {
let (executor, runtime, _) = deterministic::Executor::default();
test_clock_sleep_until(executor, runtime);
}
#[test]
fn test_deterministic_root_finishes() {
let (executor, runtime, _) = deterministic::Executor::default();
test_root_finishes(executor, runtime);
}
#[test]
fn test_deterministic_spawn_abort() {
let (executor, runtime, _) = deterministic::Executor::default();
test_spawn_abort(executor, runtime);
}
#[test]
fn test_deterministic_panic_aborts_root() {
let (runner, _, _) = deterministic::Executor::default();
test_panic_aborts_root(runner);
}
#[test]
#[should_panic(expected = "blah")]
fn test_deterministic_panic_aborts_spawn() {
let (executor, runtime, _) = deterministic::Executor::default();
test_panic_aborts_spawn(executor, runtime);
}
#[test]
fn test_deterministic_select() {
let (executor, _, _) = deterministic::Executor::default();
test_select(executor);
}
#[test]
fn test_deterministic_select_loop() {
let (executor, runtime, _) = deterministic::Executor::default();
test_select_loop(executor, runtime);
}
#[test]
fn test_deterministic_storage_operations() {
let (executor, runtime, _) = deterministic::Executor::default();
test_storage_operations(executor, runtime);
}
#[test]
fn test_deterministic_blob_read_write() {
let (executor, runtime, _) = deterministic::Executor::default();
test_blob_read_write(executor, runtime);
}
#[test]
fn test_deterministic_many_partition_read_write() {
let (executor, runtime, _) = deterministic::Executor::default();
test_many_partition_read_write(executor, runtime);
}
#[test]
fn test_tokio_error_future() {
let (runner, _) = tokio::Executor::default();
test_error_future(runner);
}
#[test]
fn test_tokio_clock_sleep() {
let (executor, runtime) = tokio::Executor::default();
test_clock_sleep(executor, runtime);
}
#[test]
fn test_tokio_clock_sleep_until() {
let (executor, runtime) = tokio::Executor::default();
test_clock_sleep_until(executor, runtime);
}
#[test]
fn test_tokio_root_finishes() {
let (executor, runtime) = tokio::Executor::default();
test_root_finishes(executor, runtime);
}
#[test]
fn test_tokio_spawn_abort() {
let (executor, runtime) = tokio::Executor::default();
test_spawn_abort(executor, runtime);
}
#[test]
fn test_tokio_panic_aborts_root() {
let (runner, _) = tokio::Executor::default();
test_panic_aborts_root(runner);
}
#[test]
fn test_tokio_panic_aborts_spawn() {
let (executor, runtime) = tokio::Executor::default();
test_panic_aborts_spawn(executor, runtime);
}
#[test]
fn test_tokio_select() {
let (executor, _) = tokio::Executor::default();
test_select(executor);
}
#[test]
fn test_tokio_select_loop() {
let (executor, runtime) = tokio::Executor::default();
test_select_loop(executor, runtime);
}
#[test]
fn test_tokio_storage_operations() {
let (executor, runtime) = tokio::Executor::default();
test_storage_operations(executor, runtime);
}
#[test]
fn test_tokio_blob_read_write() {
let (executor, runtime) = tokio::Executor::default();
test_blob_read_write(executor, runtime);
}
#[test]
fn test_tokio_many_partition_read_write() {
let (executor, runtime) = tokio::Executor::default();
test_many_partition_read_write(executor, runtime);
}
}