deadqueue 0.2.5

Dead simple async queue
Documentation
#[cfg(feature = "resizable")]
mod tests {

    use std::sync::Arc;

    use deadqueue::resizable::Queue;

    #[tokio::test]
    async fn test_basics() {
        let queue: Queue<usize> = Queue::new(2);
        assert_eq!(queue.len(), 0);
        assert!(queue.try_push(1).is_ok());
        assert_eq!(queue.len(), 1);
        assert!(queue.try_push(2).is_ok());
        assert_eq!(queue.len(), 2);
        assert!(queue.try_push(3).is_err());
        assert_eq!(queue.len(), 2);
        assert!(queue.try_pop().is_some());
        assert_eq!(queue.len(), 1);
        assert!(queue.try_push(3).is_ok());
        assert_eq!(queue.len(), 2);
    }

    #[tokio::test]
    async fn test_available() {
        let queue: Queue<usize> = Queue::new(2);
        assert_eq!(queue.len(), 0);
        assert!(queue.try_push(1).is_ok());
        assert_eq!(queue.len(), 1);
        assert_eq!(queue.available(), 1);
        assert!(queue.try_pop().is_some());
        assert_eq!(queue.len(), 0);
        assert_eq!(queue.available(), 0);
    }

    #[tokio::test]
    async fn test_parallel() {
        let queue: Arc<Queue<usize>> = Arc::new(Queue::new(10));
        let mut futures = Vec::new();
        for _ in 0..100usize {
            let queue = queue.clone();
            futures.push(tokio::spawn(async move {
                for _ in 0..100usize {
                    queue.pop().await;
                }
            }));
        }
        for _ in 0..100usize {
            let queue = queue.clone();
            futures.push(tokio::spawn(async move {
                for i in 0..100usize {
                    queue.push(i).await;
                }
            }));
        }
        for future in futures {
            future.await.unwrap();
        }
        assert_eq!(queue.len(), 0);
    }

    #[tokio::test]
    async fn test_parallel_available() {
        const N: usize = 2;
        let queue: Arc<Queue<usize>> = Arc::new(Queue::new(N));
        let barrier = Arc::new(tokio::sync::Barrier::new(N + 1));
        let mut futures = Vec::new();
        for _ in 0..N {
            let queue = queue.clone();
            let barrier = barrier.clone();
            futures.push(tokio::spawn(async move {
                barrier.wait().await;
                queue.pop().await;
            }));
        }
        barrier.wait().await;
        assert_eq!(queue.len(), 0);
        assert_eq!(queue.available(), -(N as isize));
        for i in 0..N {
            queue.push(i).await;
        }
        for future in futures {
            future.await.unwrap();
        }
        assert_eq!(queue.len(), 0);
        assert_eq!(queue.available(), 0);
    }

    #[tokio::test]
    async fn test_full() {
        let queue: Arc<Queue<usize>> = Arc::new(Queue::new(100));
        let barrier = Arc::new(tokio::sync::Barrier::new(2));
        let future_queue = queue.clone();
        let future_barrier = barrier.clone();
        let future = tokio::spawn(async move {
            future_barrier.wait().await;
            assert_ne!(future_queue.capacity(), future_queue.len());
            future_queue.wait_full().await;
        });
        barrier.wait().await;
        for i in 0..100 {
            queue.push(i).await;
        }
        future.await.unwrap();
        assert_eq!(queue.len(), 100);
    }

    #[tokio::test]
    async fn test_empty() {
        let queue: Arc<Queue<usize>> = Arc::new(Queue::new(100));
        for i in 0..100 {
            queue.push(i).await;
        }
        let barrier = Arc::new(tokio::sync::Barrier::new(2));
        let future_queue = queue.clone();
        let future_barrier = barrier.clone();
        let future = tokio::spawn(async move {
            future_barrier.wait().await;
            assert!(!future_queue.is_empty());
            future_queue.wait_empty().await;
        });
        barrier.wait().await;
        for _ in 0..100 {
            queue.pop().await;
        }
        future.await.unwrap();
        assert_eq!(queue.len(), 0);
    }

    #[tokio::test]
    async fn test_full_deadlock() {
        let queue: Arc<Queue<()>> = Arc::new(Queue::new(2));
        let barrier = Arc::new(tokio::sync::Barrier::new(2));
        let future_queue = queue.clone();
        let future_barrier = barrier.clone();
        let future = tokio::spawn(async move {
            future_barrier.wait().await;
            assert_ne!(future_queue.capacity(), future_queue.len());
            println!("Awaiting full queue");
            future_queue.wait_full().await;
        });
        barrier.wait().await;
        println!("Begin queue operations");
        queue.push(()).await;
        queue.pop().await;
        for _ in 0..2 {
            queue.push(()).await;
        }
        println!("End queue operations");
        future.await.unwrap();
        assert_eq!(queue.len(), 2);
    }

    #[tokio::test]
    async fn test_empty_deadlock() {
        let queue: Arc<Queue<()>> = Arc::new(Queue::new(3));
        for _ in 0..2 {
            queue.push(()).await;
        }
        let barrier = Arc::new(tokio::sync::Barrier::new(2));
        let future_queue = queue.clone();
        let future_barrier = barrier.clone();
        let future = tokio::spawn(async move {
            future_barrier.wait().await;
            assert!(!future_queue.is_empty());
            println!("Awaiting full queue");
            future_queue.wait_empty().await;
        });
        barrier.wait().await;
        println!("Begin queue operations");
        queue.push(()).await;
        for _ in 0..3 {
            queue.pop().await;
        }
        println!("End queue operations");
        future.await.unwrap();
        assert_eq!(queue.len(), 0);
    }

    #[test]
    fn test_debug() {
        struct NoDebug {}
        let queue: Queue<NoDebug> = Queue::new(1);
        let _ = format!("{:?}", queue);
    }

    #[tokio::test]
    async fn test_resize_enlarge() {
        let queue: Queue<usize> = Queue::new(0);
        queue.resize(1).await;
        assert_eq!(queue.capacity(), 1);
    }

    #[tokio::test]
    async fn test_resize_shrink() {
        let queue: Queue<usize> = Queue::new(2);
        queue.try_push(0).unwrap();
        queue.resize(1).await;
        assert_eq!(queue.capacity(), 1);
        assert_eq!(queue.len(), 1);
        assert_eq!(queue.try_push(42), Err(42));
        queue.resize(0).await;
        assert_eq!(queue.capacity(), 0);
        assert_eq!(queue.len(), 0);
        assert_eq!(queue.try_push(42), Err(42));
    }

    #[tokio::test]
    async fn test_is_full_basic() {
        let queue: Queue<usize> = Queue::new(2);
        assert!(!queue.is_full(), "Should be empty at construction");
        queue.push(1).await;
        assert!(
            !queue.is_full(),
            "Should not be full a one less than capacity"
        );
        queue.push(2).await;
        assert!(queue.is_full(), "Should now be full");
        let _ = queue.pop().await;
        assert!(!queue.is_full(), "Should no longer be full after pop");
    }
}