1use crossbeam_deque::{Injector, Steal};
2use std::{
3 boxed::Box,
4 fmt::{self, Debug},
5 sync::Arc,
6};
7use time::{Date, OffsetDateTime, Time};
8use tokio::{
9 select,
10 time::{Duration, Instant, sleep, sleep_until},
11};
12pub use tokio_util::sync::CancellationToken;
13use tracing::{error, instrument};
14
15#[derive(Debug, Clone)]
16pub enum Skip {
17 Date(Date),
19 DateRange(Date, Date),
21 Day(usize),
25 DayRange(usize, usize),
29 Time(Time),
31 TimeRange(Time, Time),
35 None,
37}
38
39impl Default for Skip {
40 fn default() -> Self {
41 Self::None
42 }
43}
44
45impl fmt::Display for Skip {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 Skip::Date(date) => write!(f, "date: {}", date),
49 Skip::DateRange(start, end) => write!(f, "date range: {} - {}", start, end),
50 Skip::Day(day) => write!(f, "day: {}", day),
51 Skip::DayRange(start, end) => write!(f, "day range: {} - {}", start, end),
52 Skip::Time(time) => write!(f, "time: {}", time),
53 Skip::TimeRange(start, end) => write!(f, "time range: {} - {}", start, end),
54 Skip::None => write!(f, "none"),
55 }
56 }
57}
58
59impl Skip {
60 pub fn is_skip(&self, time: OffsetDateTime) -> bool {
62 match self {
63 Skip::Date(date) => time.date() == *date,
64 Skip::DateRange(start, end) => time.date() >= *start && time.date() <= *end,
65 Skip::Day(day) => time.day() + 1 == *day as u8,
66 Skip::DayRange(start, end) => {
67 time.day() + 1 >= *start as u8 && time.day() + 1 <= *end as u8
68 }
69 Skip::Time(time) => time.hour() == time.hour() && time.minute() == time.minute(),
70 Skip::TimeRange(start, end) => {
71 assert!(start < end, "start must be less than end");
72 time.hour() >= start.hour()
73 && time.hour() <= end.hour()
74 && time.minute() >= start.minute()
75 && time.minute() <= end.minute()
76 }
77 Skip::None => false,
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
83pub enum Task {
84 Wait(u64, Option<Skip>),
86 Interval(u64, Option<Skip>),
88 At(Time, Option<Skip>),
90 Once(OffsetDateTime),
92}
93
94impl fmt::Display for Task {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Task::Wait(wait, skip) => {
98 write!(f, "wait: {} {}", wait, skip.clone().unwrap_or_default())
99 }
100 Task::Interval(interval, skip) => {
101 write!(
102 f,
103 "interval: {} {}",
104 interval,
105 skip.clone().unwrap_or_default()
106 )
107 }
108 Task::At(time, skip) => {
109 write!(f, "at: {} {}", time, skip.clone().unwrap_or_default())
110 }
111 Task::Once(time) => write!(f, "once: {}", time),
112 }
113 }
114}
115
116pub trait ScheduledTask: Sync + Send {
118 fn get_schedule(&self) -> Task;
120
121 fn on_time(&self, cancel: CancellationToken);
123
124 fn on_skip(&self, cancel: CancellationToken);
126}
127
128pub struct Scheduler {
129 tasks: Injector<Arc<Box<dyn ScheduledTask>>>,
130 cancel: CancellationToken,
131}
132
133impl Scheduler {
134 pub fn new() -> Self {
135 Self {
136 tasks: Injector::new(),
137 cancel: CancellationToken::new(),
138 }
139 }
140
141 pub async fn start(&self) {
143 self.check().await;
144
145 let cancel = self.cancel.clone();
146 select! {
147 _ = cancel.cancelled() => {
148 tracing::debug!("scheduler cancelled");
149 }
150 _ = tokio::signal::ctrl_c() => {
151 tracing::debug!("ctrl+c");
152 if !cancel.is_cancelled() {
153 cancel.cancel();
154 }
155 }
156 }
157 }
158
159 pub async fn add_task(&self, task: Arc<Box<dyn ScheduledTask>>) {
160 self.tasks.push(task);
161 self.check().await;
162 }
163
164 async fn check(&self) {
166 loop {
167 let task = self.tasks.steal();
168 match task {
169 Steal::Success(task) => {
170 let schedule = task.get_schedule();
171 let cancel = self.cancel.clone();
172 let task = task.clone();
173 match schedule {
174 Task::Wait(..) => {
175 Scheduler::run_wait(task, cancel.clone()).await;
176 }
177 Task::Interval(..) => {
178 Scheduler::run_interval(task, cancel.clone()).await;
179 }
180 Task::At(..) => {
181 Scheduler::run_at(task, cancel.clone()).await;
182 }
183 Task::Once(..) => {
184 Scheduler::run_once(task, cancel.clone()).await;
185 }
186 }
187 }
188 Steal::Retry => {
189 break;
190 }
191 Steal::Empty => {
192 break;
193 }
194 }
195 }
196 }
197
198 pub fn stop(&self) {
202 self.cancel.cancel();
203 }
204}
205
206fn get_next_time(now: OffsetDateTime, time: Time) -> OffsetDateTime {
207 let mut next = now.replace_time(time);
208 if next < now {
209 next = next + time::Duration::days(1);
210 }
211 next
212}
213
214fn get_now() -> Option<OffsetDateTime> {
215 match OffsetDateTime::now_local() {
216 Ok(now) => Some(now),
217 Err(e) => {
218 error!("failed to get local time: {}", e);
219 None
220 }
221 }
222}
223
224impl Scheduler {
225 #[instrument(skip(task, cancel))]
227 async fn run_wait(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
228 if let Task::Wait(wait, skip) = task.get_schedule() {
229 tokio::spawn(async move {
230 select! {
231 _ = cancel.cancelled() => {
232 return;
233 }
234 _ = sleep(Duration::from_secs(wait)) => {
235 tracing::debug!(wait, "wait seconds");
236 }
237 };
238 if let Some(now) = get_now() {
239 if let Some(skip) = skip {
240 if skip.is_skip(now) {
241 task.on_skip(cancel.clone());
242 return;
243 }
244 }
245 task.on_time(cancel.clone());
246 }
247 });
248 }
249 }
250
251 #[instrument(skip(task, cancel))]
253 async fn run_interval(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
254 if let Task::Interval(interval, skip) = task.get_schedule() {
255 tokio::spawn(async move {
256 loop {
257 select! {
258 _ = cancel.cancelled() => {
259 return;
260 }
261 _ = sleep(Duration::from_secs(interval)) => {
262 tracing::debug!(interval, "interval");
263 }
264 };
265 if let Some(now) = get_now() {
266 if let Some(ref skip) = skip {
267 if skip.is_skip(now) {
268 task.on_skip(cancel.clone());
269 continue;
270 }
271 }
272 task.on_time(cancel.clone());
273 }
274 }
275 });
276 }
277 }
278
279 #[instrument(skip(task, cancel))]
281 async fn run_at(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
282 if let Task::At(time, skip) = task.get_schedule() {
283 tokio::spawn(async move {
284 let now = if let Some(now) = get_now() {
285 now
286 } else {
287 return;
288 };
289 let mut next = get_next_time(now, time);
290 loop {
291 let now = if let Some(now) = get_now() {
292 now
293 } else {
294 return;
295 };
296 let seconds = (next - now).as_seconds_f64() as u64;
297 let instant = Instant::now() + Duration::from_secs(seconds);
298 select! {
299 _ = cancel.cancelled() => {
300 return;
301 }
302 _ = sleep_until(instant) => {
303 tracing::debug!("at time");
304 }
305 }
306
307 if let Some(skip) = skip.clone() {
308 if skip.is_skip(now) {
309 task.on_skip(cancel.clone());
310 return;
311 }
312 }
313
314 task.on_time(cancel.clone());
315
316 next += time::Duration::days(1);
317 }
318 });
319 }
320 }
321
322 #[instrument(skip(task, cancel))]
324 async fn run_once(task: Arc<Box<dyn ScheduledTask>>, cancel: CancellationToken) {
325 if let Task::Once(next) = task.get_schedule() {
326 tokio::spawn(async move {
327 if let Some(now) = get_now() {
328 if next < now {
329 task.on_skip(cancel.clone());
330 return;
331 }
332 let seconds = (next - now).as_seconds_f64() as u64;
333 let instant = Instant::now() + Duration::from_secs(seconds);
334
335 select! {
336 _ = cancel.cancelled() => {
337 return;
338 }
339 _ = sleep_until(instant) => {
340 tracing::debug!("once time");
341 }
342 }
343 task.on_time(cancel.clone());
344 }
345 });
346 }
347 }
348}