use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{error, info};
use uuid::Uuid;
use crate::bus::{InboundMessage, MessageBus};
use crate::error::{Result, ZeptoError};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum CronSchedule {
At { at_ms: i64 },
Every { every_ms: i64 },
Cron { expr: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronPayload {
pub message: String,
pub channel: String,
pub chat_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CronJobState {
pub next_run_at_ms: Option<i64>,
pub last_run_at_ms: Option<i64>,
pub last_status: Option<String>,
pub last_error: Option<String>,
#[serde(default)]
pub consecutive_errors: u32,
pub last_duration_ms: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronJob {
pub id: String,
pub name: String,
pub enabled: bool,
pub schedule: CronSchedule,
pub payload: CronPayload,
pub state: CronJobState,
pub created_at_ms: i64,
pub updated_at_ms: i64,
pub delete_after_run: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CronStore {
version: u32,
jobs: Vec<CronJob>,
}
impl Default for CronStore {
fn default() -> Self {
Self {
version: 1,
jobs: Vec::new(),
}
}
}
fn now_ms() -> i64 {
Utc::now().timestamp_millis()
}
#[cfg(test)]
const DEFAULT_DISPATCH_TIMEOUT_MS: u64 = 50;
#[cfg(not(test))]
const DEFAULT_DISPATCH_TIMEOUT_MS: u64 = 5_000;
const ERROR_BACKOFF_SCHEDULE_MS: [i64; 5] = [
30_000, 60_000, 5 * 60_000, 15 * 60_000, 60 * 60_000, ];
fn error_backoff_ms(consecutive_errors: u32) -> i64 {
if consecutive_errors == 0 {
return 0;
}
let idx = ((consecutive_errors - 1) as usize).min(ERROR_BACKOFF_SCHEDULE_MS.len() - 1);
ERROR_BACKOFF_SCHEDULE_MS[idx]
}
fn parse_cron_field(field: &str, min: u32, max: u32) -> Option<Vec<u32>> {
if field == "*" {
return Some((min..=max).collect());
}
if let Some(step_str) = field.strip_prefix("*/") {
let step = step_str.parse::<u32>().ok()?;
if step == 0 {
return None;
}
return Some((min..=max).step_by(step as usize).collect());
}
let mut values = Vec::new();
for part in field.split(',') {
let value = part.parse::<u32>().ok()?;
if !(min..=max).contains(&value) {
return None;
}
values.push(value);
}
if values.is_empty() {
None
} else {
Some(values)
}
}
fn next_run_from_cron_expr(expr: &str, now: i64) -> Option<i64> {
let fields: Vec<&str> = expr.split_whitespace().collect();
if fields.len() != 5 {
return None;
}
let minutes = parse_cron_field(fields[0], 0, 59)?;
let hours = parse_cron_field(fields[1], 0, 23)?;
let dom = parse_cron_field(fields[2], 1, 31)?;
let month = parse_cron_field(fields[3], 1, 12)?;
let dow = parse_cron_field(fields[4], 0, 6)?;
let mut candidate = DateTime::from_timestamp_millis(now)?
.with_second(0)?
.with_nanosecond(0)?
+ Duration::minutes(1);
let limit = candidate + Duration::days(366);
while candidate <= limit {
let m = candidate.minute();
let h = candidate.hour();
let d = candidate.day();
let mon = candidate.month();
let wd = candidate.weekday().num_days_from_sunday();
if minutes.contains(&m)
&& hours.contains(&h)
&& dom.contains(&d)
&& month.contains(&mon)
&& dow.contains(&wd)
{
return Some(candidate.timestamp_millis());
}
candidate += Duration::minutes(1);
}
None
}
pub fn is_valid_cron_expr(expr: &str) -> bool {
next_run_from_cron_expr(expr, now_ms()).is_some()
}
fn next_run_at(schedule: &CronSchedule, now: i64) -> Option<i64> {
match schedule {
CronSchedule::At { at_ms } => {
if *at_ms > now {
Some(*at_ms)
} else {
None
}
}
CronSchedule::Every { every_ms } => {
if *every_ms > 0 {
Some(now + every_ms)
} else {
None
}
}
CronSchedule::Cron { expr } => next_run_from_cron_expr(expr, now),
}
}
fn jitter_delay(max_ms: u64) -> std::time::Duration {
if max_ms == 0 {
return std::time::Duration::ZERO;
}
let jitter = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64 % max_ms)
.unwrap_or(0);
std::time::Duration::from_millis(jitter)
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum OnMiss {
#[default]
Skip,
RunOnce,
}
pub struct CronService {
store_path: PathBuf,
store: Arc<RwLock<CronStore>>,
bus: Arc<MessageBus>,
running: Arc<AtomicBool>,
handle: Arc<RwLock<Option<JoinHandle<()>>>>,
jitter_ms: u64,
}
impl CronService {
pub fn new(store_path: PathBuf, bus: Arc<MessageBus>) -> Self {
Self::with_jitter(store_path, bus, 0)
}
pub fn with_jitter(store_path: PathBuf, bus: Arc<MessageBus>, jitter_ms: u64) -> Self {
Self {
store_path,
store: Arc::new(RwLock::new(CronStore::default())),
bus,
running: Arc::new(AtomicBool::new(false)),
handle: Arc::new(RwLock::new(None)),
jitter_ms,
}
}
pub async fn start(&self, on_miss: &OnMiss) -> Result<()> {
if self.running.swap(true, Ordering::SeqCst) {
return Ok(());
}
let loaded = self.load_store().await?;
let missed_payloads: Vec<CronPayload>;
{
let mut store = self.store.write().await;
*store = loaded;
let now = now_ms();
let mut missed: Vec<CronPayload> = Vec::new();
for job in &mut store.jobs {
if job.enabled {
if let Some(next) = job.state.next_run_at_ms {
if next <= now {
match on_miss {
OnMiss::Skip => {
info!(job_id = %job.id, job_name = %job.name, "Skipping missed schedule");
}
OnMiss::RunOnce => {
info!(job_id = %job.id, job_name = %job.name, "Queueing missed schedule for immediate run");
missed.push(job.payload.clone());
}
}
job.state.next_run_at_ms = next_run_at(&job.schedule, now);
}
} else {
job.state.next_run_at_ms = next_run_at(&job.schedule, now);
}
}
}
missed_payloads = missed;
}
for payload in &missed_payloads {
let inbound =
InboundMessage::new(&payload.channel, "cron", &payload.chat_id, &payload.message);
if let Err(e) = self.bus.publish_inbound(inbound).await {
error!("Failed to dispatch missed job: {}", e);
}
}
self.save_store().await?;
let store = Arc::clone(&self.store);
let store_path = self.store_path.clone();
let bus = Arc::clone(&self.bus);
let running = Arc::clone(&self.running);
let jitter_ms = self.jitter_ms;
let running_clone = Arc::clone(&running);
let handle = tokio::spawn(async move {
info!("Cron service started");
while running.load(Ordering::SeqCst) {
if let Err(err) = tick(&store, &store_path, &bus, jitter_ms).await {
error!("Cron tick failed: {}", err);
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
running_clone.store(false, Ordering::SeqCst);
});
let mut h = self.handle.write().await;
*h = Some(handle);
Ok(())
}
pub async fn stop(&self) {
self.running.store(false, Ordering::SeqCst);
let mut h = self.handle.write().await;
if let Some(handle) = h.take() {
handle.abort();
}
}
pub async fn add_job(
&self,
name: String,
schedule: CronSchedule,
payload: CronPayload,
delete_after_run: bool,
) -> Result<CronJob> {
let now = now_ms();
let job = CronJob {
id: Uuid::new_v4().to_string().chars().take(8).collect(),
name,
enabled: true,
schedule: schedule.clone(),
payload,
state: CronJobState {
next_run_at_ms: next_run_at(&schedule, now),
..Default::default()
},
created_at_ms: now,
updated_at_ms: now,
delete_after_run,
};
{
let mut store = self.store.write().await;
store.jobs.push(job.clone());
}
self.save_store().await?;
Ok(job)
}
pub async fn list_jobs(&self, include_disabled: bool) -> Vec<CronJob> {
let store = self.store.read().await;
let mut jobs: Vec<CronJob> = store
.jobs
.iter()
.filter(|job| include_disabled || job.enabled)
.cloned()
.collect();
jobs.sort_by_key(|job| job.state.next_run_at_ms.unwrap_or(i64::MAX));
jobs
}
pub async fn remove_job(&self, job_id: &str) -> Result<bool> {
let removed = {
let mut store = self.store.write().await;
let before = store.jobs.len();
store.jobs.retain(|job| job.id != job_id);
store.jobs.len() < before
};
if removed {
self.save_store().await?;
}
Ok(removed)
}
async fn load_store(&self) -> Result<CronStore> {
if !self.store_path.exists() {
return Ok(CronStore::default());
}
let content = tokio::fs::read_to_string(&self.store_path).await?;
let store = serde_json::from_str::<CronStore>(&content)?;
Ok(store)
}
async fn save_store(&self) -> Result<()> {
if let Some(parent) = self.store_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let json = {
let store = self.store.read().await;
serde_json::to_string_pretty(&*store)?
};
tokio::fs::write(&self.store_path, json).await?;
Ok(())
}
}
impl Drop for CronService {
fn drop(&mut self) {
self.running.store(false, Ordering::SeqCst);
}
}
async fn tick(
store: &Arc<RwLock<CronStore>>,
store_path: &PathBuf,
bus: &Arc<MessageBus>,
jitter_ms: u64,
) -> Result<()> {
let now = now_ms();
let due_jobs: Vec<CronJob> = {
let store_guard = store.read().await;
store_guard
.jobs
.iter()
.filter(|job| {
job.enabled && job.state.next_run_at_ms.map(|n| n <= now).unwrap_or(false)
})
.cloned()
.collect()
};
if due_jobs.is_empty() {
return Ok(());
}
let mut results: Vec<(String, bool, Option<String>, i64, i64)> = Vec::new();
for job in &due_jobs {
let started_at = now_ms();
let inbound = InboundMessage::new(
&job.payload.channel,
"cron",
&job.payload.chat_id,
&job.payload.message,
);
if jitter_ms > 0 {
tokio::time::sleep(jitter_delay(jitter_ms)).await;
}
let send_result = tokio::time::timeout(
std::time::Duration::from_millis(DEFAULT_DISPATCH_TIMEOUT_MS),
bus.publish_inbound(inbound),
)
.await;
let ended_at = now_ms();
match send_result {
Ok(Ok(())) => results.push((job.id.clone(), true, None, started_at, ended_at)),
Ok(Err(e)) => results.push((
job.id.clone(),
false,
Some(e.to_string()),
started_at,
ended_at,
)),
Err(_) => results.push((
job.id.clone(),
false,
Some("cron dispatch timed out".to_string()),
started_at,
ended_at,
)),
}
}
{
let mut store_guard = store.write().await;
for (job_id, ok, err, started_at, ended_at) in results {
if let Some(job) = store_guard.jobs.iter_mut().find(|j| j.id == job_id) {
job.state.last_run_at_ms = Some(started_at);
job.state.last_duration_ms = Some((ended_at - started_at).max(0));
job.state.last_status = Some(if ok { "ok" } else { "error" }.to_string());
job.state.last_error = err;
job.updated_at_ms = ended_at;
if ok {
job.state.consecutive_errors = 0;
} else {
job.state.consecutive_errors = job.state.consecutive_errors.saturating_add(1);
}
match job.schedule {
CronSchedule::At { .. } => {
job.enabled = false;
job.state.next_run_at_ms = None;
}
_ => {
if ok {
job.state.next_run_at_ms = next_run_at(&job.schedule, ended_at);
} else {
let base_next = next_run_at(&job.schedule, ended_at).unwrap_or(
ended_at + error_backoff_ms(job.state.consecutive_errors),
);
let backoff_next =
ended_at + error_backoff_ms(job.state.consecutive_errors);
job.state.next_run_at_ms = Some(base_next.max(backoff_next));
}
}
}
}
}
store_guard.jobs.retain(|job| {
let should_remove = matches!(job.schedule, CronSchedule::At { .. })
&& job.delete_after_run
&& !job.enabled
&& job.state.last_status.as_deref() == Some("ok");
!should_remove
});
}
let json = {
let store_guard = store.read().await;
serde_json::to_string_pretty(&*store_guard)?
};
if let Some(parent) = store_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(store_path, json).await?;
Ok(())
}
pub fn parse_at_datetime_ms(input: &str) -> Result<i64> {
if let Ok(dt) = DateTime::parse_from_rfc3339(input) {
return Ok(dt.timestamp_millis());
}
if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(input, "%Y-%m-%dT%H:%M:%S") {
return Ok(naive.and_utc().timestamp_millis());
}
Err(ZeptoError::Tool(format!(
"Invalid 'at' datetime '{}'. Use RFC3339 or YYYY-MM-DDTHH:MM:SS",
input
)))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::MessageBus;
use tempfile::tempdir;
#[test]
fn test_next_run_at_every() {
let now = 1_000;
let next = next_run_at(&CronSchedule::Every { every_ms: 500 }, now).unwrap();
assert_eq!(next, 1_500);
}
#[test]
fn test_parse_at_datetime_ms_rfc3339() {
let ms = parse_at_datetime_ms("2026-02-12T12:34:56Z").unwrap();
assert!(ms > 0);
}
#[tokio::test]
async fn test_add_list_remove_job() {
let temp = tempdir().unwrap();
let service = CronService::new(temp.path().join("jobs.json"), Arc::new(MessageBus::new()));
let job = service
.add_job(
"test".to_string(),
CronSchedule::Every { every_ms: 1_000 },
CronPayload {
message: "hello".to_string(),
channel: "cli".to_string(),
chat_id: "cli".to_string(),
},
false,
)
.await
.unwrap();
let jobs = service.list_jobs(true).await;
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].id, job.id);
let removed = service.remove_job(&job.id).await.unwrap();
assert!(removed);
assert!(service.list_jobs(true).await.is_empty());
}
#[test]
fn test_jitter_delay_zero() {
let d = jitter_delay(0);
assert_eq!(d, std::time::Duration::ZERO);
}
#[test]
fn test_jitter_delay_bounded() {
let max_ms = 500;
let d = jitter_delay(max_ms);
assert!(d < std::time::Duration::from_millis(max_ms));
}
#[test]
fn test_cron_service_with_jitter() {
let temp = tempdir().unwrap();
let service = CronService::with_jitter(
temp.path().join("jobs.json"),
Arc::new(MessageBus::new()),
250,
);
assert_eq!(service.jitter_ms, 250);
}
#[test]
fn test_on_miss_default_is_skip() {
let policy = OnMiss::default();
assert_eq!(policy, OnMiss::Skip);
}
#[test]
fn test_on_miss_serde_roundtrip() {
let skip_json = serde_json::to_string(&OnMiss::Skip).unwrap();
assert_eq!(skip_json, r#""skip""#);
let run_once_json = serde_json::to_string(&OnMiss::RunOnce).unwrap();
assert_eq!(run_once_json, r#""run_once""#);
let parsed: OnMiss = serde_json::from_str(r#""run_once""#).unwrap();
assert_eq!(parsed, OnMiss::RunOnce);
}
#[tokio::test]
async fn test_start_skip_missed_jobs() {
let temp = tempdir().unwrap();
let bus = Arc::new(MessageBus::new());
let store_path = temp.path().join("jobs.json");
let json = serde_json::json!({
"version": 1,
"jobs": [{
"id": "missed1",
"name": "missed job",
"enabled": true,
"schedule": { "kind": "every", "every_ms": 60000 },
"payload": { "message": "check", "channel": "cli", "chat_id": "cli" },
"state": { "next_run_at_ms": 1 },
"created_at_ms": 1,
"updated_at_ms": 1,
"delete_after_run": false
}]
});
tokio::fs::write(&store_path, serde_json::to_string_pretty(&json).unwrap())
.await
.unwrap();
let service = CronService::new(store_path, bus);
service.start(&OnMiss::Skip).await.unwrap();
service.stop().await;
let jobs = service.list_jobs(true).await;
assert_eq!(jobs.len(), 1);
let next = jobs[0].state.next_run_at_ms.unwrap();
assert!(
next > now_ms() - 5000,
"next_run should be in the future after skip"
);
}
#[tokio::test]
async fn test_start_run_once_missed_jobs() {
let temp = tempdir().unwrap();
let bus = Arc::new(MessageBus::new());
let store_path = temp.path().join("jobs.json");
let json = serde_json::json!({
"version": 1,
"jobs": [{
"id": "missed2",
"name": "missed run_once job",
"enabled": true,
"schedule": { "kind": "every", "every_ms": 60000 },
"payload": { "message": "run_once_check", "channel": "cli", "chat_id": "cli" },
"state": { "next_run_at_ms": 1 },
"created_at_ms": 1,
"updated_at_ms": 1,
"delete_after_run": false
}]
});
tokio::fs::write(&store_path, serde_json::to_string_pretty(&json).unwrap())
.await
.unwrap();
let service = CronService::new(store_path, bus.clone());
service.start(&OnMiss::RunOnce).await.unwrap();
service.stop().await;
let msg = tokio::time::timeout(std::time::Duration::from_secs(2), bus.consume_inbound())
.await
.expect("should receive dispatched missed job within timeout")
.expect("bus should have a message");
assert_eq!(msg.content, "run_once_check");
let jobs = service.list_jobs(true).await;
assert_eq!(jobs.len(), 1);
let next = jobs[0].state.next_run_at_ms.unwrap();
assert!(
next > now_ms() - 5000,
"next_run should be in the future after run_once"
);
}
#[test]
fn test_error_backoff_schedule() {
assert_eq!(error_backoff_ms(0), 0);
assert_eq!(error_backoff_ms(1), 30_000);
assert_eq!(error_backoff_ms(2), 60_000);
assert_eq!(error_backoff_ms(3), 300_000);
assert_eq!(error_backoff_ms(10), 3_600_000);
}
#[tokio::test]
async fn test_tick_timeout_applies_error_backoff() {
let temp = tempdir().unwrap();
let bus = Arc::new(MessageBus::with_buffer_size(1));
let store = Arc::new(RwLock::new(CronStore {
version: 1,
jobs: vec![
CronJob {
id: "fill".to_string(),
name: "fill queue".to_string(),
enabled: true,
schedule: CronSchedule::Every { every_ms: 1_000 },
payload: CronPayload {
message: "fill".to_string(),
channel: "cli".to_string(),
chat_id: "cli".to_string(),
},
state: CronJobState {
next_run_at_ms: Some(now_ms() - 1),
..Default::default()
},
created_at_ms: now_ms(),
updated_at_ms: now_ms(),
delete_after_run: false,
},
CronJob {
id: "timeout".to_string(),
name: "should timeout".to_string(),
enabled: true,
schedule: CronSchedule::Every { every_ms: 1_000 },
payload: CronPayload {
message: "timeout".to_string(),
channel: "cli".to_string(),
chat_id: "cli".to_string(),
},
state: CronJobState {
next_run_at_ms: Some(now_ms() - 1),
..Default::default()
},
created_at_ms: now_ms(),
updated_at_ms: now_ms(),
delete_after_run: false,
},
],
}));
let store_path = temp.path().join("jobs.json");
tick(&store, &store_path, &bus, 0).await.unwrap();
let store_guard = store.read().await;
let timed_out = store_guard
.jobs
.iter()
.find(|j| j.id == "timeout")
.expect("timeout job");
assert_eq!(timed_out.state.last_status.as_deref(), Some("error"));
assert_eq!(timed_out.state.consecutive_errors, 1);
let last_run = timed_out.state.last_run_at_ms.expect("last_run_at_ms");
let duration = timed_out.state.last_duration_ms.unwrap_or(0);
let ended_at = last_run + duration;
let next = timed_out
.state
.next_run_at_ms
.expect("next_run_at_ms should be set");
assert!(
next >= ended_at + 29_000,
"expected backoff >= ~30s, got next={} ended_at={}",
next,
ended_at
);
}
#[tokio::test]
async fn test_delete_after_run_at_job_not_removed_on_error() {
let temp = tempdir().unwrap();
let bus = Arc::new(MessageBus::with_buffer_size(1));
let store = Arc::new(RwLock::new(CronStore {
version: 1,
jobs: vec![
CronJob {
id: "fill".to_string(),
name: "fill queue".to_string(),
enabled: true,
schedule: CronSchedule::Every { every_ms: 1_000 },
payload: CronPayload {
message: "fill".to_string(),
channel: "cli".to_string(),
chat_id: "cli".to_string(),
},
state: CronJobState {
next_run_at_ms: Some(now_ms() - 1),
..Default::default()
},
created_at_ms: now_ms(),
updated_at_ms: now_ms(),
delete_after_run: false,
},
CronJob {
id: "atdel".to_string(),
name: "one-shot delete".to_string(),
enabled: true,
schedule: CronSchedule::At {
at_ms: now_ms() - 1,
},
payload: CronPayload {
message: "one-shot".to_string(),
channel: "cli".to_string(),
chat_id: "cli".to_string(),
},
state: CronJobState {
next_run_at_ms: Some(now_ms() - 1),
..Default::default()
},
created_at_ms: now_ms(),
updated_at_ms: now_ms(),
delete_after_run: true,
},
],
}));
let store_path = temp.path().join("jobs.json");
tick(&store, &store_path, &bus, 0).await.unwrap();
let store_guard = store.read().await;
let job = store_guard.jobs.iter().find(|j| j.id == "atdel").unwrap();
assert_eq!(job.state.last_status.as_deref(), Some("error"));
assert!(!job.enabled, "one-shot should be disabled after run");
assert!(
job.state.next_run_at_ms.is_none(),
"one-shot should not be rescheduled after error"
);
}
#[tokio::test]
async fn test_one_shot_job_removed_after_success() {
let temp = tempdir().unwrap();
let bus = Arc::new(MessageBus::new());
let store = Arc::new(RwLock::new(CronStore {
version: 1,
jobs: vec![CronJob {
id: "oneshot-ok".to_string(),
name: "one-shot success".to_string(),
enabled: true,
schedule: CronSchedule::At {
at_ms: now_ms() - 1,
},
payload: CronPayload {
message: "hello".to_string(),
channel: "cli".to_string(),
chat_id: "cli".to_string(),
},
state: CronJobState {
next_run_at_ms: Some(now_ms() - 1),
..Default::default()
},
created_at_ms: now_ms(),
updated_at_ms: now_ms(),
delete_after_run: true,
}],
}));
let store_path = temp.path().join("jobs.json");
assert_eq!(store.read().await.jobs.len(), 1);
tick(&store, &store_path, &bus, 0).await.unwrap();
let store_guard = store.read().await;
assert!(
store_guard.jobs.is_empty(),
"one-shot job with delete_after_run=true should be removed after successful dispatch"
);
}
}