1use async_trait::async_trait;
2use std::fmt::{self, Debug};
3use time::{Date, OffsetDateTime, Time, UtcOffset, macros::format_description};
4use tokio::{
5 select,
6 time::{Duration, Instant, sleep, sleep_until},
7};
8use tokio_util::sync::CancellationToken;
9use tracing::instrument;
10
11pub mod prelude {
12 pub use super::{Notifiable, Scheduler, Skip, Task};
13 pub use async_trait::async_trait;
14 pub use tokio_util::sync::CancellationToken;
15}
16
17#[derive(Debug, Clone, PartialEq)]
18pub enum Skip {
19 Date(Date),
21 DateRange(Date, Date),
23 Day(Vec<u8>),
27 DayRange(usize, usize),
31 Time(Time),
33 TimeRange(Time, Time),
37 None,
39}
40
41impl Default for Skip {
42 fn default() -> Self {
43 Self::None
44 }
45}
46
47impl fmt::Display for Skip {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 Skip::Date(date) => write!(f, "date: {}", date),
51 Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
52 Skip::Day(day) => write!(f, "day: {:?}", day),
53 Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
54 Skip::Time(time) => write!(f, "time: {}", time),
55 Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
56 Skip::None => write!(f, "none"),
57 }
58 }
59}
60
61impl Skip {
62 pub fn is_skip(&self, time: OffsetDateTime) -> bool {
64 match self {
65 Skip::Date(date) => time.date() == *date,
66 Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
67 Skip::Day(day) => day.contains(&(time.weekday().number_from_monday())),
68 Skip::DayRange(start, end) => {
69 let weekday = time.weekday().number_from_monday() as usize;
70 weekday >= *start && weekday <= *end
71 }
72 Skip::Time(skip_time) => time.time() == *skip_time,
73 Skip::TimeRange(start, end) => {
74 let current_time = time.time();
75 if start <= end {
76 current_time >= *start && current_time <= *end
78 } else {
79 current_time >= *start || current_time <= *end
81 }
82 }
83 Skip::None => false,
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
89pub enum Task {
90 Wait(u64, Option<Vec<Skip>>),
92 Interval(u64, Option<Vec<Skip>>),
94 At(Time, Option<Vec<Skip>>),
96 Once(OffsetDateTime, Option<Vec<Skip>>),
98}
99
100impl PartialEq for Task {
101 fn eq(&self, other: &Self) -> bool {
102 match (self, other) {
103 (Task::Wait(a, skip_a), Task::Wait(b, skip_b)) => a == b && skip_a == skip_b,
104 (Task::Interval(a, skip_a), Task::Interval(b, skip_b)) => a == b && skip_a == skip_b,
105 (Task::At(a, skip_a), Task::At(b, skip_b)) => a == b && skip_a == skip_b,
106 (Task::Once(a, skip_a), Task::Once(b, skip_b)) => a == b && skip_a == skip_b,
107 _ => false,
108 }
109 }
110}
111
112impl Task {
113 pub fn parse(s: &str) -> Result<Self, String> {
128 let s = s.trim();
129
130 let open_paren = s.find('(').ok_or_else(|| {
132 format!(
133 "Invalid task format: '{}'. Expected format like 'wait(10)'",
134 s
135 )
136 })?;
137
138 let close_paren = s
139 .rfind(')')
140 .ok_or_else(|| format!("Missing closing parenthesis in: '{}'", s))?;
141
142 if close_paren <= open_paren {
143 return Err(format!("Invalid parentheses in: '{}'", s));
144 }
145
146 let function_name = s[..open_paren].trim();
147 let args = s[open_paren + 1..close_paren].trim();
148
149 match function_name {
150 "wait" => {
151 let seconds = args
152 .parse::<u64>()
153 .map_err(|_| format!("Invalid seconds value '{}' in wait({})", args, args))?;
154 Ok(Task::Wait(seconds, None))
155 }
156 "interval" => {
157 let seconds = args.parse::<u64>().map_err(|_| {
158 format!("Invalid seconds value '{}' in interval({})", args, args)
159 })?;
160 Ok(Task::Interval(seconds, None))
161 }
162 "at" => {
163 let format = format_description!("[hour]:[minute]");
164 let time = Time::parse(args, &format).map_err(|_| {
165 format!(
166 "Invalid time format '{}' in at({}). Expected format: HH:MM",
167 args, args
168 )
169 })?;
170 Ok(Task::At(time, None))
171 }
172 "once" => {
173 let format = format_description!(
174 "[year]-[month]-[day] [hour]:[minute]:[second] [offset_hour sign:mandatory]"
175 );
176 let datetime = OffsetDateTime::parse(args, &format)
177 .map_err(|_| format!("Invalid datetime format '{}' in once({}). Expected format: YYYY-MM-DD HH:MM:SS +HH", args, args))?;
178 Ok(Task::Once(datetime, None))
179 }
180 _ => Err(format!(
181 "Unknown task type '{}'. Supported types: wait, interval, at, once",
182 function_name
183 )),
184 }
185 }
186}
187
188impl From<&str> for Task {
189 fn from(s: &str) -> Self {
197 Task::parse(s).unwrap_or_else(|err| {
198 panic!("Failed to parse task from string '{}': {}", s, err);
199 })
200 }
201}
202
203impl From<String> for Task {
204 fn from(s: String) -> Self {
205 Self::from(s.as_str())
206 }
207}
208
209impl From<&String> for Task {
210 fn from(s: &String) -> Self {
211 Self::from(s.as_str())
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn test_from_string() {
221 let task = Task::from("wait(10)");
222 assert_eq!(task, Task::Wait(10, None));
223 let task = Task::from("wait(10)".to_string());
224 assert_eq!(task, Task::Wait(10, None));
225 let task = Task::from(&"wait(10)".to_string());
226 assert_eq!(task, Task::Wait(10, None));
227 }
228
229 #[test]
230 fn test_from_string_interval() {
231 let task = Task::from("interval(10)");
232 assert_eq!(task, Task::Interval(10, None));
233 }
234
235 #[test]
236 fn test_from_string_at() {
237 let task = Task::from("at(10:00)");
238 assert_eq!(task, Task::At(Time::from_hms(10, 0, 0).unwrap(), None));
239 }
240
241 #[test]
242 fn test_from_string_once() {
243 let task = Task::from("once(2024-01-01 10:00:00 +08)");
244 assert_eq!(
245 task,
246 Task::Once(
247 OffsetDateTime::from_unix_timestamp(1704074400).unwrap(),
248 None
249 )
250 );
251 }
252}
253
254impl fmt::Display for Task {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 match self {
257 Task::Wait(wait, skip) => {
258 let skip = skip
259 .clone()
260 .unwrap_or_default()
261 .into_iter()
262 .map(|s| s.to_string())
263 .collect::<Vec<String>>()
264 .join(", ");
265 write!(f, "wait: {} {}", wait, skip)
266 }
267 Task::Interval(interval, skip) => {
268 let skip = skip
269 .clone()
270 .unwrap_or_default()
271 .into_iter()
272 .map(|s| s.to_string())
273 .collect::<Vec<String>>()
274 .join(", ");
275 write!(f, "interval: {} {}", interval, skip)
276 }
277 Task::At(time, skip) => {
278 let skip = skip
279 .clone()
280 .unwrap_or_default()
281 .into_iter()
282 .map(|s| s.to_string())
283 .collect::<Vec<String>>()
284 .join(", ");
285 write!(f, "at: {} {}", time, skip)
286 }
287 Task::Once(time, skip) => {
288 let skip = skip
289 .clone()
290 .unwrap_or_default()
291 .into_iter()
292 .map(|s| s.to_string())
293 .collect::<Vec<String>>()
294 .join(", ");
295 write!(f, "once: {} {}", time, skip)
296 }
297 }
298 }
299}
300
301#[async_trait]
303pub trait Notifiable: Sync + Send + Debug {
304 fn get_schedule(&self) -> Task;
306
307 async fn on_time(&self, cancel: CancellationToken) {
311 cancel.cancel();
312 }
313
314 async fn on_skip(&self, _cancel: CancellationToken) {
316 }
318}
319
320pub struct Scheduler {
321 cancel: CancellationToken,
322 timezone_minutes: i16,
323}
324
325impl Scheduler {
326 pub fn new() -> Self {
328 Self::with_timezone(8, 0)
329 }
330
331 pub fn with_timezone(timezone_hours: i8, timezone_minutes: i8) -> Self {
333 Self {
334 cancel: CancellationToken::new(),
335 timezone_minutes: (timezone_hours as i16) * 60 + (timezone_minutes as i16),
336 }
337 }
338
339 pub fn with_timezone_minutes(timezone_minutes: i16) -> Self {
341 Self {
342 cancel: CancellationToken::new(),
343 timezone_minutes,
344 }
345 }
346
347 pub async fn run<T: Notifiable + 'static>(&self, task: T) {
349 let schedule = task.get_schedule();
350 let cancel = self.cancel.clone();
351 let timezone_minutes = self.timezone_minutes;
352
353 match schedule {
354 Task::Wait(..) => {
355 Scheduler::run_wait(task, cancel.clone(), timezone_minutes).await;
356 }
357 Task::Interval(..) => {
358 Scheduler::run_interval(task, cancel.clone(), timezone_minutes).await;
359 }
360 Task::At(..) => {
361 Scheduler::run_at(task, cancel.clone(), timezone_minutes).await;
362 }
363 Task::Once(..) => {
364 Scheduler::run_once(task, cancel.clone(), timezone_minutes).await;
365 }
366 }
367 }
368
369 pub fn stop(&self) {
373 self.cancel.cancel();
374 }
375
376 pub fn get_cancel(&self) -> CancellationToken {
378 self.cancel.clone()
379 }
380}
381
382fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
383 let mut next = now.replace_time(time);
384 if next < now {
385 next = next + time::Duration::days(1);
386 }
387 next
388}
389
390fn get_now(timezone_minutes: i16) -> Result<OffsetDateTime, time::error::ComponentRange> {
391 let hours = timezone_minutes / 60;
392 let minutes = timezone_minutes % 60;
393 let offset = UtcOffset::from_hms(hours as i8, minutes as i8, 0)?;
394 Ok(OffsetDateTime::now_utc().to_offset(offset))
395}
396
397impl Scheduler {
398 #[instrument(skip(cancel))]
400 async fn run_wait<T: Notifiable + 'static>(
401 task: T,
402 cancel: CancellationToken,
403 timezone_minutes: i16,
404 ) {
405 if let Task::Wait(wait, skip) = task.get_schedule() {
406 let task_ref = task;
407 tokio::task::spawn(async move {
408 select! {
409 _ = cancel.cancelled() => {
410 return;
411 }
412 _ = sleep(Duration::from_secs(wait)) => {
413 tracing::debug!(wait, "wait seconds");
414 }
415 };
416 let now = get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
417 if let Some(skip) = skip {
418 if skip.iter().any(|s| s.is_skip(now)) {
419 task_ref.on_skip(cancel.clone()).await;
420 return;
421 }
422 }
423 task_ref.on_time(cancel.clone()).await;
424 });
425 }
426 }
427
428 #[instrument(skip(cancel))]
430 async fn run_interval<T: Notifiable + 'static>(
431 task: T,
432 cancel: CancellationToken,
433 timezone_minutes: i16,
434 ) {
435 if let Task::Interval(interval, skip) = task.get_schedule() {
436 let task_ref = task;
437 tokio::task::spawn(async move {
438 loop {
439 select! {
440 _ = cancel.cancelled() => {
441 return;
442 }
443 _ = sleep(Duration::from_secs(interval)) => {
444 tracing::debug!(interval, "interval");
445 }
446 };
447 let now =
448 get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
449 if let Some(ref skip) = skip {
450 if skip.iter().any(|s| s.is_skip(now)) {
451 task_ref.on_skip(cancel.clone()).await;
452 continue;
453 }
454 }
455 task_ref.on_time(cancel.clone()).await;
456 }
457 });
458 }
459 }
460
461 #[instrument(skip(cancel))]
463 async fn run_at<T: Notifiable + 'static>(
464 task: T,
465 cancel: CancellationToken,
466 timezone_minutes: i16,
467 ) {
468 if let Task::At(time, skip) = task.get_schedule() {
469 let task_ref = task;
470 tokio::task::spawn(async move {
471 let now = get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
472 let mut next = get_next_time(now, time);
473 loop {
474 let now =
475 get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
476 let seconds = (next - now).as_seconds_f64() as u64;
477 let instant = Instant::now() + Duration::from_secs(seconds);
478 select! {
479 _ = cancel.cancelled() => {
480 return;
481 }
482 _ = sleep_until(instant) => {
483 tracing::debug!("at time");
484 }
485 }
486
487 if let Some(skip) = skip.clone() {
488 if skip.iter().any(|s| s.is_skip(next)) {
489 task_ref.on_skip(cancel.clone()).await;
490 next += time::Duration::days(1);
491 continue;
492 }
493 }
494
495 task_ref.on_time(cancel.clone()).await;
496
497 next += time::Duration::days(1);
498 }
499 });
500 }
501 }
502
503 #[instrument(skip(task, cancel))]
505 async fn run_once<T: Notifiable + 'static>(
506 task: T,
507 cancel: CancellationToken,
508 timezone_minutes: i16,
509 ) {
510 if let Task::Once(next, skip) = task.get_schedule() {
511 let task_ref = task;
512 tokio::task::spawn(async move {
513 let now = get_now(timezone_minutes).unwrap_or_else(|_| OffsetDateTime::now_utc());
514 if next < now {
515 task_ref.on_skip(cancel.clone()).await;
516 return;
517 }
518
519 if let Some(skip) = skip {
520 if skip.iter().any(|s| s.is_skip(next)) {
521 task_ref.on_skip(cancel.clone()).await;
522 return;
523 }
524 }
525 let seconds = (next - now).as_seconds_f64();
526 let instant = Instant::now() + Duration::from_secs(seconds as u64);
527
528 select! {
529 _ = cancel.cancelled() => {
530 return;
531 }
532 _ = sleep_until(instant) => {
533 tracing::debug!("once time");
534 }
535 }
536 task_ref.on_time(cancel.clone()).await;
537 });
538 }
539 }
540}