1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
//! Actor lifecycle and [`ActorLoop`][ActorLoop] implementation

use crate::actor::context::ActorStatus::{Started, Starting, Stopped, Stopping};
use crate::actor::context::{ActorContext, ActorStatus};
use crate::actor::message::{Handler, Message, MessageHandler};
use crate::actor::metrics::ActorMetrics;
use crate::actor::scheduler::{ActorType, DeregisterActor};
use crate::actor::system::ActorSystem;
use crate::actor::{Actor, ActorId, BoxedActorRef, LocalActorRef};

use tokio::sync::mpsc::UnboundedReceiver;
use tracing::Instrument;
use valuable::Valuable;

use tokio::sync::oneshot::Sender;

pub struct Status;

pub struct Stop(pub Option<Sender<()>>);

impl Message for Status {
    type Result = ActorStatus;
}

impl Message for Stop {
    type Result = ();
}

#[async_trait]
impl<A> Handler<Status> for A
where
    A: Actor,
{
    async fn handle(&mut self, _message: Status, ctx: &mut ActorContext) -> ActorStatus {
        ctx.get_status().clone()
    }
}

#[async_trait]
impl<A: Actor> Handler<Stop> for A {
    async fn handle(&mut self, stop: Stop, ctx: &mut ActorContext) {
        ctx.stop(stop.0);
    }
}

pub struct ActorLoop {}

impl ActorLoop {
    pub async fn run<A: Actor>(
        mut actor: A,
        actor_type: ActorType,
        mut receiver: UnboundedReceiver<MessageHandler<A>>,
        mut on_start: Option<Sender<()>>,
        actor_ref: LocalActorRef<A>,
        parent_ref: Option<BoxedActorRef>,
        mut system: Option<ActorSystem>,
    ) {
        let actor_id = actor_ref.actor_id().clone();
        let mut ctx = actor
            .new_context(system.clone(), Starting, actor_ref.clone().into())
            .with_parent(parent_ref);

        trace!("[{}] starting", ctx.full_path());

        actor.started(&mut ctx).await;
        ActorMetrics::incr_actor_created(A::type_name());

        if ctx.get_status() == &Stopping {
            return actor_stopped(&mut actor, actor_type, &mut system, &actor_id, &mut ctx).await;
        }

        ctx.set_status(Started);

        trace!("[{}] ready", ctx.full_path());

        if let Some(on_start) = on_start.take() {
            let _ = on_start.send(());
        }

        let log = ctx.log();
        while let Some(mut msg) = receiver.recv().await {
            {
                #[cfg(feature = "actor-tracing-info")]
                let span = tracing::info_span!(
                    "actor.recv",
                    ctx = log.as_value(),
                    message_type = msg.name(),
                );

                #[cfg(feature = "actor-tracing-debug")]
                let span = tracing::debug_span!(
                    "actor.recv",
                    ctx = log.as_value(),
                    message_type = msg.name(),
                );

                #[cfg(feature = "actor-tracing-trace")]
                let span = tracing::trace_span!(
                    "actor.recv",
                    ctx = log.as_value(),
                    message_type = msg.name(),
                );

                trace!("[{}] received {}", ctx.full_path(), msg.name(),);

                let handle_fut = msg.handle(&mut actor, &mut ctx);

                #[cfg(feature = "actor-tracing")]
                let handle_fut = handle_fut.instrument(span);

                handle_fut.await;

                trace!("[{}] processed {}", ctx.full_path(), msg.name());
            }

            if ctx.get_status() == &Stopping {
                break;
            }
        }

        trace!("[{}] stopping", ctx.full_path());

        ctx.set_status(Stopping);

        actor_stopped(&mut actor, actor_type, &mut system, &actor_id, &mut ctx).await
    }
}

async fn actor_stopped<A: Actor>(
    actor: &mut A,
    actor_type: ActorType,
    system: &mut Option<ActorSystem>,
    actor_id: &ActorId,
    mut ctx: &mut ActorContext,
) {
    actor.stopped(&mut ctx).await;

    ctx.set_status(Stopped);

    if actor_type.is_tracked() {
        if let Some(system) = system.take() {
            if !system.is_terminated() {
                trace!("de-registering actor {}", &actor_id);

                system
                    .scheduler()
                    .send(DeregisterActor(actor_id.clone()))
                    .await
                    .expect("de-register actor");
            }
        }
    }

    if let Some(on_stopped_handlers) = ctx.take_on_stopped_handlers() {
        for sender in on_stopped_handlers {
            let _ = sender.send(());
        }
    }
}