coerce 0.8.6

Async actor runtime and distributed systems framework
Documentation
//! Actor supervision and child spawning

use std::collections::HashMap;

use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::scheduler::{start_actor, ActorType};
use crate::actor::system::ActorSystem;
use crate::actor::{
    Actor, ActorId, ActorPath, ActorRefErr, BoxedActorRef, CoreActorRef, LocalActorRef,
};

#[derive(Debug)]
pub struct Supervised {
    pub actor_id: ActorId,
    pub path: ActorPath,
    pub children: HashMap<ActorId, ChildRef>,
}

impl Supervised {
    pub fn new(actor_id: ActorId, path: ActorPath) -> Supervised {
        Self {
            actor_id,
            path,
            children: HashMap::new(),
        }
    }
}

#[derive(Debug, Copy, Clone)]
pub enum ChildType {
    Spawned,
    Attached,
}

#[derive(Debug, Clone)]
pub struct ChildRef {
    child_type: ChildType,
    actor_ref: BoxedActorRef,
}

pub struct Terminated(pub ActorId);

impl Message for Terminated {
    type Result = ();
}

#[async_trait]
impl<A: Actor> Handler<Terminated> for A {
    async fn handle(&mut self, message: Terminated, ctx: &mut ActorContext) {
        if let Some(supervised) = ctx.supervised_mut() {
            supervised.on_child_stopped(&message.0).await;
        }

        self.on_child_stopped(&message.0, ctx).await;
    }
}

impl Supervised {
    pub async fn spawn<A: Actor>(
        &mut self,
        id: ActorId,
        actor: A,
        system: ActorSystem,
        parent_ref: BoxedActorRef,
    ) -> Result<LocalActorRef<A>, ActorRefErr> {
        if let Some(_) = self.children.get(&id) {
            return Err(ActorRefErr::AlreadyExists(id));
        }

        let (tx, rx) = tokio::sync::oneshot::channel();
        let actor_ref = start_actor(
            actor,
            id.clone(),
            ActorType::Anonymous,
            Some(tx),
            Some(system),
            Some(parent_ref),
            self.path.clone(),
        );

        self.children
            .insert(id.clone(), ChildRef::spawned(actor_ref.clone().into()));

        match rx.await {
            Ok(_) => Ok(actor_ref),
            Err(e) => {
                error!("error spawning supervised actor (id={}) {}", &id, e);
                Err(ActorRefErr::ActorStartFailed)
            }
        }
    }

    pub fn spawn_deferred<A: Actor>(
        &mut self,
        id: ActorId,
        actor: A,
        system: ActorSystem,
        parent_ref: BoxedActorRef,
    ) -> Result<LocalActorRef<A>, ActorRefErr> {
        if let Some(_) = self.children.get(&id) {
            return Err(ActorRefErr::AlreadyExists(id));
        }

        let actor_ref = start_actor(
            actor,
            id.clone(),
            ActorType::Anonymous,
            None,
            Some(system),
            Some(parent_ref),
            self.path.clone(),
        );

        self.children
            .insert(id.clone(), ChildRef::spawned(actor_ref.clone().into()));

        Ok(actor_ref)
    }

    pub fn count(&self) -> usize {
        self.children.len()
    }

    pub fn child<A: Actor>(&self, id: &ActorId) -> Option<LocalActorRef<A>> {
        self.children.get(id).and_then(|a| a.actor_ref().as_actor())
    }

    pub fn child_boxed(&self, id: &ActorId) -> Option<BoxedActorRef> {
        self.children.get(id).map(|a| a.actor_ref.clone())
    }

    pub fn attach_child_ref(&mut self, boxed_ref: BoxedActorRef) {
        self.children
            .insert(boxed_ref.actor_id().clone(), ChildRef::attached(boxed_ref));
    }

    pub fn add_child_ref(&mut self, boxed_ref: BoxedActorRef) -> Option<ChildRef> {
        self.children
            .insert(boxed_ref.actor_id().clone(), ChildRef::spawned(boxed_ref))
    }

    pub async fn stop_all(&mut self) {
        let n = self.children.len();
        let stop_results = futures::future::join_all(
            self.children
                .iter()
                .map(|(id, actor)| async move { (id.clone(), actor.actor_ref.stop().await) }),
        )
        .await;

        for (actor_id, stop_result) in stop_results {
            match stop_result {
                Ok(_) => {
                    trace!("actor stopped ({})", actor_id);
                    self.children.remove(&actor_id);
                }
                Err(e) => match e {
                    ActorRefErr::InvalidRef => {
                        warn!("invalid ref, actor_id={} already stopped", &actor_id);
                    }
                    e => {
                        warn!("failed to stop child actor_id={}, err={}", actor_id, e);
                    }
                },
            }
        }

        info!("{} stopped {} child actors", &self.actor_id, n);
    }

    pub async fn on_child_stopped(&mut self, id: &ActorId) {
        if let Some(_) = self.children.remove(id) {
            trace!("child actor (id={}) stopped", id);
        } else {
            trace!("unknown child actor (id={}) stopped", id);
        }
    }
}

impl ChildRef {
    pub fn actor_ref(&self) -> &BoxedActorRef {
        &self.actor_ref
    }

    pub fn is_attached(&self) -> bool {
        matches!(&self.child_type, ChildType::Attached)
    }

    pub fn spawned(actor_ref: BoxedActorRef) -> Self {
        ChildRef {
            child_type: ChildType::Spawned,
            actor_ref,
        }
    }

    pub fn attached(actor_ref: BoxedActorRef) -> Self {
        ChildRef {
            child_type: ChildType::Attached,
            actor_ref,
        }
    }
}