use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::panic::{catch_unwind, UnwindSafe};
use std::thread;
#[macro_use]
extern crate log;
#[macro_use(select)]
extern crate crossbeam_channel;
use crossbeam_channel::{unbounded, Receiver, Sender};
pub mod prelude {
pub use crate::{ActorContext, ActorCtl, ActorStatus, Router, RouterCtl, RouterRequest};
pub use crossbeam_channel::{select, unbounded, Receiver, Sender};
}
pub type Message = Box<dyn Any + Send>;
pub type Result<T> = std::result::Result<T, Error>;
pub struct Router {
pub id: String,
pub req: Sender<RouterRequest>,
pub ctl: Sender<RouterCtl>,
handle: thread::JoinHandle<()>,
}
impl Router {
pub fn run(id: &str) -> Self {
let router_id = id.to_owned();
let (req_tx, req) = unbounded::<RouterRequest>();
let (ctl_tx, ctl) = unbounded::<RouterCtl>();
let (router_req, router_ctl) = (req_tx.clone(), ctl_tx.clone());
let handle = thread::spawn(move || {
let mut table: HashMap<String, Actor> = HashMap::new();
let (stat_tx, stat) = unbounded::<ActorStatus>();
debug!("router::{}: started", router_id);
loop {
select! {
recv(req) -> msg => {
match msg {
Ok(request) => {
if let Some(actor) = table.get(&request.id) {
actor.req.send(request.val).unwrap();
} else {
error!("router::{}: actor '{}' not found", router_id, request.id);
}
}
Err(e) => error!("router::{}: error receiving request: {}", router_id, e)
}
},
recv(ctl) -> msg => {
match msg {
Ok(RouterCtl::Spawn { id, f, resp }) => {
debug!("router::{}: spawning actor '{}'", router_id, id);
let (actor_req_tx, actor_req_rx) = unbounded();
let (actor_ctl_tx, actor_ctl_rx) = unbounded();
let ctx = ActorContext::new(
id.clone(),
actor_req_rx,
actor_ctl_rx,
router_req.clone(),
router_ctl.clone(),
stat_tx.clone()
);
let panic_stat_tx = stat_tx.clone();
let actor_id = id.clone();
let handle = thread::spawn(move || {
let actor_status = catch_unwind(move || {
f(ctx);
});
if let Err(_) = actor_status {
panic_stat_tx.send(ActorStatus::Paniced(actor_id)).unwrap();
}
});
table.insert(id.clone(), Actor {
id: id,
req: actor_req_tx,
ctl: actor_ctl_tx,
handle: handle,
});
resp.send(()).unwrap();
},
Ok(RouterCtl::Get { id, resp }) => {
if let Some(actor) = table.get(&id) {
resp.send(Ok(actor.req.clone())).unwrap();
} else {
resp.send(Err(Error::NoSuchActor(id.clone()))).unwrap();
}
}
Ok(RouterCtl::Has { id, resp }) => {
resp.send(table.contains_key(&id)).unwrap();
}
Ok(RouterCtl::StopActor { id, resp }) => {
if let Some(actor) = table.remove(&id) {
debug!("router::{}: stopping actor '{}'", router_id, actor.id);
actor.ctl.send(ActorCtl::Stop).unwrap();
actor.handle.join().unwrap();
resp.send(()).unwrap();
}
}
Ok(RouterCtl::StopActorAsync(id)) => {
if let Some(actor) = table.remove(&id) {
debug!("router::{}: stopping actor '{}'", router_id, actor.id);
actor.ctl.send(ActorCtl::Stop).unwrap();
}
}
Ok(RouterCtl::Shutdown) => {
debug!("router::{}: stopping all actors ...", router_id);
for (_, actor) in table.drain() {
debug!("router::{}: stopping actor '{}'", router_id, actor.id);
actor.ctl.send(ActorCtl::Stop).unwrap();
actor.handle.join().unwrap();
}
break;
}
Err(e) => error!("router::{}: error receiving on ctrl: {}", router_id, e),
}
},
recv(stat) -> msg => {
match msg {
Ok(ActorStatus::Stopped(id)) => {
debug!("router::{}: actor '{}' stopped", router_id, id);
table.remove(&id);
}
Ok(ActorStatus::Paniced(id)) => {
warn!("router::{}: actor '{}' paniced", router_id, id);
table.remove(&id);
}
Err(e) => error!("router::{}: error receiving actor status: {}", router_id, e),
}
},
}
}
debug!("router::{}: stopped", router_id);
});
Self {
id: id.to_owned(),
req: req_tx,
ctl: ctl_tx,
handle: handle,
}
}
pub fn spawn(&self, id: &str, f: Box<ActorFn>) -> Result<()> {
let (spawn, resp) = RouterCtl::spawn(id, f);
match self.ctl.send(spawn) {
Ok(()) => match resp.recv() {
Ok(()) => Ok(()),
Err(e) => Err(Error::recv_error(e)),
},
Err(e) => Err(Error::send_error(e)),
}
}
pub fn has(&self, id: &str) -> Result<bool> {
let (has, resp) = RouterCtl::has(id);
if let Err(e) = self.ctl.send(has) {
return Err(Error::send_error(e));
}
match resp.recv() {
Ok(b) => Ok(b),
Err(e) => Err(Error::recv_error(e)),
}
}
pub fn get(&self, id: &str) -> Result<Sender<Message>> {
let (get, resp) = RouterCtl::get(id);
if let Err(e) = self.ctl.send(get) {
return Err(Error::send_error(e));
}
match resp.recv() {
Ok(sender) => sender,
Err(e) => Err(Error::recv_error(e)),
}
}
pub fn send(&self, id: &str, msg: Message) -> Result<()> {
match self.req.send(RouterRequest::new(id, msg)) {
Ok(()) => Ok(()),
Err(e) => Err(Error::send_error(e)),
}
}
pub fn stop(&self, id: &str) -> Result<()> {
let (stop, resp) = RouterCtl::stop_actor(id);
if let Err(e) = self.ctl.send(stop) {
return Err(Error::send_error(e));
}
if let Err(e) = resp.recv() {
return Err(Error::recv_error(e));
}
Ok(())
}
pub fn stop_async(&self, id: &str) -> Result<()> {
if let Err(e) = self.ctl.send(RouterCtl::stop_actor_async(id)) {
return Err(Error::send_error(e));
}
Ok(())
}
pub fn stop_list(&self, ids: Vec<String>) -> Result<()> {
for id in ids {
if let Err(e) = self.stop(&id) {
return Err(e);
}
}
Ok(())
}
pub fn shutdown(self) {
self.ctl.send(RouterCtl::Shutdown).unwrap();
self.handle.join().unwrap();
}
}
pub struct RouterRequest {
pub id: String,
pub val: Box<dyn Any + Send>,
}
impl RouterRequest {
pub fn new(id: &str, val: Box<dyn Any + Send>) -> Self {
RouterRequest {
id: id.to_owned(),
val: val,
}
}
}
#[macro_export]
macro_rules! send_actor {
($r:ident, $i:expr, $v:expr) => {{
$r.send($i, Box::new($v))
}};
}
pub enum RouterCtl {
Spawn {
id: String,
f: Box<ActorFn>,
resp: Sender<()>,
},
Get {
id: String,
resp: Sender<Result<Sender<Message>>>,
},
Has { id: String, resp: Sender<bool> },
StopActor { id: String, resp: Sender<()> },
StopActorAsync(String),
Shutdown,
}
impl RouterCtl {
pub fn spawn(id: &str, f: Box<ActorFn>) -> (Self, Receiver<()>) {
let (tx, rx) = unbounded();
(
Self::Spawn {
id: id.to_owned(),
f: f,
resp: tx,
},
rx,
)
}
pub fn get(id: &str) -> (Self, Receiver<Result<Sender<Message>>>) {
let (tx, rx) = unbounded();
(
Self::Get {
id: id.to_owned(),
resp: tx,
},
rx,
)
}
pub fn has(id: &str) -> (Self, Receiver<bool>) {
let (tx, rx) = unbounded();
(
Self::Has {
id: id.to_owned(),
resp: tx,
},
rx,
)
}
pub fn stop_actor(id: &str) -> (Self, Receiver<()>) {
let (tx, rx) = unbounded();
(
Self::StopActor {
id: id.to_owned(),
resp: tx,
},
rx,
)
}
pub fn stop_actor_async(id: &str) -> Self {
Self::StopActorAsync(id.to_owned())
}
}
#[macro_export]
macro_rules! spawn_actor {
($r:ident, $i:expr, $f:expr) => {{
$r.spawn($i, Box::new($f))
}};
}
pub struct Actor {
pub id: String,
pub req: Sender<Message>,
pub ctl: Sender<ActorCtl>,
pub handle: thread::JoinHandle<()>,
}
pub type ActorFn = dyn FnOnce(ActorContext) + Send + UnwindSafe + 'static;
pub struct ActorContext {
pub id: String,
pub req: Receiver<Message>,
pub ctl: Receiver<ActorCtl>,
pub router_req: Sender<RouterRequest>,
pub router_ctl: Sender<RouterCtl>,
pub stat: Sender<ActorStatus>,
}
impl ActorContext {
pub fn new(
id: String,
req: Receiver<Message>,
ctl: Receiver<ActorCtl>,
router_req: Sender<RouterRequest>,
router_ctl: Sender<RouterCtl>,
stat: Sender<ActorStatus>,
) -> Self {
Self {
id: id.to_owned(),
req: req,
ctl: ctl,
router_req: router_req,
router_ctl: router_ctl,
stat: stat,
}
}
pub fn spawn(&self, id: &str, f: Box<ActorFn>) -> Result<()> {
let (spawn, resp) = RouterCtl::spawn(id, f);
match self.router_ctl.send(spawn) {
Ok(()) => match resp.recv() {
Ok(()) => Ok(()),
Err(e) => Err(Error::recv_error(e)),
},
Err(e) => Err(Error::send_error(e)),
}
}
pub fn has(&self, id: &str) -> Result<bool> {
let (has, resp) = RouterCtl::has(id);
if let Err(e) = self.router_ctl.send(has) {
return Err(Error::send_error(e));
}
match resp.recv() {
Ok(b) => Ok(b),
Err(e) => Err(Error::recv_error(e)),
}
}
pub fn get(&self, id: &str) -> Result<Sender<Message>> {
let (get, resp) = RouterCtl::get(id);
if let Err(e) = self.router_ctl.send(get) {
return Err(Error::send_error(e));
}
match resp.recv() {
Ok(sender) => sender,
Err(e) => Err(Error::recv_error(e)),
}
}
pub fn send(&self, id: &str, msg: Message) -> Result<()> {
match self.router_req.send(RouterRequest::new(id, msg)) {
Ok(()) => Ok(()),
Err(e) => Err(Error::send_error(e)),
}
}
pub fn stop(&self, id: &str) -> Result<()> {
let (stop, resp) = RouterCtl::stop_actor(id);
if let Err(e) = self.router_ctl.send(stop) {
return Err(Error::send_error(e));
}
match resp.recv() {
Ok(()) => Ok(()),
Err(e) => Err(Error::recv_error(e)),
}
}
pub fn stop_async(&self, id: &str) -> Result<()> {
if let Err(e) = self.router_ctl.send(RouterCtl::stop_actor_async(id)) {
return Err(Error::send_error(e));
}
Ok(())
}
pub fn report_stopped(&self) -> Result<()> {
match self.stat.send(ActorStatus::Stopped(self.id.clone())) {
Ok(_) => Ok(()),
Err(e) => Err(Error::send_error(e)),
}
}
}
pub enum ActorCtl {
Stop,
}
pub enum ActorStatus {
Paniced(String),
Stopped(String),
}
#[derive(Debug)]
pub enum Error {
SendError(String),
RecvError(String),
NoSuchActor(String),
}
impl Error {
pub fn send_error<T>(e: crossbeam_channel::SendError<T>) -> Self {
Self::SendError(e.to_string())
}
pub fn recv_error(e: crossbeam_channel::RecvError) -> Self {
Self::RecvError(e.to_string())
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::SendError(e) => write!(f, "{}", e),
Self::RecvError(e) => write!(f, "{}", e),
Self::NoSuchActor(id) => write!(f, "no such actor: {}", id),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
#[cfg(test)]
mod tests {
use crate::prelude::*;
#[test]
fn basic_router() {
let router = Router::run("test");
assert!(!router.has("foo").unwrap());
spawn_actor!(router, "foo", |ctx: ActorContext| {
ctx.ctl.recv().unwrap();
ctx.report_stopped().unwrap();
})
.unwrap();
assert!(router.has("foo").unwrap());
router.stop("foo").unwrap();
assert!(!router.has("foo").unwrap());
router.shutdown();
}
}