Skip to main content

coerce_rt/actor/
lifecycle.rs

1use crate::actor::context::ActorStatus::{Started, Starting, Stopped, Stopping};
2use crate::actor::context::{ActorHandlerContext, ActorStatus};
3use crate::actor::message::{Handler, Message, MessageHandler};
4use crate::actor::scheduler::{ActorScheduler, ActorType, DeregisterActor};
5use crate::actor::{Actor, ActorId, ActorRef};
6
7pub struct Status();
8
9pub struct Stop();
10
11impl Message for Status {
12    type Result = ActorStatus;
13}
14
15impl Message for Stop {
16    type Result = ActorStatus;
17}
18
19#[async_trait]
20impl<A> Handler<Status> for A
21where
22    A: 'static + Actor + Sync + Send,
23{
24    async fn handle(&mut self, _message: Status, ctx: &mut ActorHandlerContext) -> ActorStatus {
25        ctx.get_status().clone()
26    }
27}
28
29#[async_trait]
30impl<A: Actor> Handler<Stop> for A
31where
32    A: 'static + Sync + Send,
33{
34    async fn handle(&mut self, _message: Stop, ctx: &mut ActorHandlerContext) -> ActorStatus {
35        ctx.set_status(Stopping);
36
37        Stopping
38    }
39}
40
41pub async fn actor_loop<A: Actor>(
42    id: ActorId,
43    mut actor: A,
44    actor_type: ActorType,
45    mut rx: tokio::sync::mpsc::Receiver<MessageHandler<A>>,
46    on_start: Option<tokio::sync::oneshot::Sender<bool>>,
47    actor_ref: ActorRef<A>,
48    scheduler: Option<ActorRef<ActorScheduler>>,
49) where
50    A: 'static + Send + Sync,
51{
52    let mut ctx = ActorHandlerContext::new(id, Starting, actor_ref.into());
53    trace!(target: "ActorLoop", "[{}] starting", ctx.id());
54
55    actor.started(&mut ctx).await;
56
57    match ctx.get_status() {
58        Stopping => return,
59        _ => {}
60    };
61
62    ctx.set_status(Started);
63
64    trace!(target: "ActorLoop", "[{}] ready", ctx.id());
65
66    if let Some(on_start) = on_start {
67        let _ = on_start.send(true);
68    }
69
70    while let Some(mut msg) = rx.recv().await {
71        trace!(target: "ActorLoop", "[{}] recv", ctx.id());
72
73        msg.handle(&mut actor, &mut ctx).await;
74
75        match ctx.get_status() {
76            Stopping => break,
77            _ => {}
78        }
79    }
80
81    trace!(target: "ActorLoop", "[{}] stopping", ctx.id());
82
83    ctx.set_status(Stopping);
84
85    actor.stopped(&mut ctx).await;
86
87    ctx.set_status(Stopped);
88
89    if actor_type.is_tracked() {
90        if let Some(mut scheduler) = scheduler {
91            scheduler
92                .send(DeregisterActor(id))
93                .await
94                .expect("de-register actor");
95        }
96    }
97}