rs_pkg/cron/
cron.rs

1use crate::{cron::Config, worker::Worker};
2use duration_str;
3use std::{future::Future, sync::Arc, time::Duration};
4use tokio::{
5    sync::{
6        Mutex,
7        broadcast::{self, error::TryRecvError},
8        mpsc::{self, Receiver, Sender, error::SendError},
9    },
10    time::sleep,
11};
12use tracing::debug;
13use tracing::{Instrument, debug_span};
14
15pub struct Cron {
16    name: String,
17    run_after_start: Duration,
18    interval: Duration,
19    interval_after_finish: bool,
20    worker: Arc<Worker<()>>,
21
22    close: Arc<Sender<()>>,
23    done: Arc<Mutex<Receiver<()>>>,
24}
25
26impl Cron {
27    pub fn new(name: &str, cfg: &Config) -> Self {
28        let worker = Worker::new(name, 1);
29        let run_after_start = duration_str::parse(&cfg.run_after_start).unwrap();
30        let interval = duration_str::parse(&cfg.interval).unwrap();
31        let (close, done) = mpsc::channel(1);
32        Self {
33            name: name.to_string(),
34            worker: Arc::new(worker),
35            run_after_start: run_after_start,
36            interval: interval,
37            interval_after_finish: cfg.interval_after_finish,
38            close: Arc::new(close),
39            done: Arc::new(Mutex::new(done)),
40        }
41    }
42
43    pub async fn stop(&self) -> Result<(), SendError<()>> {
44        self.close.send(()).await
45    }
46
47    pub async fn run<F, Fut>(&self, how: F)
48    where
49        F: Fn() -> Fut + Send + Sync + 'static,
50        Fut: Future<Output = ()> + Send + Sync + 'static,
51    {
52        let run_after_start = self.run_after_start;
53        let interval = self.interval;
54        let ticker = crossbeam_channel::tick(self.interval);
55        let wait = self.interval_after_finish;
56        let done = self.done.clone();
57        let worker = self.worker.clone();
58        let how = Arc::new(how);
59        let name = self.name.clone();
60        let (tx, mut rx) = broadcast::channel(1);
61
62        tokio::spawn(
63            async move {
64                let mut guard = done.lock().await;
65                _ = guard.recv().await;
66                guard.close();
67                _ = tx.send(());
68
69                // 关闭 worker
70                _ = worker.stop().await;
71                debug!("CRON STOP - {}", name)
72            }
73            .instrument(debug_span!("exit")),
74        );
75
76        // 启动 worker
77        let how = move |_: ()| {
78            let how = how.clone();
79            async move { how().instrument(debug_span!("run")).await }
80        };
81        _ = self.worker.run(how).instrument(debug_span!("worker")).await;
82
83        let sender = self.worker.get_sender();
84        let name = self.name.clone();
85        tokio::spawn(
86            async move {
87                debug!("CRON START - {}", name);
88                sleep(run_after_start).await;
89                _ = sender.send(()).await;
90                match wait {
91                    false => loop {
92                        match rx.try_recv() {
93                            Ok(_) | Err(TryRecvError::Closed) => {
94                                break;
95                            }
96
97                            _ => {
98                                if let Ok(_) = ticker.recv() {
99                                    _ = sender.send(()).await;
100                                };
101                            }
102                        }
103                    },
104
105                    true => loop {
106                        match rx.try_recv() {
107                            Ok(_) | Err(TryRecvError::Closed) => {
108                                break;
109                            }
110
111                            _ => {
112                                sleep(interval).await;
113                                _ = sender.send(()).await;
114                            }
115                        }
116                    },
117                }
118            }
119            .instrument(debug_span!("run")),
120        );
121    }
122}