tiny_tokio_actor/actor/mod.rs
1pub(crate) mod handler;
2pub(crate) mod runner;
3pub(crate) mod supervision;
4
5use async_trait::async_trait;
6use thiserror::Error;
7
8use tokio::time::Duration;
9use tokio::sync::{mpsc, oneshot};
10
11mod path;
12pub use path::ActorPath;
13
14use supervision::SupervisionStrategy;
15
16use crate::system::{ActorSystem, SystemEvent};
17
18/// The actor context gives a running actor access to its path, as well as the system that
19/// is running it.
20#[derive(Debug)]
21pub struct ActorContext<E: SystemEvent> {
22 pub path: ActorPath,
23 pub system: ActorSystem<E>,
24}
25
26impl<E: SystemEvent> ActorContext<E> {
27 /// Create a child actor under this actor.
28 pub async fn create_child<A: Actor<E>>(
29 &self,
30 name: &str,
31 actor: A,
32 ) -> Result<ActorRef<E, A>, ActorError> {
33 let path = self.path.clone() / name;
34 self.system.create_actor_path(path, actor).await
35 }
36
37 /// Retrieve a child actor running under this actor.
38 pub async fn get_child<A: Actor<E>>(&self, name: &str) -> Option<ActorRef<E, A>> {
39 let path = self.path.clone() / name;
40 self.system.get_actor(&path).await
41 }
42
43 /// Retrieve or create a new child under this actor if it does not exist yet
44 pub async fn get_or_create_child<A, F>(
45 &self,
46 name: &str,
47 actor_fn: F,
48 ) -> Result<ActorRef<E, A>, ActorError>
49 where
50 A: Actor<E>,
51 F: FnOnce() -> A,
52 {
53 let path = self.path.clone() / name;
54 self.system.get_or_create_actor_path(&path, actor_fn).await
55 }
56
57 /// Stops the child actor
58 pub async fn stop_child(&self, name: &str) {
59 let path = self.path.clone() / name;
60 self.system.stop_actor(&path).await;
61 }
62
63 pub(crate) async fn restart<A>(
64 &mut self,
65 actor: &mut A,
66 error: Option<&ActorError>,
67 ) -> Result<(), ActorError>
68 where
69 A: Actor<E>,
70 {
71 actor.pre_restart(self, error).await
72 }
73}
74
75/// Defines what an actor will receive as its message, and with what it should respond.
76pub trait Message: Clone + Send + Sync + 'static {
77 /// response an actor should give when it receives this message. If no response is
78 /// required, use `()`.
79 type Response: Send + Sync + 'static;
80}
81
82/// Basic trait for actors. Allows you to define tasks that should be run before
83/// actor startup, when an actor restarts, and tasks that should be run after
84/// the actor is stopped. It also allows you to define a supervisor strategy
85/// that should govern the actor when it fails to start up properly. For
86/// example:
87/// ```
88/// use tiny_tokio_actor::*;
89/// use std::time::Duration;
90///
91/// #[derive(Clone, Debug)]
92/// struct TestEvent(String);
93///
94/// impl SystemEvent for TestEvent {}
95///
96/// # #[derive(Clone)]
97/// # struct Database;
98/// # impl Database {
99/// # pub fn init() -> Result<Self, std::io::Error> {
100/// # Ok(Database)
101/// # }
102/// # }
103///
104/// struct MyActor {
105/// db: Option<Database>
106/// }
107///
108/// #[async_trait]
109/// impl Actor<TestEvent> for MyActor {
110///
111/// // This actor will stop after 5 seconds of not receiving a message
112/// fn timeout() -> Option<Duration> {
113/// Some(Duration::from_secs(5))
114/// }
115///
116/// // If it fails to start up, retry the actor 5 times, with a wait period
117/// // of 5 seconds before each retry
118/// fn supervision_strategy() -> SupervisionStrategy {
119/// let strategy = supervision::FixedIntervalStrategy::new(5, Duration::from_secs(5));
120/// SupervisionStrategy::Retry(Box::new(strategy))
121/// }
122///
123/// // Initialize the database
124/// async fn pre_start(&mut self, _ctx: &mut ActorContext<TestEvent>) -> Result<(), ActorError> {
125/// let db = Database::init().map_err(ActorError::new)?;
126/// self.db = Some(db);
127/// Ok(())
128/// }
129///
130/// // When restarting, log an error message and try starting again
131/// async fn pre_restart(&mut self, ctx: &mut ActorContext<TestEvent>, error: Option<&ActorError>) -> Result<(), ActorError> {
132/// log::error!("Actor '{}' is restarting due to {:#?}. Restarting...", ctx.path, error);
133/// self.pre_start(ctx).await
134/// }
135/// }
136/// ```
137#[async_trait]
138pub trait Actor<E: SystemEvent>: Send + Sync + 'static {
139
140 /// Defines the timeout to set for this actor. An actor will wait for the
141 /// defined time to receive a message. If no message is received within the
142 /// timeout time specified, the actor will stop.
143 ///
144 /// By default this is set to None (no timeout).
145 fn timeout() -> Option<Duration> {
146 None
147 }
148
149 /// Defines the supervision strategy to use for this actor. By default it is
150 /// `Stop` which simply stops the actor if an error occurs at startup. You
151 /// can also set this to [`SupervisionStrategy::Retry`] with a chosen
152 /// [`supervision::RetryStrategy`].
153 fn supervision_strategy() -> SupervisionStrategy {
154 SupervisionStrategy::Stop
155 }
156
157 /// Override this function if you like to perform initialization of the actor
158 async fn pre_start(&mut self, _ctx: &mut ActorContext<E>) -> Result<(), ActorError> {
159 Ok(())
160 }
161
162 /// Override this function if you want to define what should happen when an
163 /// error occurs in [`Actor::pre_start()`]. By default it simply calls
164 /// `pre_start()` again, but you can also choose to reinitialize the actor
165 /// in some other way.
166 async fn pre_restart(
167 &mut self,
168 ctx: &mut ActorContext<E>,
169 _error: Option<&ActorError>,
170 ) -> Result<(), ActorError> {
171 self.pre_start(ctx).await
172 }
173
174 /// Override this function if you like to perform work when the actor is stopped
175 async fn post_stop(&mut self, _ctx: &mut ActorContext<E>) {}
176}
177
178/// Defines what the actor does with a message.
179#[async_trait]
180pub trait Handler<E: SystemEvent, M: Message>: Actor<E> {
181 async fn handle(&mut self, msg: M, ctx: &mut ActorContext<E>) -> M::Response;
182}
183
184/// A clonable actor reference. It basically holds a Sender that can send messages
185/// to the mailbox (receiver) of the actor.
186pub struct ActorRef<E: SystemEvent, A: Actor<E>> {
187 path: ActorPath,
188 sender: mpsc::UnboundedSender<handler::BoxedMessageHandler<E, A>>,
189}
190
191impl<E: SystemEvent, A: Actor<E>> Clone for ActorRef<E, A> {
192 fn clone(&self) -> Self {
193 Self { path: self.path.clone(), sender: self.sender.clone() }
194 }
195}
196
197impl<E: SystemEvent, A: Actor<E>> ActorRef<E, A> {
198 /// Get the path of this actor
199 pub fn path(&self) -> &ActorPath {
200 &self.path
201 }
202
203 /// Fire and forget sending of messages to this actor.
204 pub fn tell<M>(&self, msg: M) -> Result<(), ActorError>
205 where
206 M: Message,
207 A: Handler<E, M>,
208 {
209 let message = handler::ActorMessage::<M, E, A>::new(msg, None);
210 if let Err(error) = self.sender.send(Box::new(message)) {
211 log::error!("Failed to tell message! {}", error.to_string());
212 Err(ActorError::SendError(error.to_string()))
213 } else {
214 Ok(())
215 }
216 }
217
218 /// Send a message to an actor, expecting a response.
219 pub async fn ask<M>(&self, msg: M) -> Result<M::Response, ActorError>
220 where
221 M: Message,
222 A: Handler<E, M>,
223 {
224 let (response_sender, response_receiver) = oneshot::channel();
225 let message = handler::ActorMessage::<M, E, A>::new(msg, Some(response_sender));
226 if let Err(error) = self.sender.send(Box::new(message)) {
227 log::error!("Failed to ask message! {}", error.to_string());
228 Err(ActorError::SendError(error.to_string()))
229 } else {
230 response_receiver
231 .await
232 .map_err(|error| ActorError::SendError(error.to_string()))
233 }
234 }
235
236 /// Checks if the actor mailbox is still open. If it is closed, the actor
237 /// is not running.
238 pub fn is_closed(&self) -> bool {
239 self.sender.is_closed()
240 }
241
242 pub(crate) fn new(path: ActorPath, sender: handler::MailboxSender<E, A>) -> Self {
243 ActorRef {
244 path,
245 sender,
246 }
247 }
248}
249
250impl<E: SystemEvent, A: Actor<E>> std::fmt::Debug for ActorRef<E, A> {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(f, "{}", self.path)
253 }
254}
255
256#[derive(Error, Debug)]
257pub enum ActorError {
258 #[error("Actor exists")]
259 Exists(ActorPath),
260
261 #[error("Actor creation failed")]
262 CreateError(String),
263
264 #[error("Sending message failed")]
265 SendError(String),
266
267 #[error("Actor runtime error")]
268 RuntimeError(anyhow::Error),
269}
270
271impl ActorError {
272 pub fn new<E>(error: E) -> Self
273 where
274 E: std::error::Error + Send + Sync + 'static,
275 {
276 Self::RuntimeError(anyhow::Error::new(error))
277 }
278}