1use async_trait::async_trait;
2use std::fmt::{self, Debug};
3use time::{Date, OffsetDateTime, Time};
4use tokio::{
5 select,
6 time::{Duration, Instant, sleep, sleep_until},
7};
8use tokio_util::sync::CancellationToken;
9use tracing::{error, instrument};
10
11pub mod prelude {
12 pub use super::{Notifiable, Scheduler, Skip, Task};
13 pub use async_trait::async_trait;
14 pub use tokio_util::sync::CancellationToken;
15}
16
17#[derive(Debug, Clone)]
18pub enum Skip {
19 Date(Date),
21 DateRange(Date, Date),
23 Day(Vec<u8>),
27 DayRange(usize, usize),
31 Time(Time),
33 TimeRange(Time, Time),
37 None,
39}
40
41impl Default for Skip {
42 fn default() -> Self {
43 Self::None
44 }
45}
46
47impl fmt::Display for Skip {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 Skip::Date(date) => write!(f, "date: {}", date),
51 Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
52 Skip::Day(day) => write!(f, "day: {:?}", day),
53 Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
54 Skip::Time(time) => write!(f, "time: {}", time),
55 Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
56 Skip::None => write!(f, "none"),
57 }
58 }
59}
60
61impl Skip {
62 pub fn is_skip(&self, time: OffsetDateTime) -> bool {
64 match self {
65 Skip::Date(date) => time.date() == *date,
66 Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
67 Skip::Day(day) => day.contains(&(time.day() + 1)),
68 Skip::DayRange(start, end) => {
69 time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
70 }
71 Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
72 Skip::TimeRange(start, end) => {
73 assert!(start < end, "start must be less than end");
74 time.hour() >= start.hour()
75 && time.hour() <= end.hour()
76 && time.minute() >= start.minute()
77 && time.minute() <= end.minute()
78 }
79 Skip::None => false,
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub enum Task {
86 Wait(u64, Option<Vec<Skip>>),
88 Interval(u64, Option<Vec<Skip>>),
90 At(Time, Option<Vec<Skip>>),
92 Once(OffsetDateTime),
94}
95
96impl fmt::Display for Task {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 match self {
99 Task::Wait(wait, skip) => {
100 let skip = skip
101 .clone()
102 .unwrap_or_default()
103 .into_iter()
104 .map(|s| s.to_string())
105 .collect::<Vec<String>>()
106 .join(", ");
107 write!(f, "wait: {} {}", wait, skip)
108 }
109 Task::Interval(interval, skip) => {
110 let skip = skip
111 .clone()
112 .unwrap_or_default()
113 .into_iter()
114 .map(|s| s.to_string())
115 .collect::<Vec<String>>()
116 .join(", ");
117 write!(f, "interval: {} {}", interval, skip)
118 }
119 Task::At(time, skip) => {
120 let skip = skip
121 .clone()
122 .unwrap_or_default()
123 .into_iter()
124 .map(|s| s.to_string())
125 .collect::<Vec<String>>()
126 .join(", ");
127 write!(f, "at: {} {}", time, skip)
128 }
129 Task::Once(time) => write!(f, "once: {}", time),
130 }
131 }
132}
133
134#[async_trait]
136pub trait Notifiable: Sync + Send {
137 fn get_schedule(&self) -> Task;
139
140 async fn on_time(&self, cancel: CancellationToken) {
144 cancel.cancel();
145 }
146
147 async fn on_skip(&self, _cancel: CancellationToken) {
149 }
151}
152
153pub struct Scheduler {
154 cancel: CancellationToken,
155}
156
157impl Scheduler {
158 pub fn new() -> Self {
160 Self {
161 cancel: CancellationToken::new(),
162 }
163 }
164
165 pub async fn run<T: Notifiable + 'static>(&self, task: T) {
167 let schedule = task.get_schedule();
168 let cancel = self.cancel.clone();
169
170 match schedule {
171 Task::Wait(..) => {
172 Scheduler::run_wait(task, cancel.clone()).await;
173 }
174 Task::Interval(..) => {
175 Scheduler::run_interval(task, cancel.clone()).await;
176 }
177 Task::At(..) => {
178 Scheduler::run_at(task, cancel.clone()).await;
179 }
180 Task::Once(..) => {
181 Scheduler::run_once(task, cancel.clone()).await;
182 }
183 }
184 }
185
186 pub fn stop(&self) {
190 self.cancel.cancel();
191 }
192
193 pub fn get_cancel(&self) -> CancellationToken {
195 self.cancel.clone()
196 }
197}
198
199fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
200 let mut next = now.replace_time(time);
201 if next < now {
202 next = next + time::Duration::days(1);
203 }
204 next
205}
206
207fn get_now() -> Option<OffsetDateTime> {
208 match OffsetDateTime::now_local() {
209 Ok(now) => Some(now),
210 Err(e) => {
211 error!("failed to get local time: {}", e);
212 None
213 }
214 }
215}
216
217impl Scheduler {
218 #[instrument(skip(task, cancel))]
220 async fn run_wait<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
221 if let Task::Wait(wait, skip) = task.get_schedule() {
222 let task_ref = task;
223 tokio::task::spawn(async move {
224 select! {
225 _ = cancel.cancelled() => {
226 return;
227 }
228 _ = sleep(Duration::from_secs(wait)) => {
229 tracing::debug!(wait, "wait seconds");
230 }
231 };
232 if let Some(now) = get_now() {
233 if let Some(skip) = skip {
234 if skip.iter().any(|s| s.is_skip(now)) {
235 task_ref.on_skip(cancel.clone()).await;
236 return;
237 }
238 }
239 task_ref.on_time(cancel.clone()).await;
240 }
241 });
242 }
243 }
244
245 #[instrument(skip(task, cancel))]
247 async fn run_interval<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
248 if let Task::Interval(interval, skip) = task.get_schedule() {
249 let task_ref = task;
250 tokio::task::spawn(async move {
251 loop {
252 select! {
253 _ = cancel.cancelled() => {
254 return;
255 }
256 _ = sleep(Duration::from_secs(interval)) => {
257 tracing::debug!(interval, "interval");
258 }
259 };
260 if let Some(now) = get_now() {
261 if let Some(ref skip) = skip {
262 if skip.iter().any(|s| s.is_skip(now)) {
263 task_ref.on_skip(cancel.clone()).await;
264 continue;
265 }
266 }
267 task_ref.on_time(cancel.clone()).await;
268 }
269 }
270 });
271 }
272 }
273
274 #[instrument(skip(task, cancel))]
276 async fn run_at<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
277 if let Task::At(time, skip) = task.get_schedule() {
278 let task_ref = task;
279 tokio::task::spawn(async move {
280 let now = if let Some(now) = get_now() {
281 now
282 } else {
283 return;
284 };
285 let mut next = get_next_time(now, time);
286 loop {
287 let now = if let Some(now) = get_now() {
288 now
289 } else {
290 return;
291 };
292 let seconds = (next - now).as_seconds_f64() as u64;
293 let instant = Instant::now() + Duration::from_secs(seconds);
294 select! {
295 _ = cancel.cancelled() => {
296 return;
297 }
298 _ = sleep_until(instant) => {
299 tracing::debug!("at time");
300 }
301 }
302
303 if let Some(skip) = skip.clone() {
304 if skip.iter().any(|s| s.is_skip(now)) {
305 task_ref.on_skip(cancel.clone()).await;
306 return;
307 }
308 }
309
310 task_ref.on_time(cancel.clone()).await;
311
312 next += time::Duration::days(1);
313 }
314 });
315 }
316 }
317
318 #[instrument(skip(task, cancel))]
320 async fn run_once<T: Notifiable + 'static>(task: T, cancel: CancellationToken) {
321 if let Task::Once(next) = task.get_schedule() {
322 let task_ref = task;
323 tokio::task::spawn(async move {
324 if let Some(now) = get_now() {
325 if next < now {
326 task_ref.on_skip(cancel.clone()).await;
327 return;
328 }
329 let seconds = (next - now).as_seconds_f64() as u64;
330 let instant = Instant::now() + Duration::from_secs(seconds);
331
332 select! {
333 _ = cancel.cancelled() => {
334 return;
335 }
336 _ = sleep_until(instant) => {
337 tracing::debug!("once time");
338 }
339 }
340 task_ref.on_time(cancel.clone()).await;
341 }
342 });
343 }
344 }
345}