coerce_rt/actor/
lifecycle.rs1use crate::actor::context::ActorStatus::{Started, Starting, Stopped, Stopping};
2use crate::actor::context::{ActorHandlerContext, ActorStatus};
3use crate::actor::message::{Handler, Message, MessageHandler};
4use crate::actor::scheduler::{ActorScheduler, ActorType, DeregisterActor};
5use crate::actor::{Actor, ActorId, ActorRef};
6
7pub struct Status();
8
9pub struct Stop();
10
11impl Message for Status {
12 type Result = ActorStatus;
13}
14
15impl Message for Stop {
16 type Result = ActorStatus;
17}
18
19#[async_trait]
20impl<A> Handler<Status> for A
21where
22 A: 'static + Actor + Sync + Send,
23{
24 async fn handle(&mut self, _message: Status, ctx: &mut ActorHandlerContext) -> ActorStatus {
25 ctx.get_status().clone()
26 }
27}
28
29#[async_trait]
30impl<A: Actor> Handler<Stop> for A
31where
32 A: 'static + Sync + Send,
33{
34 async fn handle(&mut self, _message: Stop, ctx: &mut ActorHandlerContext) -> ActorStatus {
35 ctx.set_status(Stopping);
36
37 Stopping
38 }
39}
40
41pub async fn actor_loop<A: Actor>(
42 id: ActorId,
43 mut actor: A,
44 actor_type: ActorType,
45 mut rx: tokio::sync::mpsc::Receiver<MessageHandler<A>>,
46 on_start: Option<tokio::sync::oneshot::Sender<bool>>,
47 actor_ref: ActorRef<A>,
48 scheduler: Option<ActorRef<ActorScheduler>>,
49) where
50 A: 'static + Send + Sync,
51{
52 let mut ctx = ActorHandlerContext::new(id, Starting, actor_ref.into());
53 trace!(target: "ActorLoop", "[{}] starting", ctx.id());
54
55 actor.started(&mut ctx).await;
56
57 match ctx.get_status() {
58 Stopping => return,
59 _ => {}
60 };
61
62 ctx.set_status(Started);
63
64 trace!(target: "ActorLoop", "[{}] ready", ctx.id());
65
66 if let Some(on_start) = on_start {
67 let _ = on_start.send(true);
68 }
69
70 while let Some(mut msg) = rx.recv().await {
71 trace!(target: "ActorLoop", "[{}] recv", ctx.id());
72
73 msg.handle(&mut actor, &mut ctx).await;
74
75 match ctx.get_status() {
76 Stopping => break,
77 _ => {}
78 }
79 }
80
81 trace!(target: "ActorLoop", "[{}] stopping", ctx.id());
82
83 ctx.set_status(Stopping);
84
85 actor.stopped(&mut ctx).await;
86
87 ctx.set_status(Stopped);
88
89 if actor_type.is_tracked() {
90 if let Some(mut scheduler) = scheduler {
91 scheduler
92 .send(DeregisterActor(id))
93 .await
94 .expect("de-register actor");
95 }
96 }
97}