use std::any::Any;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::pin::Pin;
use std::task::{self, Poll};
use heph::actor::{self, Actor, NewActor};
use heph::supervisor::{Supervisor, SupervisorStrategy};
use heph_inbox::{Manager, Receiver};
use log::error;
use crate::access::PrivateAccess;
use crate::process::{panic_message, Process, ProcessId, ProcessResult};
use crate::{self as rt, RuntimeRef, ThreadLocal, ThreadSafe};
pub(crate) struct ActorProcess<S, NA: NewActor> {
supervisor: S,
new_actor: NA,
inbox: Manager<NA::Message>,
actor: NA::Actor,
}
impl<S, NA> ActorProcess<S, NA>
where
S: Supervisor<NA>,
NA: NewActor,
NA::RuntimeAccess: RuntimeSupport,
{
pub(crate) const fn new(
supervisor: S,
new_actor: NA,
actor: NA::Actor,
inbox: Manager<NA::Message>,
) -> ActorProcess<S, NA> {
ActorProcess {
supervisor,
new_actor,
inbox,
actor,
}
}
fn handle_actor_error(
&mut self,
runtime_ref: &mut RuntimeRef,
pid: ProcessId,
err: <NA::Actor as Actor>::Error,
) -> ProcessResult {
match self.supervisor.decide(err) {
SupervisorStrategy::Restart(arg) => {
match self.create_new_actor(runtime_ref, pid, arg) {
Ok(()) => {
NA::RuntimeAccess::mark_ready(runtime_ref, pid);
ProcessResult::Pending
}
Err(err) => self.handle_restart_error(runtime_ref, pid, err),
}
}
SupervisorStrategy::Stop => ProcessResult::Complete,
_ => unreachable!(),
}
}
fn handle_actor_panic(
&mut self,
runtime_ref: &mut RuntimeRef,
pid: ProcessId,
panic: Box<dyn Any + Send + 'static>,
) -> ProcessResult {
match self.supervisor.decide_on_panic(panic) {
SupervisorStrategy::Restart(arg) => {
match self.create_new_actor(runtime_ref, pid, arg) {
Ok(()) => {
NA::RuntimeAccess::mark_ready(runtime_ref, pid);
ProcessResult::Pending
}
Err(err) => self.handle_restart_error(runtime_ref, pid, err),
}
}
SupervisorStrategy::Stop => ProcessResult::Complete,
_ => unreachable!(),
}
}
fn handle_restart_error(
&mut self,
runtime_ref: &mut RuntimeRef,
pid: ProcessId,
err: NA::Error,
) -> ProcessResult {
match self.supervisor.decide_on_restart_error(err) {
SupervisorStrategy::Restart(arg) => {
match self.create_new_actor(runtime_ref, pid, arg) {
Ok(()) => {
NA::RuntimeAccess::mark_ready(runtime_ref, pid);
ProcessResult::Pending
}
Err(err) => {
self.supervisor.second_restart_error(err);
ProcessResult::Complete
}
}
}
SupervisorStrategy::Stop => ProcessResult::Complete,
_ => unreachable!(),
}
}
fn create_new_actor(
&mut self,
runtime_ref: &mut RuntimeRef,
pid: ProcessId,
arg: NA::Argument,
) -> Result<(), NA::Error> {
let receiver = self.inbox.new_receiver().expect(
"failed to create new receiver for actor's inbox. Was the `actor::Context` leaked?",
);
let ctx = NA::RuntimeAccess::new_context(pid, receiver, runtime_ref);
self.new_actor.new(ctx, arg).map(|actor| {
unsafe { Pin::new_unchecked(&mut self.actor) }.set(actor)
})
}
}
impl<S, NA> Process for ActorProcess<S, NA>
where
S: Supervisor<NA>,
NA: NewActor,
NA::RuntimeAccess: rt::Access + RuntimeSupport,
{
fn name(&self) -> &'static str {
NA::name()
}
fn run(self: Pin<&mut Self>, runtime_ref: &mut RuntimeRef, pid: ProcessId) -> ProcessResult {
let this = unsafe { Pin::get_unchecked_mut(self) };
let mut actor = unsafe { Pin::new_unchecked(&mut this.actor) };
let waker = NA::RuntimeAccess::new_task_waker(runtime_ref, pid);
let mut task_ctx = task::Context::from_waker(&waker);
match catch_unwind(AssertUnwindSafe(|| actor.as_mut().try_poll(&mut task_ctx))) {
Ok(Poll::Ready(Ok(()))) => ProcessResult::Complete,
Ok(Poll::Ready(Err(err))) => this.handle_actor_error(runtime_ref, pid, err),
Ok(Poll::Pending) => ProcessResult::Pending,
Err(panic) => {
let msg = panic_message(&*panic);
error!("actor '{}' panicked at '{}'", NA::name(), msg);
this.handle_actor_panic(runtime_ref, pid, panic)
}
}
}
}
pub(crate) trait RuntimeSupport {
fn new_context<M>(
pid: ProcessId,
inbox: Receiver<M>,
runtime_ref: &mut RuntimeRef,
) -> actor::Context<M, Self>
where
Self: Sized;
fn mark_ready(runtime_ref: &mut RuntimeRef, pid: ProcessId);
}
impl RuntimeSupport for ThreadLocal {
fn new_context<M>(
pid: ProcessId,
inbox: Receiver<M>,
runtime_ref: &mut RuntimeRef,
) -> actor::Context<M, ThreadLocal> {
actor::Context::new(inbox, ThreadLocal::new(pid, runtime_ref.clone()))
}
fn mark_ready(runtime_ref: &mut RuntimeRef, pid: ProcessId) {
runtime_ref.mark_ready_local(pid)
}
}
impl RuntimeSupport for ThreadSafe {
fn new_context<M>(
pid: ProcessId,
inbox: Receiver<M>,
runtime_ref: &mut RuntimeRef,
) -> actor::Context<M, ThreadSafe> {
actor::Context::new(inbox, ThreadSafe::new(pid, runtime_ref.clone_shared()))
}
fn mark_ready(runtime_ref: &mut RuntimeRef, pid: ProcessId) {
runtime_ref.mark_ready_shared(pid)
}
}