use futures::{Future, Async, Poll, Stream};
use actor::{Actor, Supervised, AsyncContext};
use arbiter::Arbiter;
use address::{Address, SyncAddress};
use context::{Context, AsyncContextApi};
use contextcells::ContextProtocol;
use envelope::Envelope;
use msgs::Execute;
use queue::{sync, unsync};
pub struct Supervisor<A: Supervised> where A: Actor<Context=Context<A>> {
cell: Option<ActorCell<A>>,
factory: Option<Box<FnFactory<A>>>,
msgs: unsync::UnboundedReceiver<ContextProtocol<A>>,
sync_msgs: sync::UnboundedReceiver<Envelope<A>>,
msg: Option<ContextProtocol<A>>,
sync_msg: Option<Envelope<A>>,
sync_alive: bool,
}
struct ActorCell<A: Supervised> {
ctx: A::Context,
addr: unsync::UnboundedSender<ContextProtocol<A>>,
}
impl<A> Supervisor<A> where A: Supervised + Actor<Context=Context<A>>
{
pub fn start<F>(lazy: bool, f: F) -> (Address<A>, SyncAddress<A>)
where A: Actor<Context=Context<A>>,
F: FnOnce(&mut A::Context) -> A + 'static
{
let (cell, factory) = if !lazy {
let mut ctx = Context::new(None);
let addr = ctx.unsync_sender();
let act = f(&mut ctx);
ctx.set_actor(act);
(Some(ActorCell{ctx: ctx, addr: addr}), None)
} else {
let f: Box<FnFactory<A>> = Box::new(f);
(None, Some(f))
};
let rx = unsync::unbounded();
let (stx, srx) = sync::unbounded();
let mut supervisor = Supervisor {
cell: cell,
factory: factory,
msgs: rx,
sync_msgs: srx,
msg: None,
sync_msg: None,
sync_alive: true,
};
let addr = Address::new(supervisor.msgs.sender());
let saddr = SyncAddress::new(stx);
Arbiter::handle().spawn(supervisor);
(addr, saddr)
}
pub fn start_in<F>(addr: &SyncAddress<Arbiter>, lazy: bool, f: F) -> Option<SyncAddress<A>>
where A: Actor<Context=Context<A>>,
F: FnOnce(&mut Context<A>) -> A + Send + 'static
{
if addr.connected() {
let (tx, rx) = sync::unbounded();
addr.send(Execute::new(move || -> Result<(), ()> {
let (cell, factory) = if lazy {
let mut ctx = Context::new(None);
let addr = ctx.unsync_sender();
let act = f(&mut ctx);
ctx.set_actor(act);
(Some(ActorCell{ctx: ctx, addr: addr}), None)
} else {
let f: Box<FnFactory<A>> = Box::new(f);
(None, Some(f))
};
let lrx = unsync::unbounded();
let supervisor = Supervisor {
cell: cell,
factory: factory,
msgs: lrx,
sync_msgs: rx,
msg: None,
sync_msg: None,
sync_alive: true,
};
Arbiter::handle().spawn(supervisor);
Ok(())
}));
if addr.connected() {
Some(SyncAddress::new(tx))
} else {
None
}
} else {
None
}
}
fn get_cell(&mut self) -> &mut ActorCell<A> {
if self.cell.is_none() {
let f = self.factory.take().expect("Should be available");
let mut ctx = Context::new(None);
let addr = ctx.unsync_sender();
let act = f.call(&mut ctx);
ctx.set_actor(act);
self.cell = Some(ActorCell {ctx: ctx, addr: addr});
}
self.cell.as_mut().unwrap()
}
fn restart(&mut self) {
let cell = self.cell.take().unwrap();
let mut ctx = Context::new(None);
let addr = ctx.unsync_sender();
ctx.set_actor(cell.ctx.into_inner());
ctx.restarting();
self.cell = Some(ActorCell {ctx: ctx, addr: addr});
}
}
#[doc(hidden)]
impl<A> Future for Supervisor<A> where A: Supervised + Actor<Context=Context<A>>
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if self.cell.is_some() {
match self.get_cell().ctx.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) | Err(_) => {
self.restart();
}
}
}
let mut not_ready = true;
if let Some(msg) = self.msg.take() {
match msg {
ContextProtocol::Upgrade(_) => (),
msg => {
if let Err(msg) = self.get_cell().addr.unbounded_send(msg) {
self.msg = Some(msg.into_inner());
}
}
}
}
if self.msg.is_none() {
match self.msgs.poll() {
Ok(Async::Ready(Some(msg))) => {
not_ready = false;
match msg {
ContextProtocol::Upgrade(tx) => {
self.sync_alive = true;
let _ = tx.send(SyncAddress::new(self.sync_msgs.sender()));
}
msg => {
if let Err(msg) = self.get_cell().addr.unbounded_send(msg) {
self.msg = Some(msg.into_inner());
}
},
}
}
Ok(Async::NotReady) | Ok(Async::Ready(None)) | Err(_) => (),
}
}
if let Some(msg) = self.sync_msg.take() {
if let Err(msg) = self.get_cell().addr.unbounded_send(
ContextProtocol::Envelope(msg))
{
if let ContextProtocol::Envelope(msg) = msg.into_inner() {
self.sync_msg = Some(msg);
}
}
}
if self.sync_msg.is_none() {
match self.sync_msgs.poll() {
Ok(Async::Ready(Some(msg))) => {
not_ready = false;
if let Err(msg) = self.get_cell().addr.unbounded_send(
ContextProtocol::Envelope(msg))
{
if let ContextProtocol::Envelope(msg) = msg.into_inner() {
self.sync_msg = Some(msg);
}
}
},
Ok(Async::NotReady) => (),
Ok(Async::Ready(None)) | Err(_) => {
self.sync_alive = false
}
}
}
if not_ready {
return Ok(Async::NotReady)
}
match self.get_cell().ctx.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) | Err(_) => {
if !self.get_cell().ctx.alive() &&
!self.msgs.connected() &&
!self.sync_alive
{
return Ok(Async::Ready(()))
}
self.restart();
}
}
}
}
}
trait FnFactory<A: Actor>: 'static where A::Context: AsyncContext<A> {
fn call(self: Box<Self>, &mut A::Context) -> A;
}
impl<A: Actor, F: FnOnce(&mut A::Context) -> A + 'static> FnFactory<A> for F
where A::Context: AsyncContext<A>
{
#[cfg_attr(feature="cargo-clippy", allow(boxed_local))]
fn call(self: Box<Self>, ctx: &mut A::Context) -> A {
(*self)(ctx)
}
}