Skip to main content

xactor/
supervisor.rs

1use crate::addr::ActorEvent;
2use crate::runtime::spawn;
3use crate::{Actor, Addr, Context};
4use anyhow::Result;
5use futures::StreamExt;
6
7/// Actor supervisor
8///
9/// Supervisor gives the actor the ability to restart after failure.
10/// When the actor fails, recreate a new actor instance and replace it.
11pub struct Supervisor;
12
13impl Supervisor {
14    /// Start a supervisor
15    ///
16    /// # Examples
17    ///
18    /// ```rust
19    /// use xactor::*;
20    /// use std::time::Duration;
21    ///
22    /// #[message]
23    /// struct Die;
24    ///
25    /// #[message]
26    /// struct Add;
27    ///
28    /// #[message(result = "i32")]
29    /// struct Get;
30    ///
31    /// struct MyActor(i32);
32    ///
33    /// impl Actor for MyActor {}
34    ///
35    /// #[async_trait::async_trait]
36    /// impl Handler<Add> for MyActor {
37    ///     async fn handle(&mut self, ctx: &mut Context<Self>, _: Add) {
38    ///         self.0 += 1;
39    ///     }
40    /// }
41    ///
42    /// #[async_trait::async_trait]
43    /// impl Handler<Get> for MyActor {
44    ///     async fn handle(&mut self, ctx: &mut Context<Self>, _: Get) -> i32 {
45    ///         self.0
46    ///     }
47    /// }
48    ///
49    /// #[async_trait::async_trait]
50    /// impl Handler<Die> for MyActor {
51    ///     async fn handle(&mut self, ctx: &mut Context<Self>, _: Die) {
52    ///         ctx.stop(None);
53    ///     }
54    /// }
55    ///
56    /// #[xactor::main]
57    /// async fn main() -> Result<()> {
58    ///     let mut addr = Supervisor::start(|| MyActor(0)).await?;
59    ///
60    ///     addr.send(Add)?;
61    ///     assert_eq!(addr.call(Get).await?, 1);
62    ///
63    ///     addr.send(Add)?;
64    ///     assert_eq!(addr.call(Get).await?, 2);
65    ///
66    ///     addr.send(Die)?;
67    ///     sleep(Duration::from_secs(1)).await; // Wait for actor restart
68    ///
69    ///     assert_eq!(addr.call(Get).await?, 0);
70    ///     Ok(())
71    /// }
72    /// ```
73    pub async fn start<A, F>(f: F) -> Result<Addr<A>>
74    where
75        A: Actor,
76        F: Fn() -> A + Send + 'static,
77    {
78        let (mut ctx, mut rx, tx) = Context::new(None);
79        let addr = Addr {
80            actor_id: ctx.actor_id(),
81            tx,
82            rx_exit: ctx.rx_exit.clone(),
83        };
84
85        // Create the actor
86        let mut actor = f();
87
88        // Call started
89        actor.started(&mut ctx).await?;
90
91        spawn({
92            async move {
93                'restart_loop: loop {
94                    'event_loop: loop {
95                        match rx.next().await {
96                            None => break 'restart_loop,
97                            Some(ActorEvent::Stop(_err)) => break 'event_loop,
98                            Some(ActorEvent::Exec(f)) => f(&mut actor, &mut ctx).await,
99                            Some(ActorEvent::RemoveStream(id)) => {
100                                if ctx.streams.contains(id) {
101                                    ctx.streams.remove(id);
102                                }
103                            }
104                        }
105                    }
106
107                    actor.stopped(&mut ctx).await;
108                    ctx.abort_streams();
109                    ctx.abort_intervals();
110
111                    actor = f();
112                    actor.started(&mut ctx).await.ok();
113                }
114                actor.stopped(&mut ctx).await;
115                ctx.abort_streams();
116                ctx.abort_intervals();
117            }
118        });
119
120        Ok(addr)
121    }
122}