use futures::{Future, Async, Poll};
use actor::{Actor, Supervised};
use arbiter::Arbiter;
use address::{sync_channel, ActorAddress, Addr, Syn};
use context::Context;
use mailbox::DEFAULT_CAPACITY;
use msgs::Execute;
pub struct Supervisor<A> where A: Supervised + Actor<Context=Context<A>> {
ctx: A::Context
}
impl<A> Supervisor<A> where A: Supervised + Actor<Context=Context<A>>
{
pub fn start<Addr, F>(f: F) -> Addr
where F: FnOnce(&mut A::Context) -> A + 'static,
A: Actor<Context=Context<A>> + ActorAddress<A, Addr>
{
let mut ctx = Context::new(None);
let act = f(&mut ctx);
let addr = <A as ActorAddress<A, Addr>>::get(&mut ctx);
ctx.set_actor(act);
Arbiter::handle().spawn(Supervisor::<A>{ ctx });
addr
}
pub fn start_in<F>(addr: &Addr<Syn, Arbiter>, f: F) -> Addr<Syn, A>
where A: Actor<Context=Context<A>>,
F: FnOnce(&mut Context<A>) -> A + Send + 'static
{
let (tx, rx) = sync_channel::channel(DEFAULT_CAPACITY);
addr.do_send(Execute::new(move || -> Result<(), ()> {
let mut ctx = Context::with_receiver(None, rx);
let act = f(&mut ctx);
ctx.set_actor(act);
Arbiter::handle().spawn(Supervisor::<A>{ ctx });
Ok(())
}));
Addr::new(tx)
}
}
#[doc(hidden)]
impl<A> Future for Supervisor<A> where A: Supervised + Actor<Context=Context<A>> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.ctx.poll() {
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Ok(Async::Ready(_)) | Err(_) => {
if !self.ctx.restart() {
return Ok(Async::Ready(()))
}
}
}
}
}
}