use futures::{Async, Future, Poll};
use actor::{Actor, Supervised};
use address::{sync_channel, ActorAddress, Addr, Syn};
use arbiter::Arbiter;
use context::Context;
use mailbox::DEFAULT_CAPACITY;
use msgs::Execute;
pub struct Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
ctx: A::Context,
}
impl<A> Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
pub fn start<Addr, F>(f: F) -> Addr
where
F: FnOnce(&mut A::Context) -> A + 'static,
A: Actor<Context = Context<A>> + ActorAddress<A, Addr>,
{
let mut ctx = Context::new(None);
let act = f(&mut ctx);
let addr = <A as ActorAddress<A, Addr>>::get(&mut ctx);
ctx.set_actor(act);
Arbiter::handle().spawn(Supervisor::<A> { ctx });
addr
}
pub fn start_in<F>(addr: &Addr<Syn, Arbiter>, f: F) -> Addr<Syn, A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut Context<A>) -> A + Send + 'static,
{
let (tx, rx) = sync_channel::channel(DEFAULT_CAPACITY);
addr.do_send(Execute::new(move || -> Result<(), ()> {
let mut ctx = Context::with_receiver(None, rx);
let act = f(&mut ctx);
ctx.set_actor(act);
Arbiter::handle().spawn(Supervisor::<A> { ctx });
Ok(())
}));
Addr::new(tx)
}
}
#[doc(hidden)]
impl<A> Future for Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.ctx.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) | Err(_) => {
if !self.ctx.restart() {
return Ok(Async::Ready(()));
}
}
}
}
}
}