elfo 0.2.0-alpha.21

An asynchronous distributed actor framework with robust observability
Documentation
#![allow(missing_docs)]
#![cfg(feature = "test-util")]

use std::time::Duration;

use elfo::{config::AnyConfig, prelude::*, scope, test::Proxy, time::Interval};
use tokio::time::{sleep, Instant};

fn ms(millis: u64) -> Duration {
    Duration::from_millis(millis)
}

#[tokio::test(start_paused = true)]
async fn multiple() {
    #[message]
    struct Start(Duration);

    #[message]
    #[derive(PartialEq, Eq)]
    struct Tick(Duration);

    let group = ActorGroup::new().exec(|mut ctx| async move {
        let mut trace_ids = Vec::new();

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                Start(duration) => {
                    trace_ids.push(scope::trace_id());
                    ctx.attach(Interval::new(Tick(duration))).start(duration);
                }
                msg @ Tick => {
                    assert!(!trace_ids.contains(&scope::trace_id()));
                    ctx.send(msg).await.unwrap();
                }
            });
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    assert!(proxy.try_recv().await.is_none());

    proxy.send(Start(ms(11))).await;
    proxy.send(Start(ms(35))).await;
    proxy.send(Start(ms(49))).await;

    assert_msg_eq!(proxy.recv().await, Tick(ms(11))); // 11
    assert_msg_eq!(proxy.recv().await, Tick(ms(11))); // 22
    assert_msg_eq!(proxy.recv().await, Tick(ms(11))); // 33
    assert_msg_eq!(proxy.recv().await, Tick(ms(35))); // 35
    assert_msg_eq!(proxy.recv().await, Tick(ms(11))); // 44
    assert_msg_eq!(proxy.recv().await, Tick(ms(49))); // 49
    assert_msg_eq!(proxy.recv().await, Tick(ms(11))); // 55
    assert_msg_eq!(proxy.recv().await, Tick(ms(11))); // 66
    assert_msg_eq!(proxy.recv().await, Tick(ms(35))); // 70
    assert_msg_eq!(proxy.recv().await, Tick(ms(11))); // 77
    assert_msg_eq!(proxy.recv().await, Tick(ms(11))); // 88
    assert_msg_eq!(proxy.recv().await, Tick(ms(49))); // 98
}

// Checks that ticks can be requests, responses aren't produced.
#[tokio::test(start_paused = true)]
async fn request() {
    #[message(ret = u32)]
    #[derive(PartialEq, Eq)]
    struct Tick(u32);

    let group = ActorGroup::new().exec(|mut ctx| async move {
        let interval = ctx.attach(Interval::new(Tick(0)));
        interval.start(ms(10));

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                (msg @ Tick(no), token) => {
                    ctx.respond(token, no); // does nothing
                    let next_no = ctx.request(msg).resolve().await.unwrap();
                    interval.set_message(Tick(next_no));
                }
            });
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;

    for i in 0..5 {
        msg!(match proxy.recv().await {
            (Tick(no), token) => {
                assert_eq!(no, i);
                proxy.respond(token, no + 1);
            }
            _ => unreachable!(),
        });
    }
}

#[message]
struct Start(Duration);

#[message]
struct StartAfter(Duration, Duration);

#[message]
struct Stop;

#[message]
struct SetPeriod(Duration);

#[message]
struct SetMessage(u32);

#[message]
#[derive(PartialEq, Eq)]
struct Tick(u32);

#[message]
struct Terminate;

fn sample() -> Blueprint {
    ActorGroup::new().exec(move |mut ctx| async move {
        let mut interval = Some(ctx.attach(Interval::new(Tick(0))));

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                Start(period) => {
                    interval.as_ref().unwrap().start(period);
                }
                StartAfter(delay, period) => {
                    interval.as_ref().unwrap().start_after(delay, period);
                }
                Stop => {
                    interval.as_ref().unwrap().stop();
                }
                SetPeriod(period) => {
                    interval.as_ref().unwrap().set_period(period)
                }
                SetMessage(no) => {
                    interval.as_ref().unwrap().set_message(Tick(no));
                }
                Terminate => {
                    interval.take().unwrap().terminate();
                }
                msg @ Tick => {
                    ctx.send(msg).await.unwrap();
                }
            });
        }
    })
}

struct Checker {
    proxy: Proxy,
    prev_time: Instant,
}

impl Checker {
    fn new(proxy: Proxy) -> Self {
        Self {
            proxy,
            prev_time: Instant::now(),
        }
    }

    async fn send<M: elfo::Message>(&self, message: M) {
        self.proxy.send(message).await;
    }

    async fn tick(&mut self, expected_elapsed: Duration, expected_no: u32) {
        assert_msg_eq!(self.proxy.recv().await, Tick(expected_no));

        let now = Instant::now();
        let elapsed = now - self.prev_time;
        assert_eq!(elapsed, expected_elapsed);

        self.prev_time = now;
    }
}

#[tokio::test(start_paused = true)]
async fn start_after() {
    let proxy = elfo::test::proxy(sample(), AnyConfig::default()).await;
    let mut checker = Checker::new(proxy);

    checker.send(StartAfter(ms(15), ms(10))).await;
    checker.tick(ms(15), 0).await;
    checker.tick(ms(10), 0).await;
    checker.tick(ms(10), 0).await;
}

#[tokio::test(start_paused = true)]
async fn restart() {
    let proxy = elfo::test::proxy(sample(), AnyConfig::default()).await;
    let mut checker = Checker::new(proxy);

    checker.send(Start(ms(10))).await;
    checker.tick(ms(10), 0).await;

    sleep(ms(5)).await;
    checker.send(Start(ms(13))).await;
    checker.tick(ms(18), 0).await;
    checker.tick(ms(13), 0).await;
}

#[tokio::test(start_paused = true)]
async fn stop_start() {
    let proxy = elfo::test::proxy(sample(), AnyConfig::default()).await;
    let mut checker = Checker::new(proxy);

    checker.send(Start(ms(10))).await;
    checker.tick(ms(10), 0).await;

    sleep(ms(5)).await;
    checker.send(Stop).await;

    sleep(ms(40)).await;
    checker.send(Start(ms(13))).await;
    checker.tick(ms(58), 0).await;
    checker.tick(ms(13), 0).await;
}

#[tokio::test(start_paused = true)]
async fn set_message() {
    let proxy = elfo::test::proxy(sample(), AnyConfig::default()).await;
    let mut checker = Checker::new(proxy);

    checker.send(Start(ms(10))).await;
    checker.tick(ms(10), 0).await;
    checker.send(SetMessage(1)).await;
    checker.tick(ms(10), 1).await;
    checker.send(SetMessage(2)).await;
    checker.tick(ms(10), 2).await;
}

#[tokio::test(start_paused = true)]
async fn set_period() {
    let proxy = elfo::test::proxy(sample(), AnyConfig::default()).await;
    let mut checker = Checker::new(proxy);

    checker.send(Start(ms(10))).await;
    checker.tick(ms(10), 0).await;

    sleep(ms(7)).await;
    checker.send(SetPeriod(ms(15))).await;
    checker.tick(ms(15), 0).await;

    sleep(ms(10)).await;
    checker.send(SetPeriod(ms(5))).await;
    checker.tick(ms(10), 0).await;
}

#[tokio::test(start_paused = true)]
async fn burst() {
    let proxy = elfo::test::proxy(sample(), AnyConfig::default()).await;
    let mut checker = Checker::new(proxy);

    checker.send(Start(ms(10))).await;
    sleep(ms(35)).await;

    checker.tick(ms(35), 0).await;
    checker.tick(ms(0), 0).await;
    checker.tick(ms(0), 0).await;
    checker.tick(ms(5), 0).await;
    checker.tick(ms(10), 0).await;
}

#[tokio::test(start_paused = true)]
async fn terminate() {
    let mut proxy = elfo::test::proxy(sample(), AnyConfig::default()).await;

    proxy.send(Start(ms(10))).await;
    assert_msg_eq!(proxy.recv().await, Tick(0));

    proxy.send(Terminate).await;
    proxy.sync().await;
    assert!(proxy.try_recv().await.is_none());
}