use crate::{cron::CronConfig, worker::Worker};
use duration_str;
use std::{future::Future, sync::Arc, time::Duration};
use tokio::{
sync::{
Mutex,
mpsc::{self, Receiver, Sender, error::SendError},
},
time::{Instant, sleep, sleep_until},
};
use tracing::debug;
use tracing::{Instrument, debug_span};
#[derive(Clone)]
pub struct Cron {
name: String,
run_after_start: Duration,
interval: Duration,
interval_after_finish: bool,
worker: Arc<Worker<()>>,
close: Arc<Sender<()>>,
done: Arc<Mutex<Receiver<()>>>,
}
impl Cron {
pub fn new(name: &str, cfg: &CronConfig) -> Self {
let worker = Worker::new(name, 1);
let run_after_start = duration_str::parse(&cfg.run_after_start).unwrap();
let interval = duration_str::parse(&cfg.interval).unwrap();
let (close, done) = mpsc::channel(1);
Self {
name: name.to_string(),
worker: Arc::new(worker),
run_after_start: run_after_start,
interval: interval,
interval_after_finish: cfg.interval_after_finish,
close: Arc::new(close),
done: Arc::new(Mutex::new(done)),
}
}
pub async fn stop(&self) -> Result<(), SendError<()>> {
self.close.send(()).await
}
pub async fn run<F, Fut>(&self, how: F)
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + Sync + 'static,
{
let run_after_start = self.run_after_start;
let interval = self.interval;
let wait = self.interval_after_finish;
let done = self.done.clone();
let handler_worker = self.worker.clone();
let tick_worker = Arc::new(Worker::new("Ticker", 1));
let how = Arc::new(how);
let sender = self.worker.get_sender();
let tick_sender = sender.clone();
tick_worker
.run(move |t: Instant| {
let sender = tick_sender.clone();
async move {
sleep_until(t).await;
_ = sender.send(()).await;
}
})
.await;
let how_tick_worker = tick_worker.clone();
let how = move |_: ()| {
let how = how.clone();
let tick_worker = how_tick_worker.clone();
async move {
how()
.await;
let t = Instant::now().checked_add(interval);
if wait {
if let Some(t) = t {
_ = tick_worker.send(t).await;
} else {
sleep(interval).await;
_ = tick_worker.send(Instant::now()).await;
}
}
}
};
let name = self.name.clone();
let cron_ticker_worker = tick_worker.clone();
tokio::spawn(
async move {
let tick_worker = cron_ticker_worker.clone();
let sender = sender.clone();
debug!("CRON START - {}", name);
sleep(run_after_start).await;
_ = sender.send(()).await;
let mut t = Instant::now();
while !wait {
if let Some(tt) = t.checked_add(interval) {
t = tt;
_ = tick_worker.send(t).await;
} else {
sleep(interval).await;
_ = tick_worker.send(Instant::now()).await;
}
}
}
.instrument(debug_span!("start")),
);
_ = self.worker.run(how).instrument(debug_span!("worker")).await;
let name = self.name.clone();
tokio::spawn(
async move {
let mut guard = done.lock().await;
_ = guard.recv().await;
guard.close();
_ = handler_worker.stop().await;
_ = tick_worker.stop().await;
debug!("CRON STOP - {}", name)
}
.instrument(debug_span!("exit")),
);
}
}