easy_schedule/
lib.rs

1use async_trait::async_trait;
2use std::fmt::{self, Debug};
3use time::{Date, OffsetDateTime, Time, macros::format_description};
4use tokio::{
5    select,
6    time::{Duration, Instant, sleep, sleep_until},
7};
8use tokio_util::sync::CancellationToken;
9use tracing::{error, instrument};
10
11pub mod prelude {
12    pub use super::{Notifiable, Scheduler, Skip, Task};
13    pub use async_trait::async_trait;
14    pub use tokio_util::sync::CancellationToken;
15}
16
17#[derive(Debug, Clone)]
18pub enum Skip {
19    /// skip fixed date
20    Date(Date),
21    /// skip date range
22    DateRange(Date, Date),
23    /// skip days
24    ///
25    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
26    Day(Vec<u8>),
27    /// skip days range
28    ///
29    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
30    DayRange(usize, usize),
31    /// skip fixed time
32    Time(Time),
33    /// skip time range
34    ///
35    /// end must be greater than start
36    TimeRange(Time, Time),
37    /// no skip
38    None,
39}
40
41impl Default for Skip {
42    fn default() -> Self {
43        Self::None
44    }
45}
46
47impl fmt::Display for Skip {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            Skip::Date(date) => write!(f, "date: {}", date),
51            Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
52            Skip::Day(day) => write!(f, "day: {:?}", day),
53            Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
54            Skip::Time(time) => write!(f, "time: {}", time),
55            Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
56            Skip::None => write!(f, "none"),
57        }
58    }
59}
60
61impl Skip {
62    /// check if the time is skipped
63    pub fn is_skip(&self, time: OffsetDateTime) -> bool {
64        match self {
65            Skip::Date(date) => time.date() == *date,
66            Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
67            Skip::Day(day) => day.contains(&(time.day() + 1)),
68            Skip::DayRange(start, end) => {
69                time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
70            }
71            Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
72            Skip::TimeRange(start, end) => {
73                assert!(start < end, "start must be less than end");
74                time.hour() >= start.hour()
75                    && time.hour() <= end.hour()
76                    && time.minute() >= start.minute()
77                    && time.minute() <= end.minute()
78            }
79            Skip::None => false,
80        }
81    }
82}
83
84#[derive(Debug, Clone)]
85pub enum Task {
86    /// wait seconds
87    Wait(u64, Option<Vec<Skip>>),
88    /// interval seconds
89    Interval(u64, Option<Vec<Skip>>),
90    /// at time
91    At(Time, Option<Vec<Skip>>),
92    /// exact time
93    Once(OffsetDateTime),
94}
95
96impl PartialEq for Task {
97    fn eq(&self, other: &Self) -> bool {
98        match (self, other) {
99            (Task::Wait(a, _), Task::Wait(b, _)) => a == b,
100            (Task::Interval(a, _), Task::Interval(b, _)) => a == b,
101            (Task::At(a, _), Task::At(b, _)) => a == b,
102            (Task::Once(a), Task::Once(b)) => a == b,
103            _ => false,
104        }
105    }
106}
107
108impl From<&str> for Task {
109    ///
110    /// - wait=10
111    /// - interval=10
112    /// - at=10:00
113    /// - once=2024-01-01 10:00:00
114    fn from(s: &str) -> Self {
115        let parts = s.split("=").collect::<Vec<&str>>();
116        let task = parts[0];
117        let value = parts[1..].join("");
118        match task {
119            "wait" => {
120                let seconds = value.parse::<u64>().unwrap();
121                Task::Wait(seconds, None)
122            }
123            "interval" => {
124                let seconds = value.parse::<u64>().unwrap();
125                Task::Interval(seconds, None)
126            }
127            "at" => {
128                let format = format_description!("[hour]:[minute]");
129                let time = Time::parse(&value, &format).expect("parse time failed");
130                Task::At(time, None)
131            }
132            "once" => {
133                let format = format_description!(
134                    "[year]-[month]-[day] [hour]:[minute]:[second] [offset_hour sign:mandatory]"
135                );
136                println!("value: {}", value);
137                let datetime =
138                    OffsetDateTime::parse(&value, &format).expect("parse datetime failed");
139                Task::Once(datetime)
140            }
141            _ => Task::Wait(5, None),
142        }
143    }
144}
145
146impl From<String> for Task {
147    fn from(s: String) -> Self {
148        Self::from(s.as_str())
149    }
150}
151
152impl From<&String> for Task {
153    fn from(s: &String) -> Self {
154        Self::from(s.as_str())
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn test_from_string() {
164        let task = Task::from("wait=10");
165        assert_eq!(task, Task::Wait(10, None));
166        let task = Task::from("wait=10".to_string());
167        assert_eq!(task, Task::Wait(10, None));
168        let task = Task::from(&"wait=10".to_string());
169        assert_eq!(task, Task::Wait(10, None));
170    }
171
172    #[test]
173    fn test_from_string_interval() {
174        let task = Task::from("interval=10");
175        assert_eq!(task, Task::Interval(10, None));
176    }
177
178    #[test]
179    fn test_from_string_at() {
180        let task = Task::from("at=10:00");
181        assert_eq!(task, Task::At(Time::from_hms(10, 0, 0).unwrap(), None));
182    }
183
184    #[test]
185    fn test_from_string_once() {
186        let task = Task::from("once=2024-01-01 10:00:00 +08");
187        assert_eq!(
188            task,
189            Task::Once(OffsetDateTime::from_unix_timestamp(1704074400).unwrap())
190        );
191    }
192}
193
194impl fmt::Display for Task {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        match self {
197            Task::Wait(wait, skip) => {
198                let skip = skip
199                    .clone()
200                    .unwrap_or_default()
201                    .into_iter()
202                    .map(|s| s.to_string())
203                    .collect::<Vec<String>>()
204                    .join(", ");
205                write!(f, "wait: {} {}", wait, skip)
206            }
207            Task::Interval(interval, skip) => {
208                let skip = skip
209                    .clone()
210                    .unwrap_or_default()
211                    .into_iter()
212                    .map(|s| s.to_string())
213                    .collect::<Vec<String>>()
214                    .join(", ");
215                write!(f, "interval: {} {}", interval, skip)
216            }
217            Task::At(time, skip) => {
218                let skip = skip
219                    .clone()
220                    .unwrap_or_default()
221                    .into_iter()
222                    .map(|s| s.to_string())
223                    .collect::<Vec<String>>()
224                    .join(", ");
225                write!(f, "at: {} {}", time, skip)
226            }
227            Task::Once(time) => write!(f, "once: {}", time),
228        }
229    }
230}
231
232/// a task that can be scheduled
233#[async_trait]
234pub trait Notifiable: Sync + Send {
235    /// get the schedule type
236    fn get_schedule(&self) -> Task;
237
238    /// called when the task is scheduled
239    ///
240    /// Default cancel on first trigger
241    async fn on_time(&self, cancel: CancellationToken) {
242        cancel.cancel();
243    }
244
245    /// called when the task is skipped
246    async fn on_skip(&self, _cancel: CancellationToken) {
247        // do nothing
248    }
249}
250
251pub struct Scheduler {
252    cancel: CancellationToken,
253}
254
255impl Scheduler {
256    /// create a new scheduler
257    pub fn new() -> Self {
258        Self {
259            cancel: CancellationToken::new(),
260        }
261    }
262
263    /// run the task
264    pub async fn run<T: Notifiable + 'static>(&self, task: T) {
265        let schedule = task.get_schedule();
266        let cancel = self.cancel.clone();
267
268        match schedule {
269            Task::Wait(..) => {
270                Scheduler::run_wait(task, cancel.clone()).await;
271            }
272            Task::Interval(..) => {
273                Scheduler::run_interval(task, cancel.clone()).await;
274            }
275            Task::At(..) => {
276                Scheduler::run_at(task, cancel.clone()).await;
277            }
278            Task::Once(..) => {
279                Scheduler::run_once(task, cancel.clone()).await;
280            }
281        }
282    }
283
284    /// stop the scheduler
285    ///
286    /// this will cancel all the tasks
287    pub fn stop(&self) {
288        self.cancel.cancel();
289    }
290
291    /// get the cancel token
292    pub fn get_cancel(&self) -> CancellationToken {
293        self.cancel.clone()
294    }
295}
296
297fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
298    let mut next = now.replace_time(time);
299    if next < now {
300        next = next + time::Duration::days(1);
301    }
302    next
303}
304
305fn get_now() -> Option<OffsetDateTime> {
306    match OffsetDateTime::now_local() {
307        Ok(now) => Some(now),
308        Err(e) => {
309            error!("failed to get local time: {}", e);
310            None
311        }
312    }
313}
314
315impl Scheduler {
316    /// run wait task
317    #[instrument(skip(task, cancel))]
318    async fn run_wait<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
319        if let Task::Wait(wait, skip) = task.get_schedule() {
320            let task_ref = task;
321            tokio::task::spawn(async move {
322                select! {
323                    _ = cancel.cancelled() => {
324                        return;
325                    }
326                    _ = sleep(Duration::from_secs(wait)) => {
327                        tracing::debug!(wait, "wait seconds");
328                    }
329                };
330                if let Some(now) = get_now() {
331                    if let Some(skip) = skip {
332                        if skip.iter().any(|s| s.is_skip(now)) {
333                            task_ref.on_skip(cancel.clone()).await;
334                            return;
335                        }
336                    }
337                    task_ref.on_time(cancel.clone()).await;
338                }
339            });
340        }
341    }
342
343    /// run interval task
344    #[instrument(skip(task, cancel))]
345    async fn run_interval<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
346        if let Task::Interval(interval, skip) = task.get_schedule() {
347            let task_ref = task;
348            tokio::task::spawn(async move {
349                loop {
350                    select! {
351                        _ = cancel.cancelled() => {
352                            return;
353                        }
354                        _ = sleep(Duration::from_secs(interval)) => {
355                            tracing::debug!(interval, "interval");
356                        }
357                    };
358                    if let Some(now) = get_now() {
359                        if let Some(ref skip) = skip {
360                            if skip.iter().any(|s| s.is_skip(now)) {
361                                task_ref.on_skip(cancel.clone()).await;
362                                continue;
363                            }
364                        }
365                        task_ref.on_time(cancel.clone()).await;
366                    }
367                }
368            });
369        }
370    }
371
372    /// run at task
373    #[instrument(skip(task, cancel))]
374    async fn run_at<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
375        if let Task::At(time, skip) = task.get_schedule() {
376            let task_ref = task;
377            tokio::task::spawn(async move {
378                let now = if let Some(now) = get_now() {
379                    now
380                } else {
381                    return;
382                };
383                let mut next = get_next_time(now, time);
384                loop {
385                    let now = if let Some(now) = get_now() {
386                        now
387                    } else {
388                        return;
389                    };
390                    let seconds = (next - now).as_seconds_f64() as u64;
391                    let instant = Instant::now() + Duration::from_secs(seconds);
392                    select! {
393                        _ = cancel.cancelled() => {
394                            return;
395                        }
396                        _ = sleep_until(instant) => {
397                            tracing::debug!("at time");
398                        }
399                    }
400
401                    if let Some(skip) = skip.clone() {
402                        if skip.iter().any(|s| s.is_skip(now)) {
403                            task_ref.on_skip(cancel.clone()).await;
404                            return;
405                        }
406                    }
407
408                    task_ref.on_time(cancel.clone()).await;
409
410                    next += time::Duration::days(1);
411                }
412            });
413        }
414    }
415
416    /// run once task
417    #[instrument(skip(task, cancel))]
418    async fn run_once<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
419        if let Task::Once(next) = task.get_schedule() {
420            let task_ref = task;
421            tokio::task::spawn(async move {
422                if let Some(now) = get_now() {
423                    if next < now {
424                        task_ref.on_skip(cancel.clone()).await;
425                        return;
426                    }
427                    let seconds = (next - now).as_seconds_f64() as u64;
428                    let instant = Instant::now() + Duration::from_secs(seconds);
429
430                    select! {
431                        _ = cancel.cancelled() => {
432                            return;
433                        }
434                        _ = sleep_until(instant) => {
435                            tracing::debug!("once time");
436                        }
437                    }
438                    task_ref.on_time(cancel.clone()).await;
439                }
440            });
441        }
442    }
443}