rust_integration_services/schedule/
schedule_receiver.rs

1use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, NaiveTime, Utc};
2use tokio::{signal::unix::{signal, SignalKind}, sync::mpsc, task::JoinSet, time::sleep};
3use uuid::Uuid;
4
5use super::schedule_interval::ScheduleInterval;
6
7#[derive(Clone)]
8pub enum ScheduleReceiverEventSignal {
9    OnTrigger(String),
10}
11
12pub struct ScheduleReceiver {
13    interval: ScheduleInterval,
14    next_run: DateTime<Utc>,
15    event_broadcast: mpsc::Sender<ScheduleReceiverEventSignal>,
16    event_receiver: Option<mpsc::Receiver<ScheduleReceiverEventSignal>>,
17    event_join_set: JoinSet<()>,
18}
19
20impl ScheduleReceiver {
21    pub fn new() -> Self {
22        let (event_broadcast, event_receiver) = mpsc::channel(128);
23        ScheduleReceiver {
24            interval: ScheduleInterval::None,
25            next_run: Utc::now(),
26            event_broadcast,
27            event_receiver: Some(event_receiver),
28            event_join_set: JoinSet::new(),
29        }
30    }
31    
32    /// Sets the `UTC` start date for the scheduled task.
33    /// 
34    /// If the provided date is in the past, the scheduler will calculate the next valid future run based on the defined interval.
35    /// 
36    /// Note: Scheduler is using `UTC: Coordinated Universal Time` to avoid daylight saving problems.
37    pub fn start_date(mut self, year: i32, month: u32, day: u32) -> Self {
38        let naive_date = NaiveDate::from_ymd_opt(year, month, day).expect("Not a valid date.");
39        let naive_time = self.next_run.time();
40        let naive_dt = naive_date.and_time(naive_time);
41        self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
42        self
43    }
44    
45    /// Sets the `UTC` start time for the scheduled task.
46    /// 
47    /// If the provided time is in the past, the scheduler will calculate the next valid future run based on the defined interval.
48    /// 
49    /// Note: Scheduler is using `UTC: Coordinated Universal Time` to avoid daylight saving problems.
50    pub fn start_time(mut self, hour: u32, minute: u32, second: u32) -> Self {
51        let naive_time = NaiveTime::from_hms_opt(hour, minute, second).expect("Not a valid time.");
52        let naive_date = self.next_run.date_naive();
53        let naive_dt = naive_date.and_time(naive_time);
54        self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
55        self
56    }
57    
58    /// Sets the interval of how frequently the task should run.
59    pub fn interval(mut self, interval: ScheduleInterval) -> Self {
60        self.interval = interval;
61        self
62    }
63
64    pub fn on_event<T, Fut>(mut self, handler: T) -> Self
65    where
66        T: Fn(ScheduleReceiverEventSignal) -> Fut + Send + Sync + 'static,
67        Fut: Future<Output = ()> + Send + 'static,
68    {
69        let mut receiver = self.event_receiver.unwrap();
70        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver.");
71        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver.");
72        
73        self.event_join_set.spawn(async move {
74            loop {
75                tokio::select! {
76                    _ = sigterm.recv() => break,
77                    _ = sigint.recv() => break,
78                    event = receiver.recv() => {
79                        match event {
80                            Some(event) => handler(event).await,
81                            None => break,
82                        }
83                    }
84                }
85            }
86        });
87        
88        self.event_receiver = None;
89        self
90    }
91
92    pub async fn receive(mut self) -> tokio::io::Result<()> {
93        let mut join_set = JoinSet::new();
94        let mut sigterm = signal(SignalKind::terminate())?;
95        let mut sigint = signal(SignalKind::interrupt())?;
96
97        let now_timestamp = Utc::now().timestamp();
98        let next_run_timestamp = self.next_run.timestamp();
99        if next_run_timestamp < now_timestamp {
100            self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
101        }
102
103        join_set.spawn(async move {
104            loop {
105                let now = Utc::now();
106                if let Ok(duration) = (self.next_run - now).to_std() {
107                    sleep(duration).await;
108                }
109
110                let uuid = Uuid::new_v4().to_string();
111                self.event_broadcast.send(ScheduleReceiverEventSignal::OnTrigger(uuid.to_string())).await.unwrap();
112
113                if self.interval == ScheduleInterval::None {
114                    break;
115                }
116
117                self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
118            }
119        });
120
121        loop {
122            tokio::select! {
123                _ = sigterm.recv() => {
124                    join_set.abort_all();
125                    break;
126                },
127                _ = sigint.recv() => {
128                    join_set.abort_all();
129                    break;
130                },
131                task = join_set.join_next() => {
132                    if task.is_none() {
133                        break;
134                    }
135                }
136            }
137        }
138
139        while let Some(_) = self.event_join_set.join_next().await {}
140
141        Ok(())
142    }
143
144    async fn calculate_next_run(next_run: DateTime<Utc>, interval: ScheduleInterval) -> DateTime<Utc> {
145        let now = Utc::now();
146
147        let interval_duration = match interval {
148            ScheduleInterval::None => return next_run,
149            ScheduleInterval::Seconds(seconds) => ChronoDuration::seconds(seconds),
150            ScheduleInterval::Minutes(minutes) => ChronoDuration::minutes(minutes),
151            ScheduleInterval::Hours(hours) => ChronoDuration::hours(hours),
152            ScheduleInterval::Days(days) => ChronoDuration::days(days),
153        };
154
155        let mut calculated_next_run = next_run.clone();
156        while calculated_next_run < now {
157            calculated_next_run += interval_duration;
158        }
159
160        calculated_next_run
161    }
162}