use std;
use std::thread;
use std::cell::RefCell;
use uuid::Uuid;
use tokio_core::reactor::{Core, Handle};
use futures::sync::oneshot::{channel, Sender};
use actor::{Actor, Handler, AsyncContext};
use address::{Address, SyncAddress};
use context::Context;
use msgs::{Execute, StartActor, StopArbiter};
use message::Response;
use registry::{Registry, SystemRegistry};
use system::{System, RegisterArbiter, UnregisterArbiter};
use queue::sync;
thread_local!(
static HND: RefCell<Option<Handle>> = RefCell::new(None);
static STOP: RefCell<Option<Sender<i32>>> = RefCell::new(None);
static ADDR: RefCell<Option<Address<Arbiter>>> = RefCell::new(None);
static REG: RefCell<Option<Registry>> = RefCell::new(None);
static NAME: RefCell<Option<String>> = RefCell::new(None);
static SYS: RefCell<Option<SyncAddress<System>>> = RefCell::new(None);
static SYSARB: RefCell<Option<SyncAddress<Arbiter>>> = RefCell::new(None);
static SYSNAME: RefCell<Option<String>> = RefCell::new(None);
static SYSREG: RefCell<Option<SystemRegistry>> = RefCell::new(None);
);
pub struct Arbiter {
id: Uuid,
sys: bool,
}
impl Actor for Arbiter {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
Arbiter::system().send(
RegisterArbiter(self.id.simple().to_string(), ctx.address()));
}
}
impl Arbiter {
pub fn new<T: ToString>(name: T) -> SyncAddress<Arbiter> {
let (tx, rx) = std::sync::mpsc::channel();
let id = Uuid::new_v4();
let sys = Arbiter::system();
let sys_name = Arbiter::system_name();
let sys_arbiter = Arbiter::system_arbiter();
let sys_registry = Arbiter::system_registry().clone();
let name =
format!("arbiter:{:?}:{:?}", id.hyphenated().to_string(), name.to_string());
let _ = thread::Builder::new().name(name.clone()).spawn(move|| {
let mut core = Core::new().unwrap();
let (stop_tx, stop_rx) = channel();
HND.with(|cell| *cell.borrow_mut() = Some(core.handle()));
STOP.with(|cell| *cell.borrow_mut() = Some(stop_tx));
NAME.with(|cell| *cell.borrow_mut() = Some(name));
SYS.with(|cell| *cell.borrow_mut() = Some(sys));
SYSARB.with(|cell| *cell.borrow_mut() = Some(sys_arbiter));
SYSNAME.with(|cell| *cell.borrow_mut() = Some(sys_name));
SYSREG.with(|cell| *cell.borrow_mut() = Some(sys_registry));
let (addr, saddr) = Actor::start(
Arbiter {sys: false, id: id});
ADDR.with(|cell| *cell.borrow_mut() = Some(addr));
if tx.send(saddr).is_err() {
error!("Can not start Arbiter, remote side is dead");
} else {
let _ = match core.run(stop_rx) {
Ok(code) => code,
Err(_) => 1,
};
}
Arbiter::system().send(
UnregisterArbiter(id.simple().to_string()));
});
rx.recv().unwrap()
}
pub(crate) fn new_system(name: String) -> Core {
let core = Core::new().unwrap();
HND.with(|cell| *cell.borrow_mut() = Some(core.handle()));
REG.with(|cell| *cell.borrow_mut() = Some(Registry::new()));
NAME.with(|cell| *cell.borrow_mut() = Some(name));
SYSREG.with(|cell| *cell.borrow_mut() = Some(SystemRegistry::new()));
let (addr, sys_addr) = Actor::start(
Arbiter {sys: true, id: Uuid::new_v4()});
ADDR.with(|cell| *cell.borrow_mut() = Some(addr));
SYSARB.with(|cell| *cell.borrow_mut() = Some(sys_addr));
core
}
pub(crate) fn set_system(addr: SyncAddress<System>, name: String) {
SYS.with(|cell| *cell.borrow_mut() = Some(addr));
SYSNAME.with(|cell| *cell.borrow_mut() = Some(name));
}
pub fn name() -> String {
NAME.with(|cell| match *cell.borrow() {
Some(ref name) => name.clone(),
None => panic!("Arbiter is not running"),
})
}
pub fn arbiter() -> Address<Arbiter> {
ADDR.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("Arbiter is not running"),
})
}
pub fn system() -> SyncAddress<System> {
SYS.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("System is not running"),
})
}
pub fn system_arbiter() -> SyncAddress<Arbiter> {
SYSARB.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("System is not running"),
})
}
pub fn system_name() -> String {
SYSNAME.with(|cell| match *cell.borrow() {
Some(ref name) => name.clone(),
None => panic!("System is not running"),
})
}
pub fn system_registry() -> &'static SystemRegistry {
SYSREG.with(|cell| match *cell.borrow() {
Some(ref reg) => unsafe{std::mem::transmute(reg)},
None => panic!("System is not running"),
})
}
pub fn handle() -> &'static Handle {
HND.with(|cell| match *cell.borrow() {
Some(ref h) => unsafe{std::mem::transmute(h)},
None => panic!("Arbiter is not running"),
})
}
pub fn registry() -> &'static Registry {
REG.with(|cell| match *cell.borrow() {
Some(ref reg) => unsafe{std::mem::transmute(reg)},
None => panic!("System is not running"),
})
}
pub fn start<A, F>(f: F) -> SyncAddress<A>
where A: Actor<Context=Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static
{
let (stx, srx) = sync::unbounded();
let addr = Arbiter::new("actor");
addr.send::<Execute>(
Execute::new(move || {
let mut ctx = Context::with_receiver(
unsafe{std::mem::uninitialized()}, srx);
let act = f(&mut ctx);
let old = ctx.replace_actor(act);
std::mem::forget(old);
ctx.run(Arbiter::handle());
Ok(())
}));
SyncAddress::new(stx)
}
}
impl Handler<StopArbiter> for Arbiter {
fn handle(&mut self, msg: StopArbiter, _: &mut Context<Self>) -> Response<Self, StopArbiter>
{
if self.sys {
warn!("System arbiter received `StopArbiter` message.
To shutdown system `SystemExit` message should be send to `Address<System>`");
} else {
STOP.with(|cell| {
if let Some(stop) = cell.borrow_mut().take() {
let _ = stop.send(msg.0);
}
});
}
Self::empty()
}
}
impl<A> Handler<StartActor<A>> for Arbiter where A: Actor<Context=Context<A>> {
fn handle(&mut self, msg: StartActor<A>, _: &mut Context<Self>) -> Response<Self, StartActor<A>>
{
Self::reply(msg.call())
}
}
impl<I: Send, E: Send> Handler<Execute<I, E>> for Arbiter {
fn handle(&mut self, msg: Execute<I, E>, _: &mut Context<Self>)
-> Response<Self, Execute<I, E>>
{
match msg.exec() {
Ok(i) => Self::reply(i),
Err(e) => Self::reply_error(e),
}
}
}