1use crossbeam_deque::Worker;
2use std::{
3 boxed::Box,
4 fmt::{self, Debug},
5 sync::Arc,
6};
7use time::{Date, OffsetDateTime, Time};
8use tokio::{
9 select,
10 time::{Duration, Instant, sleep, sleep_until},
11};
12use tokio_util::sync::CancellationToken;
13use tracing::{error, instrument};
14
15#[derive(Debug, Clone)]
16pub enum Skip {
17 Date(Date),
19 DateRange(Date, Date),
21 Day(usize),
25 DayRange(usize, usize),
29 Time(Time),
31 TimeRange(Time, Time),
35 None,
37}
38
39impl Default for Skip {
40 fn default() -> Self {
41 Self::None
42 }
43}
44
45impl fmt::Display for Skip {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 Skip::Date(date) => write!(f, "date: {}", date),
49 Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
50 Skip::Day(day) => write!(f, "day: {}", day),
51 Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
52 Skip::Time(time) => write!(f, "time: {}", time),
53 Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
54 Skip::None => write!(f, "none"),
55 }
56 }
57}
58
59impl Skip {
60 pub fn is_skip(&self, time: OffsetDateTime) -> bool {
62 match self {
63 Skip::Date(date) => time.date() == *date,
64 Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
65 Skip::Day(day) => time.day() + 1 == *day as u8,
66 Skip::DayRange(start, end) => {
67 time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
68 }
69 Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
70 Skip::TimeRange(start, end) => {
71 assert!(start < end, "start must be less than end");
72 time.hour() >= start.hour()
73 && time.hour() <= end.hour()
74 && time.minute() >= start.minute()
75 && time.minute() <= end.minute()
76 }
77 Skip::None => false,
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
83pub enum Task {
84 Wait(u64, Option<Skip>),
86 Interval(u64, Option<Skip>),
88 At(Time, Option<Skip>),
90 Once(OffsetDateTime),
92}
93
94impl fmt::Display for Task {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Task::Wait(wait, skip) => {
98 write!(f, "wait: {} {}", wait, skip.clone().unwrap_or_default())
99 }
100 Task::Interval(interval, skip) => {
101 write!(
102 f,
103 "interval: {} {}",
104 interval,
105 skip.clone().unwrap_or_default()
106 )
107 }
108 Task::At(time, skip) => {
109 write!(f, "at: {} {}", time, skip.clone().unwrap_or_default())
110 }
111 Task::Once(time) => write!(f, "once: {}", time),
112 }
113 }
114}
115
116pub trait ScheduledTask: Debug + Sync + Send {
118 fn get_schedule(&self) -> Task;
120
121 fn on_time(&self);
123
124 fn on_skip(&self);
126}
127
128pub struct Scheduler {
129 tasks: Worker<Arc<Box<dyn ScheduledTask>>>,
130 cancel: CancellationToken,
131}
132
133impl Scheduler {
134 pub fn new() -> Self {
135 Self {
136 tasks: Worker::new_fifo(),
137 cancel: CancellationToken::new(),
138 }
139 }
140
141 pub async fn start(&self) {
143 self.check().await;
144
145 let cancel = self.cancel.clone();
146 select! {
147 _ = cancel.cancelled() => {
148 tracing::debug!("scheduler cancelled");
149 }
150 _ = tokio::signal::ctrl_c() => {
151 tracing::debug!("ctrl+c");
152 if !cancel.is_cancelled() {
153 cancel.cancel();
154 }
155 }
156 }
157 }
158
159 pub async fn add_task(&self, task: Arc<Box<dyn ScheduledTask>>) {
160 self.tasks.push(task);
161 self.check().await;
162 }
163
164 async fn check(&self) {
166 while let Some(task) = self.tasks.pop() {
167 let schedule = task.get_schedule();
168 let cancel = self.cancel.clone();
169 let task = task.clone();
170 match schedule {
171 Task::Wait(..) => {
172 Scheduler::run_wait(task, cancel.clone()).await;
173 }
174 Task::Interval(..) => {
175 Scheduler::run_interval(task, cancel.clone()).await;
176 }
177 Task::At(..) => {
178 Scheduler::run_at(task, cancel.clone()).await;
179 }
180 Task::Once(..) => {
181 Scheduler::run_once(task, cancel.clone()).await;
182 }
183 }
184 }
185 }
186
187 pub fn stop(&self) {
191 self.cancel.cancel();
192 }
193}
194
195fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
196 let mut next = now.replace_time(time);
197 if next < now {
198 next = next + time::Duration::days(1);
199 }
200 next
201}
202
203fn get_now() -> Option<OffsetDateTime> {
204 match OffsetDateTime::now_local() {
205 Ok(now) => Some(now),
206 Err(e) => {
207 error!("failed to get local time: {}", e);
208 None
209 }
210 }
211}
212
213impl Scheduler {
214 #[instrument(skip(task, cancel))]
216 async fn run_wait(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
217 if let Task::Wait(wait, skip) = task.get_schedule() {
218 tokio::spawn(async move {
219 select! {
220 _ = cancel.cancelled() => {
221 return;
222 }
223 _ = sleep(Duration::from_secs(wait)) => {
224 tracing::debug!(wait, "wait seconds");
225 }
226 };
227 if let Some(now) = get_now() {
228 if let Some(skip) = skip {
229 if skip.is_skip(now) {
230 task.on_skip();
231 return;
232 }
233
234 task.on_time();
235 }
236 }
237 });
238 }
239 }
240
241 #[instrument(skip(task, cancel))]
243 async fn run_interval(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
244 if let Task::Interval(interval, skip) = task.get_schedule() {
245 tokio::spawn(async move {
246 loop {
247 select! {
248 _ = cancel.cancelled() => {
249 return;
250 }
251 _ = sleep(Duration::from_secs(interval)) => {
252 tracing::debug!(interval, "interval");
253 }
254 };
255 if let Some(now) = get_now() {
256 if let Some(ref skip) = skip {
257 if skip.is_skip(now) {
258 task.on_skip();
259 continue;
260 }
261 task.on_time();
262 }
263 }
264 }
265 });
266 }
267 }
268
269 #[instrument(skip(task, cancel))]
271 async fn run_at(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
272 if let Task::At(time, skip) = task.get_schedule() {
273 tokio::spawn(async move {
274 let now = if let Some(now) = get_now() {
275 now
276 } else {
277 return;
278 };
279 let mut next = get_next_time(now, time);
280 loop {
281 let now = if let Some(now) = get_now() {
282 now
283 } else {
284 return;
285 };
286 let seconds = (next - now).as_seconds_f64() as u64;
287 let instant = Instant::now() + Duration::from_secs(seconds);
288 select! {
289 _ = cancel.cancelled() => {
290 return;
291 }
292 _ = sleep_until(instant) => {
293 tracing::debug!("at time");
294 }
295 }
296
297 if let Some(skip) = skip.clone() {
298 if skip.is_skip(now) {
299 task.on_skip();
300 return;
301 }
302 }
303
304 task.on_time();
305
306 next += time::Duration::days(1);
307 }
308 });
309 }
310 }
311
312 #[instrument(skip(task, cancel))]
314 async fn run_once(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
315 if let Task::Once(next) = task.get_schedule() {
316 tokio::spawn(async move {
317 if let Some(now) = get_now() {
318 if next < now {
319 task.on_skip();
320 return;
321 }
322 let seconds = (next - now).as_seconds_f64() as u64;
323 let instant = Instant::now() + Duration::from_secs(seconds);
324
325 select! {
326 _ = cancel.cancelled() => {
327 return;
328 }
329 _ = sleep_until(instant) => {
330 tracing::debug!("once time");
331 }
332 }
333 task.on_time();
334 }
335 });
336 }
337 }
338}