actors_rs/system/
timer.rs1use std::{
2 sync::mpsc,
3 thread,
4 time::{Duration, Instant},
5};
6
7use chrono::{DateTime, Utc};
8use config::Config;
9use uuid::Uuid;
10
11use crate::actor_ref::{ActorRef, BasicActorRef, Sender};
12use crate::{AnyMessage, Message};
13
14pub type TimerRef = mpsc::Sender<Job>;
15
16pub trait Timer {
17 fn schedule<T, M>(
18 &self,
19 initial_delay: Duration,
20 interval: Duration,
21 receiver: ActorRef<M>,
22 sender: Sender,
23 msg: T,
24 ) -> Uuid
25 where
26 T: Message + Into<M>,
27 M: Message;
28
29 fn schedule_once<T, M>(
30 &self,
31 delay: Duration,
32 receiver: ActorRef<M>,
33 sender: Sender,
34 msg: T,
35 ) -> Uuid
36 where
37 T: Message + Into<M>,
38 M: Message;
39
40 fn schedule_at_time<T, M>(
41 &self,
42 time: DateTime<Utc>,
43 receiver: ActorRef<M>,
44 sender: Sender,
45 msg: T,
46 ) -> Uuid
47 where
48 T: Message + Into<M>,
49 M: Message;
50
51 fn cancel_schedule(&self, id: Uuid);
52}
53
54pub enum Job {
55 Once(OnceJob),
56 Repeat(RepeatJob),
57 Cancel(Uuid),
58}
59
60pub struct OnceJob {
61 pub id: Uuid,
62 pub send_at: Instant,
63 pub receiver: BasicActorRef,
64 pub sender: Sender,
65 pub msg: AnyMessage,
66}
67
68impl OnceJob {
69 pub fn send(mut self) {
70 let _ = self.receiver.try_tell_any(&mut self.msg, self.sender);
71 }
72}
73
74pub struct RepeatJob {
75 pub id: Uuid,
76 pub send_at: Instant,
77 pub interval: Duration,
78 pub receiver: BasicActorRef,
79 pub sender: Sender,
80 pub msg: AnyMessage,
81}
82
83impl RepeatJob {
84 pub fn send(&mut self) {
85 let _ = self
86 .receiver
87 .try_tell_any(&mut self.msg, self.sender.clone());
88 }
89}
90
91pub struct BasicTimer {
94 once_jobs: Vec<OnceJob>,
95 repeat_jobs: Vec<RepeatJob>,
96}
97
98impl BasicTimer {
99 #[must_use]
100 pub fn start(cfg: &Config) -> TimerRef {
101 let cfg = BasicTimerConfig::from(cfg);
102
103 let mut process = Self {
104 once_jobs: Vec::new(),
105 repeat_jobs: Vec::new(),
106 };
107
108 let (tx, rx) = mpsc::channel();
109 thread::spawn(move || loop {
110 process.execute_once_jobs();
111 process.execute_repeat_jobs();
112
113 if let Ok(job) = rx.try_recv() {
114 match job {
115 Job::Cancel(id) => process.cancel(&id),
116 Job::Once(job) => process.schedule_once(job),
117 Job::Repeat(job) => process.schedule_repeat(job),
118 }
119 }
120
121 thread::sleep(Duration::from_millis(cfg.frequency_millis));
122 });
123
124 tx
125 }
126
127 pub fn execute_once_jobs(&mut self) {
128 let (send, keep): (Vec<OnceJob>, Vec<OnceJob>) = self
129 .once_jobs
130 .drain(..)
131 .partition(|j| Instant::now() >= j.send_at);
132
133 for job in send {
135 job.send();
136 }
137
138 for job in keep {
140 self.once_jobs.push(job);
141 }
142 }
143
144 pub fn execute_repeat_jobs(&mut self) {
145 for job in &mut self.repeat_jobs {
146 if Instant::now() >= job.send_at {
147 job.send_at = Instant::now() + job.interval;
148 job.send();
149 }
150 }
151 }
152
153 pub fn cancel(&mut self, id: &Uuid) {
154 if let Some(pos) = self.once_jobs.iter().position(|job| &job.id == id) {
158 self.once_jobs.remove(pos);
159 }
160
161 if let Some(pos) = self.repeat_jobs.iter().position(|job| &job.id == id) {
162 self.repeat_jobs.remove(pos);
163 }
164 }
165
166 pub fn schedule_once(&mut self, job: OnceJob) {
167 if Instant::now() >= job.send_at {
168 job.send();
169 } else {
170 self.once_jobs.push(job);
171 }
172 }
173
174 pub fn schedule_repeat(&mut self, mut job: RepeatJob) {
175 if Instant::now() >= job.send_at {
176 job.send();
177 }
178 self.repeat_jobs.push(job);
179 }
180}
181
182struct BasicTimerConfig {
183 frequency_millis: u64,
184}
185
186impl<'a> From<&'a Config> for BasicTimerConfig {
187 fn from(config: &Config) -> Self {
188 Self {
189 frequency_millis: config.get::<u64>("scheduler.frequency_millis").unwrap(),
190 }
191 }
192}