1use crate::{cron::CronConfig, worker::Worker};
2use duration_str;
3use std::{future::Future, sync::Arc, time::Duration};
4use tokio::{
5 sync::{
6 Mutex,
7 mpsc::{self, Receiver, Sender, error::SendError},
8 },
9 time::{Instant, sleep, sleep_until},
10};
11use tracing::debug;
12use tracing::{Instrument, debug_span};
13
14#[derive(Clone)]
15pub struct Cron {
16 name: String,
17 run_after_start: Duration,
18 interval: Duration,
19 interval_after_finish: bool,
20 worker: Arc<Worker<()>>,
21
22 close: Arc<Sender<()>>,
23 done: Arc<Mutex<Receiver<()>>>,
24}
25
26impl Cron {
27 pub fn new(name: &str, cfg: &CronConfig) -> Self {
28 let worker = Worker::new(name, 1);
29 let run_after_start = duration_str::parse(&cfg.run_after_start).unwrap();
30 let interval = duration_str::parse(&cfg.interval).unwrap();
31 let (close, done) = mpsc::channel(1);
32 Self {
33 name: name.to_string(),
34 worker: Arc::new(worker),
35 run_after_start: run_after_start,
36 interval: interval,
37 interval_after_finish: cfg.interval_after_finish,
38 close: Arc::new(close),
39 done: Arc::new(Mutex::new(done)),
40 }
41 }
42
43 pub async fn stop(&self) -> Result<(), SendError<()>> {
44 self.close.send(()).await
45 }
46
47 pub async fn run<F, Fut>(&self, how: F)
48 where
49 F: Fn() -> Fut + Send + Sync + 'static,
50 Fut: Future<Output = ()> + Send + Sync + 'static,
51 {
52 let run_after_start = self.run_after_start;
53 let interval = self.interval;
54 let wait = self.interval_after_finish;
55 let done = self.done.clone();
56 let handler_worker = self.worker.clone();
57 let tick_worker = Arc::new(Worker::new("Ticker", 1));
58 let how = Arc::new(how);
60
61 let sender = self.worker.get_sender();
62 let tick_sender = sender.clone();
63 tick_worker
64 .run(move |t: Instant| {
65 let sender = tick_sender.clone();
66 async move {
67 sleep_until(t).await;
68 _ = sender.send(()).await;
69 }
70 })
71 .await;
72
73 let how_tick_worker = tick_worker.clone();
74 let how = move |_: ()| {
76 let how = how.clone();
77 let tick_worker = how_tick_worker.clone();
78 async move {
79 how()
80 .await;
82 let t = Instant::now().checked_add(interval);
83 if wait {
84 if let Some(t) = t {
85 _ = tick_worker.send(t).await;
86 } else {
87 sleep(interval).await;
88 _ = tick_worker.send(Instant::now()).await;
89 }
90 }
91 }
92 };
93
94 let name = self.name.clone();
95 let cron_ticker_worker = tick_worker.clone();
96 tokio::spawn(
97 async move {
98 let tick_worker = cron_ticker_worker.clone();
99 let sender = sender.clone();
100 debug!("CRON START - {}", name);
101 sleep(run_after_start).await;
102 _ = sender.send(()).await;
103
104 let mut t = Instant::now();
105 while !wait {
106 if let Some(tt) = t.checked_add(interval) {
107 t = tt;
108 _ = tick_worker.send(t).await;
109 } else {
110 sleep(interval).await;
111 _ = tick_worker.send(Instant::now()).await;
112 }
113 }
114 }
115 .instrument(debug_span!("start")),
116 );
117
118 _ = self.worker.run(how).instrument(debug_span!("worker")).await;
119
120 let name = self.name.clone();
121 tokio::spawn(
122 async move {
123 let mut guard = done.lock().await;
124 _ = guard.recv().await;
125 guard.close();
126 _ = handler_worker.stop().await;
130 _ = tick_worker.stop().await;
131 debug!("CRON STOP - {}", name)
132 }
133 .instrument(debug_span!("exit")),
134 );
135 }
136}