Skip to main content

ave_actors_actor/
actor.rs

1//! Core actor traits, types, and lifecycle hooks.
2
3use crate::{
4    ActorPath, Error,
5    handler::HandleHelper,
6    runner::{InnerAction, InnerSender, StopHandle, StopSender},
7    supervision::SupervisionStrategy,
8    system::SystemRef,
9};
10
11use tokio::sync::{broadcast::Receiver as EventReceiver, mpsc, oneshot};
12
13use async_trait::async_trait;
14
15use serde::{Serialize, de::DeserializeOwned};
16use tracing::Span;
17
18use std::{collections::HashMap, fmt::Debug, time::Duration};
19
20/// Execution context passed to actors during message handling and lifecycle hooks.
21///
22/// Provides access to the actor's path, child management, event emission,
23/// and error reporting. The context is created by the actor system and passed
24/// as `&mut ActorContext<A>` to all handler and lifecycle methods.
25pub struct ActorContext<A: Actor + Handler<A>> {
26    stop: StopSender,
27    /// The path of the actor.
28    path: ActorPath,
29    /// The actor system.
30    system: SystemRef,
31    /// Error in the actor.
32    error: Option<Error>,
33    /// The error sender to send errors to the parent.
34    error_sender: ChildErrorSender,
35    /// Inner sender.
36    inner_sender: InnerSender<A>,
37    /// Child action senders.
38    child_senders: HashMap<ActorPath, StopHandle>,
39
40    span: tracing::Span,
41}
42
43impl<A> ActorContext<A>
44where
45    A: Actor + Handler<A>,
46{
47    pub(crate) fn new(
48        stop: StopSender,
49        path: ActorPath,
50        system: SystemRef,
51        error_sender: ChildErrorSender,
52        inner_sender: InnerSender<A>,
53        span: Span,
54    ) -> Self {
55        Self {
56            span,
57            stop,
58            path,
59            system,
60            error: None,
61            error_sender,
62            inner_sender,
63            child_senders: HashMap::new(),
64        }
65    }
66
67    pub(crate) async fn restart(
68        &mut self,
69        actor: &mut A,
70        error: Option<&Error>,
71    ) -> Result<(), Error>
72    where
73        A: Actor,
74    {
75        tracing::warn!(error = ?error, "Actor restarting");
76        let result = actor.pre_restart(self, error).await;
77        if let Err(ref e) = result {
78            tracing::error!(error = %e, "Actor restart failed");
79        }
80        result
81    }
82    /// Returns an `ActorRef` to this actor, or an error if it has already been removed from the system.
83    pub async fn reference(&self) -> Result<ActorRef<A>, Error> {
84        self.system.get_actor(&self.path).await
85    }
86
87    /// Returns the hierarchical path that uniquely identifies this actor in the system.
88    pub const fn path(&self) -> &ActorPath {
89        &self.path
90    }
91
92    /// Returns a reference to the actor system this actor belongs to.
93    pub const fn system(&self) -> &SystemRef {
94        &self.system
95    }
96
97    /// Returns a typed handle to the parent actor, or an error if this is a root actor or the parent has stopped.
98    pub async fn get_parent<P: Actor + Handler<P>>(
99        &self,
100    ) -> Result<ActorRef<P>, Error> {
101        self.system.get_actor(&self.path.parent()).await
102    }
103
104    pub(crate) async fn stop_childs(&mut self) {
105        let child_count = self.child_senders.len();
106        if child_count > 0 {
107            tracing::debug!(child_count, "Stopping child actors");
108        }
109
110        // Send all stop signals first so all children begin shutdown concurrently.
111        let mut receivers = Vec::with_capacity(child_count);
112        for (path, handle) in std::mem::take(&mut self.child_senders) {
113            let (stop_sender, stop_receiver) = oneshot::channel();
114            if handle.sender().send(Some(stop_sender)).await.is_ok() {
115                receivers.push((path, handle.timeout(), stop_receiver));
116            }
117        }
118
119        // Wait for all confirmations. Children shut down in parallel so the
120        // total wait is max(child_shutdown_time) rather than the sum.
121        for (path, timeout, receiver) in receivers {
122            if let Some(timeout) = timeout {
123                if tokio::time::timeout(timeout, receiver).await.is_err() {
124                    tracing::warn!(
125                        child = %path,
126                        timeout_ms = timeout.as_millis(),
127                        "Timed out waiting for child actor shutdown acknowledgement"
128                    );
129                }
130            } else {
131                let _ = receiver.await;
132            }
133        }
134    }
135
136    pub(crate) async fn remove_actor(&self) {
137        self.system.remove_actor(&self.path).await;
138    }
139
140    /// Sends a stop signal to this actor. Pass `Some(sender)` to receive a confirmation when shutdown completes.
141    pub async fn stop(&self, sender: Option<oneshot::Sender<()>>) {
142        let _ = self.stop.send(sender).await;
143    }
144
145    /// Broadcasts `event` to all current subscribers of this actor's event channel.
146    ///
147    /// Returns an error if the broadcast channel is closed (i.e., the actor is stopping).
148    pub async fn publish_event(&self, event: A::Event) -> Result<(), Error> {
149        self.inner_sender
150            .send(InnerAction::Event(event))
151            .await
152            .map_err(|e| {
153                tracing::error!(error = %e, "Failed to publish event");
154                Error::SendEvent {
155                    reason: e.to_string(),
156                }
157            })
158    }
159
160    /// Reports an error to this actor's parent so the parent can invoke `on_child_error`.
161    ///
162    /// Returns an error if the parent channel is no longer reachable.
163    pub async fn emit_error(&mut self, error: Error) -> Result<(), Error> {
164        tracing::warn!(error = %error, "Emitting error");
165        self.inner_sender
166            .send(InnerAction::Error(error))
167            .await
168            .map_err(|e| {
169                tracing::error!(error = %e, "Failed to emit error");
170                Error::Send {
171                    reason: e.to_string(),
172                }
173            })
174    }
175
176    /// Emits a fatal fault, halts message processing, and escalates to the parent via `on_child_fault`.
177    ///
178    /// Returns an error if the escalation channel is no longer reachable.
179    pub async fn emit_fail(&mut self, error: Error) -> Result<(), Error> {
180        tracing::error!(error = %error, "Actor failing");
181        // Store error to stop message handling.
182        self.set_error(error.clone());
183        // Send fail to parent actor.
184        self.inner_sender
185            .send(InnerAction::Fail(error.clone()))
186            .await
187            .map_err(|e| {
188                tracing::error!(error = %e, "Failed to emit fail");
189                Error::Send {
190                    reason: e.to_string(),
191                }
192            })
193    }
194
195    /// Spawns a child actor and registers it under this actor's path.
196    ///
197    /// `name` becomes the last segment of the child's path. Returns an [`ActorRef`]
198    /// to the new child on success, or an error if the actor system is shutting down
199    /// or a child with the same name already exists.
200    pub async fn create_child<C, I>(
201        &mut self,
202        name: &str,
203        actor_init: I,
204    ) -> Result<ActorRef<C>, Error>
205    where
206        C: Actor + Handler<C>,
207        I: crate::IntoActor<C>,
208    {
209        tracing::debug!(child_name = %name, "Creating child actor");
210        let actor = actor_init.into_actor();
211        let path = self.path.clone() / name;
212        let result = self
213            .system
214            .create_actor_path(
215                path.clone(),
216                actor,
217                Some(self.error_sender.clone()),
218                C::get_span(name, Some(self.span.clone())),
219            )
220            .await;
221
222        match result {
223            Ok((actor_ref, stop_sender)) => {
224                let child_path = path.clone();
225                self.child_senders.insert(
226                    path,
227                    StopHandle::new(stop_sender.clone(), C::stop_timeout()),
228                );
229                let inner_sender = self.inner_sender.clone();
230                tokio::spawn(async move {
231                    stop_sender.closed().await;
232                    let _ = inner_sender
233                        .send(InnerAction::ChildStopped(child_path))
234                        .await;
235                });
236                tracing::debug!(child_name = %name, "Child actor created");
237                Ok(actor_ref)
238            }
239            Err(e) => {
240                tracing::debug!(child_name = %name, error = %e, "Failed to create child actor");
241                Err(e)
242            }
243        }
244    }
245
246    pub(crate) fn remove_closed_child(&mut self, child_path: &ActorPath) {
247        let should_remove = self
248            .child_senders
249            .get(child_path)
250            .map(StopHandle::is_closed)
251            .unwrap_or(false);
252        if should_remove {
253            self.child_senders.remove(child_path);
254        }
255    }
256
257    /// Looks up a running child actor by its id and returns a typed handle.
258    ///
259    /// Returns an error if no child with `id` exists or if the child's message
260    /// type does not match the requested actor type `C`.
261    pub async fn get_child<C>(&self, name: &str) -> Result<ActorRef<C>, Error>
262    where
263        C: Actor + Handler<C>,
264    {
265        let path = self.path.clone() / name;
266        self.system.get_actor(&path).await
267    }
268
269    pub(crate) fn error(&self) -> Option<Error> {
270        self.error.clone()
271    }
272
273    pub(crate) fn set_error(&mut self, error: Error) {
274        self.error = Some(error);
275    }
276
277    pub(crate) fn clean_error(&mut self) {
278        self.error = None;
279    }
280}
281
282/// The current lifecycle state of an actor.
283#[derive(Debug, Clone, PartialEq, Eq)]
284pub enum ActorLifecycle {
285    /// The actor is created.
286    Created,
287    /// The actor is started.
288    Started,
289    /// The actor is restarted.
290    Restarted,
291    /// The actor is failed.
292    Failed,
293    /// The actor is stopped.
294    Stopped,
295    /// The actor is terminated.
296    Terminated,
297}
298
299/// The action that a child actor will take when an error occurs.
300#[derive(Debug, Clone)]
301pub enum ChildAction {
302    /// The child actor will stop.
303    Stop,
304    /// The child actor will restart.
305    Restart,
306    /// Delegate the action to the child supervision strategy.
307    Delegate,
308}
309
310/// Child error receiver.
311pub type ChildErrorReceiver = mpsc::Receiver<ChildError>;
312
313/// Child error sender.
314pub type ChildErrorSender = mpsc::Sender<ChildError>;
315
316/// Message sent from a child to its parent on error or fault.
317pub enum ChildError {
318    /// Error in child.
319    Error {
320        /// The error that caused the failure.
321        error: Error,
322    },
323    /// Fault in child.
324    Fault {
325        /// The error that caused the failure.
326        error: Error,
327        /// The sender will communicate the action to be carried out to the child.
328        sender: oneshot::Sender<ChildAction>,
329    },
330}
331
332/// Defines the identity and associated types of an actor.
333///
334/// Implement this trait together with [`Handler`] on your actor struct.
335/// The actor system uses these associated types to wire up message channels,
336/// event broadcasts, and tracing spans.
337#[async_trait]
338pub trait Actor: Send + Sync + Sized + 'static + Handler<Self> {
339    /// The type of messages this actor accepts.
340    type Message: Message;
341
342    /// The type of events this actor can broadcast to subscribers.
343    type Event: Event;
344
345    /// The type returned by the actor in response to each message.
346    type Response: Response;
347
348    /// Creates the tracing span for this actor instance.
349    ///
350    /// `id` is the actor's path string; `parent` is the parent actor's span, if any.
351    /// Return an `info_span!` or similar to attach all actor logs to this span.
352    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span;
353
354    /// Maximum time to spend processing critical messages during shutdown before dropping them.
355    fn drain_timeout() -> std::time::Duration {
356        std::time::Duration::from_secs(5)
357    }
358
359    /// Maximum time to wait for `pre_start` to complete; `None` disables the startup timeout.
360    fn startup_timeout() -> Option<Duration> {
361        None
362    }
363
364    /// Maximum time a parent waits for this actor to acknowledge a stop request; `None` disables the stop timeout.
365    fn stop_timeout() -> Option<Duration> {
366        None
367    }
368
369    /// Returns the supervision strategy applied when this actor fails at startup.
370    fn supervision_strategy() -> SupervisionStrategy {
371        SupervisionStrategy::Stop
372    }
373
374    /// Called once before the actor begins processing messages.
375    ///
376    /// Override to initialize resources, spawn child actors, or connect to external
377    /// services. Return an error to abort startup; the supervision strategy determines
378    /// whether a retry is attempted.
379    async fn pre_start(
380        &mut self,
381        _context: &mut ActorContext<Self>,
382    ) -> Result<(), Error> {
383        Ok(())
384    }
385
386    /// Called when the actor is about to be restarted after a failure.
387    ///
388    /// `error` is the error that caused the restart, or `None` if the restart was manual.
389    /// The default implementation delegates to `pre_start`, so any initialization
390    /// logic defined there runs again on restart.
391    async fn pre_restart(
392        &mut self,
393        ctx: &mut ActorContext<Self>,
394        _error: Option<&Error>,
395    ) -> Result<(), Error> {
396        self.pre_start(ctx).await
397    }
398
399    /// Called when the actor is about to stop, before children are stopped.
400    ///
401    /// Override to flush state, emit a final event, or notify external services.
402    /// Errors are logged but do not prevent the actor from stopping.
403    async fn pre_stop(
404        &mut self,
405        _ctx: &mut ActorContext<Self>,
406    ) -> Result<(), Error> {
407        Ok(())
408    }
409
410    /// Called after all children have stopped and the actor is fully shut down. Override for final cleanup.
411    async fn post_stop(
412        &mut self,
413        _ctx: &mut ActorContext<Self>,
414    ) -> Result<(), Error> {
415        Ok(())
416    }
417
418    /// Maps a handler response to an event; call explicitly when you need that conversion.
419    fn from_response(_response: Self::Response) -> Result<Self::Event, Error> {
420        Err(Error::Functional {
421            description: "Not implemented".to_string(),
422        })
423    }
424}
425
426/// Application-defined values that an actor may publish, persist, or apply via `on_event`.
427pub trait Event:
428    Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static
429{
430}
431
432/// Defines the type of value an actor receives as a message.
433pub trait Message: Clone + Send + Sync + 'static {
434    /// Returns `true` if this message must be processed before the actor stops; defaults to `false`.
435    fn is_critical(&self) -> bool {
436        false
437    }
438}
439
440/// Defines the type of value an actor returns in response to a message.
441pub trait Response: Send + Sync + 'static {}
442
443impl Response for () {}
444impl Event for () {}
445impl Message for () {}
446
447/// Defines how an actor processes its incoming messages.
448///
449/// Implement this together with [`Actor`]. The actor system calls
450/// `handle_message` for every message delivered to the actor.
451#[async_trait]
452pub trait Handler<A: Actor + Handler<A>>: Send + Sync {
453    /// Processes `msg` sent by `sender` and returns a response.
454    ///
455    /// `ctx` gives access to the actor's context for spawning children, emitting events,
456    /// or reporting errors. Return an error to signal a failure; the error is propagated
457    /// back to the caller of [`ActorRef::ask`].
458    async fn handle_message(
459        &mut self,
460        sender: ActorPath,
461        msg: A::Message,
462        ctx: &mut ActorContext<A>,
463    ) -> Result<A::Response, Error>;
464
465    /// Called when the actor wants to apply an event to its own state; not invoked automatically by the runtime.
466    async fn on_event(&mut self, _event: A::Event, _ctx: &mut ActorContext<A>) {
467        // Default implementation.
468    }
469
470    /// Called when a child actor reports an error via [`ActorContext::emit_error`].
471    ///
472    /// Override to inspect `error` and decide whether to escalate it. The default
473    /// implementation does nothing.
474    async fn on_child_error(
475        &mut self,
476        error: Error,
477        _ctx: &mut ActorContext<A>,
478    ) {
479        tracing::error!(error = %error, "Child actor error");
480    }
481
482    /// Called when a child actor fails unrecoverably (panics or exhausts retries).
483    ///
484    /// Return [`ChildAction::Stop`] to propagate the failure up to this actor's parent,
485    /// [`ChildAction::Restart`] to restart the child, or [`ChildAction::Delegate`]
486    /// to let the child's own supervision strategy decide. The default returns `Stop`.
487    async fn on_child_fault(
488        &mut self,
489        error: Error,
490        _ctx: &mut ActorContext<A>,
491    ) -> ChildAction {
492        tracing::error!(error = %error, "Child actor fault, stopping child");
493        // Default implementation from child actor errors.
494        ChildAction::Stop
495    }
496}
497
498/// Typed, cloneable handle to a running actor.
499///
500/// Use this to send messages with [`ask`](ActorRef::ask), subscribe to events
501/// with [`subscribe`](ActorRef::subscribe), or stop the actor with
502/// [`ask_stop`](ActorRef::ask_stop) or [`tell_stop`](ActorRef::tell_stop).
503/// Cloning an `ActorRef` is cheap — all clones share the same underlying channels.
504pub struct ActorRef<A>
505where
506    A: Actor + Handler<A>,
507{
508    /// The path of the actor.
509    path: ActorPath,
510    /// The handle helper.
511    sender: HandleHelper<A>,
512    /// The actor event receiver.
513    event_receiver: EventReceiver<<A as Actor>::Event>,
514    /// The actor stop sender.
515    stop_sender: StopSender,
516}
517
518impl<A> ActorRef<A>
519where
520    A: Actor + Handler<A>,
521{
522    pub const fn new(
523        path: ActorPath,
524        sender: HandleHelper<A>,
525        stop_sender: StopSender,
526        event_receiver: EventReceiver<<A as Actor>::Event>,
527    ) -> Self {
528        Self {
529            path,
530            sender,
531            stop_sender,
532            event_receiver,
533        }
534    }
535
536    /// Sends a message to the actor without waiting for a response (fire-and-forget).
537    pub async fn tell(&self, message: A::Message) -> Result<(), Error> {
538        self.sender.tell(self.path(), message).await
539    }
540
541    /// Sends `message` to the actor and waits for a response.
542    ///
543    /// Returns the actor's response on success, or an error if the actor has stopped
544    /// or the message channel is full.
545    pub async fn ask(&self, message: A::Message) -> Result<A::Response, Error> {
546        self.sender.ask(self.path(), message).await
547    }
548
549    /// Sends `message` and waits up to `timeout` for a response, returning `Error::Timeout` if the deadline is exceeded.
550    pub async fn ask_timeout(
551        &self,
552        message: A::Message,
553        timeout: std::time::Duration,
554    ) -> Result<A::Response, Error> {
555        tokio::time::timeout(timeout, self.sender.ask(self.path(), message))
556            .await
557            .map_err(|_| Error::Timeout {
558                ms: timeout.as_millis(),
559            })?
560    }
561
562    /// Requests the actor to stop gracefully and waits for it to confirm shutdown.
563    ///
564    /// The actor will finish its current message, run `pre_stop` and `post_stop`,
565    /// and stop its children before terminating. Returns an error if the actor has
566    /// already stopped.
567    pub async fn ask_stop(&self) -> Result<(), Error> {
568        tracing::debug!("Stopping actor");
569        let (response_sender, response_receiver) = oneshot::channel();
570
571        if self.stop_sender.send(Some(response_sender)).await.is_err() {
572            Ok(())
573        } else {
574            response_receiver.await.map_err(|error| {
575                tracing::error!(error = %error, "Failed to confirm actor stop");
576                Error::Send {
577                    reason: error.to_string(),
578                }
579            })
580        }
581    }
582
583    /// Sends a stop signal without waiting for the actor to confirm shutdown (fire-and-forget).
584    pub async fn tell_stop(&self) {
585        let _ = self.stop_sender.send(None).await;
586    }
587
588    /// Returns the hierarchical path of this actor.
589    pub fn path(&self) -> ActorPath {
590        self.path.clone()
591    }
592
593    /// Returns `true` if the actor's mailbox is closed, meaning the actor has stopped.
594    pub fn is_closed(&self) -> bool {
595        self.sender.is_closed()
596    }
597
598    /// Waits until the actor has fully terminated.
599    pub async fn closed(&self) {
600        self.sender.close().await;
601    }
602
603    /// Returns a broadcast receiver for this actor's events.
604    ///
605    /// Each subscriber receives every future event independently. Use this receiver
606    /// directly or wrap it in a [`Sink`](crate::Sink) to process events asynchronously.
607    pub fn subscribe(&self) -> EventReceiver<<A as Actor>::Event> {
608        self.event_receiver.resubscribe()
609    }
610}
611
612impl<A> Clone for ActorRef<A>
613where
614    A: Actor + Handler<A>,
615{
616    fn clone(&self) -> Self {
617        Self {
618            path: self.path.clone(),
619            sender: self.sender.clone(),
620            stop_sender: self.stop_sender.clone(),
621            event_receiver: self.event_receiver.resubscribe(),
622        }
623    }
624}
625
626#[cfg(test)]
627mod test {
628
629    use super::*;
630    use test_log::test;
631
632    use crate::sink::{Sink, Subscriber};
633
634    use serde::{Deserialize, Serialize};
635    use tokio::sync::mpsc;
636    use tokio_util::sync::CancellationToken;
637    use tracing::info_span;
638
639    #[derive(Debug, Clone)]
640    struct TestActor {
641        counter: usize,
642    }
643
644    impl crate::NotPersistentActor for TestActor {}
645
646    #[derive(Debug, Clone, Serialize, Deserialize)]
647    struct TestMessage(usize);
648
649    impl Message for TestMessage {}
650
651    #[derive(Debug, Clone, Serialize, Deserialize)]
652    struct TestResponse(usize);
653
654    impl Response for TestResponse {}
655
656    #[derive(Debug, Clone, Serialize, Deserialize)]
657    struct TestEvent(usize);
658
659    impl Event for TestEvent {}
660
661    #[async_trait]
662    impl Actor for TestActor {
663        type Message = TestMessage;
664        type Event = TestEvent;
665        type Response = TestResponse;
666
667        fn get_span(
668            id: &str,
669            _parent_span: Option<tracing::Span>,
670        ) -> tracing::Span {
671            info_span!("TestActor", id = %id)
672        }
673    }
674
675    #[async_trait]
676    impl Handler<TestActor> for TestActor {
677        async fn handle_message(
678            &mut self,
679            _sender: ActorPath,
680            msg: TestMessage,
681            ctx: &mut ActorContext<TestActor>,
682        ) -> Result<TestResponse, Error> {
683            if ctx.get_parent::<TestActor>().await.is_ok() {
684                panic!("Is not a root actor");
685            }
686
687            let value = msg.0;
688            self.counter += value;
689            ctx.publish_event(TestEvent(self.counter)).await.unwrap();
690            Ok(TestResponse(self.counter))
691        }
692    }
693
694    pub struct TestSubscriber;
695
696    #[async_trait]
697    impl Subscriber<TestEvent> for TestSubscriber {
698        async fn notify(&self, event: TestEvent) {
699            assert!(event.0 > 0);
700        }
701    }
702
703    #[test(tokio::test)]
704    async fn test_actor() {
705        let (event_sender, _event_receiver) = mpsc::channel(100);
706        let system = SystemRef::new(
707            event_sender,
708            CancellationToken::new(),
709            CancellationToken::new(),
710        );
711        let actor = TestActor { counter: 0 };
712        let actor_ref = system.create_root_actor("test", actor).await.unwrap();
713
714        let sink = Sink::new(actor_ref.subscribe(), TestSubscriber);
715        system.run_sink(sink).await;
716
717        actor_ref.tell(TestMessage(10)).await.unwrap();
718        let mut recv = actor_ref.subscribe();
719        let response = actor_ref.ask(TestMessage(10)).await.unwrap();
720        assert_eq!(response.0, 20);
721        let event = recv.recv().await.unwrap();
722        assert_eq!(event.0, 10);
723        let event = recv.recv().await.unwrap();
724        assert_eq!(event.0, 20);
725        actor_ref.ask_stop().await.unwrap();
726        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
727    }
728}