rs_pkg/cron/
cron.rs

1use crate::{cron::CronConfig, worker::Worker};
2use duration_str;
3use std::{future::Future, sync::Arc, time::Duration};
4use tokio::{
5    sync::{
6        Mutex,
7        mpsc::{self, Receiver, Sender, error::SendError},
8    },
9    time::{Instant, sleep, sleep_until},
10};
11use tracing::debug;
12use tracing::{Instrument, debug_span};
13
14#[derive(Clone)]
15pub struct Cron {
16    name: String,
17    run_after_start: Duration,
18    interval: Duration,
19    interval_after_finish: bool,
20    worker: Arc<Worker<()>>,
21
22    close: Arc<Sender<()>>,
23    done: Arc<Mutex<Receiver<()>>>,
24}
25
26impl Cron {
27    pub fn new(name: &str, cfg: &CronConfig) -> Self {
28        let worker = Worker::new(name, 1);
29        let run_after_start = duration_str::parse(&cfg.run_after_start).unwrap();
30        let interval = duration_str::parse(&cfg.interval).unwrap();
31        let (close, done) = mpsc::channel(1);
32        Self {
33            name: name.to_string(),
34            worker: Arc::new(worker),
35            run_after_start: run_after_start,
36            interval: interval,
37            interval_after_finish: cfg.interval_after_finish,
38            close: Arc::new(close),
39            done: Arc::new(Mutex::new(done)),
40        }
41    }
42
43    pub async fn stop(&self) -> Result<(), SendError<()>> {
44        self.close.send(()).await
45    }
46
47    pub async fn run<F, Fut>(&self, how: F)
48    where
49        F: Fn() -> Fut + Send + Sync + 'static,
50        Fut: Future<Output = ()> + Send + Sync + 'static,
51    {
52        let run_after_start = self.run_after_start;
53        let interval = self.interval;
54        let wait = self.interval_after_finish;
55        let done = self.done.clone();
56        let handler_worker = self.worker.clone();
57        let tick_worker = Arc::new(Worker::new("Ticker", 1));
58        // let (tx, mut rx) = broadcast::channel(1);
59        let how = Arc::new(how);
60
61        let sender = self.worker.get_sender();
62        let tick_sender = sender.clone();
63        tick_worker
64            .run(move |t: Instant| {
65                let sender = tick_sender.clone();
66                async move {
67                    sleep_until(t).await;
68                    _ = sender.send(()).await;
69                }
70            })
71            .await;
72
73        let how_tick_worker = tick_worker.clone();
74        // 启动 worker
75        let how = move |_: ()| {
76            let how = how.clone();
77            let tick_worker = how_tick_worker.clone();
78            async move {
79                how()
80                    // .instrument(debug_span!("run"))
81                    .await;
82                let t = Instant::now().checked_add(interval);
83                if wait {
84                    if let Some(t) = t {
85                        _ = tick_worker.send(t).await;
86                    } else {
87                        sleep(interval).await;
88                        _ = tick_worker.send(Instant::now()).await;
89                    }
90                }
91            }
92        };
93
94        let name = self.name.clone();
95        let cron_ticker_worker = tick_worker.clone();
96        tokio::spawn(
97            async move {
98                let tick_worker = cron_ticker_worker.clone();
99                let sender = sender.clone();
100                debug!("CRON START - {}", name);
101                sleep(run_after_start).await;
102                _ = sender.send(()).await;
103
104                let mut t = Instant::now();
105                while !wait {
106                    if let Some(tt) = t.checked_add(interval) {
107                        t = tt;
108                        _ = tick_worker.send(t).await;
109                    } else {
110                        sleep(interval).await;
111                        _ = tick_worker.send(Instant::now()).await;
112                    }
113                }
114            }
115            .instrument(debug_span!("start")),
116        );
117
118        _ = self.worker.run(how).instrument(debug_span!("worker")).await;
119
120        let name = self.name.clone();
121        tokio::spawn(
122            async move {
123                let mut guard = done.lock().await;
124                _ = guard.recv().await;
125                guard.close();
126                // _ = tx.send(());
127
128                // 关闭 worker
129                _ = handler_worker.stop().await;
130                _ = tick_worker.stop().await;
131                debug!("CRON STOP - {}", name)
132            }
133            .instrument(debug_span!("exit")),
134        );
135    }
136}