rust_integration_services/schedule/
schedule_receiver.rs1use std::sync::Arc;
2
3use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, NaiveTime, Utc};
4use tokio::{signal::unix::{signal, SignalKind}, task::JoinSet, time::sleep};
5use uuid::Uuid;
6
7use crate::{common::event_handler::EventHandler, schedule::schedule_receiver_event::ScheduleReceiverEvent};
8
9use super::schedule_interval::ScheduleInterval;
10
11pub struct ScheduleReceiver {
12 interval: ScheduleInterval,
13 next_run: DateTime<Utc>,
14 event_handler: EventHandler<ScheduleReceiverEvent>,
15 event_join_set: JoinSet<()>,
16}
17
18impl ScheduleReceiver {
19 pub fn new() -> Self {
20 ScheduleReceiver {
21 interval: ScheduleInterval::None,
22 next_run: Utc::now(),
23 event_handler: EventHandler::new(),
24 event_join_set: JoinSet::new(),
25 }
26 }
27
28 pub fn start_date(mut self, year: i32, month: u32, day: u32) -> Self {
34 let naive_date = NaiveDate::from_ymd_opt(year, month, day).expect("Not a valid date.");
35 let naive_time = self.next_run.time();
36 let naive_dt = naive_date.and_time(naive_time);
37 self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
38 self
39 }
40
41 pub fn start_time(mut self, hour: u32, minute: u32, second: u32) -> Self {
47 let naive_time = NaiveTime::from_hms_opt(hour, minute, second).expect("Not a valid time.");
48 let naive_date = self.next_run.date_naive();
49 let naive_dt = naive_date.and_time(naive_time);
50 self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
51 self
52 }
53
54 pub fn interval(mut self, interval: ScheduleInterval) -> Self {
56 self.interval = interval;
57 self
58 }
59
60 pub fn on_event<T, Fut>(mut self, handler: T) -> Self
61 where
62 T: Fn(ScheduleReceiverEvent) -> Fut + Send + Sync + 'static,
63 Fut: Future<Output = ()> + Send + 'static,
64 {
65 self.event_join_set = self.event_handler.init(handler);
66 self
67 }
68
69 pub async fn receive(mut self) {
70 let mut receiver_join_set = JoinSet::new();
71 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver");
72 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver");
73 let event_broadcast = Arc::new(self.event_handler.broadcast());
74
75 let now_timestamp = Utc::now().timestamp();
76 let next_run_timestamp = self.next_run.timestamp();
77 if next_run_timestamp < now_timestamp {
78 self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
79 }
80
81 receiver_join_set.spawn(async move {
82 loop {
83 let now = Utc::now();
84 if let Ok(duration) = (self.next_run - now).to_std() {
85 sleep(duration).await;
86 }
87
88 let uuid = Uuid::new_v4().to_string();
89 event_broadcast.send(ScheduleReceiverEvent::OnTrigger(uuid.to_string())).await.unwrap();
90
91 if self.interval == ScheduleInterval::None {
92 break;
93 }
94
95 self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
96 }
97 });
98
99 loop {
100 tokio::select! {
101 _ = sigterm.recv() => {
102 receiver_join_set.abort_all();
103 break;
104 },
105 _ = sigint.recv() => {
106 receiver_join_set.abort_all();
107 break;
108 },
109 task = receiver_join_set.join_next() => {
110 if task.is_none() {
111 break;
112 }
113 }
114 }
115 }
116
117 while let Some(_) = self.event_join_set.join_next().await {}
118 }
119
120 async fn calculate_next_run(next_run: DateTime<Utc>, interval: ScheduleInterval) -> DateTime<Utc> {
121 let now = Utc::now();
122
123 let interval_duration = match interval {
124 ScheduleInterval::None => return next_run,
125 ScheduleInterval::Seconds(seconds) => ChronoDuration::seconds(seconds),
126 ScheduleInterval::Minutes(minutes) => ChronoDuration::minutes(minutes),
127 ScheduleInterval::Hours(hours) => ChronoDuration::hours(hours),
128 ScheduleInterval::Days(days) => ChronoDuration::days(days),
129 };
130
131 let mut calculated_next_run = next_run.clone();
132 while calculated_next_run < now {
133 calculated_next_run += interval_duration;
134 }
135
136 calculated_next_run
137 }
138}