1use std::collections::HashMap;
4
5use crate::actor::context::ActorContext;
6use crate::actor::message::{Handler, Message};
7use crate::actor::scheduler::{start_actor, ActorType};
8use crate::actor::system::ActorSystem;
9use crate::actor::{
10 Actor, ActorId, ActorPath, ActorRefErr, BoxedActorRef, CoreActorRef, LocalActorRef,
11};
12
13#[derive(Debug)]
14pub struct Supervised {
15 pub actor_id: ActorId,
16 pub path: ActorPath,
17 pub children: HashMap<ActorId, ChildRef>,
18}
19
20impl Supervised {
21 pub fn new(actor_id: ActorId, path: ActorPath) -> Supervised {
22 Self {
23 actor_id,
24 path,
25 children: HashMap::new(),
26 }
27 }
28}
29
30#[derive(Debug, Copy, Clone)]
31pub enum ChildType {
32 Spawned,
33 Attached,
34}
35
36#[derive(Debug, Clone)]
37pub struct ChildRef {
38 child_type: ChildType,
39 actor_ref: BoxedActorRef,
40}
41
42pub struct Terminated(pub ActorId);
43
44impl Message for Terminated {
45 type Result = ();
46}
47
48#[async_trait]
49impl<A: Actor> Handler<Terminated> for A {
50 async fn handle(&mut self, message: Terminated, ctx: &mut ActorContext) {
51 if let Some(supervised) = ctx.supervised_mut() {
52 supervised.on_child_stopped(&message.0).await;
53 }
54
55 self.on_child_stopped(&message.0, ctx).await;
56 }
57}
58
59impl Supervised {
60 pub async fn spawn<A: Actor>(
61 &mut self,
62 id: ActorId,
63 actor: A,
64 system: ActorSystem,
65 parent_ref: BoxedActorRef,
66 ) -> Result<LocalActorRef<A>, ActorRefErr> {
67 if let Some(_) = self.children.get(&id) {
68 return Err(ActorRefErr::AlreadyExists(id));
69 }
70
71 let (tx, rx) = tokio::sync::oneshot::channel();
72 let actor_ref = start_actor(
73 actor,
74 id.clone(),
75 ActorType::Anonymous,
76 Some(tx),
77 Some(system),
78 Some(parent_ref),
79 self.path.clone(),
80 );
81
82 self.children
83 .insert(id.clone(), ChildRef::spawned(actor_ref.clone().into()));
84
85 match rx.await {
86 Ok(_) => Ok(actor_ref),
87 Err(e) => {
88 error!("error spawning supervised actor (id={}) {}", &id, e);
89 Err(ActorRefErr::ActorStartFailed)
90 }
91 }
92 }
93
94 pub fn spawn_deferred<A: Actor>(
95 &mut self,
96 id: ActorId,
97 actor: A,
98 system: ActorSystem,
99 parent_ref: BoxedActorRef,
100 ) -> Result<LocalActorRef<A>, ActorRefErr> {
101 if let Some(_) = self.children.get(&id) {
102 return Err(ActorRefErr::AlreadyExists(id));
103 }
104
105 let actor_ref = start_actor(
106 actor,
107 id.clone(),
108 ActorType::Anonymous,
109 None,
110 Some(system),
111 Some(parent_ref),
112 self.path.clone(),
113 );
114
115 self.children
116 .insert(id.clone(), ChildRef::spawned(actor_ref.clone().into()));
117
118 Ok(actor_ref)
119 }
120
121 pub fn count(&self) -> usize {
122 self.children.len()
123 }
124
125 pub fn child<A: Actor>(&self, id: &ActorId) -> Option<LocalActorRef<A>> {
126 self.children.get(id).and_then(|a| a.actor_ref().as_actor())
127 }
128
129 pub fn child_boxed(&self, id: &ActorId) -> Option<BoxedActorRef> {
130 self.children.get(id).map(|a| a.actor_ref.clone())
131 }
132
133 pub fn attach_child_ref(&mut self, boxed_ref: BoxedActorRef) {
134 self.children
135 .insert(boxed_ref.actor_id().clone(), ChildRef::attached(boxed_ref));
136 }
137
138 pub fn add_child_ref(&mut self, boxed_ref: BoxedActorRef) -> Option<ChildRef> {
139 self.children
140 .insert(boxed_ref.actor_id().clone(), ChildRef::spawned(boxed_ref))
141 }
142
143 pub async fn stop_all(&mut self) {
144 let n = self.children.len();
145 let stop_results = futures::future::join_all(
146 self.children
147 .iter()
148 .map(|(id, actor)| async move { (id.clone(), actor.actor_ref.stop().await) }),
149 )
150 .await;
151
152 for (actor_id, stop_result) in stop_results {
153 match stop_result {
154 Ok(_) => {
155 trace!("actor stopped ({})", actor_id);
156 self.children.remove(&actor_id);
157 }
158 Err(e) => match e {
159 ActorRefErr::InvalidRef => {
160 warn!("invalid ref, actor_id={} already stopped", &actor_id);
161 }
162 e => {
163 warn!("failed to stop child actor_id={}, err={}", actor_id, e);
164 }
165 },
166 }
167 }
168
169 trace!("{} stopped {} child actors", &self.actor_id, n);
170 }
171
172 pub async fn on_child_stopped(&mut self, id: &ActorId) {
173 if let Some(_) = self.children.remove(id) {
174 trace!("child actor (id={}) stopped", id);
175 } else {
176 trace!("unknown child actor (id={}) stopped", id);
177 }
178 }
179}
180
181impl ChildRef {
182 pub fn actor_ref(&self) -> &BoxedActorRef {
183 &self.actor_ref
184 }
185
186 pub fn is_attached(&self) -> bool {
187 matches!(&self.child_type, ChildType::Attached)
188 }
189
190 pub fn spawned(actor_ref: BoxedActorRef) -> Self {
191 ChildRef {
192 child_type: ChildType::Spawned,
193 actor_ref,
194 }
195 }
196
197 pub fn attached(actor_ref: BoxedActorRef) -> Self {
198 ChildRef {
199 child_type: ChildType::Attached,
200 actor_ref,
201 }
202 }
203}