easy_schedule/
lib.rs

1use async_trait::async_trait;
2use std::fmt::{self, Debug};
3use time::{Date, OffsetDateTime, Time};
4use tokio::{
5    select,
6    time::{Duration, Instant, sleep, sleep_until},
7};
8use tokio_util::sync::CancellationToken;
9use tracing::{error, instrument};
10
11pub mod prelude {
12    pub use super::{Notifiable, Scheduler, Skip, Task};
13    pub use async_trait::async_trait;
14    pub use tokio_util::sync::CancellationToken;
15}
16
17#[derive(Debug, Clone)]
18pub enum Skip {
19    /// skip fixed date
20    Date(Date),
21    /// skip date range
22    DateRange(Date, Date),
23    /// skip days
24    ///
25    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
26    Day(Vec<u8>),
27    /// skip days range
28    ///
29    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
30    DayRange(usize, usize),
31    /// skip fixed time
32    Time(Time),
33    /// skip time range
34    ///
35    /// end must be greater than start
36    TimeRange(Time, Time),
37    /// no skip
38    None,
39}
40
41impl Default for Skip {
42    fn default() -> Self {
43        Self::None
44    }
45}
46
47impl fmt::Display for Skip {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            Skip::Date(date) => write!(f, "date: {}", date),
51            Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
52            Skip::Day(day) => write!(f, "day: {:?}", day),
53            Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
54            Skip::Time(time) => write!(f, "time: {}", time),
55            Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
56            Skip::None => write!(f, "none"),
57        }
58    }
59}
60
61impl Skip {
62    /// check if the time is skipped
63    pub fn is_skip(&self, time: OffsetDateTime) -> bool {
64        match self {
65            Skip::Date(date) => time.date() == *date,
66            Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
67            Skip::Day(day) => day.contains(&(time.day() + 1)),
68            Skip::DayRange(start, end) => {
69                time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
70            }
71            Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
72            Skip::TimeRange(start, end) => {
73                assert!(start < end, "start must be less than end");
74                time.hour() >= start.hour()
75                    && time.hour() <= end.hour()
76                    && time.minute() >= start.minute()
77                    && time.minute() <= end.minute()
78            }
79            Skip::None => false,
80        }
81    }
82}
83
84#[derive(Debug, Clone)]
85pub enum Task {
86    /// wait seconds
87    Wait(u64, Option<Vec<Skip>>),
88    /// interval seconds
89    Interval(u64, Option<Vec<Skip>>),
90    /// at time
91    At(Time, Option<Vec<Skip>>),
92    /// exact time
93    Once(OffsetDateTime),
94}
95
96impl fmt::Display for Task {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        match self {
99            Task::Wait(wait, skip) => {
100                let skip = skip
101                    .clone()
102                    .unwrap_or_default()
103                    .into_iter()
104                    .map(|s| s.to_string())
105                    .collect::<Vec<String>>()
106                    .join(", ");
107                write!(f, "wait: {} {}", wait, skip)
108            }
109            Task::Interval(interval, skip) => {
110                let skip = skip
111                    .clone()
112                    .unwrap_or_default()
113                    .into_iter()
114                    .map(|s| s.to_string())
115                    .collect::<Vec<String>>()
116                    .join(", ");
117                write!(f, "interval: {} {}", interval, skip)
118            }
119            Task::At(time, skip) => {
120                let skip = skip
121                    .clone()
122                    .unwrap_or_default()
123                    .into_iter()
124                    .map(|s| s.to_string())
125                    .collect::<Vec<String>>()
126                    .join(", ");
127                write!(f, "at: {} {}", time, skip)
128            }
129            Task::Once(time) => write!(f, "once: {}", time),
130        }
131    }
132}
133
134/// a task that can be scheduled
135#[async_trait]
136pub trait Notifiable: Sync + Send {
137    /// get the schedule type
138    fn get_schedule(&self) -> Task;
139
140    /// called when the task is scheduled
141    async fn on_time(&self, cancel: CancellationToken);
142
143    /// called when the task is skipped
144    async fn on_skip(&self, cancel: CancellationToken);
145}
146
147pub struct Scheduler {
148    cancel: CancellationToken,
149}
150
151impl Scheduler {
152    /// create a new scheduler
153    pub fn new() -> Self {
154        Self {
155            cancel: CancellationToken::new(),
156        }
157    }
158
159    /// run the task
160    pub async fn run<T: Notifiable + 'static>(&self, task: T) {
161        let schedule = task.get_schedule();
162        let cancel = self.cancel.clone();
163
164        match schedule {
165            Task::Wait(..) => {
166                Scheduler::run_wait(task, cancel.clone()).await;
167            }
168            Task::Interval(..) => {
169                Scheduler::run_interval(task, cancel.clone()).await;
170            }
171            Task::At(..) => {
172                Scheduler::run_at(task, cancel.clone()).await;
173            }
174            Task::Once(..) => {
175                Scheduler::run_once(task, cancel.clone()).await;
176            }
177        }
178    }
179
180    /// stop the scheduler
181    ///
182    /// this will cancel all the tasks
183    pub fn stop(&self) {
184        self.cancel.cancel();
185    }
186
187    /// get the cancel token
188    pub fn get_cancel(&self) -> CancellationToken {
189        self.cancel.clone()
190    }
191}
192
193fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
194    let mut next = now.replace_time(time);
195    if next < now {
196        next = next + time::Duration::days(1);
197    }
198    next
199}
200
201fn get_now() -> Option<OffsetDateTime> {
202    match OffsetDateTime::now_local() {
203        Ok(now) => Some(now),
204        Err(e) => {
205            error!("failed to get local time: {}", e);
206            None
207        }
208    }
209}
210
211impl Scheduler {
212    /// run wait task
213    #[instrument(skip(task, cancel))]
214    async fn run_wait<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
215        if let Task::Wait(wait, skip) = task.get_schedule() {
216            let task_ref = task;
217            tokio::task::spawn(async move {
218                select! {
219                    _ = cancel.cancelled() => {
220                        return;
221                    }
222                    _ = sleep(Duration::from_secs(wait)) => {
223                        tracing::debug!(wait, "wait seconds");
224                    }
225                };
226                if let Some(now) = get_now() {
227                    if let Some(skip) = skip {
228                        if skip.iter().any(|s| s.is_skip(now)) {
229                            task_ref.on_skip(cancel.clone()).await;
230                            return;
231                        }
232                    }
233                    task_ref.on_time(cancel.clone()).await;
234                }
235            });
236        }
237    }
238
239    /// run interval task
240    #[instrument(skip(task, cancel))]
241    async fn run_interval<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
242        if let Task::Interval(interval, skip) = task.get_schedule() {
243            let task_ref = task;
244            tokio::task::spawn(async move {
245                loop {
246                    select! {
247                        _ = cancel.cancelled() => {
248                            return;
249                        }
250                        _ = sleep(Duration::from_secs(interval)) => {
251                            tracing::debug!(interval, "interval");
252                        }
253                    };
254                    if let Some(now) = get_now() {
255                        if let Some(ref skip) = skip {
256                            if skip.iter().any(|s| s.is_skip(now)) {
257                                task_ref.on_skip(cancel.clone()).await;
258                                continue;
259                            }
260                        }
261                        task_ref.on_time(cancel.clone()).await;
262                    }
263                }
264            });
265        }
266    }
267
268    /// run at task
269    #[instrument(skip(task, cancel))]
270    async fn run_at<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
271        if let Task::At(time, skip) = task.get_schedule() {
272            let task_ref = task;
273            tokio::task::spawn(async move {
274                let now = if let Some(now) = get_now() {
275                    now
276                } else {
277                    return;
278                };
279                let mut next = get_next_time(now, time);
280                loop {
281                    let now = if let Some(now) = get_now() {
282                        now
283                    } else {
284                        return;
285                    };
286                    let seconds = (next - now).as_seconds_f64() as u64;
287                    let instant = Instant::now() + Duration::from_secs(seconds);
288                    select! {
289                        _ = cancel.cancelled() => {
290                            return;
291                        }
292                        _ = sleep_until(instant) => {
293                            tracing::debug!("at time");
294                        }
295                    }
296
297                    if let Some(skip) = skip.clone() {
298                        if skip.iter().any(|s| s.is_skip(now)) {
299                            task_ref.on_skip(cancel.clone()).await;
300                            return;
301                        }
302                    }
303
304                    task_ref.on_time(cancel.clone()).await;
305
306                    next += time::Duration::days(1);
307                }
308            });
309        }
310    }
311
312    /// run once task
313    #[instrument(skip(task, cancel))]
314    async fn run_once<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
315        if let Task::Once(next) = task.get_schedule() {
316            let task_ref = task;
317            tokio::task::spawn(async move {
318                if let Some(now) = get_now() {
319                    if next < now {
320                        task_ref.on_skip(cancel.clone()).await;
321                        return;
322                    }
323                    let seconds = (next - now).as_seconds_f64() as u64;
324                    let instant = Instant::now() + Duration::from_secs(seconds);
325
326                    select! {
327                        _ = cancel.cancelled() => {
328                            return;
329                        }
330                        _ = sleep_until(instant) => {
331                            tracing::debug!("once time");
332                        }
333                    }
334                    task_ref.on_time(cancel.clone()).await;
335                }
336            });
337        }
338    }
339}