coerce 0.8.6

Async actor runtime and distributed systems framework
Documentation
//! Actor lifecycle and [`ActorLoop`][ActorLoop] implementation

use crate::actor::context::ActorStatus::{Started, Starting, Stopped, Stopping};
use crate::actor::context::{ActorContext, ActorStatus};
use crate::actor::message::{Handler, Message, MessageHandler};
use crate::actor::metrics::ActorMetrics;
use crate::actor::scheduler::{ActorType, DeregisterActor};
use crate::actor::system::ActorSystem;
use crate::actor::{Actor, ActorId, BoxedActorRef, LocalActorRef};

use tokio::sync::mpsc::UnboundedReceiver;
use tracing::Instrument;
use valuable::Valuable;

use tokio::sync::oneshot::Sender;

pub struct Status;

pub struct Stop(pub Option<Sender<()>>);

impl Message for Status {
    type Result = ActorStatus;
}

impl Message for Stop {
    type Result = ();
}

#[async_trait]
impl<A> Handler<Status> for A
where
    A: Actor,
{
    async fn handle(&mut self, _message: Status, ctx: &mut ActorContext) -> ActorStatus {
        ctx.get_status().clone()
    }
}

#[async_trait]
impl<A: Actor> Handler<Stop> for A {
    async fn handle(&mut self, stop: Stop, ctx: &mut ActorContext) {
        ctx.stop(stop.0);
    }
}

pub struct ActorLoop {}

impl ActorLoop {
    pub async fn run<A: Actor>(
        mut actor: A,
        actor_type: ActorType,
        mut receiver: UnboundedReceiver<MessageHandler<A>>,
        mut on_start: Option<Sender<()>>,
        actor_ref: LocalActorRef<A>,
        parent_ref: Option<BoxedActorRef>,
        mut system: Option<ActorSystem>,
    ) {
        let actor_id = actor_ref.actor_id().clone();
        let mut ctx = actor
            .new_context(system.clone(), Starting, actor_ref.clone().into())
            .with_parent(parent_ref);

        trace!("[{}] starting", ctx.full_path());

        actor.started(&mut ctx).await;
        ActorMetrics::incr_actor_created(A::type_name());

        if ctx.get_status() == &Stopping {
            return actor_stopped(&mut actor, actor_type, &mut system, &actor_id, &mut ctx).await;
        }

        ctx.set_status(Started);

        trace!("[{}] ready", ctx.full_path());

        if let Some(on_start) = on_start.take() {
            let _ = on_start.send(());
        }

        let log = ctx.log();
        while let Some(mut msg) = receiver.recv().await {
            {
                #[cfg(feature = "actor-tracing-info")]
                let span = tracing::info_span!(
                    "actor.recv",
                    ctx = log.as_value(),
                    message_type = msg.name(),
                );

                #[cfg(feature = "actor-tracing-debug")]
                let span = tracing::debug_span!(
                    "actor.recv",
                    ctx = log.as_value(),
                    message_type = msg.name(),
                );

                #[cfg(feature = "actor-tracing-trace")]
                let span = tracing::trace_span!(
                    "actor.recv",
                    ctx = log.as_value(),
                    message_type = msg.name(),
                );

                trace!("[{}] received {}", ctx.full_path(), msg.name(),);

                let handle_fut = msg.handle(&mut actor, &mut ctx);

                #[cfg(feature = "actor-tracing")]
                let handle_fut = handle_fut.instrument(span);

                handle_fut.await;

                trace!("[{}] processed {}", ctx.full_path(), msg.name());
            }

            if ctx.get_status() == &Stopping {
                break;
            }
        }

        trace!("[{}] stopping", ctx.full_path());

        ctx.set_status(Stopping);

        actor_stopped(&mut actor, actor_type, &mut system, &actor_id, &mut ctx).await
    }
}

async fn actor_stopped<A: Actor>(
    actor: &mut A,
    actor_type: ActorType,
    system: &mut Option<ActorSystem>,
    actor_id: &ActorId,
    mut ctx: &mut ActorContext,
) {
    actor.stopped(&mut ctx).await;

    ctx.set_status(Stopped);

    if actor_type.is_tracked() {
        if let Some(system) = system.take() {
            if !system.is_terminated() {
                trace!("de-registering actor {}", &actor_id);

                system
                    .scheduler()
                    .send(DeregisterActor(actor_id.clone()))
                    .await
                    .expect("de-register actor");
            }
        }
    }

    if let Some(on_stopped_handlers) = ctx.take_on_stopped_handlers() {
        for sender in on_stopped_handlers {
            let _ = sender.send(());
        }
    }
}