tiny_tokio_actor/actor/
mod.rs

1pub(crate) mod handler;
2pub(crate) mod runner;
3pub(crate) mod supervision;
4
5use async_trait::async_trait;
6use thiserror::Error;
7
8use tokio::time::Duration;
9use tokio::sync::{mpsc, oneshot};
10
11mod path;
12pub use path::ActorPath;
13
14use supervision::SupervisionStrategy;
15
16use crate::system::{ActorSystem, SystemEvent};
17
18/// The actor context gives a running actor access to its path, as well as the system that
19/// is running it.
20#[derive(Debug)]
21pub struct ActorContext<E: SystemEvent> {
22    pub path: ActorPath,
23    pub system: ActorSystem<E>,
24}
25
26impl<E: SystemEvent> ActorContext<E> {
27    /// Create a child actor under this actor.
28    pub async fn create_child<A: Actor<E>>(
29        &self,
30        name: &str,
31        actor: A,
32    ) -> Result<ActorRef<E, A>, ActorError> {
33        let path = self.path.clone() / name;
34        self.system.create_actor_path(path, actor).await
35    }
36
37    /// Retrieve a child actor running under this actor.
38    pub async fn get_child<A: Actor<E>>(&self, name: &str) -> Option<ActorRef<E, A>> {
39        let path = self.path.clone() / name;
40        self.system.get_actor(&path).await
41    }
42
43    /// Retrieve or create a new child under this actor if it does not exist yet
44    pub async fn get_or_create_child<A, F>(
45        &self,
46        name: &str,
47        actor_fn: F,
48    ) -> Result<ActorRef<E, A>, ActorError>
49    where
50        A: Actor<E>,
51        F: FnOnce() -> A,
52    {
53        let path = self.path.clone() / name;
54        self.system.get_or_create_actor_path(&path, actor_fn).await
55    }
56
57    /// Stops the child actor
58    pub async fn stop_child(&self, name: &str) {
59        let path = self.path.clone() / name;
60        self.system.stop_actor(&path).await;
61    }
62
63    pub(crate) async fn restart<A>(
64        &mut self,
65        actor: &mut A,
66        error: Option<&ActorError>,
67    ) -> Result<(), ActorError>
68    where
69        A: Actor<E>,
70    {
71        actor.pre_restart(self, error).await
72    }
73}
74
75/// Defines what an actor will receive as its message, and with what it should respond.
76pub trait Message: Clone + Send + Sync + 'static {
77    /// response an actor should give when it receives this message. If no response is
78    /// required, use `()`.
79    type Response: Send + Sync + 'static;
80}
81
82/// Basic trait for actors. Allows you to define tasks that should be run before
83/// actor startup, when an actor restarts, and tasks that should be run after
84/// the  actor is stopped. It also allows you to define a supervisor strategy
85/// that should govern the actor when it fails to start up properly. For
86/// example:
87/// ```
88/// use tiny_tokio_actor::*;
89/// use std::time::Duration;
90///
91/// #[derive(Clone, Debug)]
92/// struct TestEvent(String);
93///
94/// impl SystemEvent for TestEvent {}
95///
96/// # #[derive(Clone)]
97/// # struct Database;
98/// # impl Database {
99/// #   pub fn init() -> Result<Self, std::io::Error> {
100/// #       Ok(Database)
101/// #   }
102/// # }
103///
104/// struct MyActor {
105///     db: Option<Database>
106/// }
107///
108/// #[async_trait]
109/// impl Actor<TestEvent> for MyActor {
110///
111///     // This actor will stop after 5 seconds of not receiving a message
112///     fn timeout() -> Option<Duration> {
113///         Some(Duration::from_secs(5))
114///     }
115///
116///     // If it fails to start up, retry the actor 5 times, with a wait period
117///     // of 5 seconds before each retry
118///     fn supervision_strategy() -> SupervisionStrategy {
119///         let strategy = supervision::FixedIntervalStrategy::new(5, Duration::from_secs(5));
120///         SupervisionStrategy::Retry(Box::new(strategy))
121///     }
122///
123///     // Initialize the database
124///     async fn pre_start(&mut self, _ctx: &mut ActorContext<TestEvent>) -> Result<(), ActorError> {
125///         let db = Database::init().map_err(ActorError::new)?;
126///         self.db = Some(db);
127///         Ok(())
128///     }
129///
130///     // When restarting, log an error message and try starting again
131///     async fn pre_restart(&mut self, ctx: &mut ActorContext<TestEvent>, error: Option<&ActorError>) -> Result<(), ActorError> {
132///         log::error!("Actor '{}' is restarting due to {:#?}. Restarting...", ctx.path, error);
133///         self.pre_start(ctx).await
134///     }
135/// }
136/// ```
137#[async_trait]
138pub trait Actor<E: SystemEvent>: Send + Sync + 'static {
139
140    /// Defines the timeout to set for this actor. An actor will wait for the
141    /// defined time to receive a message. If no message is received within the
142    /// timeout time specified, the actor will stop.
143    ///
144    /// By default this is set to None (no timeout).
145    fn timeout() -> Option<Duration> {
146        None
147    }
148
149    /// Defines the supervision strategy to use for this actor. By default it is
150    /// `Stop` which simply stops the actor if an error occurs at startup. You
151    /// can also set this to [`SupervisionStrategy::Retry`] with a chosen
152    /// [`supervision::RetryStrategy`].
153    fn supervision_strategy() -> SupervisionStrategy {
154        SupervisionStrategy::Stop
155    }
156
157    /// Override this function if you like to perform initialization of the actor
158    async fn pre_start(&mut self, _ctx: &mut ActorContext<E>) -> Result<(), ActorError> {
159        Ok(())
160    }
161
162    /// Override this function if you want to define what should happen when an
163    /// error occurs in [`Actor::pre_start()`]. By default it simply calls
164    /// `pre_start()` again, but you can also choose to reinitialize the actor
165    /// in some other way.
166    async fn pre_restart(
167        &mut self,
168        ctx: &mut ActorContext<E>,
169        _error: Option<&ActorError>,
170    ) -> Result<(), ActorError> {
171        self.pre_start(ctx).await
172    }
173
174    /// Override this function if you like to perform work when the actor is stopped
175    async fn post_stop(&mut self, _ctx: &mut ActorContext<E>) {}
176}
177
178/// Defines what the actor does with a message.
179#[async_trait]
180pub trait Handler<E: SystemEvent, M: Message>: Actor<E> {
181    async fn handle(&mut self, msg: M, ctx: &mut ActorContext<E>) -> M::Response;
182}
183
184/// A clonable actor reference. It basically holds a Sender that can send messages
185/// to the mailbox (receiver) of the actor.
186pub struct ActorRef<E: SystemEvent, A: Actor<E>> {
187    path: ActorPath,
188    sender: mpsc::UnboundedSender<handler::BoxedMessageHandler<E, A>>,
189}
190
191impl<E: SystemEvent, A: Actor<E>> Clone for ActorRef<E, A> {
192    fn clone(&self) -> Self {
193        Self { path: self.path.clone(), sender: self.sender.clone() }
194    }
195}
196
197impl<E: SystemEvent, A: Actor<E>> ActorRef<E, A> {
198    /// Get the path of this actor
199    pub fn path(&self) -> &ActorPath {
200        &self.path
201    }
202
203    /// Fire and forget sending of messages to this actor.
204    pub fn tell<M>(&self, msg: M) -> Result<(), ActorError>
205    where
206        M: Message,
207        A: Handler<E, M>,
208    {
209        let message = handler::ActorMessage::<M, E, A>::new(msg, None);
210        if let Err(error) = self.sender.send(Box::new(message)) {
211            log::error!("Failed to tell message! {}", error.to_string());
212            Err(ActorError::SendError(error.to_string()))
213        } else {
214            Ok(())
215        }
216    }
217
218    /// Send a message to an actor, expecting a response.
219    pub async fn ask<M>(&self, msg: M) -> Result<M::Response, ActorError>
220    where
221        M: Message,
222        A: Handler<E, M>,
223    {
224        let (response_sender, response_receiver) = oneshot::channel();
225        let message = handler::ActorMessage::<M, E, A>::new(msg, Some(response_sender));
226        if let Err(error) = self.sender.send(Box::new(message)) {
227            log::error!("Failed to ask message! {}", error.to_string());
228            Err(ActorError::SendError(error.to_string()))
229        } else {
230            response_receiver
231                .await
232                .map_err(|error| ActorError::SendError(error.to_string()))
233        }
234    }
235
236    /// Checks if the actor mailbox is still open. If it is closed, the actor
237    /// is not running.
238    pub fn is_closed(&self) -> bool {
239        self.sender.is_closed()
240    }
241
242    pub(crate) fn new(path: ActorPath, sender: handler::MailboxSender<E, A>) -> Self {
243        ActorRef {
244            path,
245            sender,
246        }
247    }
248}
249
250impl<E: SystemEvent, A: Actor<E>> std::fmt::Debug for ActorRef<E, A> {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(f, "{}", self.path)
253    }
254}
255
256#[derive(Error, Debug)]
257pub enum ActorError {
258    #[error("Actor exists")]
259    Exists(ActorPath),
260
261    #[error("Actor creation failed")]
262    CreateError(String),
263
264    #[error("Sending message failed")]
265    SendError(String),
266
267    #[error("Actor runtime error")]
268    RuntimeError(anyhow::Error),
269}
270
271impl ActorError {
272    pub fn new<E>(error: E) -> Self
273    where
274        E: std::error::Error + Send + Sync + 'static,
275    {
276        Self::RuntimeError(anyhow::Error::new(error))
277    }
278}