gw_bin/triggers/
schedule.rs

1use super::{Trigger, TriggerError};
2use crate::context::Context;
3use duration_string::DurationString;
4use log::info;
5use std::{
6    collections::HashMap,
7    sync::mpsc::Sender,
8    thread::sleep,
9    time::{Duration, Instant},
10};
11use thiserror::Error;
12
13const TRIGGER_NAME: &str = "SCHEDULE";
14
15/// A trigger that runs the checks periodically.
16///
17/// This is running in an infinite loop, triggering every time.
18pub struct ScheduleTrigger {
19    duration: Duration,
20    timeout: Option<Duration>,
21}
22
23/// Custom error describing the error cases for the ScheduleTrigger.
24#[derive(Debug, Error)]
25pub enum ScheduleError {
26    /// Cannot send trigger with Sender. This usually because the receiver is dropped.
27    #[error("cannot trigger changes, receiver hang up")]
28    ReceiverHangup(#[from] std::sync::mpsc::SendError<Option<Context>>),
29}
30
31impl From<ScheduleError> for TriggerError {
32    fn from(val: ScheduleError) -> Self {
33        match val {
34            ScheduleError::ReceiverHangup(s) => TriggerError::ReceiverHangup(s),
35        }
36    }
37}
38
39impl ScheduleTrigger {
40    /// Creates a new ScheduleTrigger with duration.
41    pub fn new(duration: Duration) -> Self {
42        Self {
43            duration,
44            timeout: None,
45        }
46    }
47
48    /// Creates a new ScheduleTrigger with duration and timeout.
49    pub fn new_with_timeout(duration: Duration, timeout: Duration) -> Self {
50        Self {
51            duration,
52            timeout: Some(timeout),
53        }
54    }
55
56    /// Runs one step in the scheduled time process. Returns true, if it should continue,
57    /// returns false in case of an error or a timeout. One step should take exactly the duration.
58    /// In case of an error it terminates or if it will reach the final timeout it will
59    /// wait until the end of the timeout and returns with false.
60    pub fn step(
61        &self,
62        tx: Sender<Option<Context>>,
63        final_timeout: Option<Instant>,
64    ) -> Result<bool, ScheduleError> {
65        let next_check = Instant::now() + self.duration;
66
67        let context: Context = HashMap::from([
68            ("TRIGGER_NAME", TRIGGER_NAME.to_string()),
69            (
70                "SCHEDULE_DELAY",
71                DurationString::from(self.duration).to_string(),
72            ),
73        ]);
74        tx.send(Some(context))?;
75
76        if let Some(final_timeout) = final_timeout {
77            if next_check > final_timeout {
78                let until_final_timeout = final_timeout - Instant::now();
79                sleep(until_final_timeout);
80                return Ok(false);
81            }
82        }
83        // TODO: handle overlaps
84        let until_next_check = next_check - Instant::now();
85        sleep(until_next_check);
86
87        // We should handle if the sleep was too long and it went over the timeout
88        if let Some(final_timeout) = final_timeout {
89            Ok(Instant::now() < final_timeout)
90        } else {
91            Ok(true)
92        }
93    }
94}
95
96impl Trigger for ScheduleTrigger {
97    /// Starts a scheduled trigger on a new thread, starting the steps in a loop.
98    /// Every step triggers and then waits the given duration. In case of an error,
99    /// it terminates or if it will reach the final timeout it will wait until
100    /// the end of the timeout and return.
101    fn listen(&self, tx: Sender<Option<Context>>) -> Result<(), TriggerError> {
102        let final_timeout = self.timeout.map(|t| Instant::now() + t);
103        info!(
104            "Starting schedule in every {}.",
105            DurationString::new(self.duration)
106        );
107
108        loop {
109            let should_continue = self.step(tx.clone(), final_timeout)?;
110            if !should_continue {
111                break;
112            }
113        }
114
115        Ok(())
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use crate::triggers::TriggerError;
123    use std::{sync::mpsc, time::Instant};
124
125    #[test]
126    fn it_should_be_created_from_duration() {
127        let trigger = ScheduleTrigger::new(Duration::from_millis(100));
128        assert_eq!(Duration::from_millis(100), trigger.duration);
129        assert_eq!(None, trigger.timeout);
130    }
131
132    #[test]
133    fn it_should_be_created_from_duration_and_timeout() {
134        let trigger = ScheduleTrigger::new_with_timeout(
135            Duration::from_millis(100),
136            Duration::from_millis(200),
137        );
138        assert_eq!(Duration::from_millis(100), trigger.duration);
139        assert_eq!(Some(Duration::from_millis(200)), trigger.timeout);
140    }
141
142    #[test]
143    fn it_should_trigger_every_100_ms() -> Result<(), TriggerError> {
144        let trigger = ScheduleTrigger::new(Duration::from_millis(100));
145        let (tx, rx) = mpsc::channel::<Option<Context>>();
146
147        for _ in 0..5 {
148            let start = Instant::now();
149
150            let should_continue = trigger.step(tx.clone(), None)?;
151            assert!(should_continue);
152
153            // It should be close to the timings
154            let msg = rx.recv().unwrap();
155            let diff = start.elapsed();
156            assert!(
157                diff >= Duration::from_millis(95),
158                "Diff {} should be later than 95ms.",
159                DurationString::from(diff)
160            );
161
162            // It should contain the hashmap
163            let context = msg.unwrap();
164            assert_eq!(TRIGGER_NAME, context.get("TRIGGER_NAME").unwrap());
165            assert_eq!("100ms", context.get("SCHEDULE_DELAY").unwrap());
166        }
167
168        Ok(())
169    }
170
171    #[test]
172    fn it_should_not_continue_after_the_timeout() -> Result<(), TriggerError> {
173        let trigger = ScheduleTrigger::new(Duration::from_millis(100));
174        let (tx, _rx) = mpsc::channel::<Option<Context>>();
175
176        let start = Instant::now();
177        let final_timeout = start + Duration::from_millis(350);
178        for _ in 0..5 {
179            let should_continue = trigger.step(tx.clone(), Some(final_timeout))?;
180
181            // First three should pass, last two fail
182            if Instant::now() < final_timeout {
183                assert!(
184                    should_continue,
185                    "Should continue after {} passed, before 300ms.",
186                    DurationString::from(start.elapsed())
187                );
188            } else {
189                assert!(
190                    !should_continue,
191                    "Should continue after {} passed, after 300ms.",
192                    DurationString::from(start.elapsed())
193                );
194            };
195        }
196
197        Ok(())
198    }
199
200    #[test]
201    fn it_should_not_trigger_on_a_send_error() {
202        let trigger = ScheduleTrigger::new(Duration::from_millis(100));
203        let (tx, rx) = mpsc::channel::<Option<Context>>();
204
205        // Close receiving end, to create a send error
206        drop(rx);
207
208        let final_timeout = Instant::now() + Duration::from_millis(350);
209        let result = trigger.step(tx.clone(), Some(final_timeout));
210
211        // It should fail, because of ReceiverHangup
212        assert!(
213            matches!(result, Err(ScheduleError::ReceiverHangup(_)),),
214            "{result:?} should be ReceiverHangup"
215        );
216    }
217}