Skip to main content

coerce_rt/actor/scheduler/
mod.rs

1use crate::actor::context::ActorHandlerContext;
2use crate::actor::lifecycle::actor_loop;
3use crate::actor::message::{Handler, Message};
4use crate::actor::{Actor, ActorId, ActorRef, BoxedActorRef, GetActorRef};
5
6use std::collections::HashMap;
7use std::marker::PhantomData;
8
9pub mod timer;
10
11pub struct ActorScheduler {
12    actors: HashMap<ActorId, BoxedActorRef>,
13}
14
15impl ActorScheduler {
16    pub fn new() -> ActorRef<ActorScheduler> {
17        start_actor(
18            ActorScheduler {
19                actors: HashMap::new(),
20            },
21            ActorType::Anonymous,
22            None,
23            None,
24        )
25    }
26}
27
28#[async_trait]
29impl Actor for ActorScheduler {}
30
31#[derive(Clone, Copy)]
32pub enum ActorType {
33    Tracked,
34    Anonymous,
35}
36
37impl ActorType {
38    pub fn is_tracked(&self) -> bool {
39        match &self {
40            &ActorType::Tracked => true,
41            _ => false,
42        }
43    }
44
45    pub fn is_anon(&self) -> bool {
46        match &self {
47            &ActorType::Anonymous => true,
48            _ => false,
49        }
50    }
51}
52
53pub struct RegisterActor<A: Actor>(pub A, pub ActorType, pub tokio::sync::oneshot::Sender<bool>)
54where
55    A: 'static + Sync + Send;
56
57impl<A: Actor> Message for RegisterActor<A>
58where
59    A: 'static + Sync + Send,
60{
61    type Result = ActorRef<A>;
62}
63
64pub struct DeregisterActor(pub ActorId);
65
66impl Message for DeregisterActor {
67    type Result = ();
68}
69
70pub struct GetActor<A: Actor>
71where
72    A: 'static + Sync + Send,
73{
74    id: ActorId,
75    _a: PhantomData<A>,
76}
77
78impl<A: Actor> Message for GetActor<A>
79where
80    A: 'static + Sync + Send,
81{
82    type Result = Option<ActorRef<A>>;
83}
84
85impl<A: Actor> GetActor<A>
86where
87    A: 'static + Sync + Send,
88{
89    pub fn new(id: ActorId) -> GetActor<A> {
90        GetActor {
91            id,
92            _a: PhantomData,
93        }
94    }
95}
96
97#[async_trait]
98impl<A: Actor> Handler<RegisterActor<A>> for ActorScheduler
99where
100    A: 'static + Sync + Send,
101{
102    async fn handle(
103        &mut self,
104        message: RegisterActor<A>,
105        ctx: &mut ActorHandlerContext,
106    ) -> ActorRef<A> {
107        let actor_tyoe = message.1;
108        let actor = start_actor(
109            message.0,
110            actor_tyoe,
111            Some(message.2),
112            Some(self.get_ref(ctx)),
113        );
114
115        if actor_tyoe.is_tracked() {
116            let _ = self
117                .actors
118                .insert(actor.id, BoxedActorRef::from(actor.clone()));
119
120            warn!(target: "ActorScheduler", "actor {} registered", actor.id);
121        }
122
123        actor
124    }
125}
126
127#[async_trait]
128impl Handler<DeregisterActor> for ActorScheduler {
129    async fn handle(&mut self, msg: DeregisterActor, _ctx: &mut ActorHandlerContext) -> () {
130        if let Some(a) = self.actors.remove(&msg.0) {
131            trace!(target: "ActorScheduler", "de-registered actor {}", msg.0);
132        } else {
133            warn!(target: "ActorScheduler", "actor {} not found to de-register", msg.0);
134        }
135    }
136}
137
138#[async_trait]
139impl<A: Actor> Handler<GetActor<A>> for ActorScheduler
140where
141    A: 'static + Sync + Send,
142{
143    async fn handle(
144        &mut self,
145        message: GetActor<A>,
146        _ctx: &mut ActorHandlerContext,
147    ) -> Option<ActorRef<A>> {
148        match self.actors.get(&message.id) {
149            Some(actor) => Some(ActorRef::<A>::from(actor.clone())),
150            None => None,
151        }
152    }
153}
154
155fn start_actor<A: Actor>(
156    actor: A,
157    actor_type: ActorType,
158    on_start: Option<tokio::sync::oneshot::Sender<bool>>,
159    scheduler: Option<ActorRef<ActorScheduler>>,
160) -> ActorRef<A>
161where
162    A: 'static + Send + Sync,
163{
164    let id = ActorId::new_v4();
165    let (tx, rx) = tokio::sync::mpsc::channel(128);
166
167    let actor_ref = ActorRef {
168        id: id.clone(),
169        sender: tx,
170    };
171
172    tokio::spawn(actor_loop(
173        id.clone(),
174        actor,
175        actor_type,
176        rx,
177        on_start,
178        actor_ref.clone(),
179        scheduler,
180    ));
181
182    actor_ref
183}