1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
pub(crate) mod handler;
pub(crate) mod runner;
pub(crate) mod supervision;
use async_trait::async_trait;
use thiserror::Error;
use tokio::time::Duration;
use tokio::sync::{mpsc, oneshot};
mod path;
pub use path::ActorPath;
use supervision::SupervisionStrategy;
use crate::system::{ActorSystem, SystemEvent};
/// The actor context gives a running actor access to its path, as well as the system that
/// is running it.
#[derive(Debug)]
pub struct ActorContext<E: SystemEvent> {
pub path: ActorPath,
pub system: ActorSystem<E>,
}
impl<E: SystemEvent> ActorContext<E> {
/// Create a child actor under this actor.
pub async fn create_child<A: Actor<E>>(
&self,
name: &str,
actor: A,
) -> Result<ActorRef<E, A>, ActorError> {
let path = self.path.clone() / name;
self.system.create_actor_path(path, actor).await
}
/// Retrieve a child actor running under this actor.
pub async fn get_child<A: Actor<E>>(&self, name: &str) -> Option<ActorRef<E, A>> {
let path = self.path.clone() / name;
self.system.get_actor(&path).await
}
/// Retrieve or create a new child under this actor if it does not exist yet
pub async fn get_or_create_child<A, F>(
&self,
name: &str,
actor_fn: F,
) -> Result<ActorRef<E, A>, ActorError>
where
A: Actor<E>,
F: FnOnce() -> A,
{
let path = self.path.clone() / name;
self.system.get_or_create_actor_path(&path, actor_fn).await
}
/// Stops the child actor
pub async fn stop_child(&self, name: &str) {
let path = self.path.clone() / name;
self.system.stop_actor(&path).await;
}
pub(crate) async fn restart<A>(
&mut self,
actor: &mut A,
error: Option<&ActorError>,
) -> Result<(), ActorError>
where
A: Actor<E>,
{
actor.pre_restart(self, error).await
}
}
/// Defines what an actor will receive as its message, and with what it should respond.
pub trait Message: Clone + Send + Sync + 'static {
/// response an actor should give when it receives this message. If no response is
/// required, use `()`.
type Response: Send + Sync + 'static;
}
/// Basic trait for actors. Allows you to define tasks that should be run before
/// actor startup, when an actor restarts, and tasks that should be run after
/// the actor is stopped. It also allows you to define a supervisor strategy
/// that should govern the actor when it fails to start up properly. For
/// example:
/// ```
/// use tiny_tokio_actor::*;
/// use std::time::Duration;
///
/// #[derive(Clone, Debug)]
/// struct TestEvent(String);
///
/// impl SystemEvent for TestEvent {}
///
/// # #[derive(Clone)]
/// # struct Database;
/// # impl Database {
/// # pub fn init() -> Result<Self, std::io::Error> {
/// # Ok(Database)
/// # }
/// # }
///
/// struct MyActor {
/// db: Option<Database>
/// }
///
/// #[async_trait]
/// impl Actor<TestEvent> for MyActor {
///
/// // This actor will stop after 5 seconds of not receiving a message
/// fn timeout() -> Option<Duration> {
/// Some(Duration::from_secs(5))
/// }
///
/// // If it fails to start up, retry the actor 5 times, with a wait period
/// // of 5 seconds before each retry
/// fn supervision_strategy() -> SupervisionStrategy {
/// let strategy = supervision::FixedIntervalStrategy::new(5, Duration::from_secs(5));
/// SupervisionStrategy::Retry(Box::new(strategy))
/// }
///
/// // Initialize the database
/// async fn pre_start(&mut self, _ctx: &mut ActorContext<TestEvent>) -> Result<(), ActorError> {
/// let db = Database::init().map_err(ActorError::new)?;
/// self.db = Some(db);
/// Ok(())
/// }
///
/// // When restarting, log an error message and try starting again
/// async fn pre_restart(&mut self, ctx: &mut ActorContext<TestEvent>, error: Option<&ActorError>) -> Result<(), ActorError> {
/// log::error!("Actor '{}' is restarting due to {:#?}. Restarting...", ctx.path, error);
/// self.pre_start(ctx).await
/// }
/// }
/// ```
#[async_trait]
pub trait Actor<E: SystemEvent>: Send + Sync + 'static {
/// Defines the timeout to set for this actor. An actor will wait for the
/// defined time to receive a message. If no message is received within the
/// timeout time specified, the actor will stop.
///
/// By default this is set to None (no timeout).
fn timeout() -> Option<Duration> {
None
}
/// Defines the supervision strategy to use for this actor. By default it is
/// `Stop` which simply stops the actor if an error occurs at startup. You
/// can also set this to [`SupervisionStrategy::Retry`] with a chosen
/// [`supervision::RetryStrategy`].
fn supervision_strategy() -> SupervisionStrategy {
SupervisionStrategy::Stop
}
/// Override this function if you like to perform initialization of the actor
async fn pre_start(&mut self, _ctx: &mut ActorContext<E>) -> Result<(), ActorError> {
Ok(())
}
/// Override this function if you want to define what should happen when an
/// error occurs in [`Actor::pre_start()`]. By default it simply calls
/// `pre_start()` again, but you can also choose to reinitialize the actor
/// in some other way.
async fn pre_restart(
&mut self,
ctx: &mut ActorContext<E>,
_error: Option<&ActorError>,
) -> Result<(), ActorError> {
self.pre_start(ctx).await
}
/// Override this function if you like to perform work when the actor is stopped
async fn post_stop(&mut self, _ctx: &mut ActorContext<E>) {}
}
/// Defines what the actor does with a message.
#[async_trait]
pub trait Handler<E: SystemEvent, M: Message>: Actor<E> {
async fn handle(&mut self, msg: M, ctx: &mut ActorContext<E>) -> M::Response;
}
/// A clonable actor reference. It basically holds a Sender that can send messages
/// to the mailbox (receiver) of the actor.
pub struct ActorRef<E: SystemEvent, A: Actor<E>> {
path: ActorPath,
sender: mpsc::UnboundedSender<handler::BoxedMessageHandler<E, A>>,
}
impl<E: SystemEvent, A: Actor<E>> Clone for ActorRef<E, A> {
fn clone(&self) -> Self {
Self { path: self.path.clone(), sender: self.sender.clone() }
}
}
impl<E: SystemEvent, A: Actor<E>> ActorRef<E, A> {
/// Get the path of this actor
pub fn path(&self) -> &ActorPath {
&self.path
}
/// Fire and forget sending of messages to this actor.
pub fn tell<M>(&self, msg: M) -> Result<(), ActorError>
where
M: Message,
A: Handler<E, M>,
{
let message = handler::ActorMessage::<M, E, A>::new(msg, None);
if let Err(error) = self.sender.send(Box::new(message)) {
log::error!("Failed to tell message! {}", error.to_string());
Err(ActorError::SendError(error.to_string()))
} else {
Ok(())
}
}
/// Send a message to an actor, expecting a response.
pub async fn ask<M>(&self, msg: M) -> Result<M::Response, ActorError>
where
M: Message,
A: Handler<E, M>,
{
let (response_sender, response_receiver) = oneshot::channel();
let message = handler::ActorMessage::<M, E, A>::new(msg, Some(response_sender));
if let Err(error) = self.sender.send(Box::new(message)) {
log::error!("Failed to ask message! {}", error.to_string());
Err(ActorError::SendError(error.to_string()))
} else {
response_receiver
.await
.map_err(|error| ActorError::SendError(error.to_string()))
}
}
/// Checks if the actor mailbox is still open. If it is closed, the actor
/// is not running.
pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}
pub(crate) fn new(path: ActorPath, sender: handler::MailboxSender<E, A>) -> Self {
ActorRef {
path,
sender,
}
}
}
impl<E: SystemEvent, A: Actor<E>> std::fmt::Debug for ActorRef<E, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.path)
}
}
#[derive(Error, Debug)]
pub enum ActorError {
#[error("Actor exists")]
Exists(ActorPath),
#[error("Actor creation failed")]
CreateError(String),
#[error("Sending message failed")]
SendError(String),
#[error("Actor runtime error")]
RuntimeError(anyhow::Error),
}
impl ActorError {
pub fn new<E>(error: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
Self::RuntimeError(anyhow::Error::new(error))
}
}