rust_integration_services/schedule/
schedule_receiver.rs1use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, NaiveTime, Utc};
2use tokio::{signal::unix::{signal, SignalKind}, sync::mpsc, task::JoinSet, time::sleep};
3use uuid::Uuid;
4
5use super::schedule_interval::ScheduleInterval;
6
7#[derive(Clone)]
8pub enum ScheduleReceiverEventSignal {
9 OnTrigger(String),
10}
11
12pub struct ScheduleReceiver {
13 interval: ScheduleInterval,
14 next_run: DateTime<Utc>,
15 event_broadcast: mpsc::Sender<ScheduleReceiverEventSignal>,
16 event_receiver: Option<mpsc::Receiver<ScheduleReceiverEventSignal>>,
17 event_join_set: JoinSet<()>,
18}
19
20impl ScheduleReceiver {
21 pub fn new() -> Self {
22 let (event_broadcast, event_receiver) = mpsc::channel(128);
23 ScheduleReceiver {
24 interval: ScheduleInterval::None,
25 next_run: Utc::now(),
26 event_broadcast,
27 event_receiver: Some(event_receiver),
28 event_join_set: JoinSet::new(),
29 }
30 }
31
32 pub fn start_date(mut self, year: i32, month: u32, day: u32) -> Self {
38 let naive_date = NaiveDate::from_ymd_opt(year, month, day).expect("Not a valid date.");
39 let naive_time = self.next_run.time();
40 let naive_dt = naive_date.and_time(naive_time);
41 self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
42 self
43 }
44
45 pub fn start_time(mut self, hour: u32, minute: u32, second: u32) -> Self {
51 let naive_time = NaiveTime::from_hms_opt(hour, minute, second).expect("Not a valid time.");
52 let naive_date = self.next_run.date_naive();
53 let naive_dt = naive_date.and_time(naive_time);
54 self.next_run = DateTime::from_naive_utc_and_offset(naive_dt, Utc);
55 self
56 }
57
58 pub fn interval(mut self, interval: ScheduleInterval) -> Self {
60 self.interval = interval;
61 self
62 }
63
64 pub fn on_event<T, Fut>(mut self, handler: T) -> Self
65 where
66 T: Fn(ScheduleReceiverEventSignal) -> Fut + Send + Sync + 'static,
67 Fut: Future<Output = ()> + Send + 'static,
68 {
69 let mut receiver = self.event_receiver.unwrap();
70 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to start SIGTERM signal receiver.");
71 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to start SIGINT signal receiver.");
72
73 self.event_join_set.spawn(async move {
74 loop {
75 tokio::select! {
76 _ = sigterm.recv() => break,
77 _ = sigint.recv() => break,
78 event = receiver.recv() => {
79 match event {
80 Some(event) => handler(event).await,
81 None => break,
82 }
83 }
84 }
85 }
86 });
87
88 self.event_receiver = None;
89 self
90 }
91
92 pub async fn receive(mut self) -> tokio::io::Result<()> {
93 let mut join_set = JoinSet::new();
94 let mut sigterm = signal(SignalKind::terminate())?;
95 let mut sigint = signal(SignalKind::interrupt())?;
96
97 let now_timestamp = Utc::now().timestamp();
98 let next_run_timestamp = self.next_run.timestamp();
99 if next_run_timestamp < now_timestamp {
100 self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
101 }
102
103 join_set.spawn(async move {
104 loop {
105 let now = Utc::now();
106 if let Ok(duration) = (self.next_run - now).to_std() {
107 sleep(duration).await;
108 }
109
110 let uuid = Uuid::new_v4().to_string();
111 self.event_broadcast.send(ScheduleReceiverEventSignal::OnTrigger(uuid.to_string())).await.unwrap();
112
113 if self.interval == ScheduleInterval::None {
114 break;
115 }
116
117 self.next_run = Self::calculate_next_run(self.next_run, self.interval).await;
118 }
119 });
120
121 loop {
122 tokio::select! {
123 _ = sigterm.recv() => {
124 join_set.abort_all();
125 break;
126 },
127 _ = sigint.recv() => {
128 join_set.abort_all();
129 break;
130 },
131 task = join_set.join_next() => {
132 if task.is_none() {
133 break;
134 }
135 }
136 }
137 }
138
139 while let Some(_) = self.event_join_set.join_next().await {}
140
141 Ok(())
142 }
143
144 async fn calculate_next_run(next_run: DateTime<Utc>, interval: ScheduleInterval) -> DateTime<Utc> {
145 let now = Utc::now();
146
147 let interval_duration = match interval {
148 ScheduleInterval::None => return next_run,
149 ScheduleInterval::Seconds(seconds) => ChronoDuration::seconds(seconds),
150 ScheduleInterval::Minutes(minutes) => ChronoDuration::minutes(minutes),
151 ScheduleInterval::Hours(hours) => ChronoDuration::hours(hours),
152 ScheduleInterval::Days(days) => ChronoDuration::days(days),
153 };
154
155 let mut calculated_next_run = next_run.clone();
156 while calculated_next_run < now {
157 calculated_next_run += interval_duration;
158 }
159
160 calculated_next_run
161 }
162}