use std::mem;
use futures::{Future, Async, Poll, Stream};
use actor::{Actor, Supervised, ActorContext, AsyncContext};
use arbiter::Arbiter;
use address::{Address, SyncAddress};
use context::{Context, ContextProtocol, AsyncContextApi};
use envelope::Envelope;
use msgs::Execute;
use queue::{sync, unsync};
pub struct Supervisor<A: Supervised> where A: Actor<Context=Context<A>> {
ctx: A::Context,
#[allow(dead_code)]
addr: unsync::UnboundedSender<ContextProtocol<A>>,
sync_msgs: Option<sync::UnboundedReceiver<Envelope<A>>>,
unsync_msgs: unsync::UnboundedReceiver<ContextProtocol<A>>,
}
impl<A> Supervisor<A> where A: Supervised + Actor<Context=Context<A>>
{
pub fn start<F>(f: F) -> Address<A>
where A: Actor<Context=Context<A>>,
F: FnOnce(&mut A::Context) -> A + 'static
{
let mut ctx = Context::new(None);
let addr = ctx.unsync_sender();
let act = f(&mut ctx);
ctx.set_actor(act);
let rx = unsync::unbounded();
let mut supervisor = Supervisor {
ctx: ctx,
addr: addr,
sync_msgs: None,
unsync_msgs: rx };
let addr = supervisor.unsync_msgs.sender();
Arbiter::handle().spawn(supervisor);
Address::new(addr)
}
pub fn start_in<F>(addr: &SyncAddress<Arbiter>, f: F) -> SyncAddress<A>
where A: Actor<Context=Context<A>>,
F: FnOnce(&mut Context<A>) -> A + Send + 'static
{
let (tx, rx) = sync::unbounded();
addr.send(Execute::new(move || -> Result<(), ()> {
let mut ctx = Context::new(None);
let addr = ctx.unsync_sender();
let act = f(&mut ctx);
ctx.set_actor(act);
let lrx = unsync::unbounded();
let supervisor = Supervisor {
ctx: ctx,
addr: addr,
sync_msgs: Some(rx),
unsync_msgs: lrx };
Arbiter::handle().spawn(supervisor);
Ok(())
}));
SyncAddress::new(tx)
}
#[inline]
fn connected(&mut self) -> bool {
self.unsync_msgs.connected() ||
self.sync_msgs.as_ref().map(|msgs| msgs.connected()).unwrap_or(false)
}
fn sync_address(&mut self) -> SyncAddress<A> {
if self.sync_msgs.is_none() {
let (tx, rx) = sync::unbounded();
self.sync_msgs = Some(rx);
SyncAddress::new(tx)
} else {
if let Some(ref mut addr) = self.sync_msgs {
return SyncAddress::new(addr.sender())
}
unreachable!();
}
}
fn restart(&mut self) {
let ctx = Context::new(None);
let ctx = mem::replace(&mut self.ctx, ctx);
self.ctx.set_actor(ctx.into_inner());
self.ctx.restarting();
self.addr = self.ctx.unsync_sender();
}
}
#[doc(hidden)]
impl<A> Future for Supervisor<A> where A: Supervised + Actor<Context=Context<A>> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
'outer: loop {
if !self.connected() {
self.ctx.stop();
}
let ctx: &mut Context<A> = unsafe{ mem::transmute(&mut self.ctx) };
let act: &mut A = unsafe{ mem::transmute(ctx.actor()) };
match ctx.poll() {
Ok(Async::NotReady) =>
if ctx.waiting() {
return Ok(Async::NotReady)
},
Ok(Async::Ready(_)) | Err(_) => {
if !self.connected() {
return Ok(Async::Ready(()))
}
self.restart();
continue 'outer;
}
}
let mut not_ready = true;
loop {
if !ctx.is_alive() {
continue 'outer
}
if ctx.waiting() {
return Ok(Async::NotReady)
}
match self.unsync_msgs.poll() {
Ok(Async::Ready(Some(msg))) => {
not_ready = false;
match msg {
ContextProtocol::Upgrade(tx) => {
let _ = tx.send(self.sync_address());
}
ContextProtocol::Envelope(mut env) => {
env.handle(act, ctx);
}
}
}
Ok(Async::NotReady) | Ok(Async::Ready(None)) | Err(_) => break,
}
}
if let Some(ref mut msgs) = self.sync_msgs {
loop {
if !ctx.is_alive() {
continue 'outer
}
if ctx.waiting() {
return Ok(Async::NotReady)
}
match msgs.poll() {
Ok(Async::Ready(Some(mut env))) => {
not_ready = false;
env.handle(act, ctx);
},
Ok(Async::NotReady) | Ok(Async::Ready(None)) | Err(_) => break,
}
}
}
if not_ready {
return Ok(Async::NotReady)
}
}
}
}
trait FnFactory<A: Actor>: 'static where A::Context: AsyncContext<A> {
fn call(self: Box<Self>, &mut A::Context) -> A;
}
impl<A: Actor, F: FnOnce(&mut A::Context) -> A + 'static> FnFactory<A> for F
where A::Context: AsyncContext<A>
{
#[cfg_attr(feature="cargo-clippy", allow(boxed_local))]
fn call(self: Box<Self>, ctx: &mut A::Context) -> A {
(*self)(ctx)
}
}