1use crate::{
4 Actor, ActorPath, ActorRef, Error, Event, Handler,
5 actor::ChildErrorSender,
6 runner::{ActorRunner, StopHandle, StopSender},
7 sink::Sink,
8};
9
10use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
11use tokio_util::sync::CancellationToken;
12
13use tracing::{Instrument, Span, debug, error, warn};
14
15use std::{
16 any::Any,
17 collections::{HashMap, HashSet},
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22};
23
24#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum ShutdownReason {
27 Graceful,
29 Crash,
31}
32
33impl ShutdownReason {
34 pub const fn exit_code(&self) -> i32 {
36 match self {
37 Self::Graceful => 0,
38 Self::Crash => 1,
39 }
40 }
41}
42
43pub struct ActorSystem {}
45
46impl ActorSystem {
47 pub fn create(
52 graceful_token: CancellationToken,
53 crash_token: CancellationToken,
54 ) -> (SystemRef, SystemRunner) {
55 let (event_sender, event_receiver) = mpsc::channel(4);
56 let system = SystemRef::new(event_sender, graceful_token, crash_token);
57 let runner = SystemRunner::new(event_receiver);
58 (system, runner)
59 }
60}
61
62#[derive(Debug, Clone)]
64pub enum SystemEvent {
65 ActorError {
67 path: ActorPath,
69 error: Error,
71 },
72 StopSystem(ShutdownReason),
75}
76
77#[derive(Clone)]
82pub struct SystemRef {
83 actors:
86 Arc<RwLock<HashMap<ActorPath, Box<dyn Any + Send + Sync + 'static>>>>,
87 child_index: Arc<RwLock<HashMap<ActorPath, HashSet<ActorPath>>>>,
89
90 helpers: Arc<RwLock<HashMap<String, Box<dyn Any + Send + Sync + 'static>>>>,
93
94 root_senders: Arc<RwLock<HashMap<ActorPath, StopHandle>>>,
96 system_event_sender: broadcast::Sender<SystemEvent>,
98
99 graceful_token: CancellationToken,
101 crash_token: CancellationToken,
103 shutting_down: Arc<AtomicBool>,
105}
106
107impl SystemRef {
108 pub(crate) fn new(
109 event_sender: mpsc::Sender<SystemEvent>,
110 graceful_token: CancellationToken,
111 crash_token: CancellationToken,
112 ) -> Self {
113 let root_senders =
114 Arc::new(RwLock::new(HashMap::<ActorPath, StopHandle>::new()));
115 let child_index = Arc::new(RwLock::new(HashMap::new()));
116 let (system_event_sender, _) = broadcast::channel::<SystemEvent>(256);
117 let shutting_down = Arc::new(AtomicBool::new(false));
118 let root_sender_clone = root_senders.clone();
119 let system_event_sender_clone = system_event_sender.clone();
120 let shutting_down_clone = shutting_down.clone();
121 let graceful_clone = graceful_token.clone();
122 let crash_clone = crash_token.clone();
123
124 tokio::spawn(async move {
125 let reason = tokio::select! {
126 _ = graceful_clone.cancelled() => ShutdownReason::Graceful,
127 _ = crash_clone.cancelled() => ShutdownReason::Crash,
128 };
129 shutting_down_clone.store(true, Ordering::SeqCst);
130 debug!(reason = ?reason, "Stopping actor system");
131 let root_senders = {
132 let mut root_senders = root_sender_clone.write().await;
133 std::mem::take(&mut *root_senders)
136 };
137
138 let mut receivers = Vec::with_capacity(root_senders.len());
140 for (path, handle) in root_senders {
141 let (stop_sender, stop_receiver) = oneshot::channel();
142 if handle.sender().send(Some(stop_sender)).await.is_ok() {
143 receivers.push((path, handle.timeout(), stop_receiver));
144 } else {
145 warn!(path = %path, "Failed to send stop signal to root actor");
146 }
147 }
148
149 for (path, timeout, receiver) in receivers {
151 if let Some(timeout) = timeout {
152 if tokio::time::timeout(timeout, receiver).await.is_err() {
153 warn!(
154 path = %path,
155 timeout_ms = timeout.as_millis(),
156 "Timed out waiting for root actor shutdown acknowledgement"
157 );
158 }
159 } else {
160 let _ = receiver.await;
161 }
162 }
163
164 if let Err(e) = event_sender
165 .send(SystemEvent::StopSystem(reason.clone()))
166 .await
167 {
168 error!(error = %e, "Failed to send StopSystem event");
169 }
170 let _ =
171 system_event_sender_clone.send(SystemEvent::StopSystem(reason));
172 });
173
174 Self {
175 actors: Arc::new(RwLock::new(HashMap::new())),
176 child_index,
177 helpers: Arc::new(RwLock::new(HashMap::new())),
178 graceful_token,
179 crash_token,
180 root_senders,
181 system_event_sender,
182 shutting_down,
183 }
184 }
185
186 fn is_shutting_down(&self) -> bool {
187 self.shutting_down.load(Ordering::SeqCst)
188 || self.graceful_token.is_cancelled()
189 || self.crash_token.is_cancelled()
190 }
191
192 pub fn subscribe_system_events(&self) -> broadcast::Receiver<SystemEvent> {
194 self.system_event_sender.subscribe()
195 }
196
197 pub(crate) fn publish_system_event(&self, event: SystemEvent) {
198 let _ = self.system_event_sender.send(event);
199 }
200
201 async fn index_actor(&self, path: &ActorPath) {
202 let parent = path.parent();
203 self.child_index
204 .write()
205 .await
206 .entry(parent)
207 .or_default()
208 .insert(path.clone());
209 }
210
211 async fn deindex_actor(&self, path: &ActorPath) {
212 let parent = path.parent();
213 let mut child_index = self.child_index.write().await;
214 if let Some(children) = child_index.get_mut(&parent) {
215 children.remove(path);
216 if children.is_empty() {
217 child_index.remove(&parent);
218 }
219 }
220 }
221
222 pub async fn get_actor<A>(
224 &self,
225 path: &ActorPath,
226 ) -> Result<ActorRef<A>, Error>
227 where
228 A: Actor + Handler<A>,
229 {
230 let actors = self.actors.read().await;
231 actors
232 .get(path)
233 .and_then(|any| any.downcast_ref::<ActorRef<A>>().cloned())
234 .ok_or_else(|| Error::NotFound { path: path.clone() })
235 }
236
237 pub(crate) async fn create_actor_path<A>(
238 &self,
239 path: ActorPath,
240 actor: A,
241 parent_error_sender: Option<ChildErrorSender>,
242 span: Span,
243 ) -> Result<(ActorRef<A>, StopSender), Error>
244 where
245 A: Actor + Handler<A>,
246 {
247 if self.is_shutting_down() {
248 debug!(path = %path, "Rejecting actor creation during shutdown");
249 return Err(Error::SystemStopped);
250 }
251
252 let system = self.clone();
254 let is_root = parent_error_sender.is_none();
255 let (mut runner, actor_ref, stop_sender) =
256 ActorRunner::create(path.clone(), actor, parent_error_sender);
257
258 {
261 let mut actors = self.actors.write().await;
262 if actors.contains_key(&path) {
263 debug!(path = %path, "Actor already exists");
264 return Err(Error::Exists { path });
265 }
266 actors.insert(path.clone(), Box::new(actor_ref.clone()));
267 }
268 self.index_actor(&path).await;
269
270 if is_root {
271 let mut root_senders = self.root_senders.write().await;
272 if self.is_shutting_down() {
273 drop(root_senders);
274 self.remove_actor(&path).await;
275 debug!(path = %path, "Rejecting root actor creation after shutdown started");
276 return Err(Error::SystemStopped);
277 }
278 root_senders.insert(
279 path.clone(),
280 StopHandle::new(stop_sender.clone(), A::stop_timeout()),
281 );
282 }
283
284 let (sender, receiver) = oneshot::channel::<bool>();
285
286 let stop_sender_clone = stop_sender.clone();
287 let span_clone = span.clone();
288 let init_handle = tokio::spawn(
289 async move {
290 runner
291 .init(system, stop_sender_clone, Some(sender), span_clone)
292 .await;
293 }
294 .instrument(span),
295 );
296
297 let startup_result = match A::startup_timeout() {
298 Some(timeout) => tokio::time::timeout(timeout, receiver)
299 .await
300 .map_err(|_| timeout),
301 None => Ok(receiver.await),
302 };
303
304 match startup_result {
305 Ok(Ok(true)) => {
306 debug!(path = %path, "Actor initialized successfully");
307 Ok((actor_ref, stop_sender))
308 }
309 Ok(Ok(false)) => {
310 error!(path = %path, "Actor runner failed to initialize");
311 self.remove_actor(&path).await;
312 if is_root {
313 self.root_senders.write().await.remove(&path);
314 }
315 Err(Error::FunctionalCritical {
316 description: format!("Runner can not init {}", path),
317 })
318 }
319 Ok(Err(e)) => {
320 error!(path = %path, error = %e, "Failed to receive initialization signal");
321 self.remove_actor(&path).await;
322 if is_root {
323 self.root_senders.write().await.remove(&path);
324 }
325 Err(Error::FunctionalCritical {
326 description: e.to_string(),
327 })
328 }
329 Err(timeout) => {
330 init_handle.abort();
331 self.remove_actor(&path).await;
332 if is_root {
333 self.root_senders.write().await.remove(&path);
334 }
335 Err(Error::Timeout {
336 ms: timeout.as_millis(),
337 })
338 }
339 }
340 }
341
342 pub async fn create_root_actor<A, I>(
351 &self,
352 name: &str,
353 actor_init: I,
354 ) -> Result<ActorRef<A>, Error>
355 where
356 A: Actor + Handler<A>,
357 I: crate::IntoActor<A>,
358 {
359 let actor = actor_init.into_actor();
360 let path = ActorPath::from("/user") / name;
361 let id = &path.key();
362
363 let (actor_ref, ..) = self
364 .create_actor_path::<A>(
365 path.clone(),
366 actor,
367 None,
368 A::get_span(id, None),
369 )
370 .await?;
371
372 let root_senders = self.root_senders.clone();
375 let watch = actor_ref.clone();
376 let watch_path = path.clone();
377 tokio::spawn(async move {
378 watch.closed().await;
379 root_senders.write().await.remove(&watch_path);
380 });
381
382 Ok(actor_ref)
383 }
384
385 pub(crate) async fn remove_actor(&self, path: &ActorPath) {
386 let mut actors = self.actors.write().await;
387 let removed = actors.remove(path).is_some();
388 drop(actors);
389 if removed {
390 self.deindex_actor(path).await;
391 }
392 }
393
394 pub fn stop_system(&self) {
396 self.shutting_down.store(true, Ordering::SeqCst);
397 self.graceful_token.cancel();
398 }
399
400 pub fn crash_system(&self) {
402 self.shutting_down.store(true, Ordering::SeqCst);
403 self.crash_token.cancel();
404 }
405
406 pub async fn children(&self, path: &ActorPath) -> Vec<ActorPath> {
408 self.child_index
409 .read()
410 .await
411 .get(path)
412 .into_iter()
413 .flat_map(|children| children.iter())
414 .cloned()
415 .collect()
416 }
417
418 pub async fn add_helper<H>(&self, name: &str, helper: H)
420 where
421 H: Any + Send + Sync + Clone + 'static,
422 {
423 let mut helpers = self.helpers.write().await;
424 helpers.insert(name.to_owned(), Box::new(helper));
425 }
426
427 pub async fn get_helper<H>(&self, name: &str) -> Option<H>
429 where
430 H: Any + Send + Sync + Clone + 'static,
431 {
432 let helpers = self.helpers.read().await;
433 helpers
434 .get(name)
435 .and_then(|any| any.downcast_ref::<H>())
436 .cloned()
437 }
438
439 pub async fn run_sink<E>(&self, mut sink: Sink<E>)
441 where
442 E: Event,
443 {
444 tokio::spawn(async move {
445 sink.run().await;
446 });
447 }
448}
449
450pub struct SystemRunner {
452 event_receiver: mpsc::Receiver<SystemEvent>,
454}
455
456impl SystemRunner {
457 pub(crate) const fn new(
458 event_receiver: mpsc::Receiver<SystemEvent>,
459 ) -> Self {
460 Self { event_receiver }
461 }
462
463 pub async fn run(&mut self) -> ShutdownReason {
465 debug!("Running actor system");
466 loop {
467 match self.event_receiver.recv().await {
468 Some(SystemEvent::StopSystem(reason)) => {
469 debug!(reason = ?reason, "Actor system stopped");
470 return reason;
471 }
472 Some(SystemEvent::ActorError { path, error }) => {
473 warn!(path = %path, error = %error, "Ignoring observable ActorError on control channel");
474 }
475 None => {
476 warn!("System event channel closed unexpectedly");
477 return ShutdownReason::Graceful;
478 }
479 }
480 }
481 }
482}
483
484#[cfg(test)]
485mod tests {
486
487 use super::*;
488 use test_log::test;
489
490 #[test(tokio::test)]
491 async fn test_helpers() {
492 let (system, _) = ActorSystem::create(
493 CancellationToken::new(),
494 CancellationToken::new(),
495 );
496 let helper = TestHelper { value: 42 };
497 system.add_helper("test", helper).await;
498 let helper: Option<TestHelper> = system.get_helper("test").await;
499 assert_eq!(helper, Some(TestHelper { value: 42 }));
500 }
501
502 #[derive(Debug, Clone, PartialEq)]
503 pub struct TestHelper {
504 pub value: i32,
505 }
506}