batch_queue 0.1.0

A single-producer single-consumer Rust queue with smart batching
Documentation
use super::*;
use crate::{receiver::TryRecvError, sender::TrySendError};
use itertools::Itertools;
use std::sync::atomic::AtomicU64;

#[test]
fn capacity() {
    let (mut tx, mut rx) = batch_queue::<u32, 3>(3);

    assert_eq!(tx.try_send(1), Ok(false));
    tx.close_batch();
    assert_eq!(tx.try_send(2), Ok(false));
    assert_eq!(tx.try_send(3), Ok(false));
    assert_eq!(tx.try_send(4), Ok(true));
    assert_eq!(tx.try_send(5), Ok(false));
    tx.close_batch();
    assert_eq!(tx.try_send(6), Err(TrySendError::Full(6)));

    assert_eq!(rx.try_recv_batch(), Ok(vec![1]));
    assert_eq!(rx.try_recv_batch(), Ok(vec![2, 3, 4]));
    assert_eq!(rx.try_recv_batch(), Ok(vec![5]));
    assert_eq!(rx.try_recv_batch(), Err(TryRecvError::Empty));

    assert_eq!(tx.try_send(1), Ok(false));
    tx.close_batch();
    assert_eq!(tx.try_send(2), Ok(false));
    assert_eq!(tx.try_send(3), Ok(false));
    assert_eq!(tx.try_send(4), Ok(true));
    assert_eq!(tx.try_send(5), Ok(false));
    tx.close_batch();
    assert_eq!(tx.try_send(6), Err(TrySendError::Full(6)));

    assert_eq!(rx.try_recv_batch(), Ok(vec![1]));
    assert_eq!(tx.try_send(6), Ok(false));
    assert_eq!(rx.try_recv_batch(), Ok(vec![2, 3, 4]));
    assert_eq!(rx.try_recv_batch(), Ok(vec![5]));
    assert_eq!(rx.try_recv_batch(), Err(TryRecvError::Empty));
    tx.close_batch();
    assert_eq!(rx.try_recv_batch(), Ok(vec![6]));
    assert_eq!(rx.try_recv_batch(), Err(TryRecvError::Empty));

    drop(tx);
    assert_eq!(rx.try_recv_batch(), Err(TryRecvError::Closed));
}

#[test]
fn drop_rx() {
    let (mut tx, rx) = batch_queue::<u32, 2>(3);

    assert_eq!(tx.try_send(1), Ok(false));
    drop(rx);
    assert_eq!(tx.try_send(2), Err(TrySendError::Closed));
}

#[test]
fn drop_items() {
    static POOL: AtomicU64 = AtomicU64::new(1);

    #[derive(Debug, PartialEq)]
    struct D(u64);
    impl D {
        pub fn d(self) -> u64 {
            let ret = self.0;
            std::mem::forget(self);
            ret
        }
    }
    impl Drop for D {
        fn drop(&mut self) {
            POOL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| Some(v * self.0))
                .ok();
        }
    }

    let (mut tx, mut rx) = batch_queue::<D, 2>(3);

    assert_eq!(tx.try_send(D(2)), Ok(false));
    assert_eq!(tx.try_send(D(3)), Ok(true));
    assert_eq!(tx.try_send(D(5)), Ok(false));
    tx.close_batch();
    assert_eq!(tx.try_send(D(7)), Ok(false));
    assert_eq!(tx.try_send(D(11)), Ok(true));

    assert_eq!(
        rx.try_recv().unwrap().take(1).map(D::d).collect::<Vec<_>>(),
        vec![2]
    );
    assert_eq!(POOL.load(Ordering::Relaxed), 3);

    drop(rx);
    assert_eq!(POOL.load(Ordering::Relaxed), 3);

    assert_eq!(tx.try_send(D(13)), Err(TrySendError::Closed));
    assert_eq!(POOL.load(Ordering::Relaxed), 3 * 13);

    drop(tx);
    assert_eq!(POOL.load(Ordering::Relaxed), 3 * 13 * 5 * 7 * 11);
}

#[tokio::test]
async fn drop_async() {
    let (mut tx, mut rx) = batch_queue::<u32, 5>(3);
    let handle = tokio::spawn(async move {
        tx.send(1).await?;
        Result::<_, Closed>::Ok(())
    });
    assert_eq!(rx.recv_batch().await, Ok(vec![1]));
    assert_eq!(rx.recv_batch().await, Err(Closed));
    handle.await.unwrap().unwrap();

    let (mut tx1, mut rx1) = batch_queue::<u32, 5>(3);
    let (tx2, mut rx2) = batch_queue::<u32, 5>(3);
    let handle = tokio::spawn(async move {
        rx1.recv().await?;
        drop(rx1);
        drop(tx2);
        Result::<_, Closed>::Ok(())
    });
    assert_eq!(tx1.send(1).await, Ok(false));
    assert_eq!(tx1.send(2).await, Ok(false));
    assert_eq!(tx1.send(3).await, Ok(false));
    tx1.close_batch();
    assert_eq!(rx2.recv_batch().await, Err(Closed));
    assert_eq!(tx1.send(4).await, Err(Closed));
    handle.await.unwrap().unwrap();
}

#[tokio::test]
async fn stress() {
    let (mut tx, mut rx) = batch_queue::<u32, 5>(3);
    #[cfg(debug_assertions)]
    const N: u32 = 1_000_000;
    #[cfg(not(debug_assertions))]
    const N: u32 = 10_000_000;

    let handle = tokio::spawn(async move {
        for i in 0..N {
            tx.send(i).await?;
        }
        Result::<_, Closed>::Ok(())
    });

    for i in (0..N).chunks(5).into_iter() {
        assert_eq!(rx.recv_batch().await.unwrap(), i.collect::<Vec<_>>());
    }

    handle.await.unwrap().unwrap();
    assert_eq!(rx.recv_batch().await, Err(Closed));
}

#[tokio::test]
async fn stream() {
    let (tx, mut rx) = batch_queue::<u32, 128>(20);
    #[cfg(debug_assertions)]
    const N: u32 = 10_000_000;
    #[cfg(not(debug_assertions))]
    const N: u32 = 1_000_000_000;

    tokio::spawn(async move {
        let stream = futures::stream::iter(0..N);
        pipe(stream, tx).await;
    });

    let mut x = 0;
    while let Ok(v) = rx.recv().await {
        for y in v {
            assert_eq!(y, x);
            x += 1;
        }
    }
    assert_eq!(x, N);
}