use futures::{Async, Future, Poll};
use actor::{Actor, AsyncContext, Supervised};
use address::{channel, Addr};
use arbiter::Arbiter;
use context::Context;
use contextimpl::ContextFut;
use mailbox::DEFAULT_CAPACITY;
use msgs::Execute;
pub struct Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
fut: ContextFut<A, Context<A>>,
}
impl<A> Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
pub fn start<F>(f: F) -> Addr<A>
where
F: FnOnce(&mut A::Context) -> A + 'static,
A: Actor<Context = Context<A>>,
{
let mut ctx = Context::new();
let act = f(&mut ctx);
let addr = ctx.address();
let fut = ctx.into_future(act);
Arbiter::spawn(Supervisor::<A> { fut });
addr
}
pub fn start_in_arbiter<F>(sys: &Addr<Arbiter>, f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut Context<A>) -> A + Send + 'static,
{
let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
sys.do_send(Execute::new(move || -> Result<(), ()> {
let mut ctx = Context::with_receiver(rx);
let act = f(&mut ctx);
let fut = ctx.into_future(act);
Arbiter::spawn(Supervisor::<A> { fut });
Ok(())
}));
Addr::new(tx)
}
}
#[doc(hidden)]
impl<A> Future for Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.fut.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) | Err(_) => {
if !self.fut.restart() {
return Ok(Async::Ready(()));
}
}
}
}
}
}