rust_integration_services/schedule/
schedule_receiver.rs

1use std::{pin::Pin, sync::Arc};
2
3use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, NaiveTime, Utc};
4use tokio::{signal::unix::{signal, SignalKind}, task::JoinSet, time::sleep};
5use uuid::Uuid;
6
7use crate::{common::event_handler::EventHandler, schedule::schedule_receiver_event::ScheduleReceiverEvent};
8
9use super::schedule_interval::ScheduleInterval;
10
11type TriggerCallback = Arc<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
12
13pub struct ScheduleReceiver {
14    interval: ScheduleInterval,
15    next_run: DateTime<Utc>,
16    event_handler: EventHandler<ScheduleReceiverEvent>,
17    event_join_set: JoinSet<()>,
18    callback_trigger: TriggerCallback,
19}
20
21impl ScheduleReceiver {
22    pub fn new() -> Self {
23        ScheduleReceiver {
24            interval: ScheduleInterval::None,
25            next_run: Utc::now(),
26            event_handler: EventHandler::new(),
27            event_join_set: JoinSet::new(),
28            callback_trigger: Arc::new(|_| Box::pin(async {})),
29        }
30    }
31    
32    /// Sets the `UTC` start date for the scheduled task.
33    /// 
34    /// If the provided date is in the past, the scheduler will calculate the next valid future run based on the defined interval.
35    /// 
36    /// Note: Scheduler is using `UTC: Coordinated Universal Time` to avoid daylight saving problems.
37    pub fn start_date(mut self, year: i32, month: u32, day: u32) -> Self {
38        let naive_date = NaiveDate::from_ymd_opt(year, month, day).expect("Not a valid date.");
39        let naive_time = self.next_run.time();
40        let naive_dt = naive_date.and_time(naive_time);
41        self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
42        self
43    }
44    
45    /// Sets the `UTC` start time for the scheduled task.
46    /// 
47    /// If the provided time is in the past, the scheduler will calculate the next valid future run based on the defined interval.
48    /// 
49    /// Note: Scheduler is using `UTC: Coordinated Universal Time` to avoid daylight saving problems.
50    pub fn start_time(mut self, hour: u32, minute: u32, second: u32) -> Self {
51        let naive_time = NaiveTime::from_hms_opt(hour, minute, second).expect("Not a valid time.");
52        let naive_date = self.next_run.date_naive();
53        let naive_dt = naive_date.and_time(naive_time);
54        self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
55        self
56    }
57    
58    /// Sets the interval of how frequently the task should run.
59    pub fn interval(mut self, interval: ScheduleInterval) -> Self {
60        self.interval = interval;
61        self
62    }
63
64    pub fn on_event<T, Fut>(mut self, handler: T) -> Self
65    where
66        T: Fn(ScheduleReceiverEvent) -> Fut + Send + Sync + 'static,
67        Fut: Future<Output = ()> + Send + 'static,
68    {
69        self.event_join_set = self.event_handler.init(handler);
70        self
71    }
72
73    pub fn trigger<T, Fut>(mut self, callback: T) -> Self
74    where
75        T: Fn(String) -> Fut + Send + Sync + 'static,
76        Fut: Future<Output = ()> + Send + 'static,
77    {
78        self.callback_trigger = Arc::new(move |uuid| Box::pin(callback(uuid)));
79        self
80    }
81
82    pub async fn receive(mut self) {
83        let mut receiver_join_set = JoinSet::new();
84        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
85        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
86        let event_broadcast = Arc::new(self.event_handler.broadcast());
87
88        let now_timestamp = Utc::now().timestamp();
89        let next_run_timestamp = self.next_run.timestamp();
90        if next_run_timestamp < now_timestamp {
91            self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
92        }
93
94        receiver_join_set.spawn(async move {
95            loop {
96                let now = Utc::now();
97                if let Ok(duration) = (self.next_run - now).to_std() {
98                    sleep(duration).await;
99                }
100                
101                if self.interval != ScheduleInterval::None {
102                    self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
103                }
104
105                let uuid = Uuid::new_v4().to_string();
106                event_broadcast.send(ScheduleReceiverEvent::Trigger{uuid: uuid.clone()}).ok();
107                let callback_handle = tokio::spawn((self.callback_trigger)(uuid.clone())).await;
108                if let Err(err) = callback_handle {
109                    event_broadcast.send(ScheduleReceiverEvent::Error {
110                        uuid: uuid.clone(),
111                        error: err.to_string()
112                    }).ok();
113                }
114                
115                if self.interval == ScheduleInterval::None {
116                    break;
117                }
118            }
119        });
120
121        loop {
122            tokio::select! {
123                _ = sigterm.recv() => {
124                    receiver_join_set.abort_all();
125                    break;
126                },
127                _ = sigint.recv() => {
128                    receiver_join_set.abort_all();
129                    break;
130                },
131                task = receiver_join_set.join_next() => {
132                    if task.is_none() {
133                        break;
134                    }
135                }
136            }
137        }
138
139        while let Some(_) = self.event_join_set.join_next().await {}
140    }
141
142    async fn calculate_next_run(next_run: DateTime<Utc>, interval: ScheduleInterval) -> DateTime<Utc> {
143        let now = Utc::now();
144
145        let interval_duration = match interval {
146            ScheduleInterval::None => return next_run,
147            ScheduleInterval::Seconds(seconds) => ChronoDuration::seconds(seconds),
148            ScheduleInterval::Minutes(minutes) => ChronoDuration::minutes(minutes),
149            ScheduleInterval::Hours(hours) => ChronoDuration::hours(hours),
150            ScheduleInterval::Days(days) => ChronoDuration::days(days),
151        };
152
153        let mut calculated_next_run = next_run.clone();
154        while calculated_next_run < now {
155            calculated_next_run += interval_duration;
156        }
157
158        calculated_next_run
159    }
160}