1use async_trait::async_trait;
2use std::fmt::{self, Debug};
3use time::{Date, OffsetDateTime, Time, macros::format_description};
4use tokio::{
5 select,
6 time::{Duration, Instant, sleep, sleep_until},
7};
8use tokio_util::sync::CancellationToken;
9use tracing::{error, instrument};
10
11pub mod prelude {
12 pub use super::{Notifiable, Scheduler, Skip, Task};
13 pub use async_trait::async_trait;
14 pub use tokio_util::sync::CancellationToken;
15}
16
17#[derive(Debug, Clone)]
18pub enum Skip {
19 Date(Date),
21 DateRange(Date, Date),
23 Day(Vec<u8>),
27 DayRange(usize, usize),
31 Time(Time),
33 TimeRange(Time, Time),
37 None,
39}
40
41impl Default for Skip {
42 fn default() -> Self {
43 Self::None
44 }
45}
46
47impl fmt::Display for Skip {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 Skip::Date(date) => write!(f, "date: {}", date),
51 Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
52 Skip::Day(day) => write!(f, "day: {:?}", day),
53 Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
54 Skip::Time(time) => write!(f, "time: {}", time),
55 Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
56 Skip::None => write!(f, "none"),
57 }
58 }
59}
60
61impl Skip {
62 pub fn is_skip(&self, time: OffsetDateTime) -> bool {
64 match self {
65 Skip::Date(date) => time.date() == *date,
66 Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
67 Skip::Day(day) => day.contains(&(time.day() + 1)),
68 Skip::DayRange(start, end) => {
69 time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
70 }
71 Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
72 Skip::TimeRange(start, end) => {
73 assert!(start < end, "start must be less than end");
74 time.hour() >= start.hour()
75 && time.hour() <= end.hour()
76 && time.minute() >= start.minute()
77 && time.minute() <= end.minute()
78 }
79 Skip::None => false,
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub enum Task {
86 Wait(u64, Option<Vec<Skip>>),
88 Interval(u64, Option<Vec<Skip>>),
90 At(Time, Option<Vec<Skip>>),
92 Once(OffsetDateTime),
94}
95
96impl PartialEq for Task {
97 fn eq(&self, other: &Self) -> bool {
98 match (self, other) {
99 (Task::Wait(a, _), Task::Wait(b, _)) => a == b,
100 (Task::Interval(a, _), Task::Interval(b, _)) => a == b,
101 (Task::At(a, _), Task::At(b, _)) => a == b,
102 (Task::Once(a), Task::Once(b)) => a == b,
103 _ => false,
104 }
105 }
106}
107
108impl From<&str> for Task {
109 fn from(s: &str) -> Self {
115 let parts = s.split("=").collect::<Vec<&str>>();
116 let task = parts[0];
117 let value = parts[1..].join("");
118 match task {
119 "wait" => {
120 let seconds = value.parse::<u64>().unwrap();
121 Task::Wait(seconds, None)
122 }
123 "interval" => {
124 let seconds = value.parse::<u64>().unwrap();
125 Task::Interval(seconds, None)
126 }
127 "at" => {
128 let format = format_description!("[hour]:[minute]");
129 let time = Time::parse(&value, &format).expect("parse time failed");
130 Task::At(time, None)
131 }
132 "once" => {
133 let format = format_description!(
134 "[year]-[month]-[day] [hour]:[minute]:[second] [offset_hour sign:mandatory]"
135 );
136 println!("value: {}", value);
137 let datetime =
138 OffsetDateTime::parse(&value, &format).expect("parse datetime failed");
139 Task::Once(datetime)
140 }
141 _ => Task::Wait(5, None),
142 }
143 }
144}
145
146impl From<String> for Task {
147 fn from(s: String) -> Self {
148 Self::from(s.as_str())
149 }
150}
151
152impl From<&String> for Task {
153 fn from(s: &String) -> Self {
154 Self::from(s.as_str())
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn test_from_string() {
164 let task = Task::from("wait=10");
165 assert_eq!(task, Task::Wait(10, None));
166 let task = Task::from("wait=10".to_string());
167 assert_eq!(task, Task::Wait(10, None));
168 let task = Task::from(&"wait=10".to_string());
169 assert_eq!(task, Task::Wait(10, None));
170 }
171
172 #[test]
173 fn test_from_string_interval() {
174 let task = Task::from("interval=10");
175 assert_eq!(task, Task::Interval(10, None));
176 }
177
178 #[test]
179 fn test_from_string_at() {
180 let task = Task::from("at=10:00");
181 assert_eq!(task, Task::At(Time::from_hms(10, 0, 0).unwrap(), None));
182 }
183
184 #[test]
185 fn test_from_string_once() {
186 let task = Task::from("once=2024-01-01 10:00:00 +08");
187 assert_eq!(
188 task,
189 Task::Once(OffsetDateTime::from_unix_timestamp(1704074400).unwrap())
190 );
191 }
192}
193
194impl fmt::Display for Task {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 match self {
197 Task::Wait(wait, skip) => {
198 let skip = skip
199 .clone()
200 .unwrap_or_default()
201 .into_iter()
202 .map(|s| s.to_string())
203 .collect::<Vec<String>>()
204 .join(", ");
205 write!(f, "wait: {} {}", wait, skip)
206 }
207 Task::Interval(interval, skip) => {
208 let skip = skip
209 .clone()
210 .unwrap_or_default()
211 .into_iter()
212 .map(|s| s.to_string())
213 .collect::<Vec<String>>()
214 .join(", ");
215 write!(f, "interval: {} {}", interval, skip)
216 }
217 Task::At(time, skip) => {
218 let skip = skip
219 .clone()
220 .unwrap_or_default()
221 .into_iter()
222 .map(|s| s.to_string())
223 .collect::<Vec<String>>()
224 .join(", ");
225 write!(f, "at: {} {}", time, skip)
226 }
227 Task::Once(time) => write!(f, "once: {}", time),
228 }
229 }
230}
231
232#[async_trait]
234pub trait Notifiable: Sync + Send {
235 fn get_schedule(&self) -> Task;
237
238 async fn on_time(&self, cancel: CancellationToken) {
242 cancel.cancel();
243 }
244
245 async fn on_skip(&self, _cancel: CancellationToken) {
247 }
249}
250
251pub struct Scheduler {
252 cancel: CancellationToken,
253}
254
255impl Scheduler {
256 pub fn new() -> Self {
258 Self {
259 cancel: CancellationToken::new(),
260 }
261 }
262
263 pub async fn run<T: Notifiable + 'static>(&self, task: T) {
265 let schedule = task.get_schedule();
266 let cancel = self.cancel.clone();
267
268 match schedule {
269 Task::Wait(..) => {
270 Scheduler::run_wait(task, cancel.clone()).await;
271 }
272 Task::Interval(..) => {
273 Scheduler::run_interval(task, cancel.clone()).await;
274 }
275 Task::At(..) => {
276 Scheduler::run_at(task, cancel.clone()).await;
277 }
278 Task::Once(..) => {
279 Scheduler::run_once(task, cancel.clone()).await;
280 }
281 }
282 }
283
284 pub fn stop(&self) {
288 self.cancel.cancel();
289 }
290
291 pub fn get_cancel(&self) -> CancellationToken {
293 self.cancel.clone()
294 }
295}
296
297fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
298 let mut next = now.replace_time(time);
299 if next < now {
300 next = next + time::Duration::days(1);
301 }
302 next
303}
304
305fn get_now() -> Option<OffsetDateTime> {
306 match OffsetDateTime::now_local() {
307 Ok(now) => Some(now),
308 Err(e) => {
309 error!("failed to get local time: {}", e);
310 None
311 }
312 }
313}
314
315impl Scheduler {
316 #[instrument(skip(task, cancel))]
318 async fn run_wait<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
319 if let Task::Wait(wait, skip) = task.get_schedule() {
320 let task_ref = task;
321 tokio::task::spawn(async move {
322 select! {
323 _ = cancel.cancelled() => {
324 return;
325 }
326 _ = sleep(Duration::from_secs(wait)) => {
327 tracing::debug!(wait, "wait seconds");
328 }
329 };
330 if let Some(now) = get_now() {
331 if let Some(skip) = skip {
332 if skip.iter().any(|s| s.is_skip(now)) {
333 task_ref.on_skip(cancel.clone()).await;
334 return;
335 }
336 }
337 task_ref.on_time(cancel.clone()).await;
338 }
339 });
340 }
341 }
342
343 #[instrument(skip(task, cancel))]
345 async fn run_interval<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
346 if let Task::Interval(interval, skip) = task.get_schedule() {
347 let task_ref = task;
348 tokio::task::spawn(async move {
349 loop {
350 select! {
351 _ = cancel.cancelled() => {
352 return;
353 }
354 _ = sleep(Duration::from_secs(interval)) => {
355 tracing::debug!(interval, "interval");
356 }
357 };
358 if let Some(now) = get_now() {
359 if let Some(ref skip) = skip {
360 if skip.iter().any(|s| s.is_skip(now)) {
361 task_ref.on_skip(cancel.clone()).await;
362 continue;
363 }
364 }
365 task_ref.on_time(cancel.clone()).await;
366 }
367 }
368 });
369 }
370 }
371
372 #[instrument(skip(task, cancel))]
374 async fn run_at<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
375 if let Task::At(time, skip) = task.get_schedule() {
376 let task_ref = task;
377 tokio::task::spawn(async move {
378 let now = if let Some(now) = get_now() {
379 now
380 } else {
381 return;
382 };
383 let mut next = get_next_time(now, time);
384 loop {
385 let now = if let Some(now) = get_now() {
386 now
387 } else {
388 return;
389 };
390 let seconds = (next - now).as_seconds_f64() as u64;
391 let instant = Instant::now() + Duration::from_secs(seconds);
392 select! {
393 _ = cancel.cancelled() => {
394 return;
395 }
396 _ = sleep_until(instant) => {
397 tracing::debug!("at time");
398 }
399 }
400
401 if let Some(skip) = skip.clone() {
402 if skip.iter().any(|s| s.is_skip(now)) {
403 task_ref.on_skip(cancel.clone()).await;
404 return;
405 }
406 }
407
408 task_ref.on_time(cancel.clone()).await;
409
410 next += time::Duration::days(1);
411 }
412 });
413 }
414 }
415
416 #[instrument(skip(task, cancel))]
418 async fn run_once<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
419 if let Task::Once(next) = task.get_schedule() {
420 let task_ref = task;
421 tokio::task::spawn(async move {
422 if let Some(now) = get_now() {
423 if next < now {
424 task_ref.on_skip(cancel.clone()).await;
425 return;
426 }
427 let seconds = (next - now).as_seconds_f64() as u64;
428 let instant = Instant::now() + Duration::from_secs(seconds);
429
430 select! {
431 _ = cancel.cancelled() => {
432 return;
433 }
434 _ = sleep_until(instant) => {
435 tracing::debug!("once time");
436 }
437 }
438 task_ref.on_time(cancel.clone()).await;
439 }
440 });
441 }
442 }
443}