gw_bin/triggers/
schedule.rs1use super::{Trigger, TriggerError};
2use crate::context::Context;
3use duration_string::DurationString;
4use log::info;
5use std::{
6 collections::HashMap,
7 sync::mpsc::Sender,
8 thread::sleep,
9 time::{Duration, Instant},
10};
11use thiserror::Error;
12
13const TRIGGER_NAME: &str = "SCHEDULE";
14
15pub struct ScheduleTrigger {
19 duration: Duration,
20 timeout: Option<Duration>,
21}
22
23#[derive(Debug, Error)]
25pub enum ScheduleError {
26 #[error("cannot trigger changes, receiver hang up")]
28 ReceiverHangup(#[from] std::sync::mpsc::SendError<Option<Context>>),
29}
30
31impl From<ScheduleError> for TriggerError {
32 fn from(val: ScheduleError) -> Self {
33 match val {
34 ScheduleError::ReceiverHangup(s) => TriggerError::ReceiverHangup(s),
35 }
36 }
37}
38
39impl ScheduleTrigger {
40 pub fn new(duration: Duration) -> Self {
42 Self {
43 duration,
44 timeout: None,
45 }
46 }
47
48 pub fn new_with_timeout(duration: Duration, timeout: Duration) -> Self {
50 Self {
51 duration,
52 timeout: Some(timeout),
53 }
54 }
55
56 pub fn step(
61 &self,
62 tx: Sender<Option<Context>>,
63 final_timeout: Option<Instant>,
64 ) -> Result<bool, ScheduleError> {
65 let next_check = Instant::now() + self.duration;
66
67 let context: Context = HashMap::from([
68 ("TRIGGER_NAME", TRIGGER_NAME.to_string()),
69 (
70 "SCHEDULE_DELAY",
71 DurationString::from(self.duration).to_string(),
72 ),
73 ]);
74 tx.send(Some(context))?;
75
76 if let Some(final_timeout) = final_timeout {
77 if next_check > final_timeout {
78 let until_final_timeout = final_timeout - Instant::now();
79 sleep(until_final_timeout);
80 return Ok(false);
81 }
82 }
83 let until_next_check = next_check - Instant::now();
85 sleep(until_next_check);
86
87 if let Some(final_timeout) = final_timeout {
89 Ok(Instant::now() < final_timeout)
90 } else {
91 Ok(true)
92 }
93 }
94}
95
96impl Trigger for ScheduleTrigger {
97 fn listen(&self, tx: Sender<Option<Context>>) -> Result<(), TriggerError> {
102 let final_timeout = self.timeout.map(|t| Instant::now() + t);
103 info!(
104 "Starting schedule in every {}.",
105 DurationString::new(self.duration)
106 );
107
108 loop {
109 let should_continue = self.step(tx.clone(), final_timeout)?;
110 if !should_continue {
111 break;
112 }
113 }
114
115 Ok(())
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use crate::triggers::TriggerError;
123 use std::{sync::mpsc, time::Instant};
124
125 #[test]
126 fn it_should_be_created_from_duration() {
127 let trigger = ScheduleTrigger::new(Duration::from_millis(100));
128 assert_eq!(Duration::from_millis(100), trigger.duration);
129 assert_eq!(None, trigger.timeout);
130 }
131
132 #[test]
133 fn it_should_be_created_from_duration_and_timeout() {
134 let trigger = ScheduleTrigger::new_with_timeout(
135 Duration::from_millis(100),
136 Duration::from_millis(200),
137 );
138 assert_eq!(Duration::from_millis(100), trigger.duration);
139 assert_eq!(Some(Duration::from_millis(200)), trigger.timeout);
140 }
141
142 #[test]
143 fn it_should_trigger_every_100_ms() -> Result<(), TriggerError> {
144 let trigger = ScheduleTrigger::new(Duration::from_millis(100));
145 let (tx, rx) = mpsc::channel::<Option<Context>>();
146
147 for _ in 0..5 {
148 let start = Instant::now();
149
150 let should_continue = trigger.step(tx.clone(), None)?;
151 assert!(should_continue);
152
153 let msg = rx.recv().unwrap();
155 let diff = start.elapsed();
156 assert!(
157 diff >= Duration::from_millis(95),
158 "Diff {} should be later than 95ms.",
159 DurationString::from(diff)
160 );
161
162 let context = msg.unwrap();
164 assert_eq!(TRIGGER_NAME, context.get("TRIGGER_NAME").unwrap());
165 assert_eq!("100ms", context.get("SCHEDULE_DELAY").unwrap());
166 }
167
168 Ok(())
169 }
170
171 #[test]
172 fn it_should_not_continue_after_the_timeout() -> Result<(), TriggerError> {
173 let trigger = ScheduleTrigger::new(Duration::from_millis(100));
174 let (tx, _rx) = mpsc::channel::<Option<Context>>();
175
176 let start = Instant::now();
177 let final_timeout = start + Duration::from_millis(350);
178 for _ in 0..5 {
179 let should_continue = trigger.step(tx.clone(), Some(final_timeout))?;
180
181 if Instant::now() < final_timeout {
183 assert!(
184 should_continue,
185 "Should continue after {} passed, before 300ms.",
186 DurationString::from(start.elapsed())
187 );
188 } else {
189 assert!(
190 !should_continue,
191 "Should continue after {} passed, after 300ms.",
192 DurationString::from(start.elapsed())
193 );
194 };
195 }
196
197 Ok(())
198 }
199
200 #[test]
201 fn it_should_not_trigger_on_a_send_error() {
202 let trigger = ScheduleTrigger::new(Duration::from_millis(100));
203 let (tx, rx) = mpsc::channel::<Option<Context>>();
204
205 drop(rx);
207
208 let final_timeout = Instant::now() + Duration::from_millis(350);
209 let result = trigger.step(tx.clone(), Some(final_timeout));
210
211 assert!(
213 matches!(result, Err(ScheduleError::ReceiverHangup(_)),),
214 "{result:?} should be ReceiverHangup"
215 );
216 }
217}