elfo 0.1.40

Your next actor system
Documentation
#![cfg(feature = "test-util")]

use std::convert::TryFrom;

use elfo::{config::AnyConfig, prelude::*, scope, stream, trace_id::TraceId};

#[message]
#[derive(PartialEq)]
struct SomeMessage(u32);

#[message]
struct Set(Vec<u32>);

#[message]
struct Replace(Vec<u32>);

#[tokio::test]
async fn it_handles_basic_operations() {
    let group = ActorGroup::new().exec(|ctx| async move {
        let stream = stream::Stream::new(futures::stream::iter(vec![SomeMessage(0)]));

        let mut ctx = ctx.with(&stream);
        let mut prev_trace_id = scope::trace_id();

        while let Some(envelope) = ctx.recv().await {
            assert_ne!(scope::trace_id(), prev_trace_id);
            prev_trace_id = scope::trace_id();

            msg!(match envelope {
                m @ SomeMessage(_) => ctx.send(m).await.unwrap(),
                Set(d) => stream.set(futures::stream::iter(
                    d.into_iter().map(SomeMessage).collect::<Vec<_>>()
                )),
                Replace(d) => {
                    let _ = stream.replace(futures::stream::iter(
                        d.into_iter().map(SomeMessage).collect::<Vec<_>>(),
                    ));
                }
            })
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    assert_msg_eq!(proxy.recv().await, SomeMessage(0));
    assert!(proxy.try_recv().is_none());

    proxy.send(Set((1..5).collect())).await;
    for i in 1..5 {
        assert_msg_eq!(proxy.recv().await, SomeMessage(i));
    }
    assert!(proxy.try_recv().is_none());

    proxy.send(Replace((6..8).collect())).await;
    for i in 6..8 {
        assert_msg_eq!(proxy.recv().await, SomeMessage(i));
    }
}

#[tokio::test]
async fn it_restores_trace_id() {
    let group = ActorGroup::new().exec(|ctx| async move {
        let stream = stream::Stream::new(futures::stream::iter(vec![
            (TraceId::try_from(5).unwrap(), SomeMessage(5)),
            (TraceId::try_from(6).unwrap(), SomeMessage(6)),
        ]));

        let mut ctx = ctx.with(&stream);

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                SomeMessage(x) => {
                    assert_eq!(u64::from(scope::trace_id()), u64::from(x));
                    ctx.send(SomeMessage(x)).await.unwrap()
                }
            })
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    assert_msg_eq!(proxy.recv().await, SomeMessage(5));
    assert_msg_eq!(proxy.recv().await, SomeMessage(6));
}

#[message]
#[derive(PartialEq)]
struct SomeMessage2(u32);

#[tokio::test]
async fn it_generates_stream() {
    let group = ActorGroup::new().exec(|ctx| async move {
        let stream = stream::Stream::generate(|mut y| async move {
            y.emit(SomeMessage(1)).await;
            y.emit(SomeMessage2(2)).await;
        });

        let mut ctx = ctx.with(&stream);

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                SomeMessage(x) => {
                    ctx.send(SomeMessage(x)).await.unwrap()
                }
                SomeMessage2(x) => {
                    ctx.send(SomeMessage2(x)).await.unwrap()
                }
            })
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    assert_msg_eq!(proxy.recv().await, SomeMessage(1));
    assert_msg_eq!(proxy.recv().await, SomeMessage2(2));
}