#![warn(clippy::all)]
use crossbeam_channel::{self as channel, select, Receiver, Sender};
use log::*;
use parking_lot::{Mutex, RwLock};
use std::{fmt, ops::Deref, sync::Arc, thread, time::Duration};
#[cfg(test)]
pub mod testing;
static DEFAULT_CHANNEL_CAPACITY: usize = 5;
#[derive(Debug)]
pub enum ActorError {
SystemStopped { actor_name: &'static str },
ChannelDisconnected { actor_name: &'static str },
SpawnFailed { actor_name: &'static str },
ActorPanic,
}
impl fmt::Display for ActorError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ActorError::SystemStopped { actor_name } => {
write!(f, "The system is not running. The actor {} can not be started.", actor_name)
},
ActorError::ChannelDisconnected { actor_name } => {
write!(f, "The message channel is disconnected for the actor {}.", actor_name)
},
ActorError::SpawnFailed { actor_name } => {
write!(f, "Failed to spawn a thread for the actor {}.", actor_name)
},
ActorError::ActorPanic => {
write!(f, "A panic inside an actor thread. See above for more verbose logs.")
},
}
}
}
impl std::error::Error for ActorError {}
#[derive(Debug)]
pub enum SendError {
Full,
Disconnected,
}
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendError::Full => write!(f, "The channel's capacity is full."),
SendError::Disconnected => DisconnectedError.fmt(f),
}
}
}
impl std::error::Error for SendError {}
#[derive(Debug)]
pub struct DisconnectedError;
impl fmt::Display for DisconnectedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "The recipient of the message no longer exists.")
}
}
impl std::error::Error for DisconnectedError {}
impl<M> From<channel::TrySendError<M>> for SendError {
fn from(orig: channel::TrySendError<M>) -> Self {
match orig {
channel::TrySendError::Full(_) => Self::Full,
channel::TrySendError::Disconnected(_) => Self::Disconnected,
}
}
}
#[derive(Default)]
pub struct System {
handle: SystemHandle,
}
type SystemCallback = Box<dyn Fn() -> Result<(), ActorError> + Send + Sync>;
#[derive(Default)]
pub struct SystemCallbacks {
pub preshutdown: Option<SystemCallback>,
pub postshutdown: Option<SystemCallback>,
}
#[derive(Debug, PartialEq)]
enum SystemState {
Running,
ShuttingDown,
Stopped,
}
impl Default for SystemState {
fn default() -> Self {
SystemState::Running
}
}
#[derive(Default, Clone)]
pub struct SystemHandle {
name: String,
registry: Arc<Mutex<Vec<RegistryEntry>>>,
system_state: Arc<RwLock<SystemState>>,
callbacks: Arc<SystemCallbacks>,
}
pub struct Context<A: Actor + ?Sized> {
pub system_handle: SystemHandle,
pub myself: Addr<A>,
}
#[must_use = "You must call .spawn() or .block_on() to run this actor"]
pub struct SpawnBuilder<'a, A: Actor, F: FnOnce() -> A> {
system: &'a mut System,
capacity: Option<usize>,
addr: Option<Addr<A>>,
factory: F,
}
impl<'a, A: 'static + Actor, F: FnOnce() -> A> SpawnBuilder<'a, A, F> {
pub fn with_capacity(self, capacity: usize) -> Self {
Self { capacity: Some(capacity), ..self }
}
pub fn with_addr(self, addr: Addr<A>) -> Self {
Self { addr: Some(addr), ..self }
}
pub fn run_and_block(self) -> Result<(), ActorError> {
let factory = self.factory;
let capacity = self.capacity.unwrap_or(DEFAULT_CHANNEL_CAPACITY);
let addr = self.addr.unwrap_or_else(|| Addr::with_capacity(capacity));
self.system.block_on(factory(), addr)
}
}
impl<'a, A: 'static + Actor, F: FnOnce() -> A + Send + 'static> SpawnBuilder<'a, A, F> {
pub fn spawn(self) -> Result<Addr<A>, ActorError> {
let factory = self.factory;
let capacity = self.capacity.unwrap_or(DEFAULT_CHANNEL_CAPACITY);
let addr = self.addr.unwrap_or_else(|| Addr::with_capacity(capacity));
self.system.spawn_fn_with_addr(factory, addr.clone()).map(move |_| addr)
}
}
impl System {
pub fn new(name: &str) -> Self {
System::with_callbacks(name, Default::default())
}
pub fn with_callbacks(name: &str, callbacks: SystemCallbacks) -> Self {
Self {
handle: SystemHandle {
name: name.to_owned(),
callbacks: Arc::new(callbacks),
..SystemHandle::default()
},
}
}
pub fn prepare<A>(&mut self, actor: A) -> SpawnBuilder<A, impl FnOnce() -> A>
where
A: Actor + 'static,
{
SpawnBuilder { system: self, capacity: None, addr: None, factory: move || actor }
}
pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilder<A, F>
where
A: Actor + 'static,
F: FnOnce() -> A + Send + 'static,
{
SpawnBuilder { system: self, capacity: None, addr: None, factory }
}
pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A>, ActorError>
where
A: Actor + Send + 'static,
{
self.prepare(actor).spawn()
}
fn spawn_fn_with_addr<F, A>(&mut self, factory: F, addr: Addr<A>) -> Result<(), ActorError>
where
F: FnOnce() -> A + Send + 'static,
A: Actor + 'static,
{
let system_state_lock = self.handle.system_state.read();
match *system_state_lock {
SystemState::ShuttingDown | SystemState::Stopped => {
return Err(ActorError::SystemStopped { actor_name: A::name() });
},
SystemState::Running => {},
}
let system_handle = self.handle.clone();
let context = Context { system_handle: system_handle.clone(), myself: addr.clone() };
let control_addr = addr.control_tx.clone();
let thread_handle = thread::Builder::new()
.name(A::name().into())
.spawn(move || {
let mut actor = factory();
actor.started(&context);
debug!("[{}] started actor: {}", system_handle.name, A::name());
let actor_result =
Self::run_actor_select_loop(actor, addr, &context, &system_handle);
if let Err(err) = &actor_result {
error!("run_actor_select_loop returned an error: {}", err);
}
actor_result
})
.map_err(|_| ActorError::SpawnFailed { actor_name: A::name() })?;
self.handle
.registry
.lock()
.push(RegistryEntry::BackgroundThread(control_addr, thread_handle));
Ok(())
}
pub fn run(&mut self) -> Result<(), ActorError> {
while *self.system_state.read() != SystemState::Stopped {
thread::sleep(Duration::from_millis(10));
}
Ok(())
}
fn block_on<A>(&mut self, mut actor: A, addr: Addr<A>) -> Result<(), ActorError>
where
A: Actor,
{
if !self.is_running() {
return Err(ActorError::SystemStopped { actor_name: A::name() });
}
let system_handle = &self.handle;
let context = Context { system_handle: system_handle.clone(), myself: addr.clone() };
self.handle.registry.lock().push(RegistryEntry::CurrentThread(addr.control_tx.clone()));
actor.started(&context);
debug!("[{}] started actor: {}", system_handle.name, A::name());
Self::run_actor_select_loop(actor, addr, &context, system_handle)?;
while *self.system_state.read() != SystemState::Stopped {
thread::sleep(Duration::from_millis(10));
}
Ok(())
}
fn run_actor_select_loop<A>(
mut actor: A,
addr: Addr<A>,
context: &Context<A>,
system_handle: &SystemHandle,
) -> Result<(), ActorError>
where
A: Actor,
{
loop {
select! {
recv(addr.control_rx) -> msg => {
match msg {
Ok(Control::Stop) => {
actor.stopped(&context);
debug!("[{}] stopped actor: {}", system_handle.name, A::name());
return Ok(());
},
Err(_) => {
error!("[{}] control channel empty and disconnected. ending actor thread.", A::name());
return Ok(());
}
}
},
recv(addr.message_rx) -> msg => {
match msg {
Ok(msg) => {
trace!("[{}] message received by {}", system_handle.name, A::name());
if let Err(err) = actor.handle(&context, msg) {
error!("{} error: {:?}", A::name(), err);
let _ = context.system_handle.shutdown();
return Ok(());
}
},
Err(_) => {
return Err(ActorError::ChannelDisconnected{actor_name:A::name()});
}
}
},
};
}
}
}
impl Drop for System {
fn drop(&mut self) {
self.shutdown().unwrap();
}
}
impl Deref for System {
type Target = SystemHandle;
fn deref(&self) -> &Self::Target {
&self.handle
}
}
impl SystemHandle {
pub fn shutdown(&self) -> Result<(), ActorError> {
let current_thread = thread::current();
let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
info!("Thread [{}] shutting down the actor system", current_thread_name);
{
let mut system_state_lock = self.system_state.write();
match *system_state_lock {
SystemState::ShuttingDown | SystemState::Stopped => {
debug!("Thread [{}] called system.shutdown() but the system is already shutting down or stopped", current_thread_name);
return Ok(());
},
SystemState::Running => {
debug!(
"Thread [{}] setting the system_state value to ShuttingDown",
current_thread_name
);
*system_state_lock = SystemState::ShuttingDown;
},
}
}
info!("[{}] system shutting down.", self.name);
if let Some(callback) = self.callbacks.preshutdown.as_ref() {
info!("[{}] calling pre-shutdown callback.", self.name);
if let Err(err) = callback() {
warn!("[{}] pre-shutdown callback failed, reason: {}", self.name, err);
}
}
let err_count = {
let mut registry = self.registry.lock();
debug!("[{}] joining {} actor threads.", self.name, registry.len());
registry
.drain(..)
.rev()
.enumerate()
.filter_map(|(i, mut entry)| {
let actor_name = entry.name();
if let Err(e) = entry.control_addr().send(Control::Stop) {
warn!("control channel is closed: {} ({})", actor_name, e);
}
match entry {
RegistryEntry::CurrentThread(_) => None,
RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
if thread_handle.thread().id() == current_thread.id() {
return None;
}
debug!("[{}] [{}] joining actor thread: {}", self.name, i, actor_name);
let join_result = thread_handle
.join()
.map_err(|e| {
error!("a panic inside actor thread {}: {:?}", actor_name, e)
})
.and_then(|actor_result| {
actor_result.map_err(|e| {
error!(
"actor thread {} returned an error: {:?}",
actor_name, e
)
})
});
debug!("[{}] [{}] joined actor thread: {}", self.name, i, actor_name);
join_result.err()
},
}
})
.count()
};
info!("[{}] system finished shutting down.", self.name);
if let Some(callback) = self.callbacks.postshutdown.as_ref() {
info!("[{}] calling post-shutdown callback.", self.name);
if let Err(err) = callback() {
warn!("[{}] post-shutdown callback failed, reason: {}", self.name, err);
}
}
*self.system_state.write() = SystemState::Stopped;
if err_count > 0 {
Err(ActorError::ActorPanic)
} else {
Ok(())
}
}
pub fn is_running(&self) -> bool {
*self.system_state.read() == SystemState::Running
}
}
enum RegistryEntry {
CurrentThread(Sender<Control>),
BackgroundThread(Sender<Control>, thread::JoinHandle<Result<(), ActorError>>),
}
impl RegistryEntry {
fn name(&self) -> String {
match self {
RegistryEntry::CurrentThread(_) => {
thread::current().name().unwrap_or("unnamed").to_owned()
},
RegistryEntry::BackgroundThread(_, thread_handle) => {
thread_handle.thread().name().unwrap_or("unnamed").to_owned()
},
}
}
fn control_addr(&mut self) -> &mut Sender<Control> {
match self {
RegistryEntry::CurrentThread(control_addr) => control_addr,
RegistryEntry::BackgroundThread(control_addr, _) => control_addr,
}
}
}
pub enum Control {
Stop,
}
pub trait Actor {
type Message: Send + 'static;
type Error: std::fmt::Debug;
fn handle(
&mut self,
context: &Context<Self>,
message: Self::Message,
) -> Result<(), Self::Error>;
fn name() -> &'static str;
fn started(&mut self, _context: &Context<Self>) {}
fn stopped(&mut self, _context: &Context<Self>) {}
}
pub struct Addr<A: Actor + ?Sized> {
recipient: Recipient<A::Message>,
message_rx: Receiver<A::Message>,
control_rx: Receiver<Control>,
}
impl<A: Actor> Default for Addr<A> {
fn default() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
}
impl<A: Actor> Clone for Addr<A> {
fn clone(&self) -> Self {
Self {
recipient: self.recipient.clone(),
message_rx: self.message_rx.clone(),
control_rx: self.control_rx.clone(),
}
}
}
impl<A, M> Deref for Addr<A>
where
A: Actor<Message = M>,
{
type Target = Recipient<M>;
fn deref(&self) -> &Self::Target {
&self.recipient
}
}
impl<A: Actor> Addr<A> {
pub fn with_capacity(capacity: usize) -> Self {
let (message_tx, message_rx) = channel::bounded::<A::Message>(capacity);
let (control_tx, control_rx) = channel::bounded(DEFAULT_CHANNEL_CAPACITY);
let message_tx = Arc::new(message_tx);
Self { recipient: Recipient { message_tx, control_tx }, message_rx, control_rx }
}
pub fn recipient<M: Into<A::Message>>(&self) -> Recipient<M> {
Recipient {
message_tx: Arc::new(self.recipient.message_tx.clone()),
control_tx: self.recipient.control_tx.clone(),
}
}
}
pub struct Recipient<M> {
message_tx: Arc<dyn SenderTrait<M>>,
control_tx: Sender<Control>,
}
impl<M> Clone for Recipient<M> {
fn clone(&self) -> Self {
Self { message_tx: self.message_tx.clone(), control_tx: self.control_tx.clone() }
}
}
impl<M> Recipient<M> {
pub fn send(&self, message: M) -> Result<(), SendError> {
self.message_tx.try_send(message).map_err(SendError::from)
}
pub fn remaining_capacity(&self) -> Option<usize> {
let message_tx = &self.message_tx as &dyn SenderTrait<M>;
message_tx.capacity().map(|capacity| capacity - message_tx.len())
}
}
pub trait SendResultExt {
fn on_full<F: FnOnce()>(self, func: F) -> Result<(), DisconnectedError>;
fn ignore_on_full(self) -> Result<(), DisconnectedError>;
}
impl SendResultExt for Result<(), SendError> {
fn on_full<F: FnOnce()>(self, callback: F) -> Result<(), DisconnectedError> {
self.or_else(|e| match e {
SendError::Full => {
callback();
Ok(())
},
_ => Err(DisconnectedError),
})
}
fn ignore_on_full(self) -> Result<(), DisconnectedError> {
self.on_full(|| ())
}
}
trait SenderTrait<M>: Send + Sync {
fn try_send(&self, message: M) -> Result<(), SendError>;
fn len(&self) -> usize;
fn capacity(&self) -> Option<usize>;
}
impl<M: Send> SenderTrait<M> for Sender<M> {
fn try_send(&self, message: M) -> Result<(), SendError> {
self.try_send(message).map_err(SendError::from)
}
fn len(&self) -> usize {
self.len()
}
fn capacity(&self) -> Option<usize> {
self.capacity()
}
}
impl<M: Into<N>, N> SenderTrait<M> for Arc<dyn SenderTrait<N>> {
fn try_send(&self, message: M) -> Result<(), SendError> {
self.deref().try_send(message.into())
}
fn len(&self) -> usize {
self.deref().len()
}
fn capacity(&self) -> Option<usize> {
self.deref().capacity()
}
}
#[cfg(test)]
mod tests {
use std::{rc::Rc, time::Duration};
use super::*;
struct TestActor;
impl Actor for TestActor {
type Error = ();
type Message = usize;
fn name() -> &'static str {
"TestActor"
}
fn handle(&mut self, _: &Context<Self>, message: usize) -> Result<(), ()> {
println!("message: {}", message);
Ok(())
}
fn started(&mut self, _: &Context<Self>) {
println!("started");
}
fn stopped(&mut self, _: &Context<Self>) {
println!("stopped");
}
}
#[test]
fn it_works() {
let mut system = System::new("hi");
let address = system.spawn(TestActor).unwrap();
let _ = system.spawn(TestActor).unwrap();
let _ = system.spawn(TestActor).unwrap();
let _ = system.spawn(TestActor).unwrap();
let _ = system.spawn(TestActor).unwrap();
address.send(1337usize).unwrap();
address.send(666usize).unwrap();
address.send(1usize).unwrap();
thread::sleep(Duration::from_millis(100));
system.shutdown().unwrap();
thread::sleep(Duration::from_millis(100));
}
#[test]
fn send_constraints() {
#[derive(Default)]
struct LocalActor(Rc<()>);
impl Actor for LocalActor {
type Error = ();
type Message = ();
fn name() -> &'static str {
"LocalActor"
}
fn handle(&mut self, _: &Context<Self>, _: ()) -> Result<(), ()> {
Ok(())
}
fn started(&mut self, ctx: &Context<Self>) {
ctx.system_handle.shutdown().unwrap();
}
}
let mut system = System::new("main");
let _ = system.prepare_fn(LocalActor::default).spawn().unwrap();
let _ = system.prepare(LocalActor::default()).run_and_block().unwrap();
system.shutdown().unwrap();
}
}