pub mod deterministic;
pub mod mocks;
pub mod tokio;
mod utils;
pub use utils::{reschedule, Handle, Signal, Signaler};
use std::{
future::Future,
net::SocketAddr,
time::{Duration, SystemTime},
};
use thiserror::Error;
#[derive(Error, Debug, PartialEq)]
pub enum Error {
#[error("exited")]
Exited,
#[error("closed")]
Closed,
#[error("timeout")]
Timeout,
#[error("bind failed")]
BindFailed,
#[error("connection failed")]
ConnectionFailed,
#[error("write failed")]
WriteFailed,
#[error("read failed")]
ReadFailed,
#[error("send failed")]
SendFailed,
#[error("recv failed")]
RecvFailed,
#[error("partition creation failed: {0}")]
PartitionCreationFailed(String),
#[error("partition missing: {0}")]
PartitionMissing(String),
#[error("partition corrupt: {0}")]
PartitionCorrupt(String),
#[error("blob open failed: {0}/{1}")]
BlobOpenFailed(String, String),
#[error("blob missing: {0}/{1}")]
BlobMissing(String, String),
#[error("blob truncate failed: {0}/{1}")]
BlobTruncateFailed(String, String),
#[error("blob sync failed: {0}/{1}")]
BlobSyncFailed(String, String),
#[error("blob close failed: {0}/{1}")]
BlobCloseFailed(String, String),
#[error("blob insufficient length")]
BlobInsufficientLength,
#[error("offset overflow")]
OffsetOverflow,
}
pub trait Runner {
fn start<F>(self, f: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static;
}
pub trait Spawner: Clone + Send + Sync + 'static {
fn spawn<F, T>(&self, label: &str, f: F) -> Handle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static;
fn stop(&self, value: i32);
fn stopped(&self) -> Signal;
}
pub trait Clock: Clone + Send + Sync + 'static {
fn current(&self) -> SystemTime;
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
}
pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
where
L: Listener<Si, St>,
Si: Sink,
St: Stream,
{
fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
}
pub trait Listener<Si, St>: Sync + Send + 'static
where
Si: Sink,
St: Stream,
{
fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
}
pub trait Sink: Sync + Send + 'static {
fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
}
pub trait Stream: Sync + Send + 'static {
fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
}
pub trait Storage<B>: Clone + Send + Sync + 'static
where
B: Blob,
{
fn open(&self, partition: &str, name: &[u8]) -> impl Future<Output = Result<B, Error>> + Send;
fn remove(
&self,
partition: &str,
name: Option<&[u8]>,
) -> impl Future<Output = Result<(), Error>> + Send;
fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
}
#[allow(clippy::len_without_is_empty)]
pub trait Blob: Clone + Send + Sync + 'static {
fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
fn read_at(
&self,
buf: &mut [u8],
offset: u64,
) -> impl Future<Output = Result<(), Error>> + Send;
fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_macros::select;
use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::{Arc, Mutex};
use utils::reschedule;
fn test_error_future(runner: impl Runner) {
async fn error_future() -> Result<&'static str, &'static str> {
Err("An error occurred")
}
let result = runner.start(error_future());
assert_eq!(result, Err("An error occurred"));
}
fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
runner.start(async move {
let start = context.current();
let sleep_duration = Duration::from_millis(10);
context.sleep(sleep_duration).await;
let end = context.current();
assert!(end.duration_since(start).unwrap() >= sleep_duration);
});
}
fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
runner.start(async move {
let now = context.current();
context.sleep_until(now + Duration::from_millis(100)).await;
let elapsed = now.elapsed().unwrap();
assert!(elapsed >= Duration::from_millis(100));
});
}
fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
runner.start(async move {
context.spawn("test", async move {
loop {
reschedule().await;
}
});
});
}
fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
runner.start(async move {
let handle = context.spawn("test", async move {
loop {
reschedule().await;
}
});
handle.abort();
assert_eq!(handle.await, Err(Error::Closed));
});
}
fn test_panic_aborts_root(runner: impl Runner) {
let result = catch_unwind(AssertUnwindSafe(|| {
runner.start(async move {
panic!("blah");
});
}));
result.unwrap_err();
}
fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
let result = runner.start(async move {
let result = context.spawn("test", async move {
panic!("blah");
});
assert_eq!(result.await, Err(Error::Exited));
Result::<(), Error>::Ok(())
});
result.unwrap();
}
fn test_select(runner: impl Runner) {
runner.start(async move {
let output = Mutex::new(0);
select! {
v1 = ready(1) => {
*output.lock().unwrap() = v1;
},
v2 = ready(2) => {
*output.lock().unwrap() = v2;
},
};
assert_eq!(*output.lock().unwrap(), 1);
select! {
v1 = std::future::pending::<i32>() => {
*output.lock().unwrap() = v1;
},
v2 = ready(2) => {
*output.lock().unwrap() = v2;
},
};
assert_eq!(*output.lock().unwrap(), 2);
});
}
fn test_select_loop(runner: impl Runner, context: impl Clock) {
runner.start(async move {
let (mut sender, mut receiver) = mpsc::unbounded();
for _ in 0..2 {
select! {
v = receiver.next() => {
panic!("unexpected value: {:?}", v);
},
_ = context.sleep(Duration::from_millis(100)) => {
continue;
},
};
}
sender.send(0).await.unwrap();
sender.send(1).await.unwrap();
select! {
_ = async {} => {
},
v = receiver.next() => {
panic!("unexpected value: {:?}", v);
},
};
for i in 0..2 {
select! {
_ = context.sleep(Duration::from_millis(100)) => {
panic!("timeout");
},
v = receiver.next() => {
assert_eq!(v.unwrap(), i);
},
};
}
});
}
fn test_storage_operations<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
where
B: Blob,
{
runner.start(async move {
let partition = "test_partition";
let name = b"test_blob";
let blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let data = b"Hello, Storage!";
blob.write_at(data, 0)
.await
.expect("Failed to write to blob");
blob.sync().await.expect("Failed to sync blob");
let mut buffer = vec![0u8; data.len()];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read from blob");
assert_eq!(&buffer, data);
let length = blob.len().await.expect("Failed to get blob length");
assert_eq!(length, data.len() as u64);
blob.close().await.expect("Failed to close blob");
let blobs = context
.scan(partition)
.await
.expect("Failed to scan partition");
assert!(blobs.contains(&name.to_vec()));
let blob = context
.open(partition, name)
.await
.expect("Failed to reopen blob");
let mut buffer = vec![0u8; 7];
blob.read_at(&mut buffer, 7)
.await
.expect("Failed to read data");
assert_eq!(&buffer, b"Storage");
blob.close().await.expect("Failed to close blob");
context
.remove(partition, Some(name))
.await
.expect("Failed to remove blob");
let blobs = context
.scan(partition)
.await
.expect("Failed to scan partition");
assert!(!blobs.contains(&name.to_vec()));
context
.remove(partition, None)
.await
.expect("Failed to remove partition");
let result = context.scan(partition).await;
assert!(matches!(result, Err(Error::PartitionMissing(_))));
});
}
fn test_blob_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
where
B: Blob,
{
runner.start(async move {
let partition = "test_partition";
let name = b"test_blob_rw";
let blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let data1 = b"Hello";
let data2 = b"World";
blob.write_at(data1, 0)
.await
.expect("Failed to write data1");
blob.write_at(data2, 5)
.await
.expect("Failed to write data2");
let length = blob.len().await.expect("Failed to get blob length");
assert_eq!(length, 10);
let mut buffer = vec![0u8; 10];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read data");
assert_eq!(&buffer[..5], data1);
assert_eq!(&buffer[5..], data2);
let data3 = b"Store";
blob.write_at(data3, 5)
.await
.expect("Failed to write data3");
let length = blob.len().await.expect("Failed to get blob length");
assert_eq!(length, 10);
blob.truncate(5).await.expect("Failed to truncate blob");
let length = blob.len().await.expect("Failed to get blob length");
assert_eq!(length, 5);
let mut buffer = vec![0u8; 5];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read data");
assert_eq!(&buffer[..5], data1);
let mut buffer = vec![0u8; 10];
let result = blob.read_at(&mut buffer, 0).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
blob.close().await.expect("Failed to close blob");
});
}
fn test_many_partition_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
where
B: Blob,
{
runner.start(async move {
let partitions = ["partition1", "partition2", "partition3"];
let name = b"test_blob_rw";
for (additional, partition) in partitions.iter().enumerate() {
let blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let data1 = b"Hello";
let data2 = b"World";
blob.write_at(data1, 0)
.await
.expect("Failed to write data1");
blob.write_at(data2, 5 + additional as u64)
.await
.expect("Failed to write data2");
blob.close().await.expect("Failed to close blob");
}
for (additional, partition) in partitions.iter().enumerate() {
let blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let mut buffer = vec![0u8; 10 + additional];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read data");
assert_eq!(&buffer[..5], b"Hello");
assert_eq!(&buffer[5 + additional..], b"World");
blob.close().await.expect("Failed to close blob");
}
});
}
fn test_blob_read_past_length<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
where
B: Blob,
{
runner.start(async move {
let partition = "test_partition";
let name = b"test_blob_rw";
let blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let mut buffer = vec![0u8; 10];
let result = blob.read_at(&mut buffer, 0).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
let data = b"Hello, Storage!";
blob.write_at(data, 0)
.await
.expect("Failed to write to blob");
let mut buffer = vec![0u8; 20];
let result = blob.read_at(&mut buffer, 0).await;
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
})
}
fn test_blob_clone_and_concurrent_read<B>(
runner: impl Runner,
context: impl Spawner + Storage<B>,
) where
B: Blob,
{
runner.start(async move {
let partition = "test_partition";
let name = b"test_blob_rw";
let blob = context
.open(partition, name)
.await
.expect("Failed to open blob");
let data = b"Hello, Storage!";
blob.write_at(data, 0)
.await
.expect("Failed to write to blob");
blob.sync().await.expect("Failed to sync blob");
let check1 = context.spawn("test", {
let blob = blob.clone();
async move {
let mut buffer = vec![0u8; data.len()];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read from blob");
assert_eq!(&buffer, data);
}
});
let check2 = context.spawn("test", {
let blob = blob.clone();
async move {
let mut buffer = vec![0u8; data.len()];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read from blob");
assert_eq!(&buffer, data);
}
});
let result = join!(check1, check2);
assert!(result.0.is_ok());
assert!(result.1.is_ok());
let mut buffer = vec![0u8; data.len()];
blob.read_at(&mut buffer, 0)
.await
.expect("Failed to read from blob");
assert_eq!(&buffer, data);
let length = blob.len().await.expect("Failed to get blob length");
assert_eq!(length, data.len() as u64);
blob.close().await.expect("Failed to close blob");
});
}
fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock) {
let kill = 9;
runner.start(async move {
let before = context.spawn("before", {
let context = context.clone();
async move {
let sig = context.stopped().await;
assert_eq!(sig.unwrap(), kill);
}
});
let after = context.spawn("after", {
let context = context.clone();
async move {
let mut signal = context.stopped();
loop {
select! {
sig = &mut signal => {
assert_eq!(sig.unwrap(), kill);
break;
},
_ = context.sleep(Duration::from_millis(10)) => {
},
}
}
}
});
context.sleep(Duration::from_millis(50)).await;
context.stop(kill);
let result = join!(before, after);
assert!(result.0.is_ok());
assert!(result.1.is_ok());
});
}
#[test]
fn test_deterministic_future() {
let (runner, _, _) = deterministic::Executor::default();
test_error_future(runner);
}
#[test]
fn test_deterministic_clock_sleep() {
let (executor, runtime, _) = deterministic::Executor::default();
assert_eq!(runtime.current(), SystemTime::UNIX_EPOCH);
test_clock_sleep(executor, runtime);
}
#[test]
fn test_deterministic_clock_sleep_until() {
let (executor, runtime, _) = deterministic::Executor::default();
test_clock_sleep_until(executor, runtime);
}
#[test]
fn test_deterministic_root_finishes() {
let (executor, runtime, _) = deterministic::Executor::default();
test_root_finishes(executor, runtime);
}
#[test]
fn test_deterministic_spawn_abort() {
let (executor, runtime, _) = deterministic::Executor::default();
test_spawn_abort(executor, runtime);
}
#[test]
fn test_deterministic_panic_aborts_root() {
let (runner, _, _) = deterministic::Executor::default();
test_panic_aborts_root(runner);
}
#[test]
#[should_panic(expected = "blah")]
fn test_deterministic_panic_aborts_spawn() {
let (executor, runtime, _) = deterministic::Executor::default();
test_panic_aborts_spawn(executor, runtime);
}
#[test]
fn test_deterministic_select() {
let (executor, _, _) = deterministic::Executor::default();
test_select(executor);
}
#[test]
fn test_deterministic_select_loop() {
let (executor, runtime, _) = deterministic::Executor::default();
test_select_loop(executor, runtime);
}
#[test]
fn test_deterministic_storage_operations() {
let (executor, runtime, _) = deterministic::Executor::default();
test_storage_operations(executor, runtime);
}
#[test]
fn test_deterministic_blob_read_write() {
let (executor, runtime, _) = deterministic::Executor::default();
test_blob_read_write(executor, runtime);
}
#[test]
fn test_deterministic_many_partition_read_write() {
let (executor, runtime, _) = deterministic::Executor::default();
test_many_partition_read_write(executor, runtime);
}
#[test]
fn test_deterministic_blob_read_past_length() {
let (executor, runtime, _) = deterministic::Executor::default();
test_blob_read_past_length(executor, runtime);
}
#[test]
fn test_deterministic_blob_clone_and_concurrent_read() {
let cfg = deterministic::Config {
registry: Arc::new(Mutex::new(Registry::default())),
..Default::default()
};
let (executor, runtime, _) = deterministic::Executor::init(cfg.clone());
test_blob_clone_and_concurrent_read(executor, runtime);
let mut buffer = String::new();
encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
assert!(buffer.contains("open_blobs 0"));
}
#[test]
fn test_deterministic_shutdown() {
let (executor, runtime, _) = deterministic::Executor::default();
test_shutdown(executor, runtime);
}
#[test]
fn test_tokio_error_future() {
let (runner, _) = tokio::Executor::default();
test_error_future(runner);
}
#[test]
fn test_tokio_clock_sleep() {
let (executor, runtime) = tokio::Executor::default();
test_clock_sleep(executor, runtime);
}
#[test]
fn test_tokio_clock_sleep_until() {
let (executor, runtime) = tokio::Executor::default();
test_clock_sleep_until(executor, runtime);
}
#[test]
fn test_tokio_root_finishes() {
let (executor, runtime) = tokio::Executor::default();
test_root_finishes(executor, runtime);
}
#[test]
fn test_tokio_spawn_abort() {
let (executor, runtime) = tokio::Executor::default();
test_spawn_abort(executor, runtime);
}
#[test]
fn test_tokio_panic_aborts_root() {
let (runner, _) = tokio::Executor::default();
test_panic_aborts_root(runner);
}
#[test]
fn test_tokio_panic_aborts_spawn() {
let (executor, runtime) = tokio::Executor::default();
test_panic_aborts_spawn(executor, runtime);
}
#[test]
fn test_tokio_select() {
let (executor, _) = tokio::Executor::default();
test_select(executor);
}
#[test]
fn test_tokio_select_loop() {
let (executor, runtime) = tokio::Executor::default();
test_select_loop(executor, runtime);
}
#[test]
fn test_tokio_storage_operations() {
let (executor, runtime) = tokio::Executor::default();
test_storage_operations(executor, runtime);
}
#[test]
fn test_tokio_blob_read_write() {
let (executor, runtime) = tokio::Executor::default();
test_blob_read_write(executor, runtime);
}
#[test]
fn test_tokio_many_partition_read_write() {
let (executor, runtime) = tokio::Executor::default();
test_many_partition_read_write(executor, runtime);
}
#[test]
fn test_tokio_blob_read_past_length() {
let (executor, runtime) = tokio::Executor::default();
test_blob_read_past_length(executor, runtime);
}
#[test]
fn test_tokio_blob_clone_and_concurrent_read() {
let cfg = tokio::Config {
registry: Arc::new(Mutex::new(Registry::default())),
..Default::default()
};
let (executor, runtime) = tokio::Executor::init(cfg.clone());
test_blob_clone_and_concurrent_read(executor, runtime);
let mut buffer = String::new();
encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
assert!(buffer.contains("open_blobs 0"));
}
#[test]
fn test_tokio_shutdown() {
let (executor, runtime) = tokio::Executor::default();
test_shutdown(executor, runtime);
}
}