hannibal 0.16.3

A small actor library
Documentation
#![allow(clippy::unwrap_used)]
use std::{
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
    time::Duration,
};

use crate::runtime::sleep;

use super::*;
use crate::prelude::*;

#[derive(Debug)]
struct IntervalActor {
    running: Arc<AtomicBool>,
}

impl Actor for IntervalActor {
    async fn stopped(&mut self, _: &mut Context<Self>) {
        self.running.store(false, Ordering::SeqCst);
    }
}

impl Handler<()> for IntervalActor {
    async fn handle(&mut self, _: &mut Context<Self>, _cmd: ()) {
        self.running.store(true, Ordering::SeqCst);
    }
}

#[tokio::test]
async fn stopped_when_actor_stopped() {
    let flag = Arc::new(AtomicBool::new(false));
    let addr = IntervalActor {
        running: Arc::clone(&flag),
    }
    .spawn();
    sleep(Duration::from_millis(300)).await;
    addr.halt().await.unwrap();
    sleep(Duration::from_millis(300)).await;
    assert!(
        !flag.load(Ordering::SeqCst),
        "Handler should not be called after actor is stopped"
    );
}

mod interval_order {
    use super::*;

    #[test_log::test(tokio::test)]
    async fn handlers_never_overlap() {
        use std::sync::Arc;
        use std::sync::atomic::{AtomicBool, AtomicU32};

        let handler_running = Arc::new(AtomicBool::new(false));
        let overlap_detected = Arc::new(AtomicBool::new(false));
        let handler_count = Arc::new(AtomicU32::new(0));

        struct NoOverlapActor {
            handler_running: Arc<AtomicBool>,
            overlap_detected: Arc<AtomicBool>,
            handler_count: Arc<AtomicU32>,
        }

        #[derive(Clone)]
        #[message]
        struct SlowMessage;

        impl Handler<SlowMessage> for NoOverlapActor {
            async fn handle(&mut self, _ctx: &mut Context<Self>, _: SlowMessage) {
                let was_running = self.handler_running.swap(true, Ordering::SeqCst);

                if was_running {
                    self.overlap_detected.store(true, Ordering::SeqCst);
                    panic!(
                        "Handler overlap detected! A handler started while another was still running."
                    );
                }

                self.handler_count.fetch_add(1, Ordering::SeqCst);

                sleep(Duration::from_millis(100)).await;

                self.handler_running.store(false, Ordering::SeqCst);
            }
        }

        impl Actor for NoOverlapActor {
            async fn started(&mut self, ctx: &mut Context<Self>) -> DynResult<()> {
                ctx.interval_with(|| SlowMessage, Duration::from_millis(20));
                Ok(())
            }
        }

        let addr = crate::setup_actor(NoOverlapActor {
            handler_running: Arc::clone(&handler_running),
            overlap_detected: Arc::clone(&overlap_detected),
            handler_count: Arc::clone(&handler_count),
        })
        .bounded(10)
        .spawn();

        sleep(Duration::from_millis(300)).await;
        addr.halt().await.unwrap();

        assert!(
            !overlap_detected.load(Ordering::SeqCst),
            "Handlers should never overlap"
        );

        let count = handler_count.load(Ordering::SeqCst);
        assert!(
            count >= 2,
            "At least 2 handlers should have executed (got {count})",
        );

        assert!(
            !handler_running.load(Ordering::SeqCst),
            "No handler should be running after halt"
        );
    }
}

mod interval_with {
    use super::*;
    use crate::{TaskHandle, actor::spawnable::Spawnable};

    #[derive(Debug)]
    struct IntervalWithActor {
        running: Arc<AtomicBool>,
        interval: Option<TaskHandle>,
    }

    impl Actor for IntervalWithActor {
        async fn started(&mut self, ctx: &mut Context<Self>) -> DynResult<()> {
            self.interval
                .replace(ctx.interval_with(|| (), Duration::from_millis(100)));
            Ok(())
        }
        async fn stopped(&mut self, _: &mut Context<Self>) {
            self.running.store(false, Ordering::SeqCst);
        }
    }

    impl Handler<()> for IntervalWithActor {
        async fn handle(&mut self, _: &mut Context<Self>, _: ()) {
            self.running.store(true, Ordering::SeqCst);
        }
    }

    #[tokio::test]
    async fn stopped_by_task_handle() {
        let running = Arc::new(AtomicBool::new(false));
        let addr = IntervalWithActor {
            running: Arc::clone(&running),
            interval: None,
        }
        .spawn();
        sleep(Duration::from_millis(300)).await;

        #[derive(hannibal_derive::Message)]
        struct StopInterval;
        impl Handler<StopInterval> for IntervalWithActor {
            async fn handle(
                &mut self,
                ctx: &mut Context<Self>,
                _msg: StopInterval,
            ) -> <StopInterval as Message>::Response {
                self.running.store(false, Ordering::SeqCst);
                if let Some(interval) = self.interval {
                    ctx.stop_task(interval)
                }
            }
        }

        addr.send(StopInterval).await.unwrap();
        sleep(Duration::from_millis(300)).await;
        assert!(
            !running.load(Ordering::SeqCst),
            "Handler should not be called after actor is stopped"
        );
    }

    #[tokio::test]
    async fn stopped_when_actor_stopped() {
        let running = Arc::new(AtomicBool::new(false));
        let addr = IntervalWithActor {
            running: Arc::clone(&running),
            interval: None,
        }
        .spawn();
        sleep(Duration::from_millis(300)).await;
        addr.halt().await.unwrap();
        sleep(Duration::from_millis(300)).await;
        assert!(
            !running.load(Ordering::SeqCst),
            "Handler should not be called after actor is stopped"
        );
    }
}

mod delayed_send {
    use super::*;

    #[derive(Debug)]
    struct DelayedSendActor {
        running: Arc<AtomicBool>,
    }

    impl Actor for DelayedSendActor {
        async fn started(&mut self, ctx: &mut Context<Self>) -> DynResult<()> {
            ctx.delayed_send(|| (), Duration::from_millis(100));
            Ok(())
        }
        async fn stopped(&mut self, _: &mut Context<Self>) {
            self.running.store(false, Ordering::SeqCst);
        }
    }

    impl Handler<()> for DelayedSendActor {
        async fn handle(&mut self, _: &mut Context<Self>, _: ()) {
            self.running.store(true, Ordering::SeqCst);
        }
    }

    #[tokio::test]
    async fn stopped_when_actor_stopped() {
        let running = Arc::new(AtomicBool::new(false));
        let addr = DelayedSendActor {
            running: Arc::clone(&running),
        }
        .spawn();
        sleep(Duration::from_millis(300)).await;
        addr.halt().await.unwrap();
        sleep(Duration::from_millis(300)).await;
        assert!(
            !running.load(Ordering::SeqCst),
            "Handler should not be called after actor is stopped"
        );
    }
}