use std::any::Any;
use std::fmt;
use std::future::Future;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::pin::Pin;
use std::task::{self, Poll, Waker};
use heph_inbox::{Manager, ReceiverConnected};
use log::error;
use crate::actor::{self, Actor, NewActor};
use crate::actor_ref::ActorRef;
use crate::supervisor::{Supervisor, SupervisorStrategy};
pub struct ActorFuture<S, NA: NewActor, RT> {
supervisor: S,
new_actor: NA,
inbox: Manager<NA::Message>,
actor: NA::Actor,
rt: RT,
}
impl<S, NA, RT> ActorFuture<S, NA, RT>
where
S: Supervisor<NA>,
NA: NewActor<RuntimeAccess = RT>,
RT: Clone,
{
#[allow(clippy::type_complexity)]
pub fn new(
supervisor: S,
mut new_actor: NA,
argument: NA::Argument,
rt: RT,
) -> Result<(ActorFuture<S, NA, RT>, ActorRef<NA::Message>), NA::Error> {
let (inbox, sender, receiver) = heph_inbox::Manager::new_small_channel();
let actor_ref = ActorRef::local(sender);
let ctx = actor::Context::new(receiver, rt.clone());
let actor = match new_actor.new(ctx, argument) {
Ok(actor) => actor,
Err(err) => return Err(err),
};
let future = ActorFuture {
supervisor,
new_actor,
inbox,
actor,
rt,
};
Ok((future, actor_ref))
}
fn handle_actor_error(&mut self, waker: &Waker, err: <NA::Actor as Actor>::Error) -> Poll<()> {
match self.supervisor.decide(err) {
SupervisorStrategy::Restart(arg) => {
match self.create_new_actor(arg) {
Ok(()) => {
waker.wake_by_ref();
Poll::Pending
}
Err(err) => self.handle_restart_error(waker, err),
}
}
SupervisorStrategy::Stop => Poll::Ready(()),
}
}
fn handle_actor_panic(
&mut self,
waker: &Waker,
panic: Box<dyn Any + Send + 'static>,
) -> Poll<()> {
match self.supervisor.decide_on_panic(panic) {
SupervisorStrategy::Restart(arg) => {
match self.create_new_actor(arg) {
Ok(()) => {
waker.wake_by_ref();
Poll::Pending
}
Err(err) => self.handle_restart_error(waker, err),
}
}
SupervisorStrategy::Stop => Poll::Ready(()),
}
}
fn handle_restart_error(&mut self, waker: &Waker, err: NA::Error) -> Poll<()> {
match self.supervisor.decide_on_restart_error(err) {
SupervisorStrategy::Restart(arg) => {
match self.create_new_actor(arg) {
Ok(()) => {
waker.wake_by_ref();
Poll::Pending
}
Err(err) => {
self.supervisor.second_restart_error(err);
Poll::Ready(())
}
}
}
SupervisorStrategy::Stop => Poll::Ready(()),
}
}
fn create_new_actor(&mut self, arg: NA::Argument) -> Result<(), NA::Error> {
let receiver = self.inbox.new_receiver().unwrap_or_else(inbox_failure);
let ctx = actor::Context::new(receiver, self.rt.clone());
self.new_actor.new(ctx, arg).map(|actor| {
unsafe { Pin::new_unchecked(&mut self.actor) }.set(actor)
})
}
}
impl<S, NA, RT> Future for ActorFuture<S, NA, RT>
where
S: Supervisor<NA>,
NA: NewActor<RuntimeAccess = RT>,
RT: Clone,
{
type Output = ();
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = unsafe { Pin::get_unchecked_mut(self) };
let mut actor = unsafe { Pin::new_unchecked(&mut this.actor) };
match catch_unwind(AssertUnwindSafe(|| actor.as_mut().try_poll(ctx))) {
Ok(Poll::Ready(Ok(()))) => Poll::Ready(()),
Ok(Poll::Ready(Err(err))) => this.handle_actor_error(ctx.waker(), err),
Ok(Poll::Pending) => Poll::Pending,
Err(panic) => {
let msg = panic_message(&*panic);
error!("actor '{}' panicked at '{}'", NA::name(), msg);
this.handle_actor_panic(ctx.waker(), panic)
}
}
}
}
impl<S, NA, RT> fmt::Debug for ActorFuture<S, NA, RT>
where
S: Supervisor<NA> + fmt::Debug,
NA: NewActor<RuntimeAccess = RT>,
RT: Clone + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ActorFuture")
.field("supervisor", &self.supervisor)
.field("actor", &NA::name())
.field("rt", &self.rt)
.finish()
}
}
fn panic_message<'a>(panic: &'a (dyn Any + Send + 'static)) -> &'a str {
match panic.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match panic.downcast_ref::<String>() {
Some(s) => &**s,
None => "<unknown>",
},
}
}
#[cold]
fn inbox_failure<T>(_: ReceiverConnected) -> T {
panic!("failed to create new receiver for actor's inbox. Was the `actor::Context` leaked?");
}