pub mod deterministic;
pub mod mocks;
pub mod tokio;
mod utils;
pub use utils::{reschedule, Handle, Signal, Signaler};
use std::{
    future::Future,
    net::SocketAddr,
    time::{Duration, SystemTime},
};
use thiserror::Error;
#[derive(Error, Debug, PartialEq)]
pub enum Error {
    #[error("exited")]
    Exited,
    #[error("closed")]
    Closed,
    #[error("timeout")]
    Timeout,
    #[error("bind failed")]
    BindFailed,
    #[error("connection failed")]
    ConnectionFailed,
    #[error("write failed")]
    WriteFailed,
    #[error("read failed")]
    ReadFailed,
    #[error("send failed")]
    SendFailed,
    #[error("recv failed")]
    RecvFailed,
    #[error("partition creation failed: {0}")]
    PartitionCreationFailed(String),
    #[error("partition missing: {0}")]
    PartitionMissing(String),
    #[error("partition corrupt: {0}")]
    PartitionCorrupt(String),
    #[error("blob open failed: {0}/{1}")]
    BlobOpenFailed(String, String),
    #[error("blob missing: {0}/{1}")]
    BlobMissing(String, String),
    #[error("blob truncate failed: {0}/{1}")]
    BlobTruncateFailed(String, String),
    #[error("blob sync failed: {0}/{1}")]
    BlobSyncFailed(String, String),
    #[error("blob close failed: {0}/{1}")]
    BlobCloseFailed(String, String),
    #[error("blob insufficient length")]
    BlobInsufficientLength,
    #[error("offset overflow")]
    OffsetOverflow,
}
pub trait Runner {
    fn start<F>(self, f: F) -> F::Output
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static;
}
pub trait Spawner: Clone + Send + Sync + 'static {
    fn spawn<F, T>(&self, label: &str, f: F) -> Handle<T>
    where
        F: Future<Output = T> + Send + 'static,
        T: Send + 'static;
    fn stop(&self, value: i32);
    fn stopped(&self) -> Signal;
}
pub trait Clock: Clone + Send + Sync + 'static {
    fn current(&self) -> SystemTime;
    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
}
pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
where
    L: Listener<Si, St>,
    Si: Sink,
    St: Stream,
{
    fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
    fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
}
pub trait Listener<Si, St>: Sync + Send + 'static
where
    Si: Sink,
    St: Stream,
{
    fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
}
pub trait Sink: Sync + Send + 'static {
    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
}
pub trait Stream: Sync + Send + 'static {
    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
}
pub trait Storage<B>: Clone + Send + Sync + 'static
where
    B: Blob,
{
    fn open(&self, partition: &str, name: &[u8]) -> impl Future<Output = Result<B, Error>> + Send;
    fn remove(
        &self,
        partition: &str,
        name: Option<&[u8]>,
    ) -> impl Future<Output = Result<(), Error>> + Send;
    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
}
#[allow(clippy::len_without_is_empty)]
pub trait Blob: Clone + Send + Sync + 'static {
    fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
    fn read_at(
        &self,
        buf: &mut [u8],
        offset: u64,
    ) -> impl Future<Output = Result<(), Error>> + Send;
    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
}
#[cfg(test)]
mod tests {
    use super::*;
    use commonware_macros::select;
    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
    use prometheus_client::encoding::text::encode;
    use prometheus_client::registry::Registry;
    use std::panic::{catch_unwind, AssertUnwindSafe};
    use std::sync::{Arc, Mutex};
    use utils::reschedule;
    fn test_error_future(runner: impl Runner) {
        async fn error_future() -> Result<&'static str, &'static str> {
            Err("An error occurred")
        }
        let result = runner.start(error_future());
        assert_eq!(result, Err("An error occurred"));
    }
    fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
        runner.start(async move {
            let start = context.current();
            let sleep_duration = Duration::from_millis(10);
            context.sleep(sleep_duration).await;
            let end = context.current();
            assert!(end.duration_since(start).unwrap() >= sleep_duration);
        });
    }
    fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
        runner.start(async move {
            let now = context.current();
            context.sleep_until(now + Duration::from_millis(100)).await;
            let elapsed = now.elapsed().unwrap();
            assert!(elapsed >= Duration::from_millis(100));
        });
    }
    fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
        runner.start(async move {
            context.spawn("test", async move {
                loop {
                    reschedule().await;
                }
            });
        });
    }
    fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
        runner.start(async move {
            let handle = context.spawn("test", async move {
                loop {
                    reschedule().await;
                }
            });
            handle.abort();
            assert_eq!(handle.await, Err(Error::Closed));
        });
    }
    fn test_panic_aborts_root(runner: impl Runner) {
        let result = catch_unwind(AssertUnwindSafe(|| {
            runner.start(async move {
                panic!("blah");
            });
        }));
        result.unwrap_err();
    }
    fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
        let result = runner.start(async move {
            let result = context.spawn("test", async move {
                panic!("blah");
            });
            assert_eq!(result.await, Err(Error::Exited));
            Result::<(), Error>::Ok(())
        });
        result.unwrap();
    }
    fn test_select(runner: impl Runner) {
        runner.start(async move {
            let output = Mutex::new(0);
            select! {
                v1 = ready(1) => {
                    *output.lock().unwrap() = v1;
                },
                v2 = ready(2) => {
                    *output.lock().unwrap() = v2;
                },
            };
            assert_eq!(*output.lock().unwrap(), 1);
            select! {
                v1 = std::future::pending::<i32>() => {
                    *output.lock().unwrap() = v1;
                },
                v2 = ready(2) => {
                    *output.lock().unwrap() = v2;
                },
            };
            assert_eq!(*output.lock().unwrap(), 2);
        });
    }
    fn test_select_loop(runner: impl Runner, context: impl Clock) {
        runner.start(async move {
            let (mut sender, mut receiver) = mpsc::unbounded();
            for _ in 0..2 {
                select! {
                    v = receiver.next() => {
                        panic!("unexpected value: {:?}", v);
                    },
                    _ = context.sleep(Duration::from_millis(100)) => {
                        continue;
                    },
                };
            }
            sender.send(0).await.unwrap();
            sender.send(1).await.unwrap();
            select! {
                _ = async {} => {
                    },
                v = receiver.next() => {
                    panic!("unexpected value: {:?}", v);
                },
            };
            for i in 0..2 {
                select! {
                    _ = context.sleep(Duration::from_millis(100)) => {
                        panic!("timeout");
                    },
                    v = receiver.next() => {
                        assert_eq!(v.unwrap(), i);
                    },
                };
            }
        });
    }
    fn test_storage_operations<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
    where
        B: Blob,
    {
        runner.start(async move {
            let partition = "test_partition";
            let name = b"test_blob";
            let blob = context
                .open(partition, name)
                .await
                .expect("Failed to open blob");
            let data = b"Hello, Storage!";
            blob.write_at(data, 0)
                .await
                .expect("Failed to write to blob");
            blob.sync().await.expect("Failed to sync blob");
            let mut buffer = vec![0u8; data.len()];
            blob.read_at(&mut buffer, 0)
                .await
                .expect("Failed to read from blob");
            assert_eq!(&buffer, data);
            let length = blob.len().await.expect("Failed to get blob length");
            assert_eq!(length, data.len() as u64);
            blob.close().await.expect("Failed to close blob");
            let blobs = context
                .scan(partition)
                .await
                .expect("Failed to scan partition");
            assert!(blobs.contains(&name.to_vec()));
            let blob = context
                .open(partition, name)
                .await
                .expect("Failed to reopen blob");
            let mut buffer = vec![0u8; 7];
            blob.read_at(&mut buffer, 7)
                .await
                .expect("Failed to read data");
            assert_eq!(&buffer, b"Storage");
            blob.close().await.expect("Failed to close blob");
            context
                .remove(partition, Some(name))
                .await
                .expect("Failed to remove blob");
            let blobs = context
                .scan(partition)
                .await
                .expect("Failed to scan partition");
            assert!(!blobs.contains(&name.to_vec()));
            context
                .remove(partition, None)
                .await
                .expect("Failed to remove partition");
            let result = context.scan(partition).await;
            assert!(matches!(result, Err(Error::PartitionMissing(_))));
        });
    }
    fn test_blob_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
    where
        B: Blob,
    {
        runner.start(async move {
            let partition = "test_partition";
            let name = b"test_blob_rw";
            let blob = context
                .open(partition, name)
                .await
                .expect("Failed to open blob");
            let data1 = b"Hello";
            let data2 = b"World";
            blob.write_at(data1, 0)
                .await
                .expect("Failed to write data1");
            blob.write_at(data2, 5)
                .await
                .expect("Failed to write data2");
            let length = blob.len().await.expect("Failed to get blob length");
            assert_eq!(length, 10);
            let mut buffer = vec![0u8; 10];
            blob.read_at(&mut buffer, 0)
                .await
                .expect("Failed to read data");
            assert_eq!(&buffer[..5], data1);
            assert_eq!(&buffer[5..], data2);
            let data3 = b"Store";
            blob.write_at(data3, 5)
                .await
                .expect("Failed to write data3");
            let length = blob.len().await.expect("Failed to get blob length");
            assert_eq!(length, 10);
            blob.truncate(5).await.expect("Failed to truncate blob");
            let length = blob.len().await.expect("Failed to get blob length");
            assert_eq!(length, 5);
            let mut buffer = vec![0u8; 5];
            blob.read_at(&mut buffer, 0)
                .await
                .expect("Failed to read data");
            assert_eq!(&buffer[..5], data1);
            let mut buffer = vec![0u8; 10];
            let result = blob.read_at(&mut buffer, 0).await;
            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
            blob.close().await.expect("Failed to close blob");
        });
    }
    fn test_many_partition_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
    where
        B: Blob,
    {
        runner.start(async move {
            let partitions = ["partition1", "partition2", "partition3"];
            let name = b"test_blob_rw";
            for (additional, partition) in partitions.iter().enumerate() {
                let blob = context
                    .open(partition, name)
                    .await
                    .expect("Failed to open blob");
                let data1 = b"Hello";
                let data2 = b"World";
                blob.write_at(data1, 0)
                    .await
                    .expect("Failed to write data1");
                blob.write_at(data2, 5 + additional as u64)
                    .await
                    .expect("Failed to write data2");
                blob.close().await.expect("Failed to close blob");
            }
            for (additional, partition) in partitions.iter().enumerate() {
                let blob = context
                    .open(partition, name)
                    .await
                    .expect("Failed to open blob");
                let mut buffer = vec![0u8; 10 + additional];
                blob.read_at(&mut buffer, 0)
                    .await
                    .expect("Failed to read data");
                assert_eq!(&buffer[..5], b"Hello");
                assert_eq!(&buffer[5 + additional..], b"World");
                blob.close().await.expect("Failed to close blob");
            }
        });
    }
    fn test_blob_read_past_length<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
    where
        B: Blob,
    {
        runner.start(async move {
            let partition = "test_partition";
            let name = b"test_blob_rw";
            let blob = context
                .open(partition, name)
                .await
                .expect("Failed to open blob");
            let mut buffer = vec![0u8; 10];
            let result = blob.read_at(&mut buffer, 0).await;
            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
            let data = b"Hello, Storage!";
            blob.write_at(data, 0)
                .await
                .expect("Failed to write to blob");
            let mut buffer = vec![0u8; 20];
            let result = blob.read_at(&mut buffer, 0).await;
            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
        })
    }
    fn test_blob_clone_and_concurrent_read<B>(
        runner: impl Runner,
        context: impl Spawner + Storage<B>,
    ) where
        B: Blob,
    {
        runner.start(async move {
            let partition = "test_partition";
            let name = b"test_blob_rw";
            let blob = context
                .open(partition, name)
                .await
                .expect("Failed to open blob");
            let data = b"Hello, Storage!";
            blob.write_at(data, 0)
                .await
                .expect("Failed to write to blob");
            blob.sync().await.expect("Failed to sync blob");
            let check1 = context.spawn("test", {
                let blob = blob.clone();
                async move {
                    let mut buffer = vec![0u8; data.len()];
                    blob.read_at(&mut buffer, 0)
                        .await
                        .expect("Failed to read from blob");
                    assert_eq!(&buffer, data);
                }
            });
            let check2 = context.spawn("test", {
                let blob = blob.clone();
                async move {
                    let mut buffer = vec![0u8; data.len()];
                    blob.read_at(&mut buffer, 0)
                        .await
                        .expect("Failed to read from blob");
                    assert_eq!(&buffer, data);
                }
            });
            let result = join!(check1, check2);
            assert!(result.0.is_ok());
            assert!(result.1.is_ok());
            let mut buffer = vec![0u8; data.len()];
            blob.read_at(&mut buffer, 0)
                .await
                .expect("Failed to read from blob");
            assert_eq!(&buffer, data);
            let length = blob.len().await.expect("Failed to get blob length");
            assert_eq!(length, data.len() as u64);
            blob.close().await.expect("Failed to close blob");
        });
    }
    fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock) {
        let kill = 9;
        runner.start(async move {
            let before = context.spawn("before", {
                let context = context.clone();
                async move {
                    let sig = context.stopped().await;
                    assert_eq!(sig.unwrap(), kill);
                }
            });
            let after = context.spawn("after", {
                let context = context.clone();
                async move {
                    let mut signal = context.stopped();
                    loop {
                        select! {
                            sig = &mut signal => {
                                assert_eq!(sig.unwrap(), kill);
                                break;
                            },
                            _ = context.sleep(Duration::from_millis(10)) => {
                                },
                        }
                    }
                }
            });
            context.sleep(Duration::from_millis(50)).await;
            context.stop(kill);
            let result = join!(before, after);
            assert!(result.0.is_ok());
            assert!(result.1.is_ok());
        });
    }
    #[test]
    fn test_deterministic_future() {
        let (runner, _, _) = deterministic::Executor::default();
        test_error_future(runner);
    }
    #[test]
    fn test_deterministic_clock_sleep() {
        let (executor, runtime, _) = deterministic::Executor::default();
        assert_eq!(runtime.current(), SystemTime::UNIX_EPOCH);
        test_clock_sleep(executor, runtime);
    }
    #[test]
    fn test_deterministic_clock_sleep_until() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_clock_sleep_until(executor, runtime);
    }
    #[test]
    fn test_deterministic_root_finishes() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_root_finishes(executor, runtime);
    }
    #[test]
    fn test_deterministic_spawn_abort() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_spawn_abort(executor, runtime);
    }
    #[test]
    fn test_deterministic_panic_aborts_root() {
        let (runner, _, _) = deterministic::Executor::default();
        test_panic_aborts_root(runner);
    }
    #[test]
    #[should_panic(expected = "blah")]
    fn test_deterministic_panic_aborts_spawn() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_panic_aborts_spawn(executor, runtime);
    }
    #[test]
    fn test_deterministic_select() {
        let (executor, _, _) = deterministic::Executor::default();
        test_select(executor);
    }
    #[test]
    fn test_deterministic_select_loop() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_select_loop(executor, runtime);
    }
    #[test]
    fn test_deterministic_storage_operations() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_storage_operations(executor, runtime);
    }
    #[test]
    fn test_deterministic_blob_read_write() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_blob_read_write(executor, runtime);
    }
    #[test]
    fn test_deterministic_many_partition_read_write() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_many_partition_read_write(executor, runtime);
    }
    #[test]
    fn test_deterministic_blob_read_past_length() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_blob_read_past_length(executor, runtime);
    }
    #[test]
    fn test_deterministic_blob_clone_and_concurrent_read() {
        let cfg = deterministic::Config {
            registry: Arc::new(Mutex::new(Registry::default())),
            ..Default::default()
        };
        let (executor, runtime, _) = deterministic::Executor::init(cfg.clone());
        test_blob_clone_and_concurrent_read(executor, runtime);
        let mut buffer = String::new();
        encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
        assert!(buffer.contains("open_blobs 0"));
    }
    #[test]
    fn test_deterministic_shutdown() {
        let (executor, runtime, _) = deterministic::Executor::default();
        test_shutdown(executor, runtime);
    }
    #[test]
    fn test_tokio_error_future() {
        let (runner, _) = tokio::Executor::default();
        test_error_future(runner);
    }
    #[test]
    fn test_tokio_clock_sleep() {
        let (executor, runtime) = tokio::Executor::default();
        test_clock_sleep(executor, runtime);
    }
    #[test]
    fn test_tokio_clock_sleep_until() {
        let (executor, runtime) = tokio::Executor::default();
        test_clock_sleep_until(executor, runtime);
    }
    #[test]
    fn test_tokio_root_finishes() {
        let (executor, runtime) = tokio::Executor::default();
        test_root_finishes(executor, runtime);
    }
    #[test]
    fn test_tokio_spawn_abort() {
        let (executor, runtime) = tokio::Executor::default();
        test_spawn_abort(executor, runtime);
    }
    #[test]
    fn test_tokio_panic_aborts_root() {
        let (runner, _) = tokio::Executor::default();
        test_panic_aborts_root(runner);
    }
    #[test]
    fn test_tokio_panic_aborts_spawn() {
        let (executor, runtime) = tokio::Executor::default();
        test_panic_aborts_spawn(executor, runtime);
    }
    #[test]
    fn test_tokio_select() {
        let (executor, _) = tokio::Executor::default();
        test_select(executor);
    }
    #[test]
    fn test_tokio_select_loop() {
        let (executor, runtime) = tokio::Executor::default();
        test_select_loop(executor, runtime);
    }
    #[test]
    fn test_tokio_storage_operations() {
        let (executor, runtime) = tokio::Executor::default();
        test_storage_operations(executor, runtime);
    }
    #[test]
    fn test_tokio_blob_read_write() {
        let (executor, runtime) = tokio::Executor::default();
        test_blob_read_write(executor, runtime);
    }
    #[test]
    fn test_tokio_many_partition_read_write() {
        let (executor, runtime) = tokio::Executor::default();
        test_many_partition_read_write(executor, runtime);
    }
    #[test]
    fn test_tokio_blob_read_past_length() {
        let (executor, runtime) = tokio::Executor::default();
        test_blob_read_past_length(executor, runtime);
    }
    #[test]
    fn test_tokio_blob_clone_and_concurrent_read() {
        let cfg = tokio::Config {
            registry: Arc::new(Mutex::new(Registry::default())),
            ..Default::default()
        };
        let (executor, runtime) = tokio::Executor::init(cfg.clone());
        test_blob_clone_and_concurrent_read(executor, runtime);
        let mut buffer = String::new();
        encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
        assert!(buffer.contains("open_blobs 0"));
    }
    #[test]
    fn test_tokio_shutdown() {
        let (executor, runtime) = tokio::Executor::default();
        test_shutdown(executor, runtime);
    }
}