easy_schedule/
lib.rs

1use crossbeam_deque::{Injector, Steal};
2use std::{
3    boxed::Box,
4    fmt::{self, Debug},
5    sync::Arc,
6};
7use time::{Date, OffsetDateTime, Time};
8use tokio::{
9    select,
10    time::{Duration, Instant, sleep, sleep_until},
11};
12pub use tokio_util::sync::CancellationToken;
13use tracing::{error, instrument};
14
15#[derive(Debug, Clone)]
16pub enum Skip {
17    /// skip fixed date
18    Date(Date),
19    /// skip date range
20    DateRange(Date, Date),
21    /// skip days
22    ///
23    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
24    Day(usize),
25    /// skip days range
26    ///
27    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
28    DayRange(usize, usize),
29    /// skip fixed time
30    Time(Time),
31    /// skip time range
32    ///
33    /// end must be greater than start
34    TimeRange(Time, Time),
35    /// no skip
36    None,
37}
38
39impl Default for Skip {
40    fn default() -> Self {
41        Self::None
42    }
43}
44
45impl fmt::Display for Skip {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            Skip::Date(date) => write!(f, "date: {}", date),
49            Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
50            Skip::Day(day) => write!(f, "day: {}", day),
51            Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
52            Skip::Time(time) => write!(f, "time: {}", time),
53            Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
54            Skip::None => write!(f, "none"),
55        }
56    }
57}
58
59impl Skip {
60    /// check if the time is skipped
61    pub fn is_skip(&self, time: OffsetDateTime) -> bool {
62        match self {
63            Skip::Date(date) => time.date() == *date,
64            Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
65            Skip::Day(day) => time.day() + 1 == *day as u8,
66            Skip::DayRange(start, end) => {
67                time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
68            }
69            Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
70            Skip::TimeRange(start, end) => {
71                assert!(start < end, "start must be less than end");
72                time.hour() >= start.hour()
73                    && time.hour() <= end.hour()
74                    && time.minute() >= start.minute()
75                    && time.minute() <= end.minute()
76            }
77            Skip::None => false,
78        }
79    }
80}
81
82#[derive(Debug, Clone)]
83pub enum Task {
84    /// wait seconds
85    Wait(u64, Option<Skip>),
86    /// interval seconds
87    Interval(u64, Option<Skip>),
88    /// at time
89    At(Time, Option<Skip>),
90    /// exact time
91    Once(OffsetDateTime),
92}
93
94impl fmt::Display for Task {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Task::Wait(wait, skip) => {
98                write!(f, "wait: {} {}", wait, skip.clone().unwrap_or_default())
99            }
100            Task::Interval(interval, skip) => {
101                write!(
102                    f,
103                    "interval: {} {}",
104                    interval,
105                    skip.clone().unwrap_or_default()
106                )
107            }
108            Task::At(time, skip) => {
109                write!(f, "at: {} {}", time, skip.clone().unwrap_or_default())
110            }
111            Task::Once(time) => write!(f, "once: {}", time),
112        }
113    }
114}
115
116/// a task that can be scheduled
117pub trait ScheduledTask: Sync + Send {
118    /// get the schedule type
119    fn get_schedule(&self) -> Task;
120
121    /// called when the task is scheduled
122    fn on_time(&self, cancel: CancellationToken);
123
124    /// called when the task is skipped
125    fn on_skip(&self, cancel: CancellationToken);
126}
127
128pub struct Scheduler {
129    tasks: Injector<Arc<Box<dyn ScheduledTask>>>,
130    cancel: CancellationToken,
131}
132
133impl Scheduler {
134    pub fn new() -> Self {
135        Self {
136            tasks: Injector::new(),
137            cancel: CancellationToken::new(),
138        }
139    }
140
141    /// start the scheduler
142    pub async fn start(&self) {
143        self.check().await;
144
145        let cancel = self.cancel.clone();
146        select! {
147            _ = cancel.cancelled() => {
148                tracing::debug!("scheduler cancelled");
149            }
150            _ = tokio::signal::ctrl_c() => {
151                tracing::debug!("ctrl+c");
152                if !cancel.is_cancelled() {
153                    cancel.cancel();
154                }
155            }
156        }
157    }
158
159    pub async fn add_task(&self, task: Arc<Box<dyn ScheduledTask>>) {
160        self.tasks.push(task);
161        self.check().await;
162    }
163
164    /// 检查当前任务
165    async fn check(&self) {
166        loop {
167            let task = self.tasks.steal();
168            match task {
169                Steal::Success(task) => {
170                    let schedule = task.get_schedule();
171                    let cancel = self.cancel.clone();
172                    let task = task.clone();
173                    match schedule {
174                        Task::Wait(..) => {
175                            Scheduler::run_wait(task, cancel.clone()).await;
176                        }
177                        Task::Interval(..) => {
178                            Scheduler::run_interval(task, cancel.clone()).await;
179                        }
180                        Task::At(..) => {
181                            Scheduler::run_at(task, cancel.clone()).await;
182                        }
183                        Task::Once(..) => {
184                            Scheduler::run_once(task, cancel.clone()).await;
185                        }
186                    }
187                }
188                Steal::Retry => {
189                    break;
190                }
191                Steal::Empty => {
192                    break;
193                }
194            }
195        }
196    }
197
198    /// stop the scheduler
199    ///
200    /// this will cancel all the tasks
201    pub fn stop(&self) {
202        self.cancel.cancel();
203    }
204}
205
206fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
207    let mut next = now.replace_time(time);
208    if next < now {
209        next = next + time::Duration::days(1);
210    }
211    next
212}
213
214fn get_now() -> Option<OffsetDateTime> {
215    match OffsetDateTime::now_local() {
216        Ok(now) => Some(now),
217        Err(e) => {
218            error!("failed to get local time: {}", e);
219            None
220        }
221    }
222}
223
224impl Scheduler {
225    /// run wait task
226    #[instrument(skip(task, cancel))]
227    async fn run_wait(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
228        if let Task::Wait(wait, skip) = task.get_schedule() {
229            tokio::spawn(async move {
230                select! {
231                    _ = cancel.cancelled() => {
232                        return;
233                    }
234                    _ = sleep(Duration::from_secs(wait)) => {
235                        tracing::debug!(wait, "wait seconds");
236                    }
237                };
238                if let Some(now) = get_now() {
239                    if let Some(skip) = skip {
240                        if skip.is_skip(now) {
241                            task.on_skip(cancel.clone());
242                            return;
243                        }
244                    }
245                    task.on_time(cancel.clone());
246                }
247            });
248        }
249    }
250
251    /// run interval task
252    #[instrument(skip(task, cancel))]
253    async fn run_interval(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
254        if let Task::Interval(interval, skip) = task.get_schedule() {
255            tokio::spawn(async move {
256                loop {
257                    select! {
258                        _ = cancel.cancelled() => {
259                            return;
260                        }
261                        _ = sleep(Duration::from_secs(interval)) => {
262                            tracing::debug!(interval, "interval");
263                        }
264                    };
265                    if let Some(now) = get_now() {
266                        if let Some(ref skip) = skip {
267                            if skip.is_skip(now) {
268                                task.on_skip(cancel.clone());
269                                continue;
270                            }
271                        }
272                        task.on_time(cancel.clone());
273                    }
274                }
275            });
276        }
277    }
278
279    /// run at task
280    #[instrument(skip(task, cancel))]
281    async fn run_at(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
282        if let Task::At(time, skip) = task.get_schedule() {
283            tokio::spawn(async move {
284                let now = if let Some(now) = get_now() {
285                    now
286                } else {
287                    return;
288                };
289                let mut next = get_next_time(now, time);
290                loop {
291                    let now = if let Some(now) = get_now() {
292                        now
293                    } else {
294                        return;
295                    };
296                    let seconds = (next - now).as_seconds_f64() as u64;
297                    let instant = Instant::now() + Duration::from_secs(seconds);
298                    select! {
299                        _ = cancel.cancelled() => {
300                            return;
301                        }
302                        _ = sleep_until(instant) => {
303                            tracing::debug!("at time");
304                        }
305                    }
306
307                    if let Some(skip) = skip.clone() {
308                        if skip.is_skip(now) {
309                            task.on_skip(cancel.clone());
310                            return;
311                        }
312                    }
313
314                    task.on_time(cancel.clone());
315
316                    next += time::Duration::days(1);
317                }
318            });
319        }
320    }
321
322    /// run once task
323    #[instrument(skip(task, cancel))]
324    async fn run_once(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
325        if let Task::Once(next) = task.get_schedule() {
326            tokio::spawn(async move {
327                if let Some(now) = get_now() {
328                    if next < now {
329                        task.on_skip(cancel.clone());
330                        return;
331                    }
332                    let seconds = (next - now).as_seconds_f64() as u64;
333                    let instant = Instant::now() + Duration::from_secs(seconds);
334
335                    select! {
336                        _ = cancel.cancelled() => {
337                            return;
338                        }
339                        _ = sleep_until(instant) => {
340                            tracing::debug!("once time");
341                        }
342                    }
343                    task.on_time(cancel.clone());
344                }
345            });
346        }
347    }
348}