rust_integration_services/schedule/
schedule_receiver.rs1use std::{pin::Pin, sync::Arc};
2
3use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, NaiveTime, Utc};
4use tokio::{signal::unix::{signal, SignalKind}, task::JoinSet, time::sleep};
5use uuid::Uuid;
6
7use crate::{common::event_handler::EventHandler, schedule::schedule_receiver_event::ScheduleReceiverEvent};
8
9use super::schedule_interval::ScheduleInterval;
10
11type TriggerCallback = Arc<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
12
13pub struct ScheduleReceiver {
14 interval: ScheduleInterval,
15 next_run: DateTime<Utc>,
16 event_handler: EventHandler<ScheduleReceiverEvent>,
17 callback_trigger: TriggerCallback,
18}
19
20impl ScheduleReceiver {
21 pub fn new() -> Self {
22 ScheduleReceiver {
23 interval: ScheduleInterval::None,
24 next_run: Utc::now(),
25 event_handler: EventHandler::new(),
26 callback_trigger: Arc::new(|_| Box::pin(async {})),
27 }
28 }
29
30 pub fn start_date(mut self, year: i32, month: u32, day: u32) -> Self {
36 let naive_date = NaiveDate::from_ymd_opt(year, month, day).expect("Not a valid date.");
37 let naive_time = self.next_run.time();
38 let naive_dt = naive_date.and_time(naive_time);
39 self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
40 self
41 }
42
43 pub fn start_time(mut self, hour: u32, minute: u32, second: u32) -> Self {
49 let naive_time = NaiveTime::from_hms_opt(hour, minute, second).expect("Not a valid time.");
50 let naive_date = self.next_run.date_naive();
51 let naive_dt = naive_date.and_time(naive_time);
52 self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
53 self
54 }
55
56 pub fn interval(mut self, interval: ScheduleInterval) -> Self {
58 self.interval = interval;
59 self
60 }
61
62 pub fn on_event<T, Fut>(mut self, handler: T) -> Self
63 where
64 T: Fn(ScheduleReceiverEvent) -> Fut + Send + Sync + 'static,
65 Fut: Future<Output = ()> + Send + 'static,
66 {
67 self.event_handler.init(handler);
68 self
69 }
70
71 pub fn trigger<T, Fut>(mut self, callback: T) -> Self
72 where
73 T: Fn(String) -> Fut + Send + Sync + 'static,
74 Fut: Future<Output = ()> + Send + 'static,
75 {
76 self.callback_trigger = Arc::new(move |uuid| Box::pin(callback(uuid)));
77 self
78 }
79
80 pub async fn receive(mut self) {
81 let mut receiver_join_set = JoinSet::new();
82 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
83 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
84 let event_broadcast = Arc::new(self.event_handler.broadcast());
85
86 let now_timestamp = Utc::now().timestamp();
87 let next_run_timestamp = self.next_run.timestamp();
88 if next_run_timestamp < now_timestamp {
89 self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
90 }
91
92 receiver_join_set.spawn(async move {
93 loop {
94 let now = Utc::now();
95 if let Ok(duration) = (self.next_run - now).to_std() {
96 sleep(duration).await;
97 }
98
99 if self.interval != ScheduleInterval::None {
100 self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
101 }
102
103 let uuid = Uuid::new_v4().to_string();
104 event_broadcast.send(ScheduleReceiverEvent::Trigger{uuid: uuid.clone()}).ok();
105 let callback_handle = tokio::spawn((self.callback_trigger)(uuid.clone())).await;
106 if let Err(err) = callback_handle {
107 event_broadcast.send(ScheduleReceiverEvent::Error {
108 uuid: uuid.clone(),
109 error: err.to_string()
110 }).ok();
111 }
112
113 if self.interval == ScheduleInterval::None {
114 break;
115 }
116 }
117 });
118
119 loop {
120 tokio::select! {
121 _ = sigterm.recv() => {
122 receiver_join_set.abort_all();
123 break;
124 },
125 _ = sigint.recv() => {
126 receiver_join_set.abort_all();
127 break;
128 },
129 task = receiver_join_set.join_next() => {
130 if task.is_none() {
131 break;
132 }
133 }
134 }
135 }
136
137 self.event_handler.shutdown().await;
138 }
139
140 async fn calculate_next_run(next_run: DateTime<Utc>, interval: ScheduleInterval) -> DateTime<Utc> {
141 let now = Utc::now();
142
143 let interval_duration = match interval {
144 ScheduleInterval::None => return next_run,
145 ScheduleInterval::Seconds(seconds) => ChronoDuration::seconds(seconds),
146 ScheduleInterval::Minutes(minutes) => ChronoDuration::minutes(minutes),
147 ScheduleInterval::Hours(hours) => ChronoDuration::hours(hours),
148 ScheduleInterval::Days(days) => ChronoDuration::days(days),
149 };
150
151 let mut calculated_next_run = next_run.clone();
152 while calculated_next_run < now {
153 calculated_next_run += interval_duration;
154 }
155
156 calculated_next_run
157 }
158}