use std;
use std::cell::{Cell, RefCell};
use std::thread;
use futures::sync::oneshot::{channel, Sender};
use futures::{future, Future, IntoFuture};
use tokio::executor::current_thread::spawn;
use tokio::runtime::current_thread::Runtime;
use uuid::Uuid;
use actor::Actor;
use address::{channel, Addr, AddressReceiver};
use context::Context;
use handler::Handler;
use mailbox::DEFAULT_CAPACITY;
use msgs::{Execute, StartActor, StopArbiter};
use registry::Registry;
use system::{RegisterArbiter, System, UnregisterArbiter};
thread_local!(
static ADDR: RefCell<Option<Addr<Arbiter>>> = RefCell::new(None);
static NAME: RefCell<Option<String>> = RefCell::new(None);
static REG: RefCell<Option<Registry>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false);
static Q: RefCell<Vec<Box<Future<Item=(), Error=()>>>> = RefCell::new(Vec::new());
);
pub struct Arbiter {
stop: Option<Sender<i32>>,
stop_system_on_panic: bool,
}
impl Actor for Arbiter {
type Context = Context<Self>;
}
impl Drop for Arbiter {
fn drop(&mut self) {
if self.stop_system_on_panic && thread::panicking() {
eprintln!("Panic in Arbiter thread, shutting down system.");
System::current().stop_with_code(1)
}
}
}
impl Arbiter {
pub fn builder() -> Builder {
Builder::new()
}
pub fn new<T: Into<String>>(name: T) -> Addr<Arbiter> {
Arbiter::new_with_builder(Arbiter::builder().name(name))
}
fn new_with_builder(builder: Builder) -> Addr<Arbiter> {
let (tx, rx) = std::sync::mpsc::channel();
let id = Uuid::new_v4();
let name = format!(
"arbiter:{}:{}",
id.hyphenated().to_string(),
builder.name.as_ref().unwrap_or(&"actor".into())
);
let sys = System::current();
let _ = thread::Builder::new().name(name.clone()).spawn(move || {
let mut rt = Runtime::new().unwrap();
let (stop, stop_rx) = channel();
NAME.with(|cell| *cell.borrow_mut() = Some(name));
REG.with(|cell| *cell.borrow_mut() = Some(Registry::new()));
RUNNING.with(|cell| cell.set(true));
System::set_current(sys);
let addr =
rt.block_on(future::lazy(move || {
let addr = Actor::start(Arbiter {
stop: Some(stop),
stop_system_on_panic: builder.stop_system_on_panic,
});
Ok::<_, ()>(addr)
})).unwrap();
ADDR.with(|cell| *cell.borrow_mut() = Some(addr.clone()));
System::current()
.sys()
.do_send(RegisterArbiter(id.simple().to_string(), addr.clone()));
if tx.send(addr).is_err() {
error!("Can not start Arbiter, remote side is dead");
} else {
let _ = match rt.block_on(stop_rx) {
Ok(code) => code,
Err(_) => 1,
};
}
System::current()
.sys()
.do_send(UnregisterArbiter(id.simple().to_string()));
});
rx.recv().unwrap()
}
pub(crate) fn new_system(rx: AddressReceiver<Arbiter>, name: String) {
NAME.with(|cell| *cell.borrow_mut() = Some(name));
REG.with(|cell| *cell.borrow_mut() = Some(Registry::new()));
RUNNING.with(|cell| cell.set(false));
let ctx = Context::with_receiver(rx);
let fut = ctx.into_future(Arbiter {
stop: None,
stop_system_on_panic: true, });
let addr = fut.address();
Arbiter::spawn(fut);
ADDR.with(|cell| *cell.borrow_mut() = Some(addr.clone()));
}
pub(crate) fn run_system() {
RUNNING.with(|cell| cell.set(true));
Q.with(|cell| {
let mut v = cell.borrow_mut();
for fut in v.drain(..) {
spawn(fut);
}
});
}
pub(crate) fn stop_system() {
RUNNING.with(|cell| cell.set(false));
}
pub fn name() -> String {
NAME.with(|cell| match *cell.borrow() {
Some(ref name) => name.clone(),
None => "Arbiter is not running".into(),
})
}
pub fn current() -> Addr<Arbiter> {
ADDR.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("Arbiter is not running"),
})
}
pub fn registry() -> Registry {
REG.with(|cell| match *cell.borrow() {
Some(ref reg) => reg.clone(),
None => panic!("System is not running: {}", Arbiter::name()),
})
}
pub fn spawn<F>(future: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
RUNNING.with(move |cell| {
if cell.get() {
spawn(Box::new(future));
} else {
Q.with(move |cell| cell.borrow_mut().push(Box::new(future)));
}
});
}
pub fn spawn_fn<F, R>(f: F)
where
F: FnOnce() -> R + 'static,
R: IntoFuture<Item = (), Error = ()> + 'static,
{
Arbiter::spawn(future::lazy(f))
}
pub fn start<A, F>(f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
{
Arbiter::builder().start(f)
}
fn start_with_builder<A, F>(builder: Builder, f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
{
let (stx, srx) = channel::channel(DEFAULT_CAPACITY);
let addr = Arbiter::new_with_builder(builder);
addr.do_send::<Execute>(Execute::new(move || {
let mut ctx = Context::with_receiver(srx);
let act = f(&mut ctx);
let fut = ctx.into_future(act);
spawn(fut);
Ok(())
}));
Addr::new(stx)
}
}
impl Handler<StopArbiter> for Arbiter {
type Result = ();
fn handle(&mut self, msg: StopArbiter, _: &mut Context<Self>) {
if let Some(stop) = self.stop.take() {
let _ = stop.send(msg.0);
}
}
}
impl<A> Handler<StartActor<A>> for Arbiter
where
A: Actor<Context = Context<A>>,
{
type Result = Addr<A>;
fn handle(&mut self, msg: StartActor<A>, _: &mut Context<Self>) -> Addr<A> {
msg.call()
}
}
impl<I: Send, E: Send> Handler<Execute<I, E>> for Arbiter {
type Result = Result<I, E>;
fn handle(&mut self, msg: Execute<I, E>, _: &mut Context<Self>) -> Result<I, E> {
msg.exec()
}
}
pub struct Builder {
name: Option<String>,
stop_system_on_panic: bool,
}
impl Builder {
fn new() -> Self {
Builder {
name: None,
stop_system_on_panic: false,
}
}
pub fn name<T: Into<String>>(mut self, name: T) -> Self {
self.name = Some(name.into());
self
}
pub fn stop_system_on_panic(mut self, stop_system_on_panic: bool) -> Self {
self.stop_system_on_panic = stop_system_on_panic;
self
}
pub fn build(self) -> Addr<Arbiter> {
Arbiter::new_with_builder(self)
}
pub fn start<A, F>(self, f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut A::Context) -> A + Send + 'static,
{
Arbiter::start_with_builder(self, f)
}
}