Skip to main content

ave_actors_actor/
actor.rs

1//! Core actor traits, types, and lifecycle hooks.
2
3use crate::{
4    ActorPath, Error,
5    handler::HandleHelper,
6    runner::{InnerAction, InnerSender, StopHandle, StopSender},
7    supervision::SupervisionStrategy,
8    system::SystemRef,
9};
10
11use tokio::sync::{broadcast::Receiver as EventReceiver, mpsc, oneshot};
12
13use async_trait::async_trait;
14
15use serde::{Serialize, de::DeserializeOwned};
16use tracing::Span;
17
18use std::{collections::HashMap, fmt::Debug, time::Duration};
19
20/// Execution context passed to actors during message handling and lifecycle hooks.
21///
22/// Provides access to the actor's path, child management, event emission,
23/// and error reporting. The context is created by the actor system and passed
24/// as `&mut ActorContext<A>` to all handler and lifecycle methods.
25pub struct ActorContext<A: Actor + Handler<A>> {
26    stop: StopSender,
27    /// The path of the actor.
28    path: ActorPath,
29    /// The actor system.
30    system: SystemRef,
31    /// Error in the actor.
32    error: Option<Error>,
33    /// The error sender to send errors to the parent.
34    error_sender: ChildErrorSender,
35    /// Inner sender.
36    inner_sender: InnerSender<A>,
37    /// Child action senders.
38    child_senders: HashMap<ActorPath, StopHandle>,
39
40    span: tracing::Span,
41}
42
43impl<A> ActorContext<A>
44where
45    A: Actor + Handler<A>,
46{
47    pub(crate) fn new(
48        stop: StopSender,
49        path: ActorPath,
50        system: SystemRef,
51        error_sender: ChildErrorSender,
52        inner_sender: InnerSender<A>,
53        span: Span,
54    ) -> Self {
55        Self {
56            span,
57            stop,
58            path,
59            system,
60            error: None,
61            error_sender,
62            inner_sender,
63            child_senders: HashMap::new(),
64        }
65    }
66
67    pub(crate) async fn restart(
68        &mut self,
69        actor: &mut A,
70        error: Option<&Error>,
71    ) -> Result<(), Error>
72    where
73        A: Actor,
74    {
75        tracing::warn!(error = ?error, "Actor restarting");
76        let result = actor.pre_restart(self, error).await;
77        if let Err(ref e) = result {
78            tracing::error!(error = %e, "Actor restart failed");
79        }
80        result
81    }
82    /// Returns an `ActorRef` to this actor, or an error if it has already been removed from the system.
83    pub async fn reference(&self) -> Result<ActorRef<A>, Error> {
84        self.system.get_actor(&self.path).await
85    }
86
87    /// Returns the hierarchical path that uniquely identifies this actor in the system.
88    pub const fn path(&self) -> &ActorPath {
89        &self.path
90    }
91
92    /// Returns a reference to the actor system this actor belongs to.
93    pub const fn system(&self) -> &SystemRef {
94        &self.system
95    }
96
97    /// Returns a typed handle to the parent actor, or an error if this is a root actor or the parent has stopped.
98    pub async fn get_parent<P: Actor + Handler<P>>(
99        &self,
100    ) -> Result<ActorRef<P>, Error> {
101        self.system.get_actor(&self.path.parent()).await
102    }
103
104    pub(crate) async fn stop_childs(&mut self) {
105        let child_count = self.child_senders.len();
106        if child_count > 0 {
107            tracing::debug!(child_count, "Stopping child actors");
108        }
109
110        // Send all stop signals first so all children begin shutdown concurrently.
111        let mut receivers = Vec::with_capacity(child_count);
112        for (path, handle) in std::mem::take(&mut self.child_senders) {
113            let (stop_sender, stop_receiver) = oneshot::channel();
114            if handle.sender().send(Some(stop_sender)).await.is_ok() {
115                receivers.push((path, handle.timeout(), stop_receiver));
116            }
117        }
118
119        // Wait for all confirmations. Children shut down in parallel so the
120        // total wait is max(child_shutdown_time) rather than the sum.
121        for (path, timeout, receiver) in receivers {
122            if let Some(timeout) = timeout {
123                if tokio::time::timeout(timeout, receiver).await.is_err() {
124                    tracing::warn!(
125                        child = %path,
126                        timeout_ms = timeout.as_millis(),
127                        "Timed out waiting for child actor shutdown acknowledgement"
128                    );
129                }
130            } else {
131                let _ = receiver.await;
132            }
133        }
134    }
135
136    pub(crate) async fn remove_actor(&self) {
137        self.system.remove_actor(&self.path).await;
138    }
139
140    /// Sends a stop signal to this actor. Pass `Some(sender)` to receive a confirmation when shutdown completes.
141    pub async fn stop(&self, sender: Option<oneshot::Sender<()>>) {
142        let _ = self.stop.send(sender).await;
143    }
144
145    /// Broadcasts `event` to all current subscribers of this actor's event channel.
146    ///
147    /// Returns an error if the broadcast channel is closed (i.e., the actor is stopping).
148    pub async fn publish_event(&self, event: A::Event) -> Result<(), Error> {
149        self.inner_sender
150            .send(InnerAction::Event(event))
151            .await
152            .map_err(|e| {
153                tracing::error!(error = %e, "Failed to publish event");
154                Error::SendEvent {
155                    reason: e.to_string(),
156                }
157            })
158    }
159
160    /// Reports an error to this actor's parent so the parent can invoke `on_child_error`.
161    ///
162    /// Returns an error if the parent channel is no longer reachable.
163    pub async fn emit_error(&mut self, error: Error) -> Result<(), Error> {
164        tracing::warn!(error = %error, "Emitting error");
165        self.inner_sender
166            .send(InnerAction::Error(error))
167            .await
168            .map_err(|e| {
169                tracing::error!(error = %e, "Failed to emit error");
170                Error::Send {
171                    reason: e.to_string(),
172                }
173            })
174    }
175
176    /// Emits a fatal fault, halts message processing, and escalates to the parent via `on_child_fault`.
177    ///
178    /// Returns an error if the escalation channel is no longer reachable.
179    pub async fn emit_fail(&mut self, error: Error) -> Result<(), Error> {
180        tracing::error!(error = %error, "Actor failing");
181        // Store error to stop message handling.
182        self.set_error(error.clone());
183        // Send fail to parent actor.
184        self.inner_sender
185            .send(InnerAction::Fail(error.clone()))
186            .await
187            .map_err(|e| {
188                tracing::error!(error = %e, "Failed to emit fail");
189                Error::Send {
190                    reason: e.to_string(),
191                }
192            })
193    }
194
195    /// Spawns a child actor and registers it under this actor's path.
196    ///
197    /// `name` becomes the last segment of the child's path. Returns an [`ActorRef`]
198    /// to the new child on success, or an error if the actor system is shutting down
199    /// or a child with the same name already exists.
200    pub async fn create_child<C, I>(
201        &mut self,
202        name: &str,
203        actor_init: I,
204    ) -> Result<ActorRef<C>, Error>
205    where
206        C: Actor + Handler<C>,
207        I: crate::IntoActor<C>,
208    {
209        tracing::debug!(child_name = %name, "Creating child actor");
210        let actor = actor_init.into_actor();
211        let path = self.path.clone() / name;
212        let result = self
213            .system
214            .create_actor_path(
215                path.clone(),
216                actor,
217                Some(self.error_sender.clone()),
218                C::get_span(name, Some(self.span.clone())),
219            )
220            .await;
221
222        match result {
223            Ok((actor_ref, stop_sender)) => {
224                let child_path = path.clone();
225                self.child_senders.insert(
226                    path,
227                    StopHandle::new(stop_sender.clone(), C::stop_timeout()),
228                );
229                let inner_sender = self.inner_sender.clone();
230                tokio::spawn(async move {
231                    stop_sender.closed().await;
232                    let _ = inner_sender
233                        .send(InnerAction::ChildStopped(child_path))
234                        .await;
235                });
236                tracing::debug!(child_name = %name, "Child actor created");
237                Ok(actor_ref)
238            }
239            Err(e) => {
240                tracing::debug!(child_name = %name, error = %e, "Failed to create child actor");
241                Err(e)
242            }
243        }
244    }
245
246    pub(crate) fn remove_closed_child(&mut self, child_path: &ActorPath) {
247        let should_remove = self
248            .child_senders
249            .get(child_path)
250            .map(StopHandle::is_closed)
251            .unwrap_or(false);
252        if should_remove {
253            self.child_senders.remove(child_path);
254        }
255    }
256
257    /// Looks up a running child actor by its id and returns a typed handle.
258    ///
259    /// Returns an error if no child with `id` exists or if the child's message
260    /// type does not match the requested actor type `C`.
261    pub async fn get_child<C>(&self, name: &str) -> Result<ActorRef<C>, Error>
262    where
263        C: Actor + Handler<C>,
264    {
265        let path = self.path.clone() / name;
266        self.system.get_actor(&path).await
267    }
268
269    pub(crate) fn error(&self) -> Option<Error> {
270        self.error.clone()
271    }
272
273    pub(crate) fn set_error(&mut self, error: Error) {
274        self.error = Some(error);
275    }
276
277    pub(crate) fn clean_error(&mut self) {
278        self.error = None;
279    }
280}
281
282/// The current lifecycle state of an actor.
283#[derive(Debug, Clone, PartialEq, Eq)]
284pub enum ActorLifecycle {
285    /// The actor is created.
286    Created,
287    /// The actor is started.
288    Started,
289    /// The actor is restarted.
290    Restarted,
291    /// The actor is failed.
292    Failed,
293    /// The actor is stopped.
294    Stopped,
295    /// The actor is terminated.
296    Terminated,
297}
298
299/// The action that a child actor will take when an error occurs.
300#[derive(Debug, Clone)]
301pub enum ChildAction {
302    /// The child actor will stop.
303    Stop,
304    /// The child actor will restart.
305    Restart,
306    /// Delegate the action to the child supervision strategy.
307    Delegate,
308}
309
310/// Child error receiver.
311pub type ChildErrorReceiver = mpsc::Receiver<ChildError>;
312
313/// Child error sender.
314pub type ChildErrorSender = mpsc::Sender<ChildError>;
315
316/// Message sent from a child to its parent on error or fault.
317pub enum ChildError {
318    /// Error in child.
319    Error {
320        /// The error that caused the failure.
321        error: Error,
322    },
323    /// Fault in child.
324    Fault {
325        /// The error that caused the failure.
326        error: Error,
327        /// The sender will communicate the action to be carried out to the child.
328        sender: oneshot::Sender<ChildAction>,
329    },
330}
331
332/// Defines the identity and associated types of an actor.
333///
334/// Implement this trait together with [`Handler`] on your actor struct.
335/// The actor system uses these associated types to wire up message channels,
336/// event broadcasts, and tracing spans.
337#[async_trait]
338pub trait Actor: Send + Sync + Sized + 'static + Handler<Self> {
339    /// The type of messages this actor accepts.
340    type Message: Message;
341
342    /// The type of events this actor can broadcast to subscribers.
343    type Event: Event;
344
345    /// The type returned by the actor in response to each message.
346    type Response: Response;
347
348    /// Creates the tracing span for this actor instance.
349    ///
350    /// `id` is the actor's path string; `parent` is the parent actor's span, if any.
351    /// Return an `info_span!` or similar to attach all actor logs to this span.
352    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span;
353
354    /// Maximum time to spend processing critical mailbox messages during
355    /// shutdown before dropping them.
356    fn mailbox_drain_timeout() -> std::time::Duration {
357        std::time::Duration::from_secs(5)
358    }
359
360    /// Maximum time to spend draining pending published events during
361    /// shutdown before giving up.
362    fn event_drain_timeout() -> std::time::Duration {
363        std::time::Duration::from_secs(5)
364    }
365
366    /// Maximum time to wait for `pre_start` to complete; `None` disables the startup timeout.
367    fn startup_timeout() -> Option<Duration> {
368        None
369    }
370
371    /// Maximum time a parent waits for this actor to acknowledge a stop request; `None` disables the stop timeout.
372    fn stop_timeout() -> Option<Duration> {
373        None
374    }
375
376    /// Returns the supervision strategy applied when this actor fails at startup.
377    fn supervision_strategy() -> SupervisionStrategy {
378        SupervisionStrategy::Stop
379    }
380
381    /// Called once before the actor begins processing messages.
382    ///
383    /// Override to initialize resources, spawn child actors, or connect to external
384    /// services. Return an error to abort startup; the supervision strategy determines
385    /// whether a retry is attempted.
386    async fn pre_start(
387        &mut self,
388        _context: &mut ActorContext<Self>,
389    ) -> Result<(), Error> {
390        Ok(())
391    }
392
393    /// Called when the actor is about to be restarted after a failure.
394    ///
395    /// `error` is the error that caused the restart, or `None` if the restart was manual.
396    /// The default implementation delegates to `pre_start`, so any initialization
397    /// logic defined there runs again on restart.
398    async fn pre_restart(
399        &mut self,
400        ctx: &mut ActorContext<Self>,
401        _error: Option<&Error>,
402    ) -> Result<(), Error> {
403        self.pre_start(ctx).await
404    }
405
406    /// Called when the actor is about to stop, before children are stopped.
407    ///
408    /// Override to flush state, emit a final event, or notify external services.
409    /// Errors are logged but do not prevent the actor from stopping.
410    async fn pre_stop(
411        &mut self,
412        _ctx: &mut ActorContext<Self>,
413    ) -> Result<(), Error> {
414        Ok(())
415    }
416
417    /// Called after all children have stopped and the actor is fully shut down. Override for final cleanup.
418    async fn post_stop(
419        &mut self,
420        _ctx: &mut ActorContext<Self>,
421    ) -> Result<(), Error> {
422        Ok(())
423    }
424
425    /// Maps a handler response to an event; call explicitly when you need that conversion.
426    fn from_response(_response: Self::Response) -> Result<Self::Event, Error> {
427        Err(Error::Functional {
428            description: "Not implemented".to_string(),
429        })
430    }
431}
432
433/// Application-defined values that an actor may publish, persist, or apply via `on_event`.
434pub trait Event:
435    Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static
436{
437}
438
439/// Defines the type of value an actor receives as a message.
440pub trait Message: Clone + Send + Sync + 'static {
441    /// Returns `true` if this message must be processed before the actor stops; defaults to `false`.
442    fn is_critical(&self) -> bool {
443        false
444    }
445}
446
447/// Defines the type of value an actor returns in response to a message.
448pub trait Response: Send + Sync + 'static {}
449
450impl Response for () {}
451impl Event for () {}
452impl Message for () {}
453
454/// Defines how an actor processes its incoming messages.
455///
456/// Implement this together with [`Actor`]. The actor system calls
457/// `handle_message` for every message delivered to the actor.
458#[async_trait]
459pub trait Handler<A: Actor + Handler<A>>: Send + Sync {
460    /// Processes `msg` sent by `sender` and returns a response.
461    ///
462    /// `ctx` gives access to the actor's context for spawning children, emitting events,
463    /// or reporting errors. Return an error to signal a failure; the error is propagated
464    /// back to the caller of [`ActorRef::ask`].
465    async fn handle_message(
466        &mut self,
467        sender: ActorPath,
468        msg: A::Message,
469        ctx: &mut ActorContext<A>,
470    ) -> Result<A::Response, Error>;
471
472    /// Called when the actor wants to apply an event to its own state; not invoked automatically by the runtime.
473    async fn on_event(&mut self, _event: A::Event, _ctx: &mut ActorContext<A>) {
474        // Default implementation.
475    }
476
477    /// Called when a child actor reports an error via [`ActorContext::emit_error`].
478    ///
479    /// Override to inspect `error` and decide whether to escalate it. The default
480    /// implementation does nothing.
481    async fn on_child_error(
482        &mut self,
483        error: Error,
484        _ctx: &mut ActorContext<A>,
485    ) {
486        tracing::error!(error = %error, "Child actor error");
487    }
488
489    /// Called when a child actor fails unrecoverably (panics or exhausts retries).
490    ///
491    /// Return [`ChildAction::Stop`] to propagate the failure up to this actor's parent,
492    /// [`ChildAction::Restart`] to restart the child, or [`ChildAction::Delegate`]
493    /// to let the child's own supervision strategy decide. The default returns `Stop`.
494    async fn on_child_fault(
495        &mut self,
496        error: Error,
497        _ctx: &mut ActorContext<A>,
498    ) -> ChildAction {
499        tracing::error!(error = %error, "Child actor fault, stopping child");
500        // Default implementation from child actor errors.
501        ChildAction::Stop
502    }
503}
504
505/// Typed, cloneable handle to a running actor.
506///
507/// Use this to send messages with [`ask`](ActorRef::ask), subscribe to events
508/// with [`subscribe`](ActorRef::subscribe), or stop the actor with
509/// [`ask_stop`](ActorRef::ask_stop) or [`tell_stop`](ActorRef::tell_stop).
510/// Cloning an `ActorRef` is cheap — all clones share the same underlying channels.
511pub struct ActorRef<A>
512where
513    A: Actor + Handler<A>,
514{
515    /// The path of the actor.
516    path: ActorPath,
517    /// The handle helper.
518    sender: HandleHelper<A>,
519    /// The actor event receiver.
520    event_receiver: EventReceiver<<A as Actor>::Event>,
521    /// The actor stop sender.
522    stop_sender: StopSender,
523}
524
525impl<A> ActorRef<A>
526where
527    A: Actor + Handler<A>,
528{
529    pub const fn new(
530        path: ActorPath,
531        sender: HandleHelper<A>,
532        stop_sender: StopSender,
533        event_receiver: EventReceiver<<A as Actor>::Event>,
534    ) -> Self {
535        Self {
536            path,
537            sender,
538            stop_sender,
539            event_receiver,
540        }
541    }
542
543    /// Sends a message to the actor without waiting for a response (fire-and-forget).
544    pub async fn tell(&self, message: A::Message) -> Result<(), Error> {
545        self.sender.tell(self.path(), message).await
546    }
547
548    /// Sends `message` to the actor and waits for a response.
549    ///
550    /// Returns the actor's response on success, or an error if the actor has stopped
551    /// or the message channel is full.
552    pub async fn ask(&self, message: A::Message) -> Result<A::Response, Error> {
553        self.sender.ask(self.path(), message).await
554    }
555
556    /// Sends `message` and waits up to `timeout` for a response, returning `Error::Timeout` if the deadline is exceeded.
557    pub async fn ask_timeout(
558        &self,
559        message: A::Message,
560        timeout: std::time::Duration,
561    ) -> Result<A::Response, Error> {
562        tokio::time::timeout(timeout, self.sender.ask(self.path(), message))
563            .await
564            .map_err(|_| Error::Timeout {
565                ms: timeout.as_millis(),
566            })?
567    }
568
569    /// Requests the actor to stop gracefully and waits for it to confirm shutdown.
570    ///
571    /// The actor will finish its current message, run `pre_stop` and `post_stop`,
572    /// and stop its children before terminating. Returns an error if the actor has
573    /// already stopped.
574    pub async fn ask_stop(&self) -> Result<(), Error> {
575        tracing::debug!("Stopping actor");
576        let (response_sender, response_receiver) = oneshot::channel();
577
578        if self.stop_sender.send(Some(response_sender)).await.is_err() {
579            Ok(())
580        } else {
581            response_receiver.await.map_err(|error| {
582                tracing::error!(error = %error, "Failed to confirm actor stop");
583                Error::Send {
584                    reason: error.to_string(),
585                }
586            })
587        }
588    }
589
590    /// Sends a stop signal without waiting for the actor to confirm shutdown (fire-and-forget).
591    pub async fn tell_stop(&self) {
592        let _ = self.stop_sender.send(None).await;
593    }
594
595    /// Returns the hierarchical path of this actor.
596    pub fn path(&self) -> ActorPath {
597        self.path.clone()
598    }
599
600    /// Returns `true` if the actor's mailbox is closed, meaning the actor has stopped.
601    pub fn is_closed(&self) -> bool {
602        self.sender.is_closed()
603    }
604
605    /// Waits until the actor has fully terminated.
606    pub async fn closed(&self) {
607        self.sender.close().await;
608    }
609
610    /// Returns a broadcast receiver for this actor's events.
611    ///
612    /// Each subscriber receives every future event independently. Use this receiver
613    /// directly or wrap it in a [`Sink`](crate::Sink) to process events asynchronously.
614    pub fn subscribe(&self) -> EventReceiver<<A as Actor>::Event> {
615        self.event_receiver.resubscribe()
616    }
617}
618
619impl<A> Clone for ActorRef<A>
620where
621    A: Actor + Handler<A>,
622{
623    fn clone(&self) -> Self {
624        Self {
625            path: self.path.clone(),
626            sender: self.sender.clone(),
627            stop_sender: self.stop_sender.clone(),
628            event_receiver: self.event_receiver.resubscribe(),
629        }
630    }
631}
632
633#[cfg(test)]
634mod test {
635
636    use super::*;
637    use test_log::test;
638
639    use crate::sink::{Sink, Subscriber};
640
641    use serde::{Deserialize, Serialize};
642    use tokio::sync::mpsc;
643    use tokio_util::sync::CancellationToken;
644    use tracing::info_span;
645
646    #[derive(Debug, Clone)]
647    struct TestActor {
648        counter: usize,
649    }
650
651    impl crate::NotPersistentActor for TestActor {}
652
653    #[derive(Debug, Clone, Serialize, Deserialize)]
654    struct TestMessage(usize);
655
656    impl Message for TestMessage {}
657
658    #[derive(Debug, Clone, Serialize, Deserialize)]
659    struct TestResponse(usize);
660
661    impl Response for TestResponse {}
662
663    #[derive(Debug, Clone, Serialize, Deserialize)]
664    struct TestEvent(usize);
665
666    impl Event for TestEvent {}
667
668    #[async_trait]
669    impl Actor for TestActor {
670        type Message = TestMessage;
671        type Event = TestEvent;
672        type Response = TestResponse;
673
674        fn get_span(
675            id: &str,
676            _parent_span: Option<tracing::Span>,
677        ) -> tracing::Span {
678            info_span!("TestActor", id = %id)
679        }
680    }
681
682    #[async_trait]
683    impl Handler<TestActor> for TestActor {
684        async fn handle_message(
685            &mut self,
686            _sender: ActorPath,
687            msg: TestMessage,
688            ctx: &mut ActorContext<TestActor>,
689        ) -> Result<TestResponse, Error> {
690            if ctx.get_parent::<TestActor>().await.is_ok() {
691                panic!("Is not a root actor");
692            }
693
694            let value = msg.0;
695            self.counter += value;
696            ctx.publish_event(TestEvent(self.counter)).await.unwrap();
697            Ok(TestResponse(self.counter))
698        }
699    }
700
701    pub struct TestSubscriber;
702
703    #[async_trait]
704    impl Subscriber<TestEvent> for TestSubscriber {
705        async fn notify(&self, event: TestEvent) {
706            assert!(event.0 > 0);
707        }
708    }
709
710    #[test(tokio::test)]
711    async fn test_actor() {
712        let (event_sender, _event_receiver) = mpsc::channel(100);
713        let system = SystemRef::new(
714            event_sender,
715            CancellationToken::new(),
716            CancellationToken::new(),
717        );
718        let actor = TestActor { counter: 0 };
719        let actor_ref = system.create_root_actor("test", actor).await.unwrap();
720
721        let sink = Sink::new(actor_ref.subscribe(), TestSubscriber);
722        system.run_sink(sink).await;
723
724        actor_ref.tell(TestMessage(10)).await.unwrap();
725        let mut recv = actor_ref.subscribe();
726        let response = actor_ref.ask(TestMessage(10)).await.unwrap();
727        assert_eq!(response.0, 20);
728        let event = recv.recv().await.unwrap();
729        assert_eq!(event.0, 10);
730        let event = recv.recv().await.unwrap();
731        assert_eq!(event.0, 20);
732        actor_ref.ask_stop().await.unwrap();
733        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
734    }
735}