use std::cell::RefCell;
use std::collections::HashMap;
use futures::sync::oneshot::{channel, Receiver, Sender};
use futures::{future, Future};
use tokio::executor::current_thread::spawn;
use tokio::runtime::current_thread::Runtime;
use actor::Actor;
use address::{channel as addr_channel, Addr};
use arbiter::Arbiter;
use context::Context;
use handler::{Handler, Message};
use msgs::{Execute, StopArbiter};
use registry::SystemRegistry;
#[derive(Clone)]
pub struct System {
sys: Addr<SystemArbiter>,
arbiter: Addr<Arbiter>,
registry: SystemRegistry,
}
thread_local!(static CURRENT: RefCell<Option<System>> = RefCell::new(None););
impl System {
#[cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))]
pub fn new<T: Into<String>>(name: T) -> SystemRunner {
System::create_runtime(name.into(), || {})
}
pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => sys.clone(),
None => panic!("System is not running"),
})
}
#[doc(hidden)]
pub fn set_current(sys: System) {
CURRENT.with(|s| {
*s.borrow_mut() = Some(sys);
})
}
pub fn with_current<F, R>(f: F) -> R
where
F: FnOnce(&System) -> R,
{
CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => f(sys),
None => panic!("System is not running"),
})
}
pub fn stop(&self) {
self.sys.do_send(SystemExit(0));
}
pub(crate) fn sys(&self) -> &Addr<SystemArbiter> {
&self.sys
}
pub fn arbiter(&self) -> &Addr<Arbiter> {
&self.arbiter
}
pub fn registry(&self) -> &SystemRegistry {
&self.registry
}
pub fn run<F>(f: F) -> i32
where
F: FnOnce() + 'static,
{
System::create_runtime("actix".to_owned(), f).run()
}
fn create_runtime<F>(name: String, f: F) -> SystemRunner
where
F: FnOnce() + 'static,
{
let (stop_tx, stop) = channel();
let (addr_sender, addr_receiver) = addr_channel::channel(16);
let (arb_sender, arb_receiver) = addr_channel::channel(16);
let addr = Addr::new(arb_sender);
let system = System {
sys: Addr::new(addr_sender),
arbiter: addr.clone(),
registry: SystemRegistry::new(addr),
};
System::set_current(system.clone());
let arb = SystemArbiter {
arbiters: HashMap::new(),
stop: Some(stop_tx),
};
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future::lazy(move || {
Arbiter::new_system(arb_receiver, name);
spawn(Context::with_receiver(Some(arb), addr_receiver));
f();
Ok::<_, ()>(())
}));
SystemRunner { rt, stop }
}
}
#[must_use = "SystemRunner must be run"]
pub struct SystemRunner {
rt: Runtime,
stop: Receiver<i32>,
}
impl SystemRunner {
pub fn run(self) -> i32 {
let SystemRunner { mut rt, stop, .. } = self;
let _ = rt.block_on(future::lazy(move || {
Arbiter::run_system();
Ok::<_, ()>(())
}));
let code = match rt.block_on(stop) {
Ok(code) => code,
Err(_) => 1,
};
Arbiter::stop_system();
code
}
pub fn block_on<F, I, E>(&mut self, fut: F) -> Result<I, E>
where
F: Future<Item = I, Error = E>,
{
let _ = self.rt.block_on(future::lazy(move || {
Arbiter::run_system();
Ok::<_, ()>(())
}));
let res = self.rt.block_on(fut);
let _ = self.rt.block_on(future::lazy(move || {
Arbiter::stop_system();
Ok::<_, ()>(())
}));
res
}
}
pub(crate) struct SystemArbiter {
stop: Option<Sender<i32>>,
arbiters: HashMap<String, Addr<Arbiter>>,
}
impl Actor for SystemArbiter {
type Context = Context<Self>;
}
struct SystemExit(pub i32);
impl Message for SystemExit {
type Result = ();
}
impl Handler<SystemExit> for SystemArbiter {
type Result = ();
fn handle(&mut self, msg: SystemExit, _: &mut Context<Self>) {
for addr in self.arbiters.values() {
addr.do_send(StopArbiter(msg.0));
}
if let Some(stop) = self.stop.take() {
let _ = stop.send(msg.0);
}
}
}
pub(crate) struct RegisterArbiter(pub String, pub Addr<Arbiter>);
#[doc(hidden)]
impl Message for RegisterArbiter {
type Result = ();
}
#[doc(hidden)]
impl Handler<RegisterArbiter> for SystemArbiter {
type Result = ();
fn handle(&mut self, msg: RegisterArbiter, _: &mut Context<Self>) {
self.arbiters.insert(msg.0, msg.1);
}
}
pub(crate) struct UnregisterArbiter(pub String);
#[doc(hidden)]
impl Message for UnregisterArbiter {
type Result = ();
}
#[doc(hidden)]
impl Handler<UnregisterArbiter> for SystemArbiter {
type Result = ();
fn handle(&mut self, msg: UnregisterArbiter, _: &mut Context<Self>) {
self.arbiters.remove(&msg.0);
}
}
impl<I: Send, E: Send> Handler<Execute<I, E>> for SystemArbiter {
type Result = Result<I, E>;
fn handle(&mut self, msg: Execute<I, E>, _: &mut Context<Self>) -> Result<I, E> {
msg.exec()
}
}