use std::pin::Pin;
use std::task::{self, Poll};
use actix_rt::Arbiter;
use futures::Future;
use crate::actor::{Actor, AsyncContext, Supervised};
use crate::address::{channel, Addr};
use crate::context::Context;
use crate::contextimpl::ContextFut;
use crate::mailbox::DEFAULT_CAPACITY;
#[derive(Debug)]
pub struct Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
fut: ContextFut<A, Context<A>>,
}
impl<A> Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
pub fn start<F>(f: F) -> Addr<A>
where
F: FnOnce(&mut A::Context) -> A + 'static,
A: Actor<Context = Context<A>>,
{
let mut ctx = Context::new();
let act = f(&mut ctx);
let addr = ctx.address();
let fut = ctx.into_future(act);
actix_rt::spawn(Self { fut });
addr
}
pub fn start_in_arbiter<F>(sys: &Arbiter, f: F) -> Addr<A>
where
A: Actor<Context = Context<A>>,
F: FnOnce(&mut Context<A>) -> A + Send + 'static,
{
let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
sys.exec_fn(move || {
let mut ctx = Context::with_receiver(rx);
let act = f(&mut ctx);
let fut = ctx.into_future(act);
actix_rt::spawn(Self { fut });
});
Addr::new(tx)
}
}
#[doc(hidden)]
impl<A> Future for Supervisor<A>
where
A: Supervised + Actor<Context = Context<A>>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
loop {
match unsafe { Pin::new_unchecked(&mut this.fut) }.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(_) => {
if !this.fut.restart() {
return Poll::Ready(());
}
}
}
}
}
}