coerce/actor/
supervised.rs

1//! Actor supervision and child spawning
2
3use std::collections::HashMap;
4
5use crate::actor::context::ActorContext;
6use crate::actor::message::{Handler, Message};
7use crate::actor::scheduler::{start_actor, ActorType};
8use crate::actor::system::ActorSystem;
9use crate::actor::{
10    Actor, ActorId, ActorPath, ActorRefErr, BoxedActorRef, CoreActorRef, LocalActorRef,
11};
12
13#[derive(Debug)]
14pub struct Supervised {
15    pub actor_id: ActorId,
16    pub path: ActorPath,
17    pub children: HashMap<ActorId, ChildRef>,
18}
19
20impl Supervised {
21    pub fn new(actor_id: ActorId, path: ActorPath) -> Supervised {
22        Self {
23            actor_id,
24            path,
25            children: HashMap::new(),
26        }
27    }
28}
29
30#[derive(Debug, Copy, Clone)]
31pub enum ChildType {
32    Spawned,
33    Attached,
34}
35
36#[derive(Debug, Clone)]
37pub struct ChildRef {
38    child_type: ChildType,
39    actor_ref: BoxedActorRef,
40}
41
42pub struct Terminated(pub ActorId);
43
44impl Message for Terminated {
45    type Result = ();
46}
47
48#[async_trait]
49impl<A: Actor> Handler<Terminated> for A {
50    async fn handle(&mut self, message: Terminated, ctx: &mut ActorContext) {
51        if let Some(supervised) = ctx.supervised_mut() {
52            supervised.on_child_stopped(&message.0).await;
53        }
54
55        self.on_child_stopped(&message.0, ctx).await;
56    }
57}
58
59impl Supervised {
60    pub async fn spawn<A: Actor>(
61        &mut self,
62        id: ActorId,
63        actor: A,
64        system: ActorSystem,
65        parent_ref: BoxedActorRef,
66    ) -> Result<LocalActorRef<A>, ActorRefErr> {
67        if let Some(_) = self.children.get(&id) {
68            return Err(ActorRefErr::AlreadyExists(id));
69        }
70
71        let (tx, rx) = tokio::sync::oneshot::channel();
72        let actor_ref = start_actor(
73            actor,
74            id.clone(),
75            ActorType::Anonymous,
76            Some(tx),
77            Some(system),
78            Some(parent_ref),
79            self.path.clone(),
80        );
81
82        self.children
83            .insert(id.clone(), ChildRef::spawned(actor_ref.clone().into()));
84
85        match rx.await {
86            Ok(_) => Ok(actor_ref),
87            Err(e) => {
88                error!("error spawning supervised actor (id={}) {}", &id, e);
89                Err(ActorRefErr::ActorStartFailed)
90            }
91        }
92    }
93
94    pub fn spawn_deferred<A: Actor>(
95        &mut self,
96        id: ActorId,
97        actor: A,
98        system: ActorSystem,
99        parent_ref: BoxedActorRef,
100    ) -> Result<LocalActorRef<A>, ActorRefErr> {
101        if let Some(_) = self.children.get(&id) {
102            return Err(ActorRefErr::AlreadyExists(id));
103        }
104
105        let actor_ref = start_actor(
106            actor,
107            id.clone(),
108            ActorType::Anonymous,
109            None,
110            Some(system),
111            Some(parent_ref),
112            self.path.clone(),
113        );
114
115        self.children
116            .insert(id.clone(), ChildRef::spawned(actor_ref.clone().into()));
117
118        Ok(actor_ref)
119    }
120
121    pub fn count(&self) -> usize {
122        self.children.len()
123    }
124
125    pub fn child<A: Actor>(&self, id: &ActorId) -> Option<LocalActorRef<A>> {
126        self.children.get(id).and_then(|a| a.actor_ref().as_actor())
127    }
128
129    pub fn child_boxed(&self, id: &ActorId) -> Option<BoxedActorRef> {
130        self.children.get(id).map(|a| a.actor_ref.clone())
131    }
132
133    pub fn attach_child_ref(&mut self, boxed_ref: BoxedActorRef) {
134        self.children
135            .insert(boxed_ref.actor_id().clone(), ChildRef::attached(boxed_ref));
136    }
137
138    pub fn add_child_ref(&mut self, boxed_ref: BoxedActorRef) -> Option<ChildRef> {
139        self.children
140            .insert(boxed_ref.actor_id().clone(), ChildRef::spawned(boxed_ref))
141    }
142
143    pub async fn stop_all(&mut self) {
144        let n = self.children.len();
145        let stop_results = futures::future::join_all(
146            self.children
147                .iter()
148                .map(|(id, actor)| async move { (id.clone(), actor.actor_ref.stop().await) }),
149        )
150        .await;
151
152        for (actor_id, stop_result) in stop_results {
153            match stop_result {
154                Ok(_) => {
155                    trace!("actor stopped ({})", actor_id);
156                    self.children.remove(&actor_id);
157                }
158                Err(e) => match e {
159                    ActorRefErr::InvalidRef => {
160                        warn!("invalid ref, actor_id={} already stopped", &actor_id);
161                    }
162                    e => {
163                        warn!("failed to stop child actor_id={}, err={}", actor_id, e);
164                    }
165                },
166            }
167        }
168
169        trace!("{} stopped {} child actors", &self.actor_id, n);
170    }
171
172    pub async fn on_child_stopped(&mut self, id: &ActorId) {
173        if let Some(_) = self.children.remove(id) {
174            trace!("child actor (id={}) stopped", id);
175        } else {
176            trace!("unknown child actor (id={}) stopped", id);
177        }
178    }
179}
180
181impl ChildRef {
182    pub fn actor_ref(&self) -> &BoxedActorRef {
183        &self.actor_ref
184    }
185
186    pub fn is_attached(&self) -> bool {
187        matches!(&self.child_type, ChildType::Attached)
188    }
189
190    pub fn spawned(actor_ref: BoxedActorRef) -> Self {
191        ChildRef {
192            child_type: ChildType::Spawned,
193            actor_ref,
194        }
195    }
196
197    pub fn attached(actor_ref: BoxedActorRef) -> Self {
198        ChildRef {
199            child_type: ChildType::Attached,
200            actor_ref,
201        }
202    }
203}