xactor/supervisor.rs
1use crate::addr::ActorEvent;
2use crate::runtime::spawn;
3use crate::{Actor, Addr, Context};
4use anyhow::Result;
5use futures::StreamExt;
6
7/// Actor supervisor
8///
9/// Supervisor gives the actor the ability to restart after failure.
10/// When the actor fails, recreate a new actor instance and replace it.
11pub struct Supervisor;
12
13impl Supervisor {
14 /// Start a supervisor
15 ///
16 /// # Examples
17 ///
18 /// ```rust
19 /// use xactor::*;
20 /// use std::time::Duration;
21 ///
22 /// #[message]
23 /// struct Die;
24 ///
25 /// #[message]
26 /// struct Add;
27 ///
28 /// #[message(result = "i32")]
29 /// struct Get;
30 ///
31 /// struct MyActor(i32);
32 ///
33 /// impl Actor for MyActor {}
34 ///
35 /// #[async_trait::async_trait]
36 /// impl Handler<Add> for MyActor {
37 /// async fn handle(&mut self, ctx: &mut Context<Self>, _: Add) {
38 /// self.0 += 1;
39 /// }
40 /// }
41 ///
42 /// #[async_trait::async_trait]
43 /// impl Handler<Get> for MyActor {
44 /// async fn handle(&mut self, ctx: &mut Context<Self>, _: Get) -> i32 {
45 /// self.0
46 /// }
47 /// }
48 ///
49 /// #[async_trait::async_trait]
50 /// impl Handler<Die> for MyActor {
51 /// async fn handle(&mut self, ctx: &mut Context<Self>, _: Die) {
52 /// ctx.stop(None);
53 /// }
54 /// }
55 ///
56 /// #[xactor::main]
57 /// async fn main() -> Result<()> {
58 /// let mut addr = Supervisor::start(|| MyActor(0)).await?;
59 ///
60 /// addr.send(Add)?;
61 /// assert_eq!(addr.call(Get).await?, 1);
62 ///
63 /// addr.send(Add)?;
64 /// assert_eq!(addr.call(Get).await?, 2);
65 ///
66 /// addr.send(Die)?;
67 /// sleep(Duration::from_secs(1)).await; // Wait for actor restart
68 ///
69 /// assert_eq!(addr.call(Get).await?, 0);
70 /// Ok(())
71 /// }
72 /// ```
73 pub async fn start<A, F>(f: F) -> Result<Addr<A>>
74 where
75 A: Actor,
76 F: Fn() -> A + Send + 'static,
77 {
78 let (mut ctx, mut rx, tx) = Context::new(None);
79 let addr = Addr {
80 actor_id: ctx.actor_id(),
81 tx,
82 rx_exit: ctx.rx_exit.clone(),
83 };
84
85 // Create the actor
86 let mut actor = f();
87
88 // Call started
89 actor.started(&mut ctx).await?;
90
91 spawn({
92 async move {
93 'restart_loop: loop {
94 'event_loop: loop {
95 match rx.next().await {
96 None => break 'restart_loop,
97 Some(ActorEvent::Stop(_err)) => break 'event_loop,
98 Some(ActorEvent::Exec(f)) => f(&mut actor, &mut ctx).await,
99 Some(ActorEvent::RemoveStream(id)) => {
100 if ctx.streams.contains(id) {
101 ctx.streams.remove(id);
102 }
103 }
104 }
105 }
106
107 actor.stopped(&mut ctx).await;
108 ctx.abort_streams();
109 ctx.abort_intervals();
110
111 actor = f();
112 actor.started(&mut ctx).await.ok();
113 }
114 actor.stopped(&mut ctx).await;
115 ctx.abort_streams();
116 ctx.abort_intervals();
117 }
118 });
119
120 Ok(addr)
121 }
122}