revenq 0.4.1

A concurrent, revision-based event queue implementation
Documentation
use revenq::Queue;

fn accumulate<T>(rv: &mut impl Iterator<Item = revenq::RevisionRef<T>>) -> T
where
    T: Copy + std::iter::Sum,
{
    rv.map(|i| *i).sum()
}

fn skip_and_publish<T: Send + 'static>(q: &mut Queue<T>) {
    while q.next().is_some() {}
}

#[test]
fn simple() {
    let mut q = Queue::new();
    q.enqueue(vec![0]);
    skip_and_publish(&mut q);

    let mut l = q.clone();
    let mut marker = Vec::new();
    marker.extend((&mut l).map(|i| (*i).clone()).flatten());
    assert!(marker.is_empty());

    q.enqueue(vec![1]);
    skip_and_publish(&mut q);

    marker.extend((&mut l).map(|i| (*i).clone()).flatten());
    assert_eq!(marker, [1]);
}

#[test]
fn multi() {
    let mut q = Queue::new();
    let l1 = q.clone();
    let mut l2 = q.clone();

    q.enqueue(0);
    q.enqueue(1);
    skip_and_publish(&mut q);

    let mut marker = Vec::new();
    marker.extend(l1.map(|i| *i));
    assert_eq!(marker, [0, 1]);
    marker.clear();
    let mut fi = l2.next().unwrap();
    marker.push(*fi);
    marker.extend(l2.map(|i| *i));
    assert_eq!(marker, [0, 1]);
    // detach fi
    assert!(revenq::RevisionRef::try_detach(&mut fi).is_ok());
    assert_eq!(*fi, 0);
}

#[test]
#[cfg_attr(miri, ignore)]
fn multithreaded() {
    use std::{thread, time::Duration};
    let mut q = Queue::new();

    let spt = |q: &Queue<u32>| {
        let mut lx = q.clone();
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(50));
            let marker = accumulate(&mut lx);
            assert_eq!(marker, 1);
            thread::sleep(Duration::from_millis(20));
            let marker = accumulate(&mut lx);
            assert_eq!(marker, 2);
            thread::sleep(Duration::from_millis(40));
            let marker: Vec<_> = lx.map(|i| *i).collect();
            assert_eq!(marker, &[3, 4]);
        })
    };

    let th1 = spt(&q);
    let th2 = spt(&q);
    q.enqueue(1);
    skip_and_publish(&mut q);
    thread::sleep(Duration::from_millis(60));
    q.enqueue(2);
    skip_and_publish(&mut q);
    thread::sleep(Duration::from_millis(30));
    q.enqueue(3);
    q.enqueue(4);
    skip_and_publish(&mut q);
    th1.join().unwrap();
    th2.join().unwrap();
}

#[test]
fn mp() {
    let mut q1 = Queue::new();
    let mut q2 = q1.clone();

    let (mut c1, mut c2) = (0, 0);
    q1.enqueue(1);
    c1 += accumulate(&mut q1);
    q2.enqueue(2);
    c2 += accumulate(&mut q2);
    q1.enqueue(3);
    c1 += accumulate(&mut q1);
    q2.enqueue(4);
    c2 += accumulate(&mut q2);
    c1 += accumulate(&mut q1);
    c2 += accumulate(&mut q2);
    assert_eq!(c1, 6);
    assert_eq!(c2, 4);
}

#[test]
#[cfg_attr(miri, ignore)]
fn mtmp() {
    use std::{thread, time::Duration};
    let q1 = Queue::new();
    let q2 = q1.clone();

    let spt = |mut q: Queue<u32>, publiv: Vec<u32>| {
        thread::spawn(move || {
            let mut c = 0;
            for i in publiv {
                q.enqueue(i);
                c += accumulate(&mut q);
                thread::sleep(Duration::from_millis(20));
            }
            c += accumulate(&mut q);
            c
        })
    };

    let th1 = spt(q1, vec![1, 3]);
    let th2 = spt(q2, vec![2, 4]);
    assert_eq!(th1.join().unwrap(), 6);
    assert_eq!(th2.join().unwrap(), 4);
}

#[test]
#[cfg_attr(miri, ignore)]
fn blocking() {
    use std::thread;
    let q1 = Queue::new();
    let q2 = q1.clone();

    let spt = |mut q: Queue<u32>, publiv: Vec<u32>| {
        thread::spawn(move || {
            let mut c = Vec::new();
            let plvl = publiv.len();
            for i in publiv {
                q.enqueue(i);
            }
            futures_lite::future::block_on(async {
                while c.len() < plvl {
                    match q.next_async().await {
                        Some(x) => c.push(*x),
                        None => break,
                    }
                }
            });
            c.extend((&mut q).map(|i| *i));
            c
        })
    };

    let th1 = spt(q1, vec![1, 3]);
    let th2 = spt(q2, vec![2, 4]);
    assert_eq!(th1.join().unwrap(), [2, 4]);
    assert_eq!(th2.join().unwrap(), [1, 3]);
}