mm1-sup 0.7.23

An Erlang-style actor runtime for Rust.
Documentation
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;

use mm1_ask::Reply;
use mm1_common::errors::chain::StdErrorDisplayChainExt;
use mm1_common::errors::error_kind::HasErrorKind;
use mm1_common::errors::error_of::ErrorOf;
use mm1_common::log;
use mm1_core::context::{ForkErrorKind, RecvErrorKind};
use mm1_core::envelope::dispatch;
use mm1_proto::Message;
use mm1_proto_ask::Request;
use mm1_proto_sup::mixed as m_sup;
use mm1_proto_system::Exited;

use crate::mixed::decider::{Action, Decider};
use crate::mixed::strategy::RestartStrategy;
use crate::mixed::{
    ChildType, ErasedActorFactory, MixedSup, MixedSupContext, spec_builder, sup_child,
};
type ChildSpec<F> = crate::common::child_spec::ChildSpec<F, ChildType>;

pub async fn mixed_sup<Runnable, Ctx, RS, CS, K>(
    ctx: &mut Ctx,
    sup_spec: MixedSup<RS, CS>,
) -> Result<(), MixedSupError>
where
    Runnable: Send,
    Ctx: MixedSupContext<Runnable>,
    Ctx: Reply,
    CS: spec_builder::CollectInto<K, Runnable>,
    RS: RestartStrategy<K>,
    K: fmt::Display,
    K: Clone + Hash + Eq,
    K: Message,
    ChildSpec<ErasedActorFactory<Runnable>>: Send + Sync + 'static,
{
    ctx.set_trap_exit(true).await;

    let MixedSup {
        restart_strategy,
        children,
    } = sup_spec;
    let sup_addr = ctx.address();
    let mut decider = restart_strategy.decider();
    let children = do_init_children(&mut decider, children)?;

    loop {
        if let Some(action) = decider
            .next_action(ctx.now())
            .map_err(MixedSupError::decider)?
        {
            log::debug!(action = %action, "processing decider action");
            match action {
                Action::Noop => (),
                Action::InitDone => {
                    ctx.init_done(ctx.address()).await;
                },
                Action::Quit { normal_exit } => {
                    if normal_exit {
                        ctx.quit_ok().await;
                    } else {
                        ctx.quit_err(MixedSupError::Escalated).await;
                    }
                },
                Action::Start { child_id } => {
                    let child_id = child_id.clone();
                    let child_spec = children
                        .get(&child_id)
                        .expect("child_id provided by the decider")
                        .clone()
                        .map_launcher(|f| f.produce(()));

                    let forked = ctx
                        .fork()
                        .await
                        .map_err(|e| e.kind())
                        .map_err(MixedSupError::Fork)?;
                    forked
                        .run(move |mut ctx| async move { sup_child::run(&mut ctx, sup_addr, child_id, child_spec).await })
                        .await;
                },

                Action::Stop { address, child_id } => {
                    let stop_timeout = child_id
                        .map(|id| {
                            children
                                .get(id)
                                .expect("child_id provided by the decider")
                                .stop_timeout
                        })
                        .unwrap_or_default();
                    let forked = ctx
                        .fork()
                        .await
                        .map_err(|e| e.kind())
                        .map_err(MixedSupError::Fork)?;
                    forked
                        .run(move |mut ctx| {
                            async move {
                                sup_child::shutdown(&mut ctx, sup_addr, address, stop_timeout).await
                            }
                        })
                        .await;
                },
            }
        }

        let received = ctx.recv().await.map_err(MixedSupError::recv)?;

        dispatch!(match received {
            sup_child::Started::<K> { child_id, address } => {
                log::debug!(child_id = %child_id, address = %address, "started. Linking...");
                ctx.link(address).await;
                decider.started(&child_id, address, ctx.now())
            },
            sup_child::StartFailed::<K> { child_id } => {
                log::warn!(child_id = %child_id, "failed to start. Initiating shutdown...");
                decider.failed(&child_id, ctx.now());
            },
            sup_child::StopFailed { address, reason } => {
                log::warn!(
                    address = %address, reason = %reason.as_display_chain(),
                    "failed to stop. Initiating shutdown..."
                );
                decider.quit(false);
            },

            Request::<_> {
                header,
                payload: m_sup::GetChildRequest::<K> { child_id },
            } => {
                let reply_with: m_sup::GetChildResponse = match decider.address(&child_id) {
                    Err(reason) => {
                        Err(ErrorOf::new(
                            m_sup::GetChildErrorKind::UnknownChild,
                            reason.to_string(),
                        ))
                    },
                    Ok(address_opt) => Ok(address_opt),
                };
                ctx.reply(header, reply_with).await.ok();
            },

            Exited { peer, normal_exit } => {
                log::debug!(peer = %peer, "exited");
                decider.exited(peer, normal_exit, ctx.now());
            },

            unexpected @ _ => log::warn!(msg = ?unexpected, "unexpected message"),
        });
    }
}

#[derive(Debug, thiserror::Error)]
pub enum MixedSupError {
    #[error("escalated supervisor failure")]
    Escalated,

    #[error("decider: {}", _0)]
    Decider(String),

    #[error("recv: {}", _0)]
    Recv(RecvErrorKind),

    #[error("fork: {}", _0)]
    Fork(ForkErrorKind),
}

impl MixedSupError {
    pub fn decider(reason: impl fmt::Display) -> Self {
        Self::Decider(reason.to_string())
    }

    pub fn recv(reason: impl HasErrorKind<RecvErrorKind>) -> Self {
        Self::Recv(reason.kind())
    }
}

fn do_init_children<CS, D, R, K>(
    decider: &mut D,
    children: CS,
) -> Result<HashMap<K, ChildSpec<ErasedActorFactory<R>>>, MixedSupError>
where
    CS: spec_builder::CollectInto<K, R>,
    D: Decider<Key = K>,
    K: Clone + Hash + Eq,
{
    let mut flat = vec![];
    children.collect_into(&mut flat);

    for (k, spec) in flat.iter() {
        decider
            .add(k.clone(), spec.child_type)
            .map_err(MixedSupError::decider)?;
    }

    let children_map = flat.into_iter().collect();

    Ok(children_map)
}