use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use chrono::Utc;
use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use crate::error::Result;
use crate::service::{Registry, RegistrySnapshot};
use super::context::CronContext;
use super::handler::CronHandler;
use super::meta::Meta;
use super::schedule::Schedule;
#[non_exhaustive]
pub struct CronOptions {
pub timeout_secs: u64,
}
impl Default for CronOptions {
fn default() -> Self {
Self { timeout_secs: 300 }
}
}
type ErasedCronHandler =
Arc<dyn Fn(CronContext) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
struct CronEntry {
name: String,
schedule: Schedule,
handler: ErasedCronHandler,
timeout_secs: u64,
}
#[must_use]
pub struct SchedulerBuilder {
registry: Arc<RegistrySnapshot>,
entries: Vec<CronEntry>,
}
impl SchedulerBuilder {
pub fn job<H, Args>(self, schedule: &str, handler: H) -> Result<Self>
where
H: CronHandler<Args> + Send + Sync,
{
self.job_with(schedule, handler, CronOptions::default())
}
pub fn job_with<H, Args>(
mut self,
schedule: &str,
handler: H,
options: CronOptions,
) -> Result<Self>
where
H: CronHandler<Args> + Send + Sync,
{
let name = std::any::type_name::<H>().to_string();
let parsed = Schedule::parse(schedule)?;
let erased: ErasedCronHandler = Arc::new(
move |ctx: CronContext| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
let h = handler.clone();
Box::pin(async move { h.call(ctx).await })
},
);
self.entries.push(CronEntry {
name,
schedule: parsed,
handler: erased,
timeout_secs: options.timeout_secs,
});
Ok(self)
}
pub async fn start(self) -> Scheduler {
let cancel = CancellationToken::new();
let mut handles = Vec::new();
for entry in self.entries {
let handle = tokio::spawn(cron_job_loop(
entry.name,
entry.schedule,
entry.handler,
entry.timeout_secs,
self.registry.clone(),
cancel.clone(),
));
handles.push(handle);
}
Scheduler { cancel, handles }
}
}
pub struct Scheduler {
cancel: CancellationToken,
handles: Vec<JoinHandle<()>>,
}
impl Scheduler {
pub fn builder(registry: &Registry) -> SchedulerBuilder {
SchedulerBuilder {
registry: registry.snapshot(),
entries: Vec::new(),
}
}
}
impl crate::runtime::Task for Scheduler {
async fn shutdown(self) -> Result<()> {
self.cancel.cancel();
let drain = async {
for handle in self.handles {
let _ = handle.await;
}
};
let _ = tokio::time::timeout(Duration::from_secs(30), drain).await;
Ok(())
}
}
async fn cron_job_loop(
name: String,
schedule: Schedule,
handler: ErasedCronHandler,
timeout_secs: u64,
registry: Arc<RegistrySnapshot>,
cancel: CancellationToken,
) {
let running = Arc::new(AtomicBool::new(false));
let timeout_dur = Duration::from_secs(timeout_secs);
let mut handler_tasks = JoinSet::new();
let mut next_tick = match schedule.next_tick(Utc::now()) {
Some(t) => t,
None => {
tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
return;
}
};
loop {
let sleep_duration = (next_tick - Utc::now()).to_std().unwrap_or(Duration::ZERO);
tokio::select! {
_ = cancel.cancelled() => break,
_ = tokio::time::sleep(sleep_duration) => {
while handler_tasks.try_join_next().is_some() {}
if running.load(Ordering::SeqCst) {
tracing::warn!(cron_job = %name, "skipping tick, previous run still active");
next_tick = match schedule.next_tick(Utc::now()) {
Some(t) => t,
None => {
tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
break;
}
};
continue;
}
running.store(true, Ordering::SeqCst);
let deadline = tokio::time::Instant::now() + timeout_dur;
let ctx = CronContext {
registry: registry.clone(),
meta: Meta {
name: name.clone(),
deadline: Some(deadline),
tick: next_tick,
},
};
let running_flag = running.clone();
let handler_clone = handler.clone();
let job_name = name.clone();
handler_tasks.spawn(async move {
let result =
tokio::time::timeout(timeout_dur, (handler_clone)(ctx)).await;
match result {
Ok(Ok(())) => {
tracing::debug!(cron_job = %job_name, "completed");
}
Ok(Err(e)) => {
tracing::error!(cron_job = %job_name, error = %e, "failed");
}
Err(_) => {
tracing::error!(cron_job = %job_name, "timed out");
}
}
running_flag.store(false, Ordering::SeqCst);
});
next_tick = match schedule.next_tick(Utc::now()) {
Some(t) => t,
None => {
tracing::error!(cron_job = %name, "cron expression has no future occurrence; stopping");
break;
}
};
}
}
}
while handler_tasks.join_next().await.is_some() {}
}