supervisor_clear_interval/
supervisor_clear_interval.rs

1use std::time::{Duration, Instant};
2use xactor::{message, Actor, Context, Handler};
3
4#[derive(Debug)]
5pub struct PingTimer {
6    last_ping: Instant,
7}
8
9impl Default for PingTimer {
10    fn default() -> Self {
11        PingTimer {
12            last_ping: Instant::now(),
13        }
14    }
15}
16
17#[async_trait::async_trait]
18impl Actor for PingTimer {
19    async fn started(&mut self, ctx: &mut Context<Self>) -> xactor::Result<()> {
20        println!("PingTimer:: started()");
21        ctx.send_interval(Ping, Duration::from_millis(1000));
22        Ok(())
23    }
24
25    /// Called after an actor is stopped.
26    async fn stopped(&mut self, _: &mut Context<Self>) {
27        println!("PingTimer:: stopped()");
28    }
29}
30
31#[message]
32#[derive(Clone)]
33struct Ping;
34
35#[async_trait::async_trait]
36impl Handler<Ping> for PingTimer {
37    async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Ping) {
38        let now = Instant::now();
39        let delta = (now - self.last_ping).as_millis();
40        self.last_ping = now;
41        println!("PingTimer:: Ping {} {:?}", ctx.actor_id(), delta);
42    }
43}
44#[message]
45struct Halt;
46
47#[async_trait::async_trait]
48impl Handler<Halt> for PingTimer {
49    async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Halt) {
50        println!("PingTimer:: received Halt");
51        ctx.stop(None);
52        println!("PingTimer:: stopped");
53    }
54}
55
56#[message]
57struct Panic;
58
59#[async_trait::async_trait]
60impl Handler<Panic> for PingTimer {
61    async fn handle(&mut self, _: &mut Context<Self>, _msg: Panic) {
62        println!("PingTimer:: received Panic");
63        panic!("intentional panic");
64    }
65}
66
67#[xactor::main]
68async fn main() -> Result<(), Box<dyn std::error::Error>> {
69    let service_supervisor = xactor::Supervisor::start(PingTimer::default).await?;
70    let service_addr = service_supervisor.clone();
71
72    let supervisor_task = xactor::spawn(async {
73        service_supervisor.wait_for_stop().await;
74    });
75
76    let send_halt = async {
77        xactor::sleep(Duration::from_millis(5_200)).await;
78        println!("  main  :: sending Halt");
79        service_addr.send(Halt).unwrap();
80    };
81
82    let _ = futures::join!(supervisor_task, send_halt);
83    // run this to see that the interval is not properly stopped if the ctx is stopped
84    // futures::join!(supervisor_task, send_panic); // there is no panic recovery
85
86    Ok(())
87}