rust_integration_services/schedule/
schedule_receiver.rs

1use std::sync::Arc;
2
3use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, NaiveTime, Utc};
4use tokio::{signal::unix::{signal, SignalKind}, task::JoinSet, time::sleep};
5use uuid::Uuid;
6
7use crate::{common::event_handler::EventHandler, schedule::schedule_receiver_event::ScheduleReceiverEvent};
8
9use super::schedule_interval::ScheduleInterval;
10
11pub struct ScheduleReceiver {
12    interval: ScheduleInterval,
13    next_run: DateTime<Utc>,
14    event_handler: EventHandler<ScheduleReceiverEvent>,
15    event_join_set: JoinSet<()>,
16}
17
18impl ScheduleReceiver {
19    pub fn new() -> Self {
20        ScheduleReceiver {
21            interval: ScheduleInterval::None,
22            next_run: Utc::now(),
23            event_handler: EventHandler::new(),
24            event_join_set: JoinSet::new(),
25        }
26    }
27    
28    /// Sets the `UTC` start date for the scheduled task.
29    /// 
30    /// If the provided date is in the past, the scheduler will calculate the next valid future run based on the defined interval.
31    /// 
32    /// Note: Scheduler is using `UTC: Coordinated Universal Time` to avoid daylight saving problems.
33    pub fn start_date(mut self, year: i32, month: u32, day: u32) -> Self {
34        let naive_date = NaiveDate::from_ymd_opt(year, month, day).expect("Not a valid date.");
35        let naive_time = self.next_run.time();
36        let naive_dt = naive_date.and_time(naive_time);
37        self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
38        self
39    }
40    
41    /// Sets the `UTC` start time for the scheduled task.
42    /// 
43    /// If the provided time is in the past, the scheduler will calculate the next valid future run based on the defined interval.
44    /// 
45    /// Note: Scheduler is using `UTC: Coordinated Universal Time` to avoid daylight saving problems.
46    pub fn start_time(mut self, hour: u32, minute: u32, second: u32) -> Self {
47        let naive_time = NaiveTime::from_hms_opt(hour, minute, second).expect("Not a valid time.");
48        let naive_date = self.next_run.date_naive();
49        let naive_dt = naive_date.and_time(naive_time);
50        self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
51        self
52    }
53    
54    /// Sets the interval of how frequently the task should run.
55    pub fn interval(mut self, interval: ScheduleInterval) -> Self {
56        self.interval = interval;
57        self
58    }
59
60    pub fn on_event<T, Fut>(mut self, handler: T) -> Self
61    where
62        T: Fn(ScheduleReceiverEvent) -> Fut + Send + Sync + 'static,
63        Fut: Future<Output = ()> + Send + 'static,
64    {
65        self.event_join_set = self.event_handler.init(handler);
66        self
67    }
68
69    pub async fn receive(mut self) {
70        let mut receiver_join_set = JoinSet::new();
71        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
72        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
73        let event_broadcast = Arc::new(self.event_handler.broadcast());
74
75        let now_timestamp = Utc::now().timestamp();
76        let next_run_timestamp = self.next_run.timestamp();
77        if next_run_timestamp < now_timestamp {
78            self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
79        }
80
81        receiver_join_set.spawn(async move {
82            loop {
83                let now = Utc::now();
84                if let Ok(duration) = (self.next_run - now).to_std() {
85                    sleep(duration).await;
86                }
87
88                let uuid = Uuid::new_v4().to_string();
89                event_broadcast.send(ScheduleReceiverEvent::OnTrigger(uuid.to_string())).await.unwrap();
90
91                if self.interval == ScheduleInterval::None {
92                    break;
93                }
94
95                self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
96            }
97        });
98
99        loop {
100            tokio::select! {
101                _ = sigterm.recv() => {
102                    receiver_join_set.abort_all();
103                    break;
104                },
105                _ = sigint.recv() => {
106                    receiver_join_set.abort_all();
107                    break;
108                },
109                task = receiver_join_set.join_next() => {
110                    if task.is_none() {
111                        break;
112                    }
113                }
114            }
115        }
116
117        while let Some(_) = self.event_join_set.join_next().await {}
118    }
119
120    async fn calculate_next_run(next_run: DateTime<Utc>, interval: ScheduleInterval) -> DateTime<Utc> {
121        let now = Utc::now();
122
123        let interval_duration = match interval {
124            ScheduleInterval::None => return next_run,
125            ScheduleInterval::Seconds(seconds) => ChronoDuration::seconds(seconds),
126            ScheduleInterval::Minutes(minutes) => ChronoDuration::minutes(minutes),
127            ScheduleInterval::Hours(hours) => ChronoDuration::hours(hours),
128            ScheduleInterval::Days(days) => ChronoDuration::days(days),
129        };
130
131        let mut calculated_next_run = next_run.clone();
132        while calculated_next_run < now {
133            calculated_next_run += interval_duration;
134        }
135
136        calculated_next_run
137    }
138}