coerce_rt/actor/scheduler/
mod.rs1use crate::actor::context::ActorHandlerContext;
2use crate::actor::lifecycle::actor_loop;
3use crate::actor::message::{Handler, Message};
4use crate::actor::{Actor, ActorId, ActorRef, BoxedActorRef, GetActorRef};
5
6use std::collections::HashMap;
7use std::marker::PhantomData;
8
9pub mod timer;
10
11pub struct ActorScheduler {
12 actors: HashMap<ActorId, BoxedActorRef>,
13}
14
15impl ActorScheduler {
16 pub fn new() -> ActorRef<ActorScheduler> {
17 start_actor(
18 ActorScheduler {
19 actors: HashMap::new(),
20 },
21 ActorType::Anonymous,
22 None,
23 None,
24 )
25 }
26}
27
28#[async_trait]
29impl Actor for ActorScheduler {}
30
31#[derive(Clone, Copy)]
32pub enum ActorType {
33 Tracked,
34 Anonymous,
35}
36
37impl ActorType {
38 pub fn is_tracked(&self) -> bool {
39 match &self {
40 &ActorType::Tracked => true,
41 _ => false,
42 }
43 }
44
45 pub fn is_anon(&self) -> bool {
46 match &self {
47 &ActorType::Anonymous => true,
48 _ => false,
49 }
50 }
51}
52
53pub struct RegisterActor<A: Actor>(pub A, pub ActorType, pub tokio::sync::oneshot::Sender<bool>)
54where
55 A: 'static + Sync + Send;
56
57impl<A: Actor> Message for RegisterActor<A>
58where
59 A: 'static + Sync + Send,
60{
61 type Result = ActorRef<A>;
62}
63
64pub struct DeregisterActor(pub ActorId);
65
66impl Message for DeregisterActor {
67 type Result = ();
68}
69
70pub struct GetActor<A: Actor>
71where
72 A: 'static + Sync + Send,
73{
74 id: ActorId,
75 _a: PhantomData<A>,
76}
77
78impl<A: Actor> Message for GetActor<A>
79where
80 A: 'static + Sync + Send,
81{
82 type Result = Option<ActorRef<A>>;
83}
84
85impl<A: Actor> GetActor<A>
86where
87 A: 'static + Sync + Send,
88{
89 pub fn new(id: ActorId) -> GetActor<A> {
90 GetActor {
91 id,
92 _a: PhantomData,
93 }
94 }
95}
96
97#[async_trait]
98impl<A: Actor> Handler<RegisterActor<A>> for ActorScheduler
99where
100 A: 'static + Sync + Send,
101{
102 async fn handle(
103 &mut self,
104 message: RegisterActor<A>,
105 ctx: &mut ActorHandlerContext,
106 ) -> ActorRef<A> {
107 let actor_tyoe = message.1;
108 let actor = start_actor(
109 message.0,
110 actor_tyoe,
111 Some(message.2),
112 Some(self.get_ref(ctx)),
113 );
114
115 if actor_tyoe.is_tracked() {
116 let _ = self
117 .actors
118 .insert(actor.id, BoxedActorRef::from(actor.clone()));
119
120 warn!(target: "ActorScheduler", "actor {} registered", actor.id);
121 }
122
123 actor
124 }
125}
126
127#[async_trait]
128impl Handler<DeregisterActor> for ActorScheduler {
129 async fn handle(&mut self, msg: DeregisterActor, _ctx: &mut ActorHandlerContext) -> () {
130 if let Some(a) = self.actors.remove(&msg.0) {
131 trace!(target: "ActorScheduler", "de-registered actor {}", msg.0);
132 } else {
133 warn!(target: "ActorScheduler", "actor {} not found to de-register", msg.0);
134 }
135 }
136}
137
138#[async_trait]
139impl<A: Actor> Handler<GetActor<A>> for ActorScheduler
140where
141 A: 'static + Sync + Send,
142{
143 async fn handle(
144 &mut self,
145 message: GetActor<A>,
146 _ctx: &mut ActorHandlerContext,
147 ) -> Option<ActorRef<A>> {
148 match self.actors.get(&message.id) {
149 Some(actor) => Some(ActorRef::<A>::from(actor.clone())),
150 None => None,
151 }
152 }
153}
154
155fn start_actor<A: Actor>(
156 actor: A,
157 actor_type: ActorType,
158 on_start: Option<tokio::sync::oneshot::Sender<bool>>,
159 scheduler: Option<ActorRef<ActorScheduler>>,
160) -> ActorRef<A>
161where
162 A: 'static + Send + Sync,
163{
164 let id = ActorId::new_v4();
165 let (tx, rx) = tokio::sync::mpsc::channel(128);
166
167 let actor_ref = ActorRef {
168 id: id.clone(),
169 sender: tx,
170 };
171
172 tokio::spawn(actor_loop(
173 id.clone(),
174 actor,
175 actor_type,
176 rx,
177 on_start,
178 actor_ref.clone(),
179 scheduler,
180 ));
181
182 actor_ref
183}