use std::{
path::PathBuf,
sync::{Arc, OnceLock},
};
use anyhow::Context;
use chrono::{Datelike, TimeZone, Timelike, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tracing::{debug, info, trace, warn};
use rsclaw_config::schema::{CronDelivery, CronJobConfig};
const ERROR_BACKOFF_MS: [u64; 5] = [
30_000, 60_000, 300_000, 900_000, 3_600_000, ];
pub fn error_backoff_ms(consecutive_errors: u32) -> u64 {
let idx = (consecutive_errors.saturating_sub(1) as usize).min(ERROR_BACKOFF_MS.len() - 1);
ERROR_BACKOFF_MS[idx]
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum CronSchedule {
Flat(String),
Tagged(CronScheduleTagged),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum CronScheduleTagged {
#[serde(rename = "cron")]
Nested {
expr: String,
#[serde(default)]
tz: Option<String>,
},
#[serde(rename = "every")]
Every {
#[serde(default, alias = "everyMs")]
every_ms: Option<u64>,
#[serde(default, alias = "anchorMs")]
anchor_ms: Option<u64>,
},
#[serde(rename = "once")]
Once {
#[serde(default, alias = "atMs")]
at_ms: Option<u64>,
#[serde(default, alias = "delayMs")]
delay_ms: Option<u64>,
},
}
impl CronSchedule {
pub fn expr(&self) -> &str {
match self {
CronSchedule::Flat(s) => s,
CronSchedule::Tagged(CronScheduleTagged::Nested { expr, .. }) => expr,
CronSchedule::Tagged(CronScheduleTagged::Every { .. }) => "every",
CronSchedule::Tagged(CronScheduleTagged::Once { .. }) => "once",
}
}
pub fn tz(&self) -> Option<&str> {
match self {
CronSchedule::Flat(_) => None,
CronSchedule::Tagged(CronScheduleTagged::Nested { tz, .. }) => tz.as_deref(),
CronSchedule::Tagged(CronScheduleTagged::Every { .. }) => None,
CronSchedule::Tagged(CronScheduleTagged::Once { .. }) => None,
}
}
pub fn is_once(&self) -> bool {
matches!(self, CronSchedule::Tagged(CronScheduleTagged::Once { .. }))
}
pub fn compute_next_run(&self, from_ms: u64) -> Option<u64> {
match self {
CronSchedule::Flat(expr) => compute_next_run_from_expr(expr, from_ms, None),
CronSchedule::Tagged(CronScheduleTagged::Nested { expr, tz, .. }) => {
compute_next_run_from_expr(expr, from_ms, tz.as_deref())
}
CronSchedule::Tagged(CronScheduleTagged::Every {
every_ms,
anchor_ms,
}) => {
let every_ms = every_ms.unwrap_or(0);
if every_ms == 0 {
return None;
}
let anchor = anchor_ms.unwrap_or(from_ms);
if anchor > from_ms {
Some(anchor)
} else {
let elapsed = from_ms - anchor;
let n = (elapsed / every_ms) + 1;
Some(anchor + n * every_ms)
}
}
CronSchedule::Tagged(CronScheduleTagged::Once { at_ms, delay_ms }) => {
if let Some(at) = at_ms {
if *at > from_ms { Some(*at) } else { None }
} else if let Some(delay) = delay_ms {
let target = from_ms + delay;
if target > from_ms { Some(target) } else { None }
} else {
None
}
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum CronPayload {
Text(String),
Structured {
#[serde(default, alias = "kind")]
kind: Option<String>,
#[serde(alias = "text", rename = "message", default)]
text: Option<String>,
#[serde(default, alias = "timeoutSeconds")]
timeout_seconds: Option<u64>,
#[serde(default)]
summarize: Option<bool>,
},
}
impl CronPayload {
pub fn text(&self) -> &str {
match self {
CronPayload::Text(s) => s,
CronPayload::Structured { text, .. } => text.as_deref().unwrap_or(""),
}
}
pub fn summarize(&self) -> bool {
match self {
CronPayload::Text(_) => false,
CronPayload::Structured { summarize, .. } => summarize.unwrap_or(false),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CronIter {
pub items: Vec<String>,
#[serde(default)]
pub cursor: usize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CronJobState {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_run_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_run_status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_duration_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_delivery_status: Option<String>,
#[serde(default)]
pub consecutive_errors: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next_run_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub running_at_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CronJob {
pub id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default)]
pub agent_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_key: Option<String>,
pub enabled: bool,
pub schedule: CronSchedule,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload: Option<CronPayload>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delivery: Option<CronDelivery>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_target: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wake_mode: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state: Option<CronJobState>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub iter: Option<CronIter>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub created_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub updated_at_ms: Option<u64>,
}
impl CronJob {
pub fn effective_message(&self) -> &str {
if let Some(ref payload) = self.payload {
return payload.text();
}
self.message.as_deref().unwrap_or("")
}
pub fn cron_expr(&self) -> &str {
self.schedule.expr()
}
pub fn timezone(&self) -> Option<&str> {
self.schedule.tz()
}
pub fn render_message(&self) -> String {
let raw = self.effective_message();
let Some(iter) = self.iter.as_ref() else {
return raw.to_owned();
};
if iter.items.is_empty() {
return raw.to_owned();
}
let n = iter.items.len();
let cur = iter.cursor % n;
let nxt = (cur + 1) % n;
raw.replace("{current}", &iter.items[cur])
.replace("{next}", &iter.items[nxt])
.replace("{index}", &(cur + 1).to_string())
.replace("{total}", &n.to_string())
}
pub fn advance_iter(&mut self) -> Option<usize> {
let iter = self.iter.as_mut()?;
if iter.items.is_empty() {
return None;
}
iter.cursor = (iter.cursor + 1) % iter.items.len();
Some(iter.cursor)
}
pub fn bake_message(&mut self, text: String) {
if let Some(payload) = self.payload.as_mut() {
match payload {
CronPayload::Text(s) => *s = text,
CronPayload::Structured { text: t, .. } => *t = Some(text),
}
} else {
self.message = Some(text);
}
}
}
impl From<&CronJobConfig> for CronJob {
fn from(cfg: &CronJobConfig) -> Self {
let session_key = cfg.session.as_ref().and_then(|v| {
if let serde_json::Value::String(s) = v {
Some(s.clone())
} else {
None
}
});
let schedule = if let Some(ref tz) = cfg.tz {
CronSchedule::Tagged(CronScheduleTagged::Nested {
expr: cfg.schedule.clone(),
tz: Some(tz.clone()),
})
} else {
CronSchedule::Flat(cfg.schedule.clone())
};
Self {
id: cfg.id.clone(),
name: cfg.name.clone(),
agent_id: cfg
.agent_id
.clone()
.unwrap_or_else(|| "default".to_string()),
session_key,
enabled: cfg.enabled.unwrap_or(true),
schedule,
payload: None,
message: Some(cfg.message.clone()),
delivery: cfg.delivery.clone(),
session_target: None,
wake_mode: None,
state: None,
iter: None,
created_at_ms: None,
updated_at_ms: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronStore {
pub version: u32,
pub jobs: Vec<CronJob>,
}
impl Default for CronStore {
fn default() -> Self {
Self {
version: 1,
jobs: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RunLogEntry {
pub id: String,
pub job_id: String,
pub started_at: chrono::DateTime<Utc>,
pub finished_at: Option<chrono::DateTime<Utc>>,
pub success: bool,
pub reply_preview: Option<String>,
pub error: Option<String>,
}
pub fn cron_jobs_config_equal(a: &CronJob, b: &CronJob) -> bool {
let mut a_v = match serde_json::to_value(a) {
Ok(v) => v,
Err(_) => return false,
};
let mut b_v = match serde_json::to_value(b) {
Ok(v) => v,
Err(_) => return false,
};
for v in [&mut a_v, &mut b_v] {
if let Some(obj) = v.as_object_mut() {
obj.remove("state");
obj.remove("createdAtMs");
obj.remove("updatedAtMs");
}
}
a_v == b_v
}
fn field_matches(field: &str, value: u32) -> bool {
if field == "*" {
return true;
}
if let Some(step) = field.strip_prefix("*/") {
if let Ok(n) = step.parse::<u32>() {
return n > 0 && value % n == 0;
}
}
if field.contains(',') {
return field
.split(',')
.any(|part| field_matches(part.trim(), value));
}
if field.contains('-') {
let parts: Vec<&str> = field.split('-').collect();
if parts.len() == 2 {
if let (Ok(start), Ok(end)) = (parts[0].parse::<u32>(), parts[1].parse::<u32>()) {
return value >= start && value <= end;
}
}
}
field.parse::<u32>().map(|v| v == value).unwrap_or(false)
}
fn dow_matches(field: &str, dow: u32) -> bool {
if field == "*" {
return true;
}
if let Some(step) = field.strip_prefix("*/") {
if let Ok(n) = step.parse::<u32>() {
return n > 0 && dow % n == 0;
}
}
if field.contains(',') {
return field.split(',').any(|part| dow_matches(part.trim(), dow));
}
if field.contains('-') {
let parts: Vec<&str> = field.split('-').collect();
if parts.len() == 2 {
if let (Ok(start), Ok(end)) = (parts[0].parse::<u32>(), parts[1].parse::<u32>()) {
return dow >= start && dow <= end;
}
}
}
field.parse::<u32>().map(|v| v == dow).unwrap_or(false)
}
pub fn compute_next_run_from_expr(cron_expr: &str, from_ms: u64, tz: Option<&str>) -> Option<u64> {
let fields: Vec<&str> = cron_expr.split_whitespace().collect();
if fields.len() != 5 {
warn!(expr = %cron_expr, "cron: expression must have exactly 5 fields");
return None;
}
let [min_f, hr_f, dom_f, mon_f, dow_f] = fields[..] else {
return None;
};
let utc_dt = match chrono::DateTime::from_timestamp_millis(from_ms as i64) {
Some(dt) => dt,
None => return None,
};
let tz_opt: Option<chrono_tz::Tz> = tz.and_then(|tz_str| tz_str.parse().ok());
let tz_for_search: chrono_tz::Tz = tz_opt.unwrap_or_else(rsclaw_config::system_tz);
let local_now = utc_dt.with_timezone(&tz_for_search);
let mut cand = local_now
.with_second(0)
.expect("second 0 always valid")
.with_nanosecond(0)
.expect("nanosecond 0 always valid");
cand += chrono::Duration::minutes(1);
let max_cand = cand + chrono::Duration::days(366);
while cand < max_cand {
let dow = cand.date_naive().weekday().num_days_from_sunday();
let m = field_matches(mon_f, cand.month());
let d = field_matches(dom_f, cand.day());
let w = dow_matches(dow_f, dow);
if !(m && d && w) {
cand = (cand.date_naive() + chrono::Days::new(1))
.and_hms_opt(0, 0, 0)
.and_then(|naive| cand.timezone().from_local_datetime(&naive).single())
.unwrap_or_else(|| cand + chrono::Duration::days(1));
continue;
}
let h = field_matches(hr_f, cand.hour());
let mi = field_matches(min_f, cand.minute());
trace!(expr=%cron_expr, dow, "searching: {} m={} d={} w={} h={} mi={}", cand.date_naive(), m, d, w, h, mi);
if h && mi {
let utc_cand = cand.with_timezone(&chrono::Utc);
debug!(expr=%cron_expr, "MATCH: {} (UTC: {})", cand, utc_cand);
return Some(utc_cand.timestamp_millis() as u64);
}
cand += chrono::Duration::minutes(1);
}
warn!(expr = %cron_expr, "cron: no next run found within 1 year");
None
}
pub fn current_timestamp_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_millis() as u64
}
pub fn build_run_log_entry(
job: &CronJob,
success: bool,
error: Option<anyhow::Error>,
) -> RunLogEntry {
RunLogEntry {
id: uuid::Uuid::new_v4().to_string(),
job_id: job.id.clone(),
started_at: Utc::now(),
finished_at: Some(Utc::now()),
success,
reply_preview: None,
error: error.map(|e| e.to_string()),
}
}
pub fn extract_saved_files_content(output: &str) -> String {
use std::collections::HashSet;
let patterns = [
r"报告已保存[:\s]+([^\n]+)",
r"文件已保存[:\s]+([^\n]+)",
r"saved to[:\s]+([^\n]+)",
r"output saved[:\s]+([^\n]+)",
r"保存到[:\s]+([^\n]+)",
];
let mut seen_paths: HashSet<String> = HashSet::new();
let mut contents: Vec<String> = Vec::new();
for pattern in &patterns {
let re = regex::Regex::new(pattern).unwrap();
for cap in re.captures_iter(output) {
let path = cap[1].trim();
if seen_paths.contains(path) {
continue;
}
seen_paths.insert(path.to_string());
if let Ok(content) = std::fs::read_to_string(path) {
contents.push(format!("[FILE: {}]\n{}", path, content));
}
}
}
contents.join("\n\n---\n\n")
}
pub fn resolve_cron_store_path() -> PathBuf {
let base = rsclaw_config::loader::base_dir();
base.join("cron.json5")
}
pub fn load_cron_jobs() -> (Vec<CronJob>, bool) {
if let Some(store) = cron_store() {
match store.cron_list() {
Ok(entries) => {
let jobs: Vec<CronJob> = entries
.into_iter()
.filter_map(|(_, json)| serde_json::from_str::<CronJob>(&json).ok())
.collect();
return (jobs, true);
}
Err(e) => {
warn!(err = %e, "cron: redb load failed; falling back to file");
return load_cron_jobs_from_file();
}
}
}
load_cron_jobs_from_file()
}
pub fn save_cron_jobs(jobs: &[CronJob]) -> anyhow::Result<()> {
if let Some(store) = cron_store() {
let entries: Vec<(String, String)> = jobs
.iter()
.filter_map(|j| serde_json::to_string(j).ok().map(|s| (j.id.clone(), s)))
.collect();
store
.cron_bulk_replace(&entries)
.context("redb cron_bulk_replace failed")?;
export_cron_jobs_to_file(jobs);
return Ok(());
}
let cron_file = resolve_cron_store_path();
let store = serde_json::json!({ "version": 1, "jobs": jobs });
let json =
serde_json::to_string_pretty(&store).context("failed to serialize cron jobs to JSON")?;
if let Some(parent) = cron_file.parent() {
std::fs::create_dir_all(parent).context("failed to create cron directory")?;
}
let tmp = format!("{}.tmp", cron_file.display());
std::fs::write(&tmp, json).context("failed to write cron jobs tmp file")?;
std::fs::rename(&tmp, &cron_file).context("failed to rename cron jobs file")?;
Ok(())
}
pub static CRON_FILE_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
static CRON_STORE: std::sync::OnceLock<std::sync::Arc<rsclaw_store::RedbStore>> =
std::sync::OnceLock::new();
pub fn init_cron_store(store: std::sync::Arc<rsclaw_store::RedbStore>) {
if CRON_STORE.set(Arc::clone(&store)).is_err() {
return; }
let count = reconcile_file_to_redb_on_boot(&store);
info!(count, "cron: storage bound to redb (post-reconcile)");
}
pub fn cron_store() -> Option<std::sync::Arc<rsclaw_store::RedbStore>> {
CRON_STORE.get().cloned()
}
pub fn reconcile_file_to_redb_on_boot(store: &rsclaw_store::RedbStore) -> usize {
let (file_jobs, file_ok) = load_cron_jobs_from_file();
let redb_existing: std::collections::HashMap<String, CronJob> = match store.cron_list() {
Ok(entries) => entries
.into_iter()
.filter_map(|(id, json)| serde_json::from_str::<CronJob>(&json).ok().map(|j| (id, j)))
.collect(),
Err(e) => {
warn!(err = %e, "cron: redb cron_list failed during boot reconcile");
std::collections::HashMap::new()
}
};
if !file_ok {
warn!(
"cron: cron.json5 parse failed at boot; redb left untouched ({} jobs)",
redb_existing.len()
);
return redb_existing.len();
}
if file_jobs.is_empty() && redb_existing.is_empty() {
return 0;
}
let mut merged: Vec<(String, String)> = Vec::with_capacity(file_jobs.len());
for mut file_job in file_jobs {
if let Some(redb_job) = redb_existing.get(&file_job.id) {
file_job.state = redb_job.state.clone();
}
let json = match serde_json::to_string(&file_job) {
Ok(s) => s,
Err(e) => {
warn!(err = %e, job_id = %file_job.id, "cron: serialize failed during reconcile");
continue;
}
};
merged.push((file_job.id.clone(), json));
}
if let Err(e) = store.cron_bulk_replace(&merged) {
warn!(err = %e, "cron: reconcile bulk_replace failed");
return redb_existing.len();
}
let added = merged
.iter()
.filter(|(id, _)| !redb_existing.contains_key(id))
.count();
let removed = redb_existing
.keys()
.filter(|id| !merged.iter().any(|(mid, _)| mid == *id))
.count();
if added > 0 || removed > 0 {
info!(
total = merged.len(),
added, removed, "cron: boot reconcile cron.json5 -> redb"
);
}
merged.len()
}
pub fn load_cron_jobs_from_file() -> (Vec<CronJob>, bool) {
let source = resolve_cron_store_path();
if !source.exists() {
let base = rsclaw_config::loader::base_dir();
let legacy = base.join("cron").join("jobs.json");
if legacy.exists() {
info!(from = %legacy.display(), to = %source.display(), "migrating legacy cron/jobs.json to cron.json5");
if let Err(e) = std::fs::copy(&legacy, &source) {
warn!(err = %e, "failed to migrate legacy cron/jobs.json");
} else {
if let Err(e) = std::fs::remove_file(&legacy) {
tracing::debug!("failed to remove legacy cron file: {e}");
}
if let Err(e) = std::fs::remove_dir(base.join("cron")) {
tracing::debug!("failed to remove legacy cron dir: {e}");
}
}
}
}
if !source.exists() {
return (Vec::new(), true);
}
let raw = match std::fs::read_to_string(&source) {
Ok(raw) => raw,
Err(_) => return (Vec::new(), true),
};
if raw.trim().is_empty() {
return (Vec::new(), true);
}
let parsed_result: Result<serde_json::Value, _> =
json5::from_str(&raw).or_else(|_| serde_json::from_str(&raw));
if parsed_result.is_err() {
warn!(file = %source.display(), "cron.json5 parse failed - keeping original file");
return (Vec::new(), false);
}
let parsed = parsed_result.unwrap();
let jobs_array = if let Some(arr) = parsed.get("jobs").and_then(|v| v.as_array()) {
arr.clone()
} else if parsed.is_array() {
parsed.as_array().cloned().unwrap_or_default()
} else {
Vec::new()
};
let total = jobs_array.len();
let jobs: Vec<CronJob> = jobs_array
.iter()
.filter_map(|v| serde_json::from_value::<CronJob>(v.clone()).ok())
.collect();
let loaded = jobs.len();
if loaded < total {
return (jobs, false);
}
(jobs, true)
}
pub fn export_cron_jobs_to_file(jobs: &[CronJob]) {
let cron_file = resolve_cron_store_path();
let store = serde_json::json!({ "version": 1, "jobs": jobs });
let json = match serde_json::to_string_pretty(&store) {
Ok(s) => s,
Err(e) => {
warn!(err = %e, "cron: export serialize failed");
return;
}
};
if let Some(parent) = cron_file.parent() {
let _ = std::fs::create_dir_all(parent);
}
let tmp = format!("{}.tmp", cron_file.display());
if let Err(e) = std::fs::write(&tmp, &json) {
warn!(err = %e, "cron: export write failed");
return;
}
if let Err(e) = std::fs::rename(&tmp, &cron_file) {
warn!(err = %e, "cron: export rename failed");
}
}
static CRON_RELOAD_TX: OnceLock<broadcast::Sender<()>> = OnceLock::new();
pub fn install_reload_sender(tx: broadcast::Sender<()>) {
if CRON_RELOAD_TX.set(tx).is_err() {
warn!("cron: reload sender already installed, ignoring duplicate install");
}
}
pub fn trigger_reload() -> bool {
match CRON_RELOAD_TX.get() {
Some(tx) => tx.send(()).is_ok(),
None => false,
}
}
pub fn validate_cron_expr(expr: &str) -> Result<(), String> {
let trimmed = expr.trim();
if trimmed.is_empty() {
return Err("cron expression is empty".to_owned());
}
let fields: Vec<&str> = trimmed.split_whitespace().collect();
if fields.len() != 5 {
let hint = if fields.len() == 4
&& fields[0].len() >= 2
&& fields[0].chars().all(|c| c.is_ascii_digit())
{
let n = fields[0];
format!(
" — looks like a missing space: '{}' could be '{} {}' which makes 5 fields (e.g. '0 17 * * *' for 5pm daily)",
n,
&n[..1],
&n[1..]
)
} else {
String::new()
};
return Err(format!(
"cron expression must have exactly 5 fields separated by spaces \
(minute hour day month weekday), got {} field(s): '{}'{}",
fields.len(),
trimmed,
hint
));
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if compute_next_run_from_expr(trimmed, now, None).is_none() {
return Err(format!(
"cron expression '{}' could not be parsed. Valid examples: \
'*/5 * * * *' (every 5 min), '0 17 * * *' (5pm daily), \
'0 9 * * 1' (9am Mondays)",
trimmed
));
}
Ok(())
}
#[cfg(test)]
mod cron_config_equal_tests {
use super::*;
fn job(id: &str, expr: &str, msg: &str) -> CronJob {
CronJob {
id: id.to_string(),
name: Some(id.to_string()),
agent_id: "default".to_string(),
session_key: None,
enabled: true,
schedule: CronSchedule::Flat(expr.to_string()),
payload: None,
message: Some(msg.to_string()),
delivery: None,
session_target: None,
wake_mode: None,
state: None,
iter: None,
created_at_ms: Some(1_000),
updated_at_ms: Some(1_000),
}
}
#[test]
fn identical_jobs_equal() {
let a = job("j1", "*/5 * * * *", "ping");
let b = job("j1", "*/5 * * * *", "ping");
assert!(cron_jobs_config_equal(&a, &b));
}
#[test]
fn different_message_not_equal() {
let a = job("j1", "*/5 * * * *", "ping");
let b = job("j1", "*/5 * * * *", "pong");
assert!(!cron_jobs_config_equal(&a, &b));
}
#[test]
fn different_schedule_not_equal() {
let a = job("j1", "*/5 * * * *", "ping");
let b = job("j1", "*/30 * * * *", "ping");
assert!(!cron_jobs_config_equal(&a, &b));
}
#[test]
fn state_diff_still_equal() {
let mut a = job("j1", "*/5 * * * *", "ping");
let mut b = job("j1", "*/5 * * * *", "ping");
a.state = Some(CronJobState {
consecutive_errors: 0,
..Default::default()
});
b.state = Some(CronJobState {
consecutive_errors: 7,
last_error: Some("boom".to_string()),
next_run_at_ms: Some(99_999),
..Default::default()
});
assert!(cron_jobs_config_equal(&a, &b));
}
#[test]
fn updated_at_diff_still_equal() {
let mut a = job("j1", "*/5 * * * *", "ping");
let mut b = job("j1", "*/5 * * * *", "ping");
a.updated_at_ms = Some(1_000);
b.updated_at_ms = Some(2_000);
assert!(cron_jobs_config_equal(&a, &b));
}
#[test]
fn enabled_diff_not_equal() {
let a = job("j1", "*/5 * * * *", "ping");
let mut b = job("j1", "*/5 * * * *", "ping");
b.enabled = false;
assert!(!cron_jobs_config_equal(&a, &b));
}
}
#[cfg(test)]
mod cron_iter_tests {
use super::*;
fn bare_job(msg: &str) -> CronJob {
CronJob {
id: "rot".into(),
name: None,
agent_id: "default".into(),
session_key: None,
enabled: true,
schedule: CronSchedule::Flat("* * * * *".into()),
payload: None,
message: Some(msg.into()),
delivery: None,
session_target: None,
wake_mode: None,
state: None,
iter: None,
created_at_ms: None,
updated_at_ms: None,
}
}
fn iter_job(items: &[&str], cursor: usize, msg: &str) -> CronJob {
let mut j = bare_job(msg);
j.iter = Some(CronIter {
items: items.iter().map(|s| s.to_string()).collect(),
cursor,
});
j
}
#[test]
fn render_substitutes_current_and_next() {
let j = iter_job(
&["东京", "曼谷", "迪拜"],
0,
"查询{current}天气,下一次:{next}",
);
assert_eq!(j.render_message(), "查询东京天气,下一次:曼谷");
}
#[test]
fn render_index_and_total_one_based() {
let j = iter_job(&["a", "b", "c"], 1, "{index}/{total}: {current}");
assert_eq!(j.render_message(), "2/3: b");
}
#[test]
fn next_wraps_around_at_end() {
let j = iter_job(&["a", "b", "c"], 2, "{current}->{next}");
assert_eq!(j.render_message(), "c->a");
}
#[test]
fn advance_wraps_and_reports_new_cursor() {
let mut j = iter_job(&["x", "y"], 1, "{current}");
assert_eq!(j.advance_iter(), Some(0));
assert_eq!(j.iter.as_ref().unwrap().cursor, 0);
}
#[test]
fn render_without_iter_returns_raw() {
let mut j = bare_job("hello {current}");
assert!(j.iter.is_none());
assert_eq!(j.render_message(), "hello {current}");
assert_eq!(j.advance_iter(), None);
}
#[test]
fn empty_items_falls_back_to_raw() {
let j = iter_job(&[], 0, "x={current}");
assert_eq!(j.render_message(), "x={current}");
}
#[test]
fn bake_overwrites_payload_then_message() {
let mut j = iter_job(&["a", "b"], 0, "ignored");
j.payload = Some(CronPayload::Structured {
kind: Some("agentTurn".into()),
text: Some("查询{current}".into()),
timeout_seconds: None,
summarize: None,
});
let rendered = j.render_message();
assert_eq!(rendered, "查询a");
j.bake_message(rendered);
assert_eq!(j.effective_message(), "查询a");
}
#[test]
fn iter_cursor_survives_serde_roundtrip() {
let mut j = iter_job(&["东京", "曼谷", "迪拜"], 0, "查询{current}");
let rendered = j.render_message();
assert_eq!(rendered, "查询东京");
assert_eq!(j.advance_iter(), Some(1));
let json = serde_json::to_string(&j).expect("serialize");
let restored: CronJob = serde_json::from_str(&json).expect("deserialize");
let iter = restored.iter.as_ref().expect("iter must round-trip");
assert_eq!(iter.cursor, 1, "cursor must survive serde roundtrip");
assert_eq!(iter.items, vec!["东京", "曼谷", "迪拜"]);
assert_eq!(restored.render_message(), "查询曼谷");
}
#[tokio::test]
async fn iter_cursor_persists_to_disk_before_dispatch() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("cron.json5");
let mut job = iter_job(&["a", "b", "c"], 0, "do {current}");
let rendered = job.render_message();
assert_eq!(rendered, "do a");
assert_eq!(job.advance_iter(), Some(1));
#[derive(serde::Serialize, serde::Deserialize)]
struct Store {
version: u32,
jobs: Vec<CronJob>,
}
let store = Store {
version: 1,
jobs: vec![job],
};
let json = serde_json::to_string_pretty(&store).expect("serialize");
let tmp_path = format!("{}.tmp", path.display());
tokio::fs::write(&tmp_path, &json).await.expect("write tmp");
tokio::fs::rename(&tmp_path, &path).await.expect("rename");
let bytes = tokio::fs::read(&path).await.expect("read");
let restored: Store = serde_json::from_slice(&bytes).expect("deserialize");
assert_eq!(restored.jobs.len(), 1);
let iter = restored.jobs[0].iter.as_ref().expect("iter present");
assert_eq!(
iter.cursor, 1,
"cursor must persist before dispatch returns"
);
assert_eq!(
restored.jobs[0].render_message(),
"do b",
"next fire post-restart must pick the next item, not replay 'a'"
);
}
}
#[cfg(test)]
mod cron_validate_tests {
use super::validate_cron_expr;
#[test]
fn accepts_common_patterns() {
for ok in ["*/5 * * * *", "0 17 * * *", "30 8 * * 1-5", "0 9 1 * *"] {
assert!(validate_cron_expr(ok).is_ok(), "should accept '{}'", ok);
}
}
#[test]
fn rejects_empty() {
assert!(validate_cron_expr("").is_err());
assert!(validate_cron_expr(" ").is_err());
}
#[test]
fn rejects_four_fields_with_hint() {
let err = validate_cron_expr("017 * * *").unwrap_err();
assert!(err.contains("5 fields"), "err = {err}");
assert!(err.contains("0 17"), "should hint at '0 17': {err}");
}
#[test]
fn rejects_garbage() {
assert!(validate_cron_expr("not a cron").is_err());
}
}