ractor_actors 0.5.0

Helpful actors built with Ractor
Documentation
use super::{TimeoutStrategy, WatchdogStats, WATCHDOG_TIMEOUT};
use ractor::concurrency::{Duration, JoinHandle};
use ractor::{
    Actor, ActorCell, ActorId, ActorProcessingErr, ActorRef, MessagingErr, RpcReplyPort,
    SupervisionEvent,
};
use std::collections::HashMap;
use tracing::{debug, info};

pub struct Watchdog;

pub enum WatchdogMsg {
    Register(ActorCell, Duration, TimeoutStrategy),
    Unregister(ActorCell),
    Ping(ActorId, RpcReplyPort<()>),
    Timeout(ActorId),
    Stats(RpcReplyPort<WatchdogStats>),
}

pub struct WatchdogState {
    subjects: HashMap<ActorId, Registration>,
    kills: usize,
    stops: usize,
}

struct Registration {
    actor: ActorCell,
    timeout: Duration,
    timeout_strategy: TimeoutStrategy,
    timer: JoinHandle<Result<(), MessagingErr<WatchdogMsg>>>,
}

#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Actor for Watchdog {
    type Msg = WatchdogMsg;
    type State = WatchdogState;
    type Arguments = ();

    async fn pre_start(
        &self,
        _: ActorRef<Self::Msg>,
        _: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        Ok(WatchdogState {
            subjects: HashMap::new(),
            kills: 0,
            stops: 0,
        })
    }

    async fn handle(
        &self,
        myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            WatchdogMsg::Register(actor, timeout, timeout_strategy) => {
                let id = actor.get_id();

                let timer = myself.send_after(timeout, move || WatchdogMsg::Timeout(id));

                state.subjects.insert(
                    id,
                    Registration {
                        actor,
                        timeout,
                        timeout_strategy,
                        timer,
                    },
                );
                Ok(())
            }
            WatchdogMsg::Unregister(actor) => {
                state.unregister(&actor);
                Ok(())
            }
            WatchdogMsg::Ping(actor, reply) => match state.subjects.get(&actor) {
                Some(Registration { timeout, timer, .. }) => {
                    info!(actor = actor.to_string(), "got ping, rescheduling watchdog");
                    timer.abort();
                    myself.send_after(*timeout, move || WatchdogMsg::Timeout(actor));

                    // Ignoring this, don't want the death of the subject to affect the watchdog
                    let _ = reply.send(());

                    Ok(())
                }
                _ => {
                    state.subjects.remove(&actor);
                    Ok(())
                }
            },
            WatchdogMsg::Timeout(actor) => {
                if let Some(Registration {
                    actor,
                    timeout_strategy,
                    ..
                }) = state.subjects.remove(&actor)
                {
                    match timeout_strategy {
                        TimeoutStrategy::Kill => {
                            info!(
                                actor_id = actor.get_id().to_string(),
                                actor_name = actor.get_name(),
                                "watchdog timeout, killing",
                            );
                            actor.kill();
                            state.kills += 1;
                        }
                        TimeoutStrategy::Stop => {
                            info!(
                                actor_id = actor.get_id().to_string(),
                                actor_name = actor.get_name(),
                                "watchdog timeout, stopping",
                            );
                            actor.stop(Some(WATCHDOG_TIMEOUT.to_string()));
                            state.stops += 1;
                        }
                    }
                };
                Ok(())
            }
            WatchdogMsg::Stats(reply) => reply
                .send(WatchdogStats { kills: state.kills })
                .map_err(ActorProcessingErr::from),
        }
    }

    async fn handle_supervisor_evt(
        &self,
        _: ActorRef<Self::Msg>,
        message: SupervisionEvent,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            SupervisionEvent::ActorTerminated(cell, ..) => {
                debug!(actor = cell.get_id().to_string(), "actor terminated");
                state.unregister(&cell);
                Ok(())
            }
            SupervisionEvent::ActorFailed(cell, ..) => {
                debug!(actor = cell.get_id().to_string(), "actor failed");
                state.unregister(&cell);
                Ok(())
            }
            _ => Ok(()),
        }
    }
}

impl WatchdogState {
    fn unregister(&mut self, cell: &ActorCell) -> Option<ActorCell> {
        debug!(actor = cell.get_id().to_string(), "unregistering");
        self.subjects
            .remove(&cell.get_id())
            .map(|Registration { actor, timer, .. }| {
                timer.abort();
                actor
            })
    }
}