ascolt 0.1.11

Async runtime-agnostic actor framework
Documentation
use async_trait::async_trait;
use std::{
    fmt::{Debug, Display},
    time::{Duration, Instant},
};

use crate::{
    error::{
        actor::{ActorHandleErrorFailure, ActorInitFailure, ActorRuntimeError, ActorStopFailure},
        handler::BaseHandlerError,
    },
    handler::ActorMessageHandlerTrait,
    log,
    messaging::Receiver,
};

pub enum CommandMessage {
    StopActor,
    ForceStopActor,
    RestartActor,
}

#[doc(hidden)]
pub enum ActorMessage<M> {
    CommandMessage(CommandMessage),
    RegularMessage {
        msg: M,
        sent_at: Instant,
        ttl: Option<Duration>,
    },
}

#[async_trait]
pub trait ActorTrait<E>
where
    E: Send + Display + Debug + 'static,
{
    async fn init(&mut self) -> Result<(), ActorInitFailure> {
        Ok(())
    }

    #[allow(unused_variables, unused_mut)]
    async fn on_stop(&mut self) -> Result<(), ActorStopFailure> {
        Ok(())
    }

    #[allow(unused_variables, unused_mut)]
    async fn on_error(
        &mut self,
        error: BaseHandlerError<E>,
    ) -> Result<Option<CommandMessage>, ActorHandleErrorFailure> {
        Ok(Some(CommandMessage::StopActor))
    }
}

pub async fn run<A, M, E>(actor: A, rx: Receiver<M>)
where
    M: Send + 'static,
    A: ActorMessageHandlerTrait<M, E> + ActorTrait<E> + Send + Sync + 'static,
    E: Send + Debug + Display + 'static,
{
    if let Err(error) = run_actor_loop(actor, rx.rx).await {
        log::error(format!("Actor runtime error: {error}"));
    }

    log::info("Actor task finished - channel closed".to_string());
}

async fn run_actor_loop<A, M, E>(
    mut actor: A,
    rx: async_channel::Receiver<ActorMessage<M>>,
) -> Result<(), ActorRuntimeError>
where
    M: Send + 'static,
    A: ActorMessageHandlerTrait<M, E> + ActorTrait<E> + Send + Sync + 'static,
    E: Send + Debug + Display + 'static,
{
    actor.init().await?;

    loop {
        let msg = rx.recv().await?;

        let command_result = handle_message(&mut actor, msg).await?;

        if let Some(command) = command_result {
            match command {
                CommandMessage::StopActor => {
                    rx.close();
                }
                CommandMessage::ForceStopActor => {
                    return Ok(());
                }
                CommandMessage::RestartActor => {
                    actor.on_stop().await?;

                    actor.init().await?;
                }
            };
        }

        if rx.is_closed() {
            break;
        }
    }

    actor.on_stop().await?;

    Ok(())
}

async fn handle_message<A, M, E>(
    actor: &mut A,
    msg: ActorMessage<M>,
) -> Result<Option<CommandMessage>, ActorHandleErrorFailure>
where
    M: Send + 'static,
    A: Send + Sync + ActorMessageHandlerTrait<M, E> + ActorTrait<E> + 'static,
    E: Send + Debug + Display + 'static,
{
    let result = match msg {
        ActorMessage::CommandMessage(command) => Some(command),

        ActorMessage::RegularMessage { msg, sent_at, ttl } => {
            if match ttl {
                Some(ttl) => sent_at.elapsed() > ttl,
                None => false,
            } {
                return Ok(None);
            }

            let result = actor
                .__handle(msg)
                .await
                .inspect_err(|err| log::error(format!("{err}")));

            match result {
                Ok(_) => None,
                Err(err) => actor.on_error(err).await?,
            }
        }
    };

    Ok(result)
}