async-atomic 0.2.0

Atomics which can be subscribed to and asynchronously notify when updated
Documentation
extern crate std;

use crate::{prelude::*, AsyncAtomic};
use async_std::{
    future::timeout,
    task::{sleep, spawn},
    test as async_test,
};
use core::sync::atomic::AtomicUsize;
use futures::stream::StreamExt;
use std::{sync::Arc, time::Duration, vec::Vec};

const SMALL_TIMEOUT: Duration = Duration::from_millis(10);
const BIG_TIMEOUT: Duration = Duration::from_millis(1000);

#[async_test]
async fn waiting() {
    let sub = Arc::new(AsyncAtomic::<usize>::new(0));
    let val = sub.clone();

    assert!(timeout(SMALL_TIMEOUT, sub.wait(|x| x > 0)).await.is_err());

    spawn(async move {
        sleep(SMALL_TIMEOUT).await;
        assert_eq!(val.fetch_add(1), 0);
    });

    let mut v = None;
    timeout(
        BIG_TIMEOUT,
        sub.wait(|x| {
            if x > 0 {
                v = Some(x);
                true
            } else {
                false
            }
        }),
    )
    .await
    .unwrap();
    assert_eq!(v, Some(1));
}

#[async_test]
async fn concurrent_increment() {
    const COUNT: usize = 256;
    let sub = Arc::new(AsyncAtomic::<usize>::new(0));
    let val = sub.clone();

    for _ in 0..COUNT {
        let val = val.clone();
        spawn(async move {
            sleep(SMALL_TIMEOUT).await;
            val.fetch_add(1);
        });
    }

    timeout(BIG_TIMEOUT, sub.wait(|x| x == COUNT))
        .await
        .unwrap();
}

#[async_test]
async fn ping_pong() {
    const PROD_VAL: usize = 29;
    const CONS_VAL: usize = 17;

    let sub = Arc::new(AsyncAtomic::<usize>::new(0));
    let val = sub.clone();

    spawn({
        let val = val.clone();
        async move {
            for _ in 0..CONS_VAL {
                sleep(SMALL_TIMEOUT).await;
                val.fetch_add(PROD_VAL);
            }
        }
    });

    for _ in 0..PROD_VAL {
        sub.wait_and_update(|x| {
            if x >= CONS_VAL {
                Some(x - CONS_VAL)
            } else {
                None
            }
        })
        .await;
    }

    assert_eq!(val.load(), 0);
}

#[async_test]
async fn static_() {
    static ATOMIC: AsyncAtomic<usize> = AsyncAtomic::from_impl(AtomicUsize::new(0));

    let sub = &ATOMIC;

    assert!(timeout(SMALL_TIMEOUT, sub.wait(|x| x > 0)).await.is_err());

    spawn(async move {
        sleep(SMALL_TIMEOUT).await;
        assert_eq!(ATOMIC.fetch_add(1), 0);
    });

    let mut v = None;
    timeout(
        BIG_TIMEOUT,
        sub.wait(|x| {
            if x > 0 {
                v = Some(x);
                true
            } else {
                false
            }
        }),
    )
    .await
    .unwrap();
    assert_eq!(v, Some(1));
}

#[async_test]
async fn stream() {
    const COUNT: usize = 64;
    let sub = Arc::new(AsyncAtomic::<usize>::new(0));
    let val = sub.clone();

    spawn(async move {
        for _ in 0..COUNT {
            sleep(SMALL_TIMEOUT).await;
            val.fetch_add(1);
        }
    });

    spawn(async move {
        let stream = sub.changed();
        let data = timeout(BIG_TIMEOUT, stream.take(COUNT + 1).collect::<Vec<_>>())
            .await
            .unwrap();
        assert!(data.into_iter().eq(0..=COUNT));
    })
    .await
}