rust_integration_services/schedule/
schedule_receiver.rs

1use std::{pin::Pin, sync::Arc};
2
3use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, NaiveTime, Utc};
4use tokio::{signal::unix::{signal, SignalKind}, task::JoinSet, time::sleep};
5use uuid::Uuid;
6
7use crate::{common::event_handler::EventHandler, schedule::schedule_receiver_event::ScheduleReceiverEvent};
8
9use super::schedule_interval::ScheduleInterval;
10
11type TriggerCallback = Arc<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
12
13pub struct ScheduleReceiver {
14    interval: ScheduleInterval,
15    next_run: DateTime<Utc>,
16    event_handler: EventHandler<ScheduleReceiverEvent>,
17    callback_trigger: TriggerCallback,
18}
19
20impl ScheduleReceiver {
21    pub fn new() -> Self {
22        ScheduleReceiver {
23            interval: ScheduleInterval::None,
24            next_run: Utc::now(),
25            event_handler: EventHandler::new(),
26            callback_trigger: Arc::new(|_| Box::pin(async {})),
27        }
28    }
29    
30    /// Sets the `UTC` start date for the scheduled task.
31    /// 
32    /// If the provided date is in the past, the scheduler will calculate the next valid future run based on the defined interval.
33    /// 
34    /// Note: Scheduler is using `UTC: Coordinated Universal Time` to avoid daylight saving problems.
35    pub fn start_date(mut self, year: i32, month: u32, day: u32) -> Self {
36        let naive_date = NaiveDate::from_ymd_opt(year, month, day).expect("Not a valid date.");
37        let naive_time = self.next_run.time();
38        let naive_dt = naive_date.and_time(naive_time);
39        self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
40        self
41    }
42    
43    /// Sets the `UTC` start time for the scheduled task.
44    /// 
45    /// If the provided time is in the past, the scheduler will calculate the next valid future run based on the defined interval.
46    /// 
47    /// Note: Scheduler is using `UTC: Coordinated Universal Time` to avoid daylight saving problems.
48    pub fn start_time(mut self, hour: u32, minute: u32, second: u32) -> Self {
49        let naive_time = NaiveTime::from_hms_opt(hour, minute, second).expect("Not a valid time.");
50        let naive_date = self.next_run.date_naive();
51        let naive_dt = naive_date.and_time(naive_time);
52        self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
53        self
54    }
55    
56    /// Sets the interval of how frequently the task should run.
57    pub fn interval(mut self, interval: ScheduleInterval) -> Self {
58        self.interval = interval;
59        self
60    }
61
62    pub fn on_event<T, Fut>(mut self, handler: T) -> Self
63    where
64        T: Fn(ScheduleReceiverEvent) -> Fut + Send + Sync + 'static,
65        Fut: Future<Output = ()> + Send + 'static,
66    {
67        self.event_handler.init(handler);
68        self
69    }
70
71    pub fn trigger<T, Fut>(mut self, callback: T) -> Self
72    where
73        T: Fn(String) -> Fut + Send + Sync + 'static,
74        Fut: Future<Output = ()> + Send + 'static,
75    {
76        self.callback_trigger = Arc::new(move |uuid| Box::pin(callback(uuid)));
77        self
78    }
79
80    pub async fn receive(mut self) {
81        let mut receiver_join_set = JoinSet::new();
82        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
83        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
84        let event_broadcast = Arc::new(self.event_handler.broadcast());
85
86        let now_timestamp = Utc::now().timestamp();
87        let next_run_timestamp = self.next_run.timestamp();
88        if next_run_timestamp < now_timestamp {
89            self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
90        }
91
92        receiver_join_set.spawn(async move {
93            loop {
94                let now = Utc::now();
95                if let Ok(duration) = (self.next_run - now).to_std() {
96                    sleep(duration).await;
97                }
98                
99                if self.interval != ScheduleInterval::None {
100                    self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
101                }
102
103                let uuid = Uuid::new_v4().to_string();
104                event_broadcast.send(ScheduleReceiverEvent::Trigger{uuid: uuid.clone()}).ok();
105                let callback_handle = tokio::spawn((self.callback_trigger)(uuid.clone())).await;
106                if let Err(err) = callback_handle {
107                    event_broadcast.send(ScheduleReceiverEvent::Error {
108                        uuid: uuid.clone(),
109                        error: err.to_string()
110                    }).ok();
111                }
112                
113                if self.interval == ScheduleInterval::None {
114                    break;
115                }
116            }
117        });
118
119        loop {
120            tokio::select! {
121                _ = sigterm.recv() => {
122                    receiver_join_set.abort_all();
123                    break;
124                },
125                _ = sigint.recv() => {
126                    receiver_join_set.abort_all();
127                    break;
128                },
129                task = receiver_join_set.join_next() => {
130                    if task.is_none() {
131                        break;
132                    }
133                }
134            }
135        }
136
137        self.event_handler.shutdown().await;
138    }
139
140    async fn calculate_next_run(next_run: DateTime<Utc>, interval: ScheduleInterval) -> DateTime<Utc> {
141        let now = Utc::now();
142
143        let interval_duration = match interval {
144            ScheduleInterval::None => return next_run,
145            ScheduleInterval::Seconds(seconds) => ChronoDuration::seconds(seconds),
146            ScheduleInterval::Minutes(minutes) => ChronoDuration::minutes(minutes),
147            ScheduleInterval::Hours(hours) => ChronoDuration::hours(hours),
148            ScheduleInterval::Days(days) => ChronoDuration::days(days),
149        };
150
151        let mut calculated_next_run = next_run.clone();
152        while calculated_next_run < now {
153            calculated_next_run += interval_duration;
154        }
155
156        calculated_next_run
157    }
158}