Skip to main content

actors_rs/system/
timer.rs

1use std::{
2    sync::mpsc,
3    thread,
4    time::{Duration, Instant},
5};
6
7use chrono::{DateTime, Utc};
8use config::Config;
9use uuid::Uuid;
10
11use crate::actor_ref::{ActorRef, BasicActorRef, Sender};
12use crate::{AnyMessage, Message};
13
14pub type TimerRef = mpsc::Sender<Job>;
15
16pub trait Timer {
17    fn schedule<T, M>(
18        &self,
19        initial_delay: Duration,
20        interval: Duration,
21        receiver: ActorRef<M>,
22        sender: Sender,
23        msg: T,
24    ) -> Uuid
25    where
26        T: Message + Into<M>,
27        M: Message;
28
29    fn schedule_once<T, M>(
30        &self,
31        delay: Duration,
32        receiver: ActorRef<M>,
33        sender: Sender,
34        msg: T,
35    ) -> Uuid
36    where
37        T: Message + Into<M>,
38        M: Message;
39
40    fn schedule_at_time<T, M>(
41        &self,
42        time: DateTime<Utc>,
43        receiver: ActorRef<M>,
44        sender: Sender,
45        msg: T,
46    ) -> Uuid
47    where
48        T: Message + Into<M>,
49        M: Message;
50
51    fn cancel_schedule(&self, id: Uuid);
52}
53
54pub enum Job {
55    Once(OnceJob),
56    Repeat(RepeatJob),
57    Cancel(Uuid),
58}
59
60pub struct OnceJob {
61    pub id: Uuid,
62    pub send_at: Instant,
63    pub receiver: BasicActorRef,
64    pub sender: Sender,
65    pub msg: AnyMessage,
66}
67
68impl OnceJob {
69    pub fn send(mut self) {
70        let _ = self.receiver.try_tell_any(&mut self.msg, self.sender);
71    }
72}
73
74pub struct RepeatJob {
75    pub id: Uuid,
76    pub send_at: Instant,
77    pub interval: Duration,
78    pub receiver: BasicActorRef,
79    pub sender: Sender,
80    pub msg: AnyMessage,
81}
82
83impl RepeatJob {
84    pub fn send(&mut self) {
85        let _ = self
86            .receiver
87            .try_tell_any(&mut self.msg, self.sender.clone());
88    }
89}
90
91// Default timer implementation
92
93pub struct BasicTimer {
94    once_jobs: Vec<OnceJob>,
95    repeat_jobs: Vec<RepeatJob>,
96}
97
98impl BasicTimer {
99    #[must_use]
100    pub fn start(cfg: &Config) -> TimerRef {
101        let cfg = BasicTimerConfig::from(cfg);
102
103        let mut process = Self {
104            once_jobs: Vec::new(),
105            repeat_jobs: Vec::new(),
106        };
107
108        let (tx, rx) = mpsc::channel();
109        thread::spawn(move || loop {
110            process.execute_once_jobs();
111            process.execute_repeat_jobs();
112
113            if let Ok(job) = rx.try_recv() {
114                match job {
115                    Job::Cancel(id) => process.cancel(&id),
116                    Job::Once(job) => process.schedule_once(job),
117                    Job::Repeat(job) => process.schedule_repeat(job),
118                }
119            }
120
121            thread::sleep(Duration::from_millis(cfg.frequency_millis));
122        });
123
124        tx
125    }
126
127    pub fn execute_once_jobs(&mut self) {
128        let (send, keep): (Vec<OnceJob>, Vec<OnceJob>) = self
129            .once_jobs
130            .drain(..)
131            .partition(|j| Instant::now() >= j.send_at);
132
133        // send those messages where the 'send_at' time has been reached or elapsed
134        for job in send {
135            job.send();
136        }
137
138        // for those messages that are not to be sent yet, just put them back on the vec
139        for job in keep {
140            self.once_jobs.push(job);
141        }
142    }
143
144    pub fn execute_repeat_jobs(&mut self) {
145        for job in &mut self.repeat_jobs {
146            if Instant::now() >= job.send_at {
147                job.send_at = Instant::now() + job.interval;
148                job.send();
149            }
150        }
151    }
152
153    pub fn cancel(&mut self, id: &Uuid) {
154        // slightly sub optimal way of canceling because we don't know the job type
155        // so need to do the remove on both vecs
156
157        if let Some(pos) = self.once_jobs.iter().position(|job| &job.id == id) {
158            self.once_jobs.remove(pos);
159        }
160
161        if let Some(pos) = self.repeat_jobs.iter().position(|job| &job.id == id) {
162            self.repeat_jobs.remove(pos);
163        }
164    }
165
166    pub fn schedule_once(&mut self, job: OnceJob) {
167        if Instant::now() >= job.send_at {
168            job.send();
169        } else {
170            self.once_jobs.push(job);
171        }
172    }
173
174    pub fn schedule_repeat(&mut self, mut job: RepeatJob) {
175        if Instant::now() >= job.send_at {
176            job.send();
177        }
178        self.repeat_jobs.push(job);
179    }
180}
181
182struct BasicTimerConfig {
183    frequency_millis: u64,
184}
185
186impl<'a> From<&'a Config> for BasicTimerConfig {
187    fn from(config: &Config) -> Self {
188        Self {
189            frequency_millis: config.get::<u64>("scheduler.frequency_millis").unwrap(),
190        }
191    }
192}