use futures::{Async, Future, Poll};
use actor::{Actor, AsyncContext, Supervised};
use address::{channel, Addr};
use arbiter::Arbiter;
use context::Context;
use mailbox::DEFAULT_CAPACITY;
use msgs::Execute;
pub struct Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
ctx: A::Context,
}
impl<A> Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
pub fn start<F>(f: F) -> Addr<A>
where
F: FnOnce(&mut A::Context) -> A + 'static,
A: Actor<Context = Context<A>>,
{
let mut ctx = Context::new(None);
let act = f(&mut ctx);
let addr = ctx.address();
ctx.set_actor(act);
Arbiter::spawn(Supervisor::<A> { ctx });
addr
}
pub fn start_in_arbiter<F>(sys: &Addr<Arbiter>, f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut Context<A>) -> A + Send + 'static,
{
let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
sys.do_send(Execute::new(move || -> Result<(), ()> {
let mut ctx = Context::with_receiver(None, rx);
let act = f(&mut ctx);
ctx.set_actor(act);
Arbiter::spawn(Supervisor::<A> { ctx });
Ok(())
}));
Addr::new(tx)
}
}
#[doc(hidden)]
impl<A> Future for Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.ctx.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) | Err(_) => {
if !self.ctx.restart() {
return Ok(Async::Ready(()));
}
}
}
}
}
}