#![warn(clippy::clone_on_ref_ptr, clippy::todo)]
use flume::{Receiver, RecvError, Selector, Sender, select::SelectError};
use log::*;
use parking_lot::{Mutex, RwLock};
use std::{
any::{TypeId, type_name},
collections::HashMap,
fmt,
ops::Deref,
sync::Arc,
thread,
time::{Duration, Instant},
};
#[cfg(feature = "async")]
pub mod r#async;
pub mod timed;
#[cfg(feature = "async")]
pub use r#async::AsyncActor;
const CONTROL_CHANNEL_CAPACITY: usize = 5;
#[derive(Debug)]
pub enum ActorError {
SystemStopped { actor_name: &'static str },
SpawnFailed { actor_name: &'static str },
ActorPanic,
}
impl fmt::Display for ActorError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ActorError::SystemStopped { actor_name } => {
write!(f, "the system is not running; the actor {actor_name} can not be started")
},
ActorError::SpawnFailed { actor_name } => {
write!(f, "failed to spawn a thread for the actor {actor_name}")
},
ActorError::ActorPanic => {
write!(f, "panic inside an actor thread; see above for more verbose logs")
},
}
}
}
impl std::error::Error for ActorError {}
#[derive(Debug, Clone, Copy)]
pub struct SendError {
pub recipient_name: &'static str,
pub priority: Priority,
pub reason: SendErrorReason,
}
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let recipient_name = self.recipient_name;
let priority = self.priority;
match self.reason {
SendErrorReason::Full => {
write!(
f,
"the capacity of {recipient_name}'s {priority:?}-priority channel is full"
)
},
SendErrorReason::Disconnected => DisconnectedError { recipient_name, priority }.fmt(f),
}
}
}
impl std::error::Error for SendError {}
#[derive(Debug)]
pub struct PublishError(pub Vec<SendError>);
impl fmt::Display for PublishError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let error_strings: Vec<String> = self.0.iter().map(ToString::to_string).collect();
write!(
f,
"failed to deliver an event to {} subscribers: {}",
self.0.len(),
error_strings.join(", ")
)
}
}
impl std::error::Error for PublishError {}
#[derive(Debug, Clone, Copy)]
pub struct DisconnectedError {
pub recipient_name: &'static str,
pub priority: Priority,
}
impl fmt::Display for DisconnectedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "the recipient of the message ({}) no longer exists", self.recipient_name)
}
}
impl std::error::Error for DisconnectedError {}
#[derive(Debug, Clone, Copy)]
pub enum SendErrorReason {
Full,
Disconnected,
}
impl<M> From<flume::TrySendError<M>> for SendErrorReason {
fn from(orig: flume::TrySendError<M>) -> Self {
match orig {
flume::TrySendError::Full(_) => Self::Full,
flume::TrySendError::Disconnected(_) => Self::Disconnected,
}
}
}
#[derive(Default)]
pub struct System {
handle: SystemHandle,
}
type SystemCallback = Box<dyn Fn() -> Result<(), ActorError> + Send + Sync>;
type EventCallback = Box<dyn Fn(&dyn std::any::Any) -> Result<(), SendError> + Send + Sync>;
#[derive(Default)]
pub struct SystemCallbacks {
pub preshutdown: Option<SystemCallback>,
pub postshutdown: Option<SystemCallback>,
}
#[derive(Debug, Default, PartialEq)]
enum SystemState {
#[default]
Running,
ShuttingDown,
Stopped,
}
impl SystemState {
fn is_running(&self) -> bool {
match self {
SystemState::Running => true,
SystemState::ShuttingDown | SystemState::Stopped => false,
}
}
}
pub trait Event: Clone + std::any::Any + Send + Sync {}
#[derive(Default)]
struct EventSubscribers {
events: HashMap<TypeId, Vec<EventCallback>>,
last_value_cache: dashmap::DashMap<TypeId, Box<dyn std::any::Any + Send + Sync>>,
}
impl EventSubscribers {
fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&mut self, recipient: Recipient<M>) {
let subs = self.events.entry(TypeId::of::<E>()).or_default();
subs.push(Box::new(move |e| {
if let Some(event) = e.downcast_ref::<E>() {
let msg = event.clone();
recipient.send(msg.into())?;
}
Ok(())
}));
}
}
#[derive(Default, Clone)]
pub struct SystemHandle {
name: String,
registry: Arc<Mutex<Vec<RegistryEntry>>>,
system_state: Arc<RwLock<SystemState>>,
callbacks: Arc<SystemCallbacks>,
event_subscribers: Arc<RwLock<EventSubscribers>>,
}
pub struct Context<M> {
bare: BareContext<M>,
receive_deadline: Option<Instant>,
}
impl<M> Context<M> {
fn new(system_handle: SystemHandle, myself: Recipient<M>) -> Self {
Self { bare: BareContext { system_handle, myself }, receive_deadline: None }
}
pub fn deadline(&self) -> &Option<Instant> {
&self.receive_deadline
}
pub fn set_deadline(&mut self, deadline: Option<Instant>) {
self.receive_deadline = deadline;
}
pub fn set_timeout(&mut self, timeout: Option<Duration>) {
self.set_deadline(timeout.map(|t| Instant::now() + t));
}
}
impl<M> Deref for Context<M> {
type Target = BareContext<M>;
fn deref(&self) -> &BareContext<M> {
&self.bare
}
}
pub struct BareContext<M> {
pub system_handle: SystemHandle,
pub myself: Recipient<M>,
}
impl<M: 'static> BareContext<M> {
pub fn subscribe<E: Event + Into<M>>(&self) {
self.system_handle.subscribe_recipient::<M, E>(self.myself.clone());
}
pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError> {
self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.clone())
}
}
impl<M> Clone for BareContext<M> {
fn clone(&self) -> Self {
Self { system_handle: self.system_handle.clone(), myself: self.myself.clone() }
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Capacity {
pub normal: usize,
pub high: usize,
}
impl From<usize> for Capacity {
fn from(capacity: usize) -> Self {
Self { normal: capacity, high: capacity }
}
}
#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \
configure this builder"]
pub struct SpawnBuilderWithoutAddress<'a, A: Actor, F: FnOnce() -> A> {
system: &'a mut System,
factory: F,
}
impl<'a, A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
SpawnBuilderWithoutAddress<'a, A, F>
{
pub fn with_addr(self, addr: Addr<A::Message>) -> SpawnBuilderWithAddress<'a, A, F> {
SpawnBuilderWithAddress { spawn_builder: self, addr }
}
pub fn with_capacity(self, capacity: impl Into<Capacity>) -> SpawnBuilderWithAddress<'a, A, F> {
let addr = A::addr_with_capacity(capacity);
SpawnBuilderWithAddress { spawn_builder: self, addr }
}
pub fn with_default_capacity(self) -> SpawnBuilderWithAddress<'a, A, F> {
let addr = A::addr();
SpawnBuilderWithAddress { spawn_builder: self, addr }
}
}
#[must_use = "You must call .spawn() or .run_and_block() to run the actor"]
pub struct SpawnBuilderWithAddress<'a, A: Actor, F: FnOnce() -> A> {
spawn_builder: SpawnBuilderWithoutAddress<'a, A, F>,
addr: Addr<A::Message>,
}
impl<A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
SpawnBuilderWithAddress<'_, A, F>
{
pub fn run_and_block(self) -> Result<(), ActorError> {
let factory = self.spawn_builder.factory;
self.spawn_builder.system.block_on(factory(), self.addr)
}
}
impl<A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A + Send + 'static>
SpawnBuilderWithAddress<'_, A, F>
{
pub fn spawn(self) -> Result<Addr<A::Message>, ActorError> {
let builder = self.spawn_builder;
builder.system.spawn_fn_with_addr(builder.factory, self.addr.clone())?;
Ok(self.addr)
}
}
impl System {
pub fn new(name: &str) -> Self {
System::with_callbacks(name, Default::default())
}
pub fn with_callbacks(name: &str, callbacks: SystemCallbacks) -> Self {
Self {
handle: SystemHandle {
name: name.to_owned(),
callbacks: Arc::new(callbacks),
..SystemHandle::default()
},
}
}
pub fn prepare<A>(
&mut self,
actor: A,
) -> SpawnBuilderWithoutAddress<'_, A, impl FnOnce() -> A + use<A>>
where
A: Actor,
{
SpawnBuilderWithoutAddress { system: self, factory: move || actor }
}
pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilderWithoutAddress<'_, A, F>
where
A: Actor,
F: FnOnce() -> A + Send,
{
SpawnBuilderWithoutAddress { system: self, factory }
}
pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A::Message>, ActorError>
where
A: Actor<Context = Context<<A as Actor>::Message>> + Send + 'static,
{
self.prepare(actor).with_default_capacity().spawn()
}
fn spawn_fn_with_addr<F, A>(
&mut self,
factory: F,
addr: Addr<A::Message>,
) -> Result<(), ActorError>
where
F: FnOnce() -> A + Send + 'static,
A: Actor<Context = Context<<A as Actor>::Message>>,
{
let system_state_lock = self.handle.system_state.read();
if !system_state_lock.is_running() {
return Err(ActorError::SystemStopped { actor_name: A::name() });
}
let system_handle = self.handle.clone();
let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
let control_addr = addr.control_tx.clone();
let thread_handle = thread::Builder::new()
.name(A::name().into())
.spawn(move || {
let mut actor = factory();
if let Err(error) = actor.started(&mut context) {
Self::report_error_shutdown(&system_handle, A::name(), "started()", error);
return;
}
debug!("[{}] started actor: {}", system_handle.name, A::name());
Self::run_actor_select_loop(actor, addr, &mut context, &system_handle);
})
.map_err(|_| ActorError::SpawnFailed { actor_name: A::name() })?;
self.handle
.registry
.lock()
.push(RegistryEntry::BackgroundThread(control_addr, thread_handle));
Ok(())
}
pub fn run(&mut self) -> Result<(), ActorError> {
while *self.system_state.read() != SystemState::Stopped {
thread::sleep(Duration::from_millis(10));
}
Ok(())
}
fn block_on<A>(&mut self, mut actor: A, addr: Addr<A::Message>) -> Result<(), ActorError>
where
A: Actor<Context = Context<<A as Actor>::Message>>,
{
if !self.is_running() {
return Err(ActorError::SystemStopped { actor_name: A::name() });
}
let system_handle = &self.handle;
let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
self.handle
.registry
.lock()
.push(RegistryEntry::InPlace(addr.control_tx.clone(), thread::current()));
match actor.started(&mut context) {
Ok(()) => {
debug!("[{}] started actor: {}", system_handle.name, A::name());
Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
},
Err(error) => Self::report_error_shutdown(system_handle, A::name(), "started()", error),
}
while *self.system_state.read() != SystemState::Stopped {
thread::sleep(Duration::from_millis(10));
}
Ok(())
}
fn run_actor_select_loop<A>(
mut actor: A,
addr: Addr<A::Message>,
context: &mut Context<A::Message>,
system_handle: &SystemHandle,
) where
A: Actor<Context = Context<<A as Actor>::Message>>,
{
enum Received<M> {
Control(Control),
Message(M),
Timeout,
}
loop {
let selector = Selector::new()
.recv(&addr.control_rx, |msg| match msg {
Ok(control) => Received::Control(control),
Err(RecvError::Disconnected) => {
panic!("We keep control_tx alive through addr, should not happen.")
},
})
.recv(&addr.priority_rx, |msg| match msg {
Ok(msg) => Received::Message(msg),
Err(RecvError::Disconnected) => {
panic!("We keep priority_tx alive through addr, should not happen.")
},
})
.recv(&addr.message_rx, |msg| match msg {
Ok(msg) => Received::Message(msg),
Err(RecvError::Disconnected) => {
panic!("We keep message_tx alive through addr, should not happen.")
},
});
let received = if let Some(deadline) = context.receive_deadline {
match selector.wait_deadline(deadline) {
Ok(received) => received,
Err(SelectError::Timeout) => Received::Timeout,
}
} else {
selector.wait()
};
match received {
Received::Control(Control::Stop) => {
if let Err(error) = actor.stopped(context) {
Self::report_error_shutdown(system_handle, A::name(), "stopped()", error);
}
debug!("[{}] stopped actor: {}", system_handle.name, A::name());
return;
},
Received::Message(msg) => {
trace!("[{}] message received by {}", system_handle.name, A::name());
if let Err(error) = actor.handle(context, msg) {
Self::report_error_shutdown(system_handle, A::name(), "handle()", error);
return;
}
},
Received::Timeout => {
let deadline = context.receive_deadline.take().expect("implied by timeout");
if let Err(error) = actor.deadline_passed(context, deadline) {
Self::report_error_shutdown(
system_handle,
A::name(),
"deadline_passed()",
error,
);
return;
}
},
}
}
}
fn report_error_shutdown(
system_handle: &SystemHandle,
actor_name: &str,
action: &str,
error: impl std::fmt::Display,
) {
let system_name = &system_handle.name;
if system_handle.system_state.read().is_running() {
error!(
"[{system_name}] {actor_name} {action} error: {error:#}. Shutting down the actor \
system."
);
let _ = system_handle.shutdown();
} else {
warn!(
"[{system_name}] {actor_name} {action} error while shutting down: {error:#}. \
Ignoring."
);
}
}
}
impl Drop for System {
fn drop(&mut self) {
self.shutdown().unwrap();
}
}
impl Deref for System {
type Target = SystemHandle;
fn deref(&self) -> &Self::Target {
&self.handle
}
}
impl SystemHandle {
pub fn shutdown(&self) -> Result<(), ActorError> {
let shutdown_start = Instant::now();
let current_thread = thread::current();
let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
{
let mut system_state_lock = self.system_state.write();
if system_state_lock.is_running() {
info!(
"[{}] thread {} shutting down the actor system.",
self.name, current_thread_name,
);
*system_state_lock = SystemState::ShuttingDown;
} else {
trace!(
"[{}] thread {} called system.shutdown() but the system is already shutting \
down or stopped.",
self.name, current_thread_name,
);
return Ok(());
}
}
if let Some(callback) = self.callbacks.preshutdown.as_ref() {
info!("[{}] calling pre-shutdown callback.", self.name);
if let Err(err) = callback() {
warn!("[{}] pre-shutdown callback failed, reason: {}", self.name, err);
}
}
let err_count = {
let mut registry = self.registry.lock();
debug!("[{}] joining {} actor threads.", self.name, registry.len());
for entry in registry.iter_mut().rev() {
let actor_name = entry.name();
if let Err(e) = entry.control_addr().send(Control::Stop) {
warn!(
"Couldn't send Control::Stop to {actor_name} to shut it down: {e:#}. \
Ignoring and proceeding."
);
}
}
registry
.drain(..)
.enumerate()
.rev()
.filter_map(|(i, entry)| {
let actor_name = entry.name();
match entry {
RegistryEntry::InPlace(_, _) => {
debug!(
"[{}] [{i}] skipping join of an actor running in-place: \
{actor_name}",
self.name
);
None
},
RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
if thread_handle.thread().id() == current_thread.id() {
debug!(
"[{}] [{i}] skipping join of the actor thread currently \
executing SystemHandle::shutdown(): {actor_name}",
self.name,
);
return None;
}
debug!("[{}] [{}] joining actor thread: {}", self.name, i, actor_name);
let join_result = thread_handle.join().map_err(|e| {
error!("a panic inside actor thread {actor_name}: {e:?}")
});
debug!("[{}] [{}] joined actor thread: {}", self.name, i, actor_name);
join_result.err()
},
}
})
.count()
};
info!("[{}] system finished shutting down in {:?}.", self.name, shutdown_start.elapsed());
if let Some(callback) = self.callbacks.postshutdown.as_ref() {
info!("[{}] calling post-shutdown callback.", self.name);
if let Err(err) = callback() {
warn!("[{}] post-shutdown callback failed, reason: {}", self.name, err);
}
}
*self.system_state.write() = SystemState::Stopped;
if err_count > 0 { Err(ActorError::ActorPanic) } else { Ok(()) }
}
pub fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&self, recipient: Recipient<M>) {
let mut event_subscribers = self.event_subscribers.write();
event_subscribers.subscribe_recipient::<M, E>(recipient);
}
pub fn subscribe_and_receive_latest<M: 'static, E: Event + Into<M>>(
&self,
recipient: Recipient<M>,
) -> Result<(), SendError> {
let mut event_subscribers = self.event_subscribers.write();
if let Some(last_cached_value) = event_subscribers.last_value_cache.get(&TypeId::of::<E>())
&& let Some(msg) = last_cached_value.downcast_ref::<E>()
{
recipient.send(msg.clone().into())?;
}
event_subscribers.subscribe_recipient::<M, E>(recipient);
Ok(())
}
pub fn publish<E: Event>(&self, event: E) -> Result<(), PublishError> {
let event_subscribers = self.event_subscribers.read();
let type_id = TypeId::of::<E>();
event_subscribers.last_value_cache.insert(type_id, Box::new(event.clone()));
if let Some(subs) = event_subscribers.events.get(&type_id) {
let errors: Vec<SendError> = subs
.iter()
.filter_map(|subscriber_callback| subscriber_callback(&event).err())
.collect();
if !errors.is_empty() {
return Err(PublishError(errors));
}
}
Ok(())
}
pub fn name(&self) -> &str {
&self.name
}
pub fn is_running(&self) -> bool {
self.system_state.read().is_running()
}
}
enum RegistryEntry {
InPlace(Sender<Control>, thread::Thread),
BackgroundThread(Sender<Control>, thread::JoinHandle<()>),
}
impl RegistryEntry {
fn name(&self) -> String {
match self {
RegistryEntry::InPlace(_, thread_handle) => {
thread_handle.name().unwrap_or("unnamed").to_owned()
},
RegistryEntry::BackgroundThread(_, join_handle) => {
join_handle.thread().name().unwrap_or("unnamed").to_owned()
},
}
}
fn control_addr(&mut self) -> &mut Sender<Control> {
match self {
RegistryEntry::InPlace(control_addr, _) => control_addr,
RegistryEntry::BackgroundThread(control_addr, _) => control_addr,
}
}
}
pub enum Control {
Stop,
}
pub trait Actor {
type Message: Send + 'static;
type Error: std::fmt::Display;
type Context;
const DEFAULT_CAPACITY_NORMAL: usize = 5;
const DEFAULT_CAPACITY_HIGH: usize = 5;
fn name() -> &'static str {
type_name::<Self>()
}
fn priority(_message: &Self::Message) -> Priority {
Priority::Normal
}
fn started(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
Ok(())
}
fn handle(
&mut self,
context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error>;
fn stopped(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
Ok(())
}
fn deadline_passed(
&mut self,
_context: &mut Self::Context,
_deadline: Instant,
) -> Result<(), Self::Error> {
Ok(())
}
fn addr() -> Addr<Self::Message> {
let capacity =
Capacity { normal: Self::DEFAULT_CAPACITY_NORMAL, high: Self::DEFAULT_CAPACITY_HIGH };
Self::addr_with_capacity(capacity)
}
fn addr_with_capacity(capacity: impl Into<Capacity>) -> Addr<Self::Message> {
Addr::new(capacity, Self::name(), Self::priority)
}
}
pub struct Addr<M> {
recipient: Recipient<M>,
priority_rx: Receiver<M>,
message_rx: Receiver<M>,
control_rx: Receiver<Control>,
}
impl<M> Clone for Addr<M> {
fn clone(&self) -> Self {
Self {
recipient: self.recipient.clone(),
priority_rx: self.priority_rx.clone(),
message_rx: self.message_rx.clone(),
control_rx: self.control_rx.clone(),
}
}
}
impl<M> Deref for Addr<M> {
type Target = Recipient<M>;
fn deref(&self) -> &Self::Target {
&self.recipient
}
}
impl<M: Send + 'static> Addr<M> {
fn new(
capacity: impl Into<Capacity>,
name: &'static str,
get_priority: fn(&M) -> Priority,
) -> Self {
let capacity: Capacity = capacity.into();
let (priority_tx, priority_rx) = flume::bounded::<M>(capacity.high);
let (message_tx, message_rx) = flume::bounded::<M>(capacity.normal);
let (control_tx, control_rx) = flume::bounded(CONTROL_CHANNEL_CAPACITY);
let message_tx =
Arc::new(MessageSender { high: priority_tx, normal: message_tx, get_priority, name });
Self {
recipient: Recipient { message_tx, control_tx },
priority_rx,
message_rx,
control_rx,
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum Priority {
Normal,
High,
}
pub struct Recipient<M> {
message_tx: Arc<dyn SenderTrait<M>>,
control_tx: Sender<Control>,
}
impl<M> Clone for Recipient<M> {
fn clone(&self) -> Self {
Self { message_tx: Arc::clone(&self.message_tx), control_tx: self.control_tx.clone() }
}
}
impl<M> Recipient<M> {
pub fn send(&self, message: M) -> Result<(), SendError> {
self.message_tx.try_send(message)
}
}
impl<M: 'static> Recipient<M> {
pub fn recipient<N: Into<M>>(&self) -> Recipient<N> {
Recipient {
message_tx: Arc::new(Arc::clone(&self.message_tx)),
control_tx: self.control_tx.clone(),
}
}
}
pub trait SendResultExt {
fn on_full<F: FnOnce(&'static str, Priority)>(self, func: F) -> Result<(), DisconnectedError>;
fn ignore_on_full(self) -> Result<(), DisconnectedError>;
}
impl SendResultExt for Result<(), SendError> {
fn on_full<F: FnOnce(&'static str, Priority)>(
self,
callback: F,
) -> Result<(), DisconnectedError> {
self.or_else(|e| match e {
SendError { recipient_name, priority, reason: SendErrorReason::Full } => {
callback(recipient_name, priority);
Ok(())
},
SendError { recipient_name, priority, reason: SendErrorReason::Disconnected } => {
Err(DisconnectedError { recipient_name, priority })
},
})
}
fn ignore_on_full(self) -> Result<(), DisconnectedError> {
self.on_full(|_, _| ())
}
}
struct MessageSender<M> {
high: Sender<M>,
normal: Sender<M>,
get_priority: fn(&M) -> Priority,
name: &'static str,
}
trait SenderTrait<M>: Send + Sync {
fn try_send(&self, message: M) -> Result<(), SendError>;
}
impl<M: Send> SenderTrait<M> for MessageSender<M> {
fn try_send(&self, message: M) -> Result<(), SendError> {
let priority = (self.get_priority)(&message);
let sender = match priority {
Priority::Normal => &self.normal,
Priority::High => &self.high,
};
sender.try_send(message).map_err(|e| SendError {
reason: e.into(),
recipient_name: self.name,
priority,
})
}
}
impl<M: Into<N>, N> SenderTrait<M> for Arc<dyn SenderTrait<N>> {
fn try_send(&self, message: M) -> Result<(), SendError> {
self.deref().try_send(message.into())
}
}
#[cfg(test)]
mod tests {
use std::{
rc::Rc,
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use super::*;
struct TestActor;
impl Actor for TestActor {
type Context = Context<Self::Message>;
type Error = String;
type Message = usize;
fn name() -> &'static str {
"TestActor"
}
fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> {
println!("message: {message}");
Ok(())
}
fn started(&mut self, _: &mut Self::Context) -> Result<(), String> {
println!("started");
Ok(())
}
fn stopped(&mut self, _: &mut Self::Context) -> Result<(), String> {
println!("stopped");
Ok(())
}
}
#[test]
fn it_works() {
let mut system = System::new("hi");
let address = system.spawn(TestActor).unwrap();
let _ = system.spawn(TestActor).unwrap();
let _ = system.spawn(TestActor).unwrap();
let _ = system.spawn(TestActor).unwrap();
let _ = system.spawn(TestActor).unwrap();
address.send(1337usize).unwrap();
address.send(666usize).unwrap();
address.send(1usize).unwrap();
thread::sleep(Duration::from_millis(100));
system.shutdown().unwrap();
thread::sleep(Duration::from_millis(100));
}
#[test]
fn test_ignore_on_full() {
let mut system = System::new("hi");
let address = system.prepare(TestActor).with_capacity(1).spawn().unwrap();
address.send(1337usize).unwrap();
assert!(address.send(666usize).is_err());
address.send(666usize).ignore_on_full().unwrap();
thread::sleep(Duration::from_millis(100));
system.shutdown().unwrap();
thread::sleep(Duration::from_millis(100));
}
#[test]
fn send_constraints() {
#[derive(Default)]
struct LocalActor {
_ensure_not_send_not_sync: Rc<()>,
}
impl Actor for LocalActor {
type Context = Context<Self::Message>;
type Error = String;
type Message = ();
fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> {
Ok(())
}
fn started(&mut self, ctx: &mut Self::Context) -> Result<(), String> {
ctx.system_handle.shutdown().map_err(|e| e.to_string())
}
}
{
let mut system = System::new("send_constraints prepare_fn");
let _ = system.prepare_fn(LocalActor::default).with_default_capacity().spawn().unwrap();
}
{
let mut system = System::new("send_constraints run_and_block");
system.prepare(LocalActor::default()).with_default_capacity().run_and_block().unwrap();
}
}
#[test]
fn timeouts() {
struct TimeoutActor {
handle_count: Arc<AtomicU32>,
timeout_count: Arc<AtomicU32>,
}
impl Actor for TimeoutActor {
type Context = Context<Self::Message>;
type Error = String;
type Message = Option<Instant>;
fn handle(
&mut self,
ctx: &mut Self::Context,
msg: Self::Message,
) -> Result<(), String> {
self.handle_count.fetch_add(1, Ordering::SeqCst);
if msg.is_some() {
ctx.receive_deadline = msg;
}
Ok(())
}
fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), String> {
self.timeout_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
let mut system = System::new("timeouts");
let (handle_count, timeout_count) = (Default::default(), Default::default());
let actor = TimeoutActor {
handle_count: Arc::clone(&handle_count),
timeout_count: Arc::clone(&timeout_count),
};
let addr = system.spawn(actor).unwrap();
addr.send(Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())).unwrap();
thread::sleep(Duration::from_millis(10));
assert_eq!(handle_count.load(Ordering::SeqCst), 1);
assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
addr.send(Some(Instant::now() + Duration::from_millis(20))).unwrap();
thread::sleep(Duration::from_millis(10));
assert_eq!(handle_count.load(Ordering::SeqCst), 2);
assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
thread::sleep(Duration::from_millis(20));
assert_eq!(handle_count.load(Ordering::SeqCst), 2);
assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
addr.send(Some(Instant::now() + Duration::from_millis(40))).unwrap();
thread::sleep(Duration::from_millis(20));
assert_eq!(handle_count.load(Ordering::SeqCst), 3);
assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
addr.send(None).unwrap();
thread::sleep(Duration::from_millis(30));
assert_eq!(handle_count.load(Ordering::SeqCst), 4);
assert_eq!(timeout_count.load(Ordering::SeqCst), 3);
system.shutdown().unwrap();
}
#[test]
fn errors() {
let mut system = System::new("hi");
let low_capacity_actor = TestActor::addr_with_capacity(1);
let stopped_actor = system.spawn(TestActor).unwrap().recipient();
low_capacity_actor.send(9).expect("one message should fit");
let error = low_capacity_actor.send(123).unwrap_err();
assert_eq!(
error.to_string(),
"the capacity of TestActor's Normal-priority channel is full"
);
assert_eq!(
format!("{error:?}"),
r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Full }"#
);
system.shutdown().unwrap();
let error = stopped_actor.send(456usize).unwrap_err();
assert_eq!(error.to_string(), "the recipient of the message (TestActor) no longer exists");
assert_eq!(
format!("{error:?}"),
r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Disconnected }"#
);
}
#[test]
fn message_priorities() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace"))
.try_init()
.ok();
struct PriorityActor {
received: Arc<Mutex<Vec<usize>>>,
}
impl Actor for PriorityActor {
type Context = Context<Self::Message>;
type Error = String;
type Message = usize;
fn handle(
&mut self,
context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
let mut received = self.received.lock();
received.push(message);
if received.len() >= 20 {
context.system_handle.shutdown().unwrap();
}
Ok(())
}
fn priority(message: &Self::Message) -> Priority {
if *message >= 10 { Priority::High } else { Priority::Normal }
}
}
let addr = PriorityActor::addr_with_capacity(10);
let received = Arc::new(Mutex::new(Vec::<usize>::new()));
for message in 0..20usize {
addr.send(message).unwrap();
}
let mut system = System::new("priorities");
system
.prepare(PriorityActor { received: Arc::clone(&received) })
.with_addr(addr)
.run_and_block()
.unwrap();
assert_eq!(
*received.lock(),
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
);
}
impl Event for () {}
#[test]
fn last_cached_event() {
struct Subscriber;
impl Actor for Subscriber {
type Context = Context<Self::Message>;
type Error = String;
type Message = ();
fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
context.subscribe_and_receive_latest::<Self::Message>().map_err(|e| e.to_string())
}
fn handle(
&mut self,
context: &mut Self::Context,
_: Self::Message,
) -> Result<(), Self::Error> {
println!("Event received!");
context.system_handle.shutdown().unwrap();
Ok(())
}
}
let mut system = System::new("last cached event");
system.publish(()).expect("can publish event");
system
.prepare(Subscriber)
.with_capacity(1)
.run_and_block()
.expect("actor finishes successfully");
}
}