easy_schedule/
lib.rs

1use async_trait::async_trait;
2use std::fmt::{self, Debug};
3use time::{Date, OffsetDateTime, Time, UtcOffset, macros::format_description};
4use tokio::{
5    select,
6    time::{Duration, Instant, sleep, sleep_until},
7};
8use tokio_util::sync::CancellationToken;
9use tracing::instrument;
10
11pub mod prelude {
12    pub use super::{Notifiable, Scheduler, Skip, Task};
13    pub use async_trait::async_trait;
14    pub use tokio_util::sync::CancellationToken;
15}
16
17#[derive(Debug, Clone, PartialEq)]
18pub enum Skip {
19    /// skip fixed date
20    Date(Date),
21    /// skip date range
22    DateRange(Date, Date),
23    /// skip days
24    ///
25    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
26    Day(Vec<u8>),
27    /// skip days range
28    ///
29    /// 1: Monday, 2: Tuesday, 3: Wednesday, 4: Thursday, 5: Friday, 6: Saturday, 7: Sunday
30    DayRange(usize, usize),
31    /// skip fixed time
32    Time(Time),
33    /// skip time range
34    ///
35    /// end must be greater than start
36    TimeRange(Time, Time),
37    /// no skip
38    None,
39}
40
41impl Default for Skip {
42    fn default() -> Self {
43        Self::None
44    }
45}
46
47impl fmt::Display for Skip {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            Skip::Date(date) => write!(f, "date: {}", date),
51            Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
52            Skip::Day(day) => write!(f, "day: {:?}", day),
53            Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
54            Skip::Time(time) => write!(f, "time: {}", time),
55            Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
56            Skip::None => write!(f, "none"),
57        }
58    }
59}
60
61impl Skip {
62    /// check if the time is skipped
63    pub fn is_skip(&self, time: OffsetDateTime) -> bool {
64        match self {
65            Skip::Date(date) => time.date() == *date,
66            Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
67            Skip::Day(day) => day.contains(&(time.weekday().number_from_monday())),
68            Skip::DayRange(start, end) => {
69                let weekday = time.weekday().number_from_monday() as usize;
70                weekday >= *start && weekday <= *end
71            }
72            Skip::Time(skip_time) => time.time() == *skip_time,
73            Skip::TimeRange(start, end) => {
74                let current_time = time.time();
75                if start <= end {
76                    // 同一天内的时间范围
77                    current_time >= *start && current_time <= *end
78                } else {
79                    // 跨日期的时间范围 (如 22:00 - 06:00)
80                    current_time >= *start || current_time <= *end
81                }
82            }
83            Skip::None => false,
84        }
85    }
86}
87
88#[derive(Debug, Clone)]
89pub enum Task {
90    /// wait seconds
91    Wait(u64, Option<Vec<Skip>>),
92    /// interval seconds
93    Interval(u64, Option<Vec<Skip>>),
94    /// at time
95    At(Time, Option<Vec<Skip>>),
96    /// exact time
97    Once(OffsetDateTime, Option<Vec<Skip>>),
98}
99
100impl PartialEq for Task {
101    fn eq(&self, other: &Self) -> bool {
102        match (self, other) {
103            (Task::Wait(a, skip_a), Task::Wait(b, skip_b)) => a == b && skip_a == skip_b,
104            (Task::Interval(a, skip_a), Task::Interval(b, skip_b)) => a == b && skip_a == skip_b,
105            (Task::At(a, skip_a), Task::At(b, skip_b)) => a == b && skip_a == skip_b,
106            (Task::Once(a, skip_a), Task::Once(b, skip_b)) => a == b && skip_a == skip_b,
107            _ => false,
108        }
109    }
110}
111
112impl Task {
113    /// Parse a task from a string with detailed error reporting.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use easy_schedule::Task;
119    ///
120    /// let task = Task::parse("wait(10)").unwrap();
121    ///
122    /// match Task::parse("invalid") {
123    ///     Ok(task) => println!("Success: {}", task),
124    ///     Err(err) => println!("Error: {}", err),
125    /// }
126    /// ```
127    pub fn parse(s: &str) -> Result<Self, String> {
128        let s = s.trim();
129
130        // Find the function name and arguments
131        let open_paren = s.find('(').ok_or_else(|| {
132            format!(
133                "Invalid task format: '{}'. Expected format like 'wait(10)'",
134                s
135            )
136        })?;
137
138        let close_paren = s
139            .rfind(')')
140            .ok_or_else(|| format!("Missing closing parenthesis in: '{}'", s))?;
141
142        if close_paren <= open_paren {
143            return Err(format!("Invalid parentheses in: '{}'", s));
144        }
145
146        let function_name = s[..open_paren].trim();
147        let args = s[open_paren + 1..close_paren].trim();
148
149        match function_name {
150            "wait" => {
151                let seconds = args
152                    .parse::<u64>()
153                    .map_err(|_| format!("Invalid seconds value '{}' in wait({})", args, args))?;
154                Ok(Task::Wait(seconds, None))
155            }
156            "interval" => {
157                let seconds = args.parse::<u64>().map_err(|_| {
158                    format!("Invalid seconds value '{}' in interval({})", args, args)
159                })?;
160                Ok(Task::Interval(seconds, None))
161            }
162            "at" => {
163                let format = format_description!("[hour]:[minute]");
164                let time = Time::parse(args, &format).map_err(|_| {
165                    format!(
166                        "Invalid time format '{}' in at({}). Expected format: HH:MM",
167                        args, args
168                    )
169                })?;
170                Ok(Task::At(time, None))
171            }
172            "once" => {
173                let format = format_description!(
174                    "[year]-[month]-[day] [hour]:[minute]:[second] [offset_hour sign:mandatory]"
175                );
176                let datetime = OffsetDateTime::parse(args, &format)
177                    .map_err(|_| format!("Invalid datetime format '{}' in once({}). Expected format: YYYY-MM-DD HH:MM:SS +HH", args, args))?;
178                Ok(Task::Once(datetime, None))
179            }
180            _ => Err(format!(
181                "Unknown task type '{}'. Supported types: wait, interval, at, once",
182                function_name
183            )),
184        }
185    }
186}
187
188impl From<&str> for Task {
189    /// Parse a task from a string, panicking on parse errors.
190    ///
191    /// For better error handling, consider using `Task::parse()` instead.
192    ///
193    /// # Panics
194    ///
195    /// Panics if the string cannot be parsed as a valid task.
196    fn from(s: &str) -> Self {
197        Task::parse(s).unwrap_or_else(|err| {
198            panic!("Failed to parse task from string '{}': {}", s, err);
199        })
200    }
201}
202
203impl From<String> for Task {
204    fn from(s: String) -> Self {
205        Self::from(s.as_str())
206    }
207}
208
209impl From<&String> for Task {
210    fn from(s: &String) -> Self {
211        Self::from(s.as_str())
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn test_from_string() {
221        let task = Task::from("wait(10)");
222        assert_eq!(task, Task::Wait(10, None));
223        let task = Task::from("wait(10)".to_string());
224        assert_eq!(task, Task::Wait(10, None));
225        let task = Task::from(&"wait(10)".to_string());
226        assert_eq!(task, Task::Wait(10, None));
227    }
228
229    #[test]
230    fn test_from_string_interval() {
231        let task = Task::from("interval(10)");
232        assert_eq!(task, Task::Interval(10, None));
233    }
234
235    #[test]
236    fn test_from_string_at() {
237        let task = Task::from("at(10:00)");
238        assert_eq!(task, Task::At(Time::from_hms(10, 0, 0).unwrap(), None));
239    }
240
241    #[test]
242    fn test_from_string_once() {
243        let task = Task::from("once(2024-01-01 10:00:00 +08)");
244        assert_eq!(
245            task,
246            Task::Once(
247                OffsetDateTime::from_unix_timestamp(1704074400).unwrap(),
248                None
249            )
250        );
251    }
252}
253
254impl fmt::Display for Task {
255    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256        match self {
257            Task::Wait(wait, skip) => {
258                let skip = skip
259                    .clone()
260                    .unwrap_or_default()
261                    .into_iter()
262                    .map(|s| s.to_string())
263                    .collect::<Vec<String>>()
264                    .join(", ");
265                write!(f, "wait: {} {}", wait, skip)
266            }
267            Task::Interval(interval, skip) => {
268                let skip = skip
269                    .clone()
270                    .unwrap_or_default()
271                    .into_iter()
272                    .map(|s| s.to_string())
273                    .collect::<Vec<String>>()
274                    .join(", ");
275                write!(f, "interval: {} {}", interval, skip)
276            }
277            Task::At(time, skip) => {
278                let skip = skip
279                    .clone()
280                    .unwrap_or_default()
281                    .into_iter()
282                    .map(|s| s.to_string())
283                    .collect::<Vec<String>>()
284                    .join(", ");
285                write!(f, "at: {} {}", time, skip)
286            }
287            Task::Once(time, skip) => {
288                let skip = skip
289                    .clone()
290                    .unwrap_or_default()
291                    .into_iter()
292                    .map(|s| s.to_string())
293                    .collect::<Vec<String>>()
294                    .join(", ");
295                write!(f, "once: {} {}", time, skip)
296            }
297        }
298    }
299}
300
301/// a task that can be scheduled
302#[async_trait]
303pub trait Notifiable: Sync + Send + Debug {
304    /// get the schedule type
305    fn get_schedule(&self) -> Task;
306
307    /// called when the task is scheduled
308    ///
309    /// Default cancel on first trigger
310    async fn on_time(&self, cancel: CancellationToken) {
311        cancel.cancel();
312    }
313
314    /// called when the task is skipped
315    async fn on_skip(&self, _cancel: CancellationToken) {
316        // do nothing
317    }
318}
319
320pub struct Scheduler {
321    cancel: CancellationToken,
322    timezone_minutes: i16,
323}
324
325impl Scheduler {
326    /// create a new scheduler with default timezone (+8)
327    pub fn new() -> Self {
328        Self::with_timezone(8, 0)
329    }
330
331    /// create a new scheduler with specified timezone hours offset
332    pub fn with_timezone(timezone_hours: i8, timezone_minutes: i8) -> Self {
333        Self {
334            cancel: CancellationToken::new(),
335            timezone_minutes: (timezone_hours as i16) * 60 + (timezone_minutes as i16),
336        }
337    }
338
339    /// create a new scheduler with timezone offset in minutes
340    pub fn with_timezone_minutes(timezone_minutes: i16) -> Self {
341        Self {
342            cancel: CancellationToken::new(),
343            timezone_minutes,
344        }
345    }
346
347    /// run the task
348    pub async fn run<T: Notifiable + 'static>(&self, task: T) {
349        let schedule = task.get_schedule();
350        let cancel = self.cancel.clone();
351        let timezone_minutes = self.timezone_minutes;
352
353        match schedule {
354            Task::Wait(..) => {
355                Scheduler::run_wait(task, cancel.clone(), timezone_minutes).await;
356            }
357            Task::Interval(..) => {
358                Scheduler::run_interval(task, cancel.clone(), timezone_minutes).await;
359            }
360            Task::At(..) => {
361                Scheduler::run_at(task, cancel.clone(), timezone_minutes).await;
362            }
363            Task::Once(..) => {
364                Scheduler::run_once(task, cancel.clone(), timezone_minutes).await;
365            }
366        }
367    }
368
369    /// stop the scheduler
370    ///
371    /// this will cancel all the tasks
372    pub fn stop(&self) {
373        self.cancel.cancel();
374    }
375
376    /// get the cancel token
377    pub fn get_cancel(&self) -> CancellationToken {
378        self.cancel.clone()
379    }
380}
381
382fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
383    let mut next = now.replace_time(time);
384    if next < now {
385        next = next + time::Duration::days(1);
386    }
387    next
388}
389
390fn get_now(timezone_minutes: i16) -> Result<OffsetDateTime, time::error::ComponentRange> {
391    let hours = timezone_minutes / 60;
392    let minutes = timezone_minutes % 60;
393    let offset = UtcOffset::from_hms(hours as i8, minutes as i8, 0)?;
394    Ok(OffsetDateTime::now_utc().to_offset(offset))
395}
396
397impl Scheduler {
398    /// run wait task
399    #[instrument(skip(cancel))]
400    async fn run_wait<T: Notifiable + 'static>(
401        task: T,
402        cancel: CancellationToken,
403        timezone_minutes: i16,
404    ) {
405        if let Task::Wait(wait, skip) = task.get_schedule() {
406            let task_ref = task;
407            tokio::task::spawn(async move {
408                select! {
409                    _ = cancel.cancelled() => {
410                        return;
411                    }
412                    _ = sleep(Duration::from_secs(wait)) => {
413                        tracing::debug!(wait, "wait seconds");
414                    }
415                };
416                let now = get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
417                if let Some(skip) = skip {
418                    if skip.iter().any(|s| s.is_skip(now)) {
419                        task_ref.on_skip(cancel.clone()).await;
420                        return;
421                    }
422                }
423                task_ref.on_time(cancel.clone()).await;
424            });
425        }
426    }
427
428    /// run interval task
429    #[instrument(skip(cancel))]
430    async fn run_interval<T: Notifiable + 'static>(
431        task: T,
432        cancel: CancellationToken,
433        timezone_minutes: i16,
434    ) {
435        if let Task::Interval(interval, skip) = task.get_schedule() {
436            let task_ref = task;
437            tokio::task::spawn(async move {
438                loop {
439                    select! {
440                        _ = cancel.cancelled() => {
441                            return;
442                        }
443                        _ = sleep(Duration::from_secs(interval)) => {
444                            tracing::debug!(interval, "interval");
445                        }
446                    };
447                    let now =
448                        get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
449                    if let Some(ref skip) = skip {
450                        if skip.iter().any(|s| s.is_skip(now)) {
451                            task_ref.on_skip(cancel.clone()).await;
452                            continue;
453                        }
454                    }
455                    task_ref.on_time(cancel.clone()).await;
456                }
457            });
458        }
459    }
460
461    /// run at task
462    #[instrument(skip(cancel))]
463    async fn run_at<T: Notifiable + 'static>(
464        task: T,
465        cancel: CancellationToken,
466        timezone_minutes: i16,
467    ) {
468        if let Task::At(time, skip) = task.get_schedule() {
469            let task_ref = task;
470            tokio::task::spawn(async move {
471                let now = get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
472                let mut next = get_next_time(now, time);
473                loop {
474                    let now =
475                        get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
476                    let seconds = (next - now).as_seconds_f64() as u64;
477                    let instant = Instant::now() + Duration::from_secs(seconds);
478                    select! {
479                        _ = cancel.cancelled() => {
480                            return;
481                        }
482                        _ = sleep_until(instant) => {
483                            tracing::debug!("at time");
484                        }
485                    }
486
487                    if let Some(skip) = skip.clone() {
488                        if skip.iter().any(|s| s.is_skip(next)) {
489                            task_ref.on_skip(cancel.clone()).await;
490                            next += time::Duration::days(1);
491                            continue;
492                        }
493                    }
494
495                    task_ref.on_time(cancel.clone()).await;
496
497                    next += time::Duration::days(1);
498                }
499            });
500        }
501    }
502
503    /// run once task
504    #[instrument(skip(task, cancel))]
505    async fn run_once<T: Notifiable + 'static>(
506        task: T,
507        cancel: CancellationToken,
508        timezone_minutes: i16,
509    ) {
510        if let Task::Once(next, skip) = task.get_schedule() {
511            let task_ref = task;
512            tokio::task::spawn(async move {
513                let now = get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
514                if next < now {
515                    task_ref.on_skip(cancel.clone()).await;
516                    return;
517                }
518
519                if let Some(skip) = skip {
520                    if skip.iter().any(|s| s.is_skip(next)) {
521                        task_ref.on_skip(cancel.clone()).await;
522                        return;
523                    }
524                }
525                let seconds = (next - now).as_seconds_f64();
526                let instant = Instant::now() + Duration::from_secs(seconds as u64);
527
528                select! {
529                    _ = cancel.cancelled() => {
530                        return;
531                    }
532                    _ = sleep_until(instant) => {
533                        tracing::debug!("once time");
534                    }
535                }
536                task_ref.on_time(cancel.clone()).await;
537            });
538        }
539    }
540}