Skip to main content

ave_actors_actor/
system.rs

1//! Actor system: creates, manages, and shuts down actors.
2
3use crate::{
4    Actor, ActorPath, ActorRef, Error, Event, Handler,
5    actor::ChildErrorSender,
6    runner::{ActorRunner, StopHandle, StopSender},
7    sink::Sink,
8};
9
10use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
11use tokio_util::sync::CancellationToken;
12
13use tracing::{Instrument, Span, debug, error, warn};
14
15use std::{
16    any::Any,
17    collections::{HashMap, HashSet},
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22};
23
24/// The reason why the actor system stopped, returned by [`SystemRunner::run`].
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum ShutdownReason {
27    /// System stopped gracefully (e.g., SIGTERM); exit with code 0.
28    Graceful,
29    /// System stopped due to an unrecoverable actor failure; exit with a non-zero code.
30    Crash,
31}
32
33impl ShutdownReason {
34    /// Returns `0` for a graceful shutdown or `1` for a crash.
35    pub const fn exit_code(&self) -> i32 {
36        match self {
37            Self::Graceful => 0,
38            Self::Crash => 1,
39        }
40    }
41}
42
43/// Entry point for building an actor system instance.
44pub struct ActorSystem {}
45
46impl ActorSystem {
47    /// Creates the actor system and returns a `(SystemRef, SystemRunner)` pair.
48    ///
49    /// Cancel `graceful_token` for a graceful shutdown (exit code 0) or `crash_token` for a crash
50    /// shutdown (exit code 1); the reason is reflected in [`SystemRunner::run`]'s return value.
51    pub fn create(
52        graceful_token: CancellationToken,
53        crash_token: CancellationToken,
54    ) -> (SystemRef, SystemRunner) {
55        let (event_sender, event_receiver) = mpsc::channel(4);
56        let system = SystemRef::new(event_sender, graceful_token, crash_token);
57        let runner = SystemRunner::new(event_receiver);
58        (system, runner)
59    }
60}
61
62/// System-level events broadcast on the observable system event channel.
63#[derive(Debug, Clone)]
64pub enum SystemEvent {
65    /// Non-fatal error emitted by a root actor that has no parent to receive it.
66    ActorError {
67        /// Path of the actor that emitted the error.
68        path: ActorPath,
69        /// Error emitted by that actor.
70        error: Error,
71    },
72    /// Signals that the actor system should stop.
73    /// Carries the reason so the runner can report it to the caller.
74    StopSystem(ShutdownReason),
75}
76
77/// Cloneable, thread-safe handle to the actor system.
78///
79/// Use this to create root actors, spawn event sinks, and shut down the system.
80/// Cloning is cheap — all clones share the same underlying system state.
81#[derive(Clone)]
82pub struct SystemRef {
83    /// Registry of all actors in the system, indexed by their paths.
84    /// Uses type erasure (Any) to store heterogeneous actor types.
85    actors:
86        Arc<RwLock<HashMap<ActorPath, Box<dyn Any + Send + Sync + 'static>>>>,
87    /// Direct-children index to avoid scanning the full actor registry on lookups.
88    child_index: Arc<RwLock<HashMap<ActorPath, HashSet<ActorPath>>>>,
89
90    /// Registry of helper objects that can be shared across actors.
91    /// Helpers can be any type (database connections, configurations, etc.).
92    helpers: Arc<RwLock<HashMap<String, Box<dyn Any + Send + Sync + 'static>>>>,
93
94    /// Stop senders for root-level actors to enable coordinated shutdown.
95    root_senders: Arc<RwLock<HashMap<ActorPath, StopHandle>>>,
96    /// Broadcast bus for observable system-level events such as root actor errors.
97    system_event_sender: broadcast::Sender<SystemEvent>,
98
99    /// Cancelled by an external signal (SIGTERM, operator). Exit code 0.
100    graceful_token: CancellationToken,
101    /// Cancelled by an actor on unrecoverable failure. Exit code 1.
102    crash_token: CancellationToken,
103    /// Set as soon as shutdown begins; blocks creation of new actors.
104    shutting_down: Arc<AtomicBool>,
105}
106
107impl SystemRef {
108    pub(crate) fn new(
109        event_sender: mpsc::Sender<SystemEvent>,
110        graceful_token: CancellationToken,
111        crash_token: CancellationToken,
112    ) -> Self {
113        let root_senders =
114            Arc::new(RwLock::new(HashMap::<ActorPath, StopHandle>::new()));
115        let child_index = Arc::new(RwLock::new(HashMap::new()));
116        let (system_event_sender, _) = broadcast::channel::<SystemEvent>(256);
117        let shutting_down = Arc::new(AtomicBool::new(false));
118        let root_sender_clone = root_senders.clone();
119        let system_event_sender_clone = system_event_sender.clone();
120        let shutting_down_clone = shutting_down.clone();
121        let graceful_clone = graceful_token.clone();
122        let crash_clone = crash_token.clone();
123
124        tokio::spawn(async move {
125            let reason = tokio::select! {
126                _ = graceful_clone.cancelled() => ShutdownReason::Graceful,
127                _ = crash_clone.cancelled()   => ShutdownReason::Crash,
128            };
129            shutting_down_clone.store(true, Ordering::SeqCst);
130            debug!(reason = ?reason, "Stopping actor system");
131            let root_senders = {
132                let mut root_senders = root_sender_clone.write().await;
133                // Move the senders out while holding the lock, then release it
134                // before awaiting on stop notifications.
135                std::mem::take(&mut *root_senders)
136            };
137
138            // Send all stop signals first so all root actors begin shutdown concurrently.
139            let mut receivers = Vec::with_capacity(root_senders.len());
140            for (path, handle) in root_senders {
141                let (stop_sender, stop_receiver) = oneshot::channel();
142                if handle.sender().send(Some(stop_sender)).await.is_ok() {
143                    receivers.push((path, handle.timeout(), stop_receiver));
144                } else {
145                    warn!(path = %path, "Failed to send stop signal to root actor");
146                }
147            }
148
149            // Wait for all confirmations in parallel.
150            for (path, timeout, receiver) in receivers {
151                if let Some(timeout) = timeout {
152                    if tokio::time::timeout(timeout, receiver).await.is_err() {
153                        warn!(
154                            path = %path,
155                            timeout_ms = timeout.as_millis(),
156                            "Timed out waiting for root actor shutdown acknowledgement"
157                        );
158                    }
159                } else {
160                    let _ = receiver.await;
161                }
162            }
163
164            if let Err(e) = event_sender
165                .send(SystemEvent::StopSystem(reason.clone()))
166                .await
167            {
168                error!(error = %e, "Failed to send StopSystem event");
169            }
170            let _ =
171                system_event_sender_clone.send(SystemEvent::StopSystem(reason));
172        });
173
174        Self {
175            actors: Arc::new(RwLock::new(HashMap::new())),
176            child_index,
177            helpers: Arc::new(RwLock::new(HashMap::new())),
178            graceful_token,
179            crash_token,
180            root_senders,
181            system_event_sender,
182            shutting_down,
183        }
184    }
185
186    fn is_shutting_down(&self) -> bool {
187        self.shutting_down.load(Ordering::SeqCst)
188            || self.graceful_token.is_cancelled()
189            || self.crash_token.is_cancelled()
190    }
191
192    /// Returns a broadcast receiver for system-level events such as root actor errors and shutdown.
193    pub fn subscribe_system_events(&self) -> broadcast::Receiver<SystemEvent> {
194        self.system_event_sender.subscribe()
195    }
196
197    pub(crate) fn publish_system_event(&self, event: SystemEvent) {
198        let _ = self.system_event_sender.send(event);
199    }
200
201    async fn index_actor(&self, path: &ActorPath) {
202        let parent = path.parent();
203        self.child_index
204            .write()
205            .await
206            .entry(parent)
207            .or_default()
208            .insert(path.clone());
209    }
210
211    async fn deindex_actor(&self, path: &ActorPath) {
212        let parent = path.parent();
213        let mut child_index = self.child_index.write().await;
214        if let Some(children) = child_index.get_mut(&parent) {
215            children.remove(path);
216            if children.is_empty() {
217                child_index.remove(&parent);
218            }
219        }
220    }
221
222    /// Returns the `ActorRef` for the actor at `path`, or `Error::NotFound` if no actor is registered there.
223    pub async fn get_actor<A>(
224        &self,
225        path: &ActorPath,
226    ) -> Result<ActorRef<A>, Error>
227    where
228        A: Actor + Handler<A>,
229    {
230        let actors = self.actors.read().await;
231        actors
232            .get(path)
233            .and_then(|any| any.downcast_ref::<ActorRef<A>>().cloned())
234            .ok_or_else(|| Error::NotFound { path: path.clone() })
235    }
236
237    pub(crate) async fn create_actor_path<A>(
238        &self,
239        path: ActorPath,
240        actor: A,
241        parent_error_sender: Option<ChildErrorSender>,
242        span: Span,
243    ) -> Result<(ActorRef<A>, StopSender), Error>
244    where
245        A: Actor + Handler<A>,
246    {
247        if self.is_shutting_down() {
248            debug!(path = %path, "Rejecting actor creation during shutdown");
249            return Err(Error::SystemStopped);
250        }
251
252        // Create the actor runner and init it.
253        let system = self.clone();
254        let is_root = parent_error_sender.is_none();
255        let (mut runner, actor_ref, stop_sender) =
256            ActorRunner::create(path.clone(), actor, parent_error_sender);
257
258        // Atomically check+insert under the same write lock to avoid
259        // concurrent duplicate creations for the same path.
260        {
261            let mut actors = self.actors.write().await;
262            if actors.contains_key(&path) {
263                debug!(path = %path, "Actor already exists");
264                return Err(Error::Exists { path });
265            }
266            actors.insert(path.clone(), Box::new(actor_ref.clone()));
267        }
268        self.index_actor(&path).await;
269
270        if is_root {
271            let mut root_senders = self.root_senders.write().await;
272            if self.is_shutting_down() {
273                drop(root_senders);
274                self.remove_actor(&path).await;
275                debug!(path = %path, "Rejecting root actor creation after shutdown started");
276                return Err(Error::SystemStopped);
277            }
278            root_senders.insert(
279                path.clone(),
280                StopHandle::new(stop_sender.clone(), A::stop_timeout()),
281            );
282        }
283
284        let (sender, receiver) = oneshot::channel::<bool>();
285
286        let stop_sender_clone = stop_sender.clone();
287        let span_clone = span.clone();
288        let init_handle = tokio::spawn(
289            async move {
290                runner
291                    .init(system, stop_sender_clone, Some(sender), span_clone)
292                    .await;
293            }
294            .instrument(span),
295        );
296
297        let startup_result = match A::startup_timeout() {
298            Some(timeout) => tokio::time::timeout(timeout, receiver)
299                .await
300                .map_err(|_| timeout),
301            None => Ok(receiver.await),
302        };
303
304        match startup_result {
305            Ok(Ok(true)) => {
306                debug!(path = %path, "Actor initialized successfully");
307                Ok((actor_ref, stop_sender))
308            }
309            Ok(Ok(false)) => {
310                error!(path = %path, "Actor runner failed to initialize");
311                self.remove_actor(&path).await;
312                if is_root {
313                    self.root_senders.write().await.remove(&path);
314                }
315                Err(Error::FunctionalCritical {
316                    description: format!("Runner can not init {}", path),
317                })
318            }
319            Ok(Err(e)) => {
320                error!(path = %path, error = %e, "Failed to receive initialization signal");
321                self.remove_actor(&path).await;
322                if is_root {
323                    self.root_senders.write().await.remove(&path);
324                }
325                Err(Error::FunctionalCritical {
326                    description: e.to_string(),
327                })
328            }
329            Err(timeout) => {
330                init_handle.abort();
331                self.remove_actor(&path).await;
332                if is_root {
333                    self.root_senders.write().await.remove(&path);
334                }
335                Err(Error::Timeout {
336                    ms: timeout.as_millis(),
337                })
338            }
339        }
340    }
341
342    /// Spawns a top-level actor under `/user/{name}` and returns an `ActorRef` to it.
343    ///
344    /// `actor_init` can be the actor itself for [`NotPersistentActor`](crate::NotPersistentActor)
345    /// types, or any other value implementing [`IntoActor`](crate::IntoActor), such as
346    /// `InitializedActor<A>` from `ave-actors-store`.
347    ///
348    /// Returns `Error::Exists` if a root actor with the same name already exists, or
349    /// `Error::SystemStopped` if the system is shutting down.
350    pub async fn create_root_actor<A, I>(
351        &self,
352        name: &str,
353        actor_init: I,
354    ) -> Result<ActorRef<A>, Error>
355    where
356        A: Actor + Handler<A>,
357        I: crate::IntoActor<A>,
358    {
359        let actor = actor_init.into_actor();
360        let path = ActorPath::from("/user") / name;
361        let id = &path.key();
362
363        let (actor_ref, ..) = self
364            .create_actor_path::<A>(
365                path.clone(),
366                actor,
367                None,
368                A::get_span(id, None),
369            )
370            .await?;
371
372        // When this root actor fully terminates on its own, remove its stop
373        // sender entry so shutdown only sees live roots.
374        let root_senders = self.root_senders.clone();
375        let watch = actor_ref.clone();
376        let watch_path = path.clone();
377        tokio::spawn(async move {
378            watch.closed().await;
379            root_senders.write().await.remove(&watch_path);
380        });
381
382        Ok(actor_ref)
383    }
384
385    pub(crate) async fn remove_actor(&self, path: &ActorPath) {
386        let mut actors = self.actors.write().await;
387        let removed = actors.remove(path).is_some();
388        drop(actors);
389        if removed {
390            self.deindex_actor(path).await;
391        }
392    }
393
394    /// Initiates graceful shutdown, stopping all root actors and causing [`SystemRunner::run`] to return [`ShutdownReason::Graceful`].
395    pub fn stop_system(&self) {
396        self.shutting_down.store(true, Ordering::SeqCst);
397        self.graceful_token.cancel();
398    }
399
400    /// Initiates a crash shutdown, causing [`SystemRunner::run`] to return [`ShutdownReason::Crash`] with exit code 1.
401    pub fn crash_system(&self) {
402        self.shutting_down.store(true, Ordering::SeqCst);
403        self.crash_token.cancel();
404    }
405
406    /// Returns the paths of all currently registered direct children of the actor at `path`.
407    pub async fn children(&self, path: &ActorPath) -> Vec<ActorPath> {
408        self.child_index
409            .read()
410            .await
411            .get(path)
412            .into_iter()
413            .flat_map(|children| children.iter())
414            .cloned()
415            .collect()
416    }
417
418    /// Stores a shared resource (e.g. a database pool or config object) under `name` for retrieval by any actor.
419    pub async fn add_helper<H>(&self, name: &str, helper: H)
420    where
421        H: Any + Send + Sync + Clone + 'static,
422    {
423        let mut helpers = self.helpers.write().await;
424        helpers.insert(name.to_owned(), Box::new(helper));
425    }
426
427    /// Returns the helper stored under `name`, or `None` if not found or if the type does not match.
428    pub async fn get_helper<H>(&self, name: &str) -> Option<H>
429    where
430        H: Any + Send + Sync + Clone + 'static,
431    {
432        let helpers = self.helpers.read().await;
433        helpers
434            .get(name)
435            .and_then(|any| any.downcast_ref::<H>())
436            .cloned()
437    }
438
439    /// Spawns a [`Sink`] in a background Tokio task so it processes actor events asynchronously.
440    pub async fn run_sink<E>(&self, mut sink: Sink<E>)
441    where
442        E: Event,
443    {
444        tokio::spawn(async move {
445            sink.run().await;
446        });
447    }
448}
449
450/// Drives the actor system event loop; block on [`SystemRunner::run`] to keep the system alive until shutdown.
451pub struct SystemRunner {
452    /// Receiver for system-wide events.
453    event_receiver: mpsc::Receiver<SystemEvent>,
454}
455
456impl SystemRunner {
457    pub(crate) const fn new(
458        event_receiver: mpsc::Receiver<SystemEvent>,
459    ) -> Self {
460        Self { event_receiver }
461    }
462
463    /// Runs the system event loop until shutdown, returning the [`ShutdownReason`] for use as a process exit code.
464    pub async fn run(&mut self) -> ShutdownReason {
465        debug!("Running actor system");
466        loop {
467            match self.event_receiver.recv().await {
468                Some(SystemEvent::StopSystem(reason)) => {
469                    debug!(reason = ?reason, "Actor system stopped");
470                    return reason;
471                }
472                Some(SystemEvent::ActorError { path, error }) => {
473                    warn!(path = %path, error = %error, "Ignoring observable ActorError on control channel");
474                }
475                None => {
476                    warn!("System event channel closed unexpectedly");
477                    return ShutdownReason::Graceful;
478                }
479            }
480        }
481    }
482}
483
484#[cfg(test)]
485mod tests {
486
487    use super::*;
488    use test_log::test;
489
490    #[test(tokio::test)]
491    async fn test_helpers() {
492        let (system, _) = ActorSystem::create(
493            CancellationToken::new(),
494            CancellationToken::new(),
495        );
496        let helper = TestHelper { value: 42 };
497        system.add_helper("test", helper).await;
498        let helper: Option<TestHelper> = system.get_helper("test").await;
499        assert_eq!(helper, Some(TestHelper { value: 42 }));
500    }
501
502    #[derive(Debug, Clone, PartialEq)]
503    pub struct TestHelper {
504        pub value: i32,
505    }
506}