easy_schedule/
lib.rs

1use async_trait::async_trait;
2use std::fmt::{self, Debug};
3use time::{Date, OffsetDateTime, Time};
4use tokio::{
5    select,
6    time::{Duration, Instant, sleep, sleep_until},
7};
8use tokio_util::sync::CancellationToken;
9use tracing::{error, instrument};
10
11pub mod prelude {
12    pub use super::{Notifiable, Scheduler, Skip, Task};
13    pub use async_trait::async_trait;
14    pub use tokio_util::sync::CancellationToken;
15}
16
17#[derive(Debug, Clone)]
18pub enum Skip {
19    /// skip fixed date
20    Date(Date),
21    /// skip date range
22    DateRange(Date, Date),
23    /// skip days
24    ///
25    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
26    Day(Vec<u8>),
27    /// skip days range
28    ///
29    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
30    DayRange(usize, usize),
31    /// skip fixed time
32    Time(Time),
33    /// skip time range
34    ///
35    /// end must be greater than start
36    TimeRange(Time, Time),
37    /// no skip
38    None,
39}
40
41impl Default for Skip {
42    fn default() -> Self {
43        Self::None
44    }
45}
46
47impl fmt::Display for Skip {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            Skip::Date(date) => write!(f, "date: {}", date),
51            Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
52            Skip::Day(day) => write!(f, "day: {:?}", day),
53            Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
54            Skip::Time(time) => write!(f, "time: {}", time),
55            Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
56            Skip::None => write!(f, "none"),
57        }
58    }
59}
60
61impl Skip {
62    /// check if the time is skipped
63    pub fn is_skip(&self, time: OffsetDateTime) -> bool {
64        match self {
65            Skip::Date(date) => time.date() == *date,
66            Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
67            Skip::Day(day) => day.contains(&(time.day() + 1)),
68            Skip::DayRange(start, end) => {
69                time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
70            }
71            Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
72            Skip::TimeRange(start, end) => {
73                assert!(start < end, "start must be less than end");
74                time.hour() >= start.hour()
75                    && time.hour() <= end.hour()
76                    && time.minute() >= start.minute()
77                    && time.minute() <= end.minute()
78            }
79            Skip::None => false,
80        }
81    }
82}
83
84#[derive(Debug, Clone)]
85pub enum Task {
86    /// wait seconds
87    Wait(u64, Option<Vec<Skip>>),
88    /// interval seconds
89    Interval(u64, Option<Vec<Skip>>),
90    /// at time
91    At(Time, Option<Vec<Skip>>),
92    /// exact time
93    Once(OffsetDateTime),
94}
95
96impl fmt::Display for Task {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        match self {
99            Task::Wait(wait, skip) => {
100                let skip = skip
101                    .clone()
102                    .unwrap_or_default()
103                    .into_iter()
104                    .map(|s| s.to_string())
105                    .collect::<Vec<String>>()
106                    .join(", ");
107                write!(f, "wait: {} {}", wait, skip)
108            }
109            Task::Interval(interval, skip) => {
110                let skip = skip
111                    .clone()
112                    .unwrap_or_default()
113                    .into_iter()
114                    .map(|s| s.to_string())
115                    .collect::<Vec<String>>()
116                    .join(", ");
117                write!(f, "interval: {} {}", interval, skip)
118            }
119            Task::At(time, skip) => {
120                let skip = skip
121                    .clone()
122                    .unwrap_or_default()
123                    .into_iter()
124                    .map(|s| s.to_string())
125                    .collect::<Vec<String>>()
126                    .join(", ");
127                write!(f, "at: {} {}", time, skip)
128            }
129            Task::Once(time) => write!(f, "once: {}", time),
130        }
131    }
132}
133
134/// a task that can be scheduled
135#[async_trait]
136pub trait Notifiable: Sync + Send {
137    /// get the schedule type
138    fn get_schedule(&self) -> Task;
139
140    /// called when the task is scheduled
141    ///
142    /// Default cancel on first trigger
143    async fn on_time(&self, cancel: CancellationToken) {
144        cancel.cancel();
145    }
146
147    /// called when the task is skipped
148    async fn on_skip(&self, _cancel: CancellationToken) {
149        // do nothing
150    }
151}
152
153pub struct Scheduler {
154    cancel: CancellationToken,
155}
156
157impl Scheduler {
158    /// create a new scheduler
159    pub fn new() -> Self {
160        Self {
161            cancel: CancellationToken::new(),
162        }
163    }
164
165    /// run the task
166    pub async fn run<T: Notifiable + 'static>(&self, task: T) {
167        let schedule = task.get_schedule();
168        let cancel = self.cancel.clone();
169
170        match schedule {
171            Task::Wait(..) => {
172                Scheduler::run_wait(task, cancel.clone()).await;
173            }
174            Task::Interval(..) => {
175                Scheduler::run_interval(task, cancel.clone()).await;
176            }
177            Task::At(..) => {
178                Scheduler::run_at(task, cancel.clone()).await;
179            }
180            Task::Once(..) => {
181                Scheduler::run_once(task, cancel.clone()).await;
182            }
183        }
184    }
185
186    /// stop the scheduler
187    ///
188    /// this will cancel all the tasks
189    pub fn stop(&self) {
190        self.cancel.cancel();
191    }
192
193    /// get the cancel token
194    pub fn get_cancel(&self) -> CancellationToken {
195        self.cancel.clone()
196    }
197}
198
199fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
200    let mut next = now.replace_time(time);
201    if next < now {
202        next = next + time::Duration::days(1);
203    }
204    next
205}
206
207fn get_now() -> Option<OffsetDateTime> {
208    match OffsetDateTime::now_local() {
209        Ok(now) => Some(now),
210        Err(e) => {
211            error!("failed to get local time: {}", e);
212            None
213        }
214    }
215}
216
217impl Scheduler {
218    /// run wait task
219    #[instrument(skip(task, cancel))]
220    async fn run_wait<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
221        if let Task::Wait(wait, skip) = task.get_schedule() {
222            let task_ref = task;
223            tokio::task::spawn(async move {
224                select! {
225                    _ = cancel.cancelled() => {
226                        return;
227                    }
228                    _ = sleep(Duration::from_secs(wait)) => {
229                        tracing::debug!(wait, "wait seconds");
230                    }
231                };
232                if let Some(now) = get_now() {
233                    if let Some(skip) = skip {
234                        if skip.iter().any(|s| s.is_skip(now)) {
235                            task_ref.on_skip(cancel.clone()).await;
236                            return;
237                        }
238                    }
239                    task_ref.on_time(cancel.clone()).await;
240                }
241            });
242        }
243    }
244
245    /// run interval task
246    #[instrument(skip(task, cancel))]
247    async fn run_interval<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
248        if let Task::Interval(interval, skip) = task.get_schedule() {
249            let task_ref = task;
250            tokio::task::spawn(async move {
251                loop {
252                    select! {
253                        _ = cancel.cancelled() => {
254                            return;
255                        }
256                        _ = sleep(Duration::from_secs(interval)) => {
257                            tracing::debug!(interval, "interval");
258                        }
259                    };
260                    if let Some(now) = get_now() {
261                        if let Some(ref skip) = skip {
262                            if skip.iter().any(|s| s.is_skip(now)) {
263                                task_ref.on_skip(cancel.clone()).await;
264                                continue;
265                            }
266                        }
267                        task_ref.on_time(cancel.clone()).await;
268                    }
269                }
270            });
271        }
272    }
273
274    /// run at task
275    #[instrument(skip(task, cancel))]
276    async fn run_at<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
277        if let Task::At(time, skip) = task.get_schedule() {
278            let task_ref = task;
279            tokio::task::spawn(async move {
280                let now = if let Some(now) = get_now() {
281                    now
282                } else {
283                    return;
284                };
285                let mut next = get_next_time(now, time);
286                loop {
287                    let now = if let Some(now) = get_now() {
288                        now
289                    } else {
290                        return;
291                    };
292                    let seconds = (next - now).as_seconds_f64() as u64;
293                    let instant = Instant::now() + Duration::from_secs(seconds);
294                    select! {
295                        _ = cancel.cancelled() => {
296                            return;
297                        }
298                        _ = sleep_until(instant) => {
299                            tracing::debug!("at time");
300                        }
301                    }
302
303                    if let Some(skip) = skip.clone() {
304                        if skip.iter().any(|s| s.is_skip(now)) {
305                            task_ref.on_skip(cancel.clone()).await;
306                            return;
307                        }
308                    }
309
310                    task_ref.on_time(cancel.clone()).await;
311
312                    next += time::Duration::days(1);
313                }
314            });
315        }
316    }
317
318    /// run once task
319    #[instrument(skip(task, cancel))]
320    async fn run_once<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
321        if let Task::Once(next) = task.get_schedule() {
322            let task_ref = task;
323            tokio::task::spawn(async move {
324                if let Some(now) = get_now() {
325                    if next < now {
326                        task_ref.on_skip(cancel.clone()).await;
327                        return;
328                    }
329                    let seconds = (next - now).as_seconds_f64() as u64;
330                    let instant = Instant::now() + Duration::from_secs(seconds);
331
332                    select! {
333                        _ = cancel.cancelled() => {
334                            return;
335                        }
336                        _ = sleep_until(instant) => {
337                            tracing::debug!("once time");
338                        }
339                    }
340                    task_ref.on_time(cancel.clone()).await;
341                }
342            });
343        }
344    }
345}