use futures::sync::oneshot::{channel, Sender};
use std;
use std::cell::RefCell;
use std::thread;
use tokio_core::reactor::{Core, Handle};
use uuid::Uuid;
use actor::{Actor, AsyncContext};
use address::{sync_channel, Addr, Syn, Unsync};
use context::Context;
use handler::Handler;
use mailbox::DEFAULT_CAPACITY;
use msgs::{Execute, StartActor, StopArbiter};
use registry::{Registry, SystemRegistry};
use system::{RegisterArbiter, System, UnregisterArbiter};
thread_local!(
static HND: RefCell<Option<Handle>> = RefCell::new(None);
static STOP: RefCell<Option<Sender<i32>>> = RefCell::new(None);
static ADDR: RefCell<Option<Addr<Unsync, Arbiter>>> = RefCell::new(None);
static REG: RefCell<Option<Registry>> = RefCell::new(None);
static NAME: RefCell<Option<String>> = RefCell::new(None);
static SYS: RefCell<Option<Addr<Syn, System>>> = RefCell::new(None);
static SYSARB: RefCell<Option<Addr<Syn, Arbiter>>> = RefCell::new(None);
static SYSNAME: RefCell<Option<String>> = RefCell::new(None);
static SYSREG: RefCell<Option<SystemRegistry>> = RefCell::new(None);
);
pub struct Arbiter {
id: Uuid,
sys: bool,
}
impl Actor for Arbiter {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
if !self.sys {
Arbiter::system().do_send(RegisterArbiter(
self.id.simple().to_string(),
ctx.address(),
));
}
}
}
impl Arbiter {
pub fn new<T: Into<String>>(name: T) -> Addr<Syn, Arbiter> {
let (tx, rx) = std::sync::mpsc::channel();
let id = Uuid::new_v4();
let sys = Arbiter::system();
let sys_name = Arbiter::system_name();
let sys_arbiter = Arbiter::system_arbiter();
let sys_registry = Arbiter::system_registry().clone();
let name = format!(
"arbiter:{:?}:{:?}",
id.hyphenated().to_string(),
name.into()
);
let _ = thread::Builder::new()
.name(name.clone())
.spawn(move || {
let mut core = Core::new().unwrap();
let (stop_tx, stop_rx) = channel();
HND.with(|cell| *cell.borrow_mut() = Some(core.handle()));
STOP.with(|cell| *cell.borrow_mut() = Some(stop_tx));
NAME.with(|cell| *cell.borrow_mut() = Some(name));
REG.with(|cell| *cell.borrow_mut() = Some(Registry::new()));
SYS.with(|cell| *cell.borrow_mut() = Some(sys));
SYSARB.with(|cell| *cell.borrow_mut() = Some(sys_arbiter));
SYSNAME.with(|cell| *cell.borrow_mut() = Some(sys_name));
SYSREG.with(|cell| *cell.borrow_mut() = Some(sys_registry));
let (addr, saddr) = Actor::start(Arbiter { id, sys: false });
ADDR.with(|cell| *cell.borrow_mut() = Some(addr));
if tx.send(saddr).is_err() {
error!("Can not start Arbiter, remote side is dead");
} else {
let _ = match core.run(stop_rx) {
Ok(code) => code,
Err(_) => 1,
};
}
Arbiter::system().do_send(UnregisterArbiter(id.simple().to_string()));
});
rx.recv().unwrap()
}
pub(crate) fn new_system(name: String) -> Core {
let core = Core::new().unwrap();
HND.with(|cell| *cell.borrow_mut() = Some(core.handle()));
REG.with(|cell| *cell.borrow_mut() = Some(Registry::new()));
NAME.with(|cell| *cell.borrow_mut() = Some(name));
SYSREG.with(|cell| *cell.borrow_mut() = Some(SystemRegistry::new()));
let (addr, sys_addr) = Actor::start(Arbiter {
sys: true,
id: Uuid::new_v4(),
});
ADDR.with(|cell| *cell.borrow_mut() = Some(addr));
SYSARB.with(|cell| *cell.borrow_mut() = Some(sys_addr));
core
}
pub(crate) fn set_system(addr: Addr<Syn, System>, name: String) {
SYS.with(|cell| *cell.borrow_mut() = Some(addr));
SYSNAME.with(|cell| *cell.borrow_mut() = Some(name));
}
pub fn name() -> String {
NAME.with(|cell| match *cell.borrow() {
Some(ref name) => name.clone(),
None => "Arbiter is not running".into(),
})
}
pub fn arbiter() -> Addr<Unsync, Arbiter> {
ADDR.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("Arbiter is not running"),
})
}
pub fn system() -> Addr<Syn, System> {
SYS.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("System is not running"),
})
}
pub fn system_arbiter() -> Addr<Syn, Arbiter> {
SYSARB.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("System is not running"),
})
}
pub fn system_name() -> String {
SYSNAME.with(|cell| match *cell.borrow() {
Some(ref name) => name.clone(),
None => panic!("System is not running"),
})
}
pub fn system_registry() -> &'static SystemRegistry {
SYSREG.with(|cell| match *cell.borrow() {
Some(ref reg) => unsafe { std::mem::transmute(reg) },
None => panic!("System is not running"),
})
}
pub fn handle() -> &'static Handle {
HND.with(|cell| match *cell.borrow() {
Some(ref h) => unsafe { std::mem::transmute(h) },
None => panic!("Arbiter is not running"),
})
}
pub fn registry() -> &'static Registry {
REG.with(|cell| match *cell.borrow() {
Some(ref reg) => unsafe { std::mem::transmute(reg) },
None => panic!("System is not running: {}", Arbiter::name()),
})
}
pub fn start<A, F>(f: F) -> Addr<Syn, A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
{
let (stx, srx) = sync_channel::channel(DEFAULT_CAPACITY);
let addr = Arbiter::new("actor");
addr.do_send::<Execute>(Execute::new(move || {
let mut ctx = Context::with_receiver(None, srx);
let act = f(&mut ctx);
ctx.set_actor(act);
ctx.run(Arbiter::handle());
Ok(())
}));
Addr::new(stx)
}
}
impl Handler<StopArbiter> for Arbiter {
type Result = ();
fn handle(&mut self, msg: StopArbiter, _: &mut Context<Self>) {
if self.sys {
warn!(
"System arbiter received `StopArbiter` message.
To shutdown system, `SystemExit` message should be
send to `Addr<Syn, System>`"
);
} else {
STOP.with(|cell| {
if let Some(stop) = cell.borrow_mut().take() {
let _ = stop.send(msg.0);
}
});
}
}
}
impl<A> Handler<StartActor<A>> for Arbiter
where
A: Actor<Context = Context<A>>,
{
type Result = Addr<Syn, A>;
fn handle(&mut self, msg: StartActor<A>, _: &mut Context<Self>) -> Addr<Syn, A> {
msg.call()
}
}
impl<I: Send, E: Send> Handler<Execute<I, E>> for Arbiter {
type Result = Result<I, E>;
fn handle(&mut self, msg: Execute<I, E>, _: &mut Context<Self>) -> Result<I, E> {
msg.exec()
}
}