acto 0.8.0

light-weight Actor library for Rust
Documentation
#![cfg(feature = "tokio")]

use crate::{tokio::AcTokio, ActoCell, ActoHandle, ActoInput, ActoRuntime, SupervisionRef};
use std::{pin::Pin, sync::Arc, time::Duration};
use tokio::{sync::oneshot, time::timeout};
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};

macro_rules! assert_timed {
    ($cond:expr $(,$($arg:tt)+)?) => {
        for _ in 0..300 {
            std::thread::sleep(Duration::from_millis(10));
            if $cond { break; }
        }
        assert!($cond $(,$($arg)*)?);
    };
}

#[test]
fn supervisor_termination() {
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE)
        .with_test_writer()
        .init();
    let sys = AcTokio::new("test", 2).unwrap();

    let probe = Arc::new(());

    let (tx, rx) = oneshot::channel();
    let probe2 = probe.clone();
    let SupervisionRef { me: r, handle: j } =
        sys.spawn_actor::<_, _, _, Arc<()>>("super", move |mut cell| async move {
            let probe3 = probe2.clone();
            let _r = cell.spawn_supervised("c1", move |mut cell: ActoCell<i32, _>| async move {
                cell.recv().await;
                probe3
            });
            tracing::debug!("message");
            let probe3 = probe2.clone();
            let _r = cell.spawn_supervised("c2", move |mut cell: ActoCell<i32, _>| async move {
                cell.recv().await;
                probe3
            });
            println!("x");
            tx.send(()).ok();
            cell.recv().await
        });
    assert_eq!(&r.name()[..11], "super(test/");

    sys.with_rt(|rt| rt.block_on(async { timeout(Duration::from_secs(1), rx).await }))
        .expect("runtime should be there")
        .expect("timed out")
        .expect("channel gone");
    assert_eq!(Arc::strong_count(&probe), 4);

    r.send(());
    let msg = sys
        .with_rt(|rt| rt.block_on(async { timeout(Duration::from_secs(1), j.join()).await }))
        .expect("runtime should be there")
        .expect("timed out")
        .expect("panicked");
    assert_eq!(msg, ActoInput::Message(()));
    assert_timed!(
        Arc::strong_count(&probe) == 1,
        " was {}",
        Arc::strong_count(&probe)
    );
}

#[test]
fn termination_info() {
    let sys = AcTokio::new("test", 2).unwrap();
    let SupervisionRef {
        me: r,
        handle: mut j,
    } = sys.spawn_actor("buh", |mut cell: ActoCell<(), _>| async move {
        loop {
            cell.recv().await;
        }
    });
    assert!(!r.is_gone());
    assert!(!j.is_finished());
    Pin::new(&mut j).abort_pinned();
    assert_timed!(j.is_finished());
    assert!(r.is_gone());
}