rust_integration_services/schedule/
schedule_receiver.rs1use std::{pin::Pin, sync::Arc};
2
3use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, NaiveTime, Utc};
4use tokio::{signal::unix::{signal, SignalKind}, task::JoinSet, time::sleep};
5use uuid::Uuid;
6
7use crate::{common::event_handler::EventHandler, schedule::schedule_receiver_event::ScheduleReceiverEvent};
8
9use super::schedule_interval::ScheduleInterval;
10
11type TriggerCallback = Arc<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
12
13pub struct ScheduleReceiver {
14 interval: ScheduleInterval,
15 next_run: DateTime<Utc>,
16 event_handler: EventHandler<ScheduleReceiverEvent>,
17 event_join_set: JoinSet<()>,
18 callback_trigger: TriggerCallback,
19}
20
21impl ScheduleReceiver {
22 pub fn new() -> Self {
23 ScheduleReceiver {
24 interval: ScheduleInterval::None,
25 next_run: Utc::now(),
26 event_handler: EventHandler::new(),
27 event_join_set: JoinSet::new(),
28 callback_trigger: Arc::new(|_| Box::pin(async {})),
29 }
30 }
31
32 pub fn start_date(mut self, year: i32, month: u32, day: u32) -> Self {
38 let naive_date = NaiveDate::from_ymd_opt(year, month, day).expect("Not a valid date.");
39 let naive_time = self.next_run.time();
40 let naive_dt = naive_date.and_time(naive_time);
41 self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
42 self
43 }
44
45 pub fn start_time(mut self, hour: u32, minute: u32, second: u32) -> Self {
51 let naive_time = NaiveTime::from_hms_opt(hour, minute, second).expect("Not a valid time.");
52 let naive_date = self.next_run.date_naive();
53 let naive_dt = naive_date.and_time(naive_time);
54 self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
55 self
56 }
57
58 pub fn interval(mut self, interval: ScheduleInterval) -> Self {
60 self.interval = interval;
61 self
62 }
63
64 pub fn on_event<T, Fut>(mut self, handler: T) -> Self
65 where
66 T: Fn(ScheduleReceiverEvent) -> Fut + Send + Sync + 'static,
67 Fut: Future<Output = ()> + Send + 'static,
68 {
69 self.event_join_set = self.event_handler.init(handler);
70 self
71 }
72
73 pub fn trigger<T, Fut>(mut self, callback: T) -> Self
74 where
75 T: Fn(String) -> Fut + Send + Sync + 'static,
76 Fut: Future<Output = ()> + Send + 'static,
77 {
78 self.callback_trigger = Arc::new(move |uuid| Box::pin(callback(uuid)));
79 self
80 }
81
82 pub async fn receive(mut self) {
83 let mut receiver_join_set = JoinSet::new();
84 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
85 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
86 let event_broadcast = Arc::new(self.event_handler.broadcast());
87
88 let now_timestamp = Utc::now().timestamp();
89 let next_run_timestamp = self.next_run.timestamp();
90 if next_run_timestamp < now_timestamp {
91 self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
92 }
93
94 receiver_join_set.spawn(async move {
95 loop {
96 let now = Utc::now();
97 if let Ok(duration) = (self.next_run - now).to_std() {
98 sleep(duration).await;
99 }
100
101 if self.interval != ScheduleInterval::None {
102 self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
103 }
104
105 let uuid = Uuid::new_v4().to_string();
106 event_broadcast.send(ScheduleReceiverEvent::Trigger{uuid: uuid.clone()}).ok();
107 let callback_handle = tokio::spawn((self.callback_trigger)(uuid.clone())).await;
108 if let Err(err) = callback_handle {
109 event_broadcast.send(ScheduleReceiverEvent::Error {
110 uuid: uuid.clone(),
111 error: err.to_string()
112 }).ok();
113 }
114
115 if self.interval == ScheduleInterval::None {
116 break;
117 }
118 }
119 });
120
121 loop {
122 tokio::select! {
123 _ = sigterm.recv() => {
124 receiver_join_set.abort_all();
125 break;
126 },
127 _ = sigint.recv() => {
128 receiver_join_set.abort_all();
129 break;
130 },
131 task = receiver_join_set.join_next() => {
132 if task.is_none() {
133 break;
134 }
135 }
136 }
137 }
138
139 while let Some(_) = self.event_join_set.join_next().await {}
140 }
141
142 async fn calculate_next_run(next_run: DateTime<Utc>, interval: ScheduleInterval) -> DateTime<Utc> {
143 let now = Utc::now();
144
145 let interval_duration = match interval {
146 ScheduleInterval::None => return next_run,
147 ScheduleInterval::Seconds(seconds) => ChronoDuration::seconds(seconds),
148 ScheduleInterval::Minutes(minutes) => ChronoDuration::minutes(minutes),
149 ScheduleInterval::Hours(hours) => ChronoDuration::hours(hours),
150 ScheduleInterval::Days(days) => ChronoDuration::days(days),
151 };
152
153 let mut calculated_next_run = next_run.clone();
154 while calculated_next_run < now {
155 calculated_next_run += interval_duration;
156 }
157
158 calculated_next_run
159 }
160}