elfo 0.2.0-alpha.21

An asynchronous distributed actor framework with robust observability
Documentation
#![allow(missing_docs)]
#![cfg(feature = "test-util")]

use std::{collections::HashMap, sync::Arc, time::Duration};

use derive_more::Constructor;
use elfo::{config::AnyConfig, messages, prelude::*, scope, stream::Stream, tracing::TraceId};
use futures::StreamExt;
use parking_lot::Mutex;
use tokio::time;

#[tokio::test(start_paused = true)]
async fn once() {
    #[message]
    #[derive(Constructor)]
    struct Start {
        no: u64,
        override_trace_id: bool,
    }

    #[message]
    #[derive(PartialEq, Eq)]
    struct Finished(u64);

    let group = ActorGroup::new().exec(|mut ctx| async move {
        let mut trace_ids = HashMap::new();

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                msg @ Start => {
                    let override_trace_id = msg.override_trace_id.then(TraceId::generate);
                    let no = msg.no;

                    let handle = ctx.attach(Stream::once(async move {
                        if let Some(trace_id) = override_trace_id {
                            scope::set_trace_id(trace_id);
                        }

                        time::sleep(Duration::from_millis(no)).await;
                        Finished(no)
                    }));

                    assert!(!handle.is_terminated());
                    let data = (handle, override_trace_id.unwrap_or_else(scope::trace_id));
                    trace_ids.insert(no, data);
                }
                msg @ Finished(no) => {
                    let (handle, expected_trace_id) = trace_ids.remove(&no).unwrap();
                    assert_eq!(scope::trace_id(), expected_trace_id);
                    ctx.send(msg).await.unwrap();
                    assert!(handle.is_terminated());
                }
            });
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    assert!(proxy.try_recv().await.is_none());

    proxy.send(Start::new(100, false)).await;
    proxy.send(Start::new(25, true)).await;
    proxy.send(Start::new(50, false)).await;

    assert_msg_eq!(proxy.recv().await, Finished(25));
    proxy.send(Start::new(5, false)).await;

    assert_msg_eq!(proxy.recv().await, Finished(5));
    assert_msg_eq!(proxy.recv().await, Finished(50));
    assert_msg_eq!(proxy.recv().await, Finished(100));
    assert!(proxy.try_recv().await.is_none());

    proxy.send(Start::new(10, true)).await;
    proxy.send(Start::new(15, false)).await;
    assert_msg_eq!(proxy.recv().await, Finished(10));
    assert_msg_eq!(proxy.recv().await, Finished(15));
}

#[tokio::test(start_paused = true)]
async fn from_futures03() {
    #[message]
    #[derive(Constructor)]
    struct Start {
        group: u64,
        count: u32,
    }

    #[message]
    #[derive(PartialEq, Eq, Constructor)]
    struct Produced {
        group: u64,
        no: u32,
    }

    let group = ActorGroup::new().exec(|mut ctx| async move {
        let trace_ids = Arc::new(Mutex::new(HashMap::new()));

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                msg @ Start => {
                    let trace_ids = trace_ids.clone();

                    let stream = futures::stream::iter(0..msg.count).then(move |no| {
                        let trace_ids = trace_ids.clone();
                        let group = msg.group;

                        async move {
                            let expected_trace_id = if no % 2 == 0 {
                                let trace_id = TraceId::generate();
                                scope::set_trace_id(trace_id);
                                trace_id
                            } else {
                                scope::trace_id()
                            };

                            trace_ids.lock().insert((group, no), expected_trace_id);

                            time::sleep(Duration::from_millis(group)).await;
                            Produced::new(group, no)
                        }
                    });

                    ctx.attach(Stream::from_futures03(stream));
                }
                msg @ Produced => {
                    let expected_trace_id = trace_ids.lock().remove(&(msg.group, msg.no)).unwrap();
                    assert_eq!(scope::trace_id(), expected_trace_id);
                    ctx.send(msg).await.unwrap();
                }
            });
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    assert!(proxy.try_recv().await.is_none());

    proxy.send(Start::new(11, 4)).await;
    proxy.send(Start::new(35, 3)).await;
    proxy.send(Start::new(49, 2)).await;

    assert_msg_eq!(proxy.recv().await, Produced::new(11, 0)); // 11
    assert_msg_eq!(proxy.recv().await, Produced::new(11, 1)); // 22
    assert_msg_eq!(proxy.recv().await, Produced::new(11, 2)); // 33
    assert_msg_eq!(proxy.recv().await, Produced::new(35, 0)); // 35
    assert_msg_eq!(proxy.recv().await, Produced::new(11, 3)); // 44
    assert_msg_eq!(proxy.recv().await, Produced::new(49, 0)); // 49
    assert_msg_eq!(proxy.recv().await, Produced::new(35, 1)); // 70
    assert_msg_eq!(proxy.recv().await, Produced::new(49, 1)); // 98
    assert_msg_eq!(proxy.recv().await, Produced::new(35, 2)); // 105
}

#[tokio::test(start_paused = true)]
async fn terminate() {
    #[message]
    struct Start(u64);

    #[message]
    struct Terminate(u64);

    #[message]
    #[derive(PartialEq, Eq)]
    struct Produced(u64);

    let group = ActorGroup::new().exec(|mut ctx| async move {
        let mut handles = HashMap::new();

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                Start(group) => {
                    let stream = futures::stream::unfold(0, move |no| async move {
                        time::sleep(Duration::from_millis(group)).await;
                        Some((Produced(group), no + 1))
                    });

                    let stream = ctx.attach(Stream::from_futures03(stream));
                    handles.insert(group, stream);
                }
                Terminate(group) => {
                    let handle = handles.remove(&group).unwrap();
                    assert!(!handle.is_terminated());
                    handle.terminate();
                }
                Produced(group) => {
                    ctx.send(Produced(group)).await.unwrap();
                }
            });
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    assert!(proxy.try_recv().await.is_none());

    proxy.send(Start(11)).await;
    proxy.send(Start(23)).await;
    proxy.send(Start(35)).await;

    assert_msg_eq!(proxy.recv().await, Produced(11)); // 11
    assert_msg_eq!(proxy.recv().await, Produced(11)); // 22
    assert_msg_eq!(proxy.recv().await, Produced(23)); // 23
    assert_msg_eq!(proxy.recv().await, Produced(11)); // 33
    proxy.send(Terminate(11)).await;
    assert_msg_eq!(proxy.recv().await, Produced(35)); // 35
    assert_msg_eq!(proxy.recv().await, Produced(23)); // 46
    proxy.send(Terminate(23)).await;
    assert_msg_eq!(proxy.recv().await, Produced(35)); // 70
    proxy.send(Terminate(35)).await;

    assert!(proxy.try_recv().await.is_none());
}

#[tokio::test(start_paused = true)]
async fn generate() {
    #[message]
    #[derive(PartialEq, Eq)]
    struct OneProduced(u32);

    #[message]
    #[derive(PartialEq, Eq)]
    struct TwoProduced(u32);

    #[message]
    #[derive(PartialEq, Eq)]
    struct Finished(u32);

    let group = ActorGroup::new().exec(|mut ctx| async move {
        let trace_ids = Arc::new(Mutex::new(HashMap::new()));
        let trace_ids_1 = trace_ids.clone();

        ctx.attach(Stream::generate(|mut emitter| async move {
            trace_ids.lock().insert(0, scope::trace_id());
            trace_ids.lock().insert(1, scope::trace_id());
            emitter.emit(OneProduced(0)).await;
            emitter.emit(OneProduced(1)).await;

            scope::set_trace_id(TraceId::generate());
            trace_ids.lock().insert(2, scope::trace_id());
            trace_ids.lock().insert(3, scope::trace_id());
            emitter.emit(TwoProduced(2)).await;
            emitter.emit(TwoProduced(3)).await;

            scope::set_trace_id(TraceId::generate());
            trace_ids.lock().insert(4, scope::trace_id());
            emitter.emit(Finished(4)).await;
        }));

        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                msg @ OneProduced(no) | msg @ TwoProduced(no) | msg @ Finished(no) => {
                    let expected_trace_id = trace_ids_1.lock().remove(&no).unwrap();
                    assert_eq!(scope::trace_id(), expected_trace_id);
                    ctx.send(msg).await.unwrap();
                }
            });
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;

    assert_msg_eq!(proxy.recv().await, OneProduced(0));
    assert_msg_eq!(proxy.recv().await, OneProduced(1));
    assert_msg_eq!(proxy.recv().await, TwoProduced(2));
    assert_msg_eq!(proxy.recv().await, TwoProduced(3));
    assert_msg_eq!(proxy.recv().await, Finished(4));
}

#[tokio::test(start_paused = true)]
async fn result() {
    #[message]
    #[derive(Constructor)]
    struct Start {
        no: u64,
        fail: bool,
    }

    #[message]
    #[derive(PartialEq, Eq)]
    struct Success(u64);

    #[message]
    #[derive(PartialEq, Eq)]
    struct Failure(u64);

    let group = ActorGroup::new().exec(|mut ctx| async move {
        while let Some(envelope) = ctx.recv().await {
            msg!(match envelope {
                msg @ Start => {
                    ctx.attach(Stream::once(async move {
                        time::sleep(Duration::from_millis(msg.no)).await;

                        if msg.fail {
                            Ok(Failure(msg.no))
                        } else {
                            Err(Success(msg.no))
                        }
                    }));
                }
                msg @ Success | msg @ Failure => {
                    ctx.send(msg).await.unwrap();
                }
            });
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    assert!(proxy.try_recv().await.is_none());

    proxy.send(Start::new(10, false)).await;
    proxy.send(Start::new(20, true)).await;

    assert_msg_eq!(proxy.recv().await, Success(10));
    assert_msg_eq!(proxy.recv().await, Failure(20));
}

// Prevents regression after [unicycle#30].
// [unicycle#30]: https://github.com/udoprog/unicycle/issues/30
#[tokio::test(start_paused = true)]
async fn drop_on_terminate() {
    struct Guard(Arc<Mutex<bool>>);

    impl Drop for Guard {
        fn drop(&mut self) {
            *self.0.lock() = true;
        }
    }

    let dropped = Arc::new(Mutex::new(false));
    let dropped_1 = dropped.clone();

    let group = ActorGroup::new().exec(move |mut ctx| {
        let dropped = dropped_1.clone();
        async move {
            ctx.attach(Stream::<messages::Impossible>::once(async move {
                let _guard = Guard(dropped);
                std::future::pending().await
            }));

            while ctx.recv().await.is_some() {}
        }
    });

    let mut proxy = elfo::test::proxy(group, AnyConfig::default()).await;
    proxy.sync().await;
    proxy.send(messages::Terminate::default()).await;
    proxy.finished().await;

    assert!(*dropped.lock());
}