zestors 0.1.2

A fast and flexible actor-framework for building fault-tolerant Rust applications
Documentation
use futures::{future::pending, StreamExt};
use std::{collections::HashSet, time::Duration};
use zestors::{
    actor_reference::{ActorRefExt, Transformable},
    actor_type::Inbox,
    messaging::RecvError,
    protocol,
    spawning::{spawn, spawn_many, spawn_many_with, spawn_with, BackPressure, Capacity, Link},
};

#[tokio::test]
async fn spawn_and_abort() {
    let (mut child, _address) = spawn(|_: Inbox<()>| async move {
        let () = pending().await;
    });
    child.abort();
    assert!(child.await.unwrap_err().is_abort());
}

#[tokio::test]
async fn spawn_await_address() {
    let (mut child, address) = spawn(|_: Inbox<()>| async move {
        let () = pending().await;
    });
    child.abort();
    address.await;
}

#[tokio::test]
async fn spawn_and_panic() {
    let (child, _address) = spawn(|_: Inbox<()>| async move { panic!() });
    assert!(child.await.unwrap_err().is_panic());
}

#[tokio::test]
async fn spawn_and_normal_exit() {
    let (child, _address) = spawn(|_: Inbox<()>| async move {});
    assert!(child.await.is_ok());
}

#[tokio::test]
async fn spawn_and_halt() {
    let (child, _address) = spawn(|mut inbox: Inbox<()>| async move {
        assert_eq!(inbox.recv().await.unwrap_err(), RecvError::Halted);
    });
    child.halt();
    assert!(child.await.is_ok());
}

#[tokio::test]
async fn spawn_and_drop() {
    let (child, address) = spawn_with(
        Link::Attached(Duration::from_millis(10).into()),
        Capacity::Bounded(10),
        |mut inbox: Inbox<()>| async move {
            assert_eq!(inbox.recv().await.unwrap_err(), RecvError::Halted);
            let () = pending().await;
        },
    );
    drop(child);
    address.await;
}

#[tokio::test]
async fn spawn_and_drop_detached() {
    let (child, address) = spawn_with(
        Link::Detached,
        Capacity::BackPressure(BackPressure::default()),
        |mut inbox: Inbox<()>| async move {
            assert_eq!(inbox.recv().await.unwrap(), ());
        },
    );
    drop(child);
    tokio::time::sleep(Duration::from_millis(10)).await;
    address.send(()).await.unwrap();
    address.await;
}

#[tokio::test]
async fn base_counts() {
    let (mut child, address) = spawn(|inbox: Inbox<()>| async move {
        pending::<()>().await;
        drop(inbox);
    });
    assert_eq!(child.address_count(), 1);
    assert_eq!(child.process_count(), 1);
    assert_eq!(address.msg_count(), 0);
    child.abort();
}

#[tokio::test]
async fn address_counts() {
    let (mut child, address) = spawn(|inbox: Inbox<()>| async move {
        pending::<()>().await;
        drop(inbox);
    });
    assert_eq!(child.address_count(), 1);
    let address2 = address.clone();
    assert_eq!(child.address_count(), 2);
    drop(address2);
    assert_eq!(child.address_count(), 1);
    child.abort();
}

#[tokio::test]
async fn inbox_counts() {
    let (pool, _address) = spawn_many(0..4, |_, mut inbox: Inbox<()>| async move {
        inbox.recv().await.unwrap_err();
    });
    let mut pool = pool.into_dyn();
    assert_eq!(pool.process_count(), 4);

    pool.halt_some(1);
    tokio::time::sleep(Duration::from_millis(10)).await;
    assert_eq!(pool.process_count(), 3);

    pool.try_spawn_onto(|mut inbox: Inbox<()>| async move {
        inbox.recv().await.unwrap_err();
    })
    .unwrap();
    assert_eq!(pool.process_count(), 4);

    pool.halt_some(2);
    tokio::time::sleep(Duration::from_millis(10)).await;
    assert_eq!(pool.process_count(), 2);

    pool.halt();
    tokio::time::sleep(Duration::from_millis(10)).await;
    assert_eq!(pool.process_count(), 0);
}

#[protocol]
enum U32Protocol {
    U32(u32),
}

#[tokio::test]
async fn pooled_messaging_split() {
    let (pool, address) = spawn_many_with(
        Link::default(),
        Capacity::Bounded(5),
        0..3,
        |_, mut inbox: Inbox<U32Protocol>| async move {
            let mut numbers = Vec::new();
            loop {
                match inbox.recv().await {
                    Ok(U32Protocol::U32(msg)) => {
                        tokio::time::sleep(Duration::from_millis(1)).await;
                        numbers.push(msg);
                    }
                    Err(signal) => match signal {
                        RecvError::Halted => break Ok(numbers),
                        RecvError::ClosedAndEmpty => break Err(()),
                    },
                }
            }
        },
    );

    for i in 0..30 {
        address.send(i).await.unwrap();
    }

    tokio::time::sleep(Duration::from_millis(20)).await;
    address.halt();

    let res = pool
        .map(|e| e.unwrap().unwrap())
        .fold(HashSet::new(), |mut acc, vals| async move {
            assert!(vals.len() >= 9);
            assert!(vals.len() <= 11);
            for val in vals {
                assert!(acc.insert(val));
            }
            acc
        })
        .await;
    assert_eq!(res.len(), 30)
}