1use crate::{cron::Config, worker::Worker};
2use duration_str;
3use std::{future::Future, sync::Arc, time::Duration};
4use tokio::{
5 sync::{
6 Mutex,
7 broadcast::{self, error::TryRecvError},
8 mpsc::{self, Receiver, Sender, error::SendError},
9 },
10 time::sleep,
11};
12use tracing::debug;
13use tracing::{Instrument, debug_span};
14
15pub struct Cron {
16 name: String,
17 run_after_start: Duration,
18 interval: Duration,
19 interval_after_finish: bool,
20 worker: Arc<Worker<()>>,
21
22 close: Arc<Sender<()>>,
23 done: Arc<Mutex<Receiver<()>>>,
24}
25
26impl Cron {
27 pub fn new(name: &str, cfg: &Config) -> Self {
28 let worker = Worker::new(name, 1);
29 let run_after_start = duration_str::parse(&cfg.run_after_start).unwrap();
30 let interval = duration_str::parse(&cfg.interval).unwrap();
31 let (close, done) = mpsc::channel(1);
32 Self {
33 name: name.to_string(),
34 worker: Arc::new(worker),
35 run_after_start: run_after_start,
36 interval: interval,
37 interval_after_finish: cfg.interval_after_finish,
38 close: Arc::new(close),
39 done: Arc::new(Mutex::new(done)),
40 }
41 }
42
43 pub async fn stop(&self) -> Result<(), SendError<()>> {
44 self.close.send(()).await
45 }
46
47 pub async fn run<F, Fut>(&self, how: F)
48 where
49 F: Fn() -> Fut + Send + Sync + 'static,
50 Fut: Future<Output = ()> + Send + Sync + 'static,
51 {
52 let run_after_start = self.run_after_start;
53 let interval = self.interval;
54 let ticker = crossbeam_channel::tick(self.interval);
55 let wait = self.interval_after_finish;
56 let done = self.done.clone();
57 let worker = self.worker.clone();
58 let how = Arc::new(how);
59 let name = self.name.clone();
60 let (tx, mut rx) = broadcast::channel(1);
61
62 tokio::spawn(
63 async move {
64 let mut guard = done.lock().await;
65 _ = guard.recv().await;
66 guard.close();
67 _ = tx.send(());
68
69 _ = worker.stop().await;
71 debug!("CRON STOP - {}", name)
72 }
73 .instrument(debug_span!("exit")),
74 );
75
76 let how = move |_: ()| {
78 let how = how.clone();
79 async move { how().instrument(debug_span!("run")).await }
80 };
81 _ = self.worker.run(how).instrument(debug_span!("worker")).await;
82
83 let sender = self.worker.get_sender();
84 let name = self.name.clone();
85 tokio::spawn(
86 async move {
87 debug!("CRON START - {}", name);
88 sleep(run_after_start).await;
89 _ = sender.send(()).await;
90 match wait {
91 false => loop {
92 match rx.try_recv() {
93 Ok(_) | Err(TryRecvError::Closed) => {
94 break;
95 }
96
97 _ => {
98 if let Ok(_) = ticker.recv() {
99 _ = sender.send(()).await;
100 };
101 }
102 }
103 },
104
105 true => loop {
106 match rx.try_recv() {
107 Ok(_) | Err(TryRecvError::Closed) => {
108 break;
109 }
110
111 _ => {
112 sleep(interval).await;
113 _ = sender.send(()).await;
114 }
115 }
116 },
117 }
118 }
119 .instrument(debug_span!("run")),
120 );
121 }
122}