1use async_trait::async_trait;
2use std::fmt::{self, Debug};
3use time::{Date, OffsetDateTime, Time};
4use tokio::{
5 select,
6 time::{Duration, Instant, sleep, sleep_until},
7};
8use tokio_util::sync::CancellationToken;
9use tracing::{error, instrument};
10
11pub mod prelude {
12 pub use super::{Notifiable, Scheduler, Skip, Task};
13 pub use async_trait::async_trait;
14 pub use tokio_util::sync::CancellationToken;
15}
16
17#[derive(Debug, Clone)]
18pub enum Skip {
19 Date(Date),
21 DateRange(Date, Date),
23 Day(Vec<u8>),
27 DayRange(usize, usize),
31 Time(Time),
33 TimeRange(Time, Time),
37 None,
39}
40
41impl Default for Skip {
42 fn default() -> Self {
43 Self::None
44 }
45}
46
47impl fmt::Display for Skip {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 Skip::Date(date) => write!(f, "date: {}", date),
51 Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
52 Skip::Day(day) => write!(f, "day: {:?}", day),
53 Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
54 Skip::Time(time) => write!(f, "time: {}", time),
55 Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
56 Skip::None => write!(f, "none"),
57 }
58 }
59}
60
61impl Skip {
62 pub fn is_skip(&self, time: OffsetDateTime) -> bool {
64 match self {
65 Skip::Date(date) => time.date() == *date,
66 Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
67 Skip::Day(day) => day.contains(&(time.day() + 1)),
68 Skip::DayRange(start, end) => {
69 time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
70 }
71 Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
72 Skip::TimeRange(start, end) => {
73 assert!(start < end, "start must be less than end");
74 time.hour() >= start.hour()
75 && time.hour() <= end.hour()
76 && time.minute() >= start.minute()
77 && time.minute() <= end.minute()
78 }
79 Skip::None => false,
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub enum Task {
86 Wait(u64, Option<Vec<Skip>>),
88 Interval(u64, Option<Vec<Skip>>),
90 At(Time, Option<Vec<Skip>>),
92 Once(OffsetDateTime),
94}
95
96impl fmt::Display for Task {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 match self {
99 Task::Wait(wait, skip) => {
100 let skip = skip
101 .clone()
102 .unwrap_or_default()
103 .into_iter()
104 .map(|s| s.to_string())
105 .collect::<Vec<String>>()
106 .join(", ");
107 write!(f, "wait: {} {}", wait, skip)
108 }
109 Task::Interval(interval, skip) => {
110 let skip = skip
111 .clone()
112 .unwrap_or_default()
113 .into_iter()
114 .map(|s| s.to_string())
115 .collect::<Vec<String>>()
116 .join(", ");
117 write!(f, "interval: {} {}", interval, skip)
118 }
119 Task::At(time, skip) => {
120 let skip = skip
121 .clone()
122 .unwrap_or_default()
123 .into_iter()
124 .map(|s| s.to_string())
125 .collect::<Vec<String>>()
126 .join(", ");
127 write!(f, "at: {} {}", time, skip)
128 }
129 Task::Once(time) => write!(f, "once: {}", time),
130 }
131 }
132}
133
134#[async_trait]
136pub trait Notifiable: Sync + Send {
137 fn get_schedule(&self) -> Task;
139
140 async fn on_time(&self, cancel: CancellationToken);
142
143 async fn on_skip(&self, cancel: CancellationToken);
145}
146
147pub struct Scheduler {
148 cancel: CancellationToken,
149}
150
151impl Scheduler {
152 pub fn new() -> Self {
154 Self {
155 cancel: CancellationToken::new(),
156 }
157 }
158
159 pub async fn start<T: Notifiable + 'static>(&self, task: T) {
161 let schedule = task.get_schedule();
162 let cancel = self.cancel.clone();
163
164 match schedule {
165 Task::Wait(..) => {
166 Scheduler::run_wait(task, cancel.clone()).await;
167 }
168 Task::Interval(..) => {
169 Scheduler::run_interval(task, cancel.clone()).await;
170 }
171 Task::At(..) => {
172 Scheduler::run_at(task, cancel.clone()).await;
173 }
174 Task::Once(..) => {
175 Scheduler::run_once(task, cancel.clone()).await;
176 }
177 }
178 }
179
180 pub fn stop(&self) {
184 self.cancel.cancel();
185 }
186
187 pub fn get_cancel(&self) -> CancellationToken {
189 self.cancel.clone()
190 }
191}
192
193fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
194 let mut next = now.replace_time(time);
195 if next < now {
196 next = next + time::Duration::days(1);
197 }
198 next
199}
200
201fn get_now() -> Option<OffsetDateTime> {
202 match OffsetDateTime::now_local() {
203 Ok(now) => Some(now),
204 Err(e) => {
205 error!("failed to get local time: {}", e);
206 None
207 }
208 }
209}
210
211impl Scheduler {
212 #[instrument(skip(task, cancel))]
214 async fn run_wait<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
215 if let Task::Wait(wait, skip) = task.get_schedule() {
216 let task_ref = task;
217 tokio::task::spawn(async move {
218 select! {
219 _ = cancel.cancelled() => {
220 return;
221 }
222 _ = sleep(Duration::from_secs(wait)) => {
223 tracing::debug!(wait, "wait seconds");
224 }
225 };
226 if let Some(now) = get_now() {
227 if let Some(skip) = skip {
228 if skip.iter().any(|s| s.is_skip(now)) {
229 task_ref.on_skip(cancel.clone()).await;
230 return;
231 }
232 }
233 task_ref.on_time(cancel.clone()).await;
234 }
235 });
236 }
237 }
238
239 #[instrument(skip(task, cancel))]
241 async fn run_interval<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
242 if let Task::Interval(interval, skip) = task.get_schedule() {
243 let task_ref = task;
244 tokio::task::spawn(async move {
245 loop {
246 select! {
247 _ = cancel.cancelled() => {
248 return;
249 }
250 _ = sleep(Duration::from_secs(interval)) => {
251 tracing::debug!(interval, "interval");
252 }
253 };
254 if let Some(now) = get_now() {
255 if let Some(ref skip) = skip {
256 if skip.iter().any(|s| s.is_skip(now)) {
257 task_ref.on_skip(cancel.clone()).await;
258 continue;
259 }
260 }
261 task_ref.on_time(cancel.clone()).await;
262 }
263 }
264 });
265 }
266 }
267
268 #[instrument(skip(task, cancel))]
270 async fn run_at<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
271 if let Task::At(time, skip) = task.get_schedule() {
272 let task_ref = task;
273 tokio::task::spawn(async move {
274 let now = if let Some(now) = get_now() {
275 now
276 } else {
277 return;
278 };
279 let mut next = get_next_time(now, time);
280 loop {
281 let now = if let Some(now) = get_now() {
282 now
283 } else {
284 return;
285 };
286 let seconds = (next - now).as_seconds_f64() as u64;
287 let instant = Instant::now() + Duration::from_secs(seconds);
288 select! {
289 _ = cancel.cancelled() => {
290 return;
291 }
292 _ = sleep_until(instant) => {
293 tracing::debug!("at time");
294 }
295 }
296
297 if let Some(skip) = skip.clone() {
298 if skip.iter().any(|s| s.is_skip(now)) {
299 task_ref.on_skip(cancel.clone()).await;
300 return;
301 }
302 }
303
304 task_ref.on_time(cancel.clone()).await;
305
306 next += time::Duration::days(1);
307 }
308 });
309 }
310 }
311
312 #[instrument(skip(task, cancel))]
314 async fn run_once<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
315 if let Task::Once(next) = task.get_schedule() {
316 let task_ref = task;
317 tokio::task::spawn(async move {
318 if let Some(now) = get_now() {
319 if next < now {
320 task_ref.on_skip(cancel.clone()).await;
321 return;
322 }
323 let seconds = (next - now).as_seconds_f64() as u64;
324 let instant = Instant::now() + Duration::from_secs(seconds);
325
326 select! {
327 _ = cancel.cancelled() => {
328 return;
329 }
330 _ = sleep_until(instant) => {
331 tracing::debug!("once time");
332 }
333 }
334 task_ref.on_time(cancel.clone()).await;
335 }
336 });
337 }
338 }
339}