use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use chrono::{Datelike, TimeZone, Timelike, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, Semaphore};
use tokio::io::AsyncWriteExt;
use tokio::time::sleep;
use tracing::{debug, info, trace, warn};
use crate::{
agent::{AgentMessage, AgentRegistry},
channel::{ChannelManager, OutboundMessage},
config::schema::{CronConfig, CronDelivery, CronJobConfig},
};
const MAX_TIMER_DELAY_MS: u64 = 60_000;
const MIN_REFIRE_GAP_MS: u64 = 2_000;
const MAX_CONSECUTIVE_ERRORS: u32 = 5;
const STUCK_RUN_MS: u64 = 2 * 60 * 60 * 1000;
const ERROR_BACKOFF_MS: [u64; 5] = [
30_000, 60_000, 300_000, 900_000, 3_600_000, ];
fn error_backoff_ms(consecutive_errors: u32) -> u64 {
let idx = (consecutive_errors.saturating_sub(1) as usize).min(ERROR_BACKOFF_MS.len() - 1);
ERROR_BACKOFF_MS[idx]
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum CronSchedule {
Flat(String),
Tagged(CronScheduleTagged),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum CronScheduleTagged {
#[serde(rename = "cron")]
Nested {
expr: String,
#[serde(default)]
tz: Option<String>,
},
#[serde(rename = "every")]
Every {
#[serde(default, alias = "everyMs")]
every_ms: Option<u64>,
#[serde(default, alias = "anchorMs")]
anchor_ms: Option<u64>,
},
#[serde(rename = "once")]
Once {
#[serde(default, alias = "atMs")]
at_ms: Option<u64>,
#[serde(default, alias = "delayMs")]
delay_ms: Option<u64>,
},
}
impl CronSchedule {
pub fn expr(&self) -> &str {
match self {
CronSchedule::Flat(s) => s,
CronSchedule::Tagged(CronScheduleTagged::Nested { expr, .. }) => expr,
CronSchedule::Tagged(CronScheduleTagged::Every { .. }) => "every",
CronSchedule::Tagged(CronScheduleTagged::Once { .. }) => "once",
}
}
pub fn tz(&self) -> Option<&str> {
match self {
CronSchedule::Flat(_) => None,
CronSchedule::Tagged(CronScheduleTagged::Nested { tz, .. }) => tz.as_deref(),
CronSchedule::Tagged(CronScheduleTagged::Every { .. }) => None,
CronSchedule::Tagged(CronScheduleTagged::Once { .. }) => None,
}
}
pub fn is_once(&self) -> bool {
matches!(self, CronSchedule::Tagged(CronScheduleTagged::Once { .. }))
}
pub fn compute_next_run(&self, from_ms: u64) -> Option<u64> {
match self {
CronSchedule::Flat(expr) => {
crate::cron::compute_next_run_from_expr(expr, from_ms, None)
}
CronSchedule::Tagged(CronScheduleTagged::Nested { expr, tz, .. }) => {
crate::cron::compute_next_run_from_expr(expr, from_ms, tz.as_deref())
}
CronSchedule::Tagged(CronScheduleTagged::Every { every_ms, anchor_ms }) => {
let every_ms = every_ms.unwrap_or(0);
if every_ms == 0 {
return None;
}
let anchor = anchor_ms.unwrap_or(from_ms);
if anchor > from_ms {
Some(anchor)
} else {
let elapsed = from_ms - anchor;
let n = (elapsed / every_ms) + 1;
Some(anchor + n * every_ms)
}
}
CronSchedule::Tagged(CronScheduleTagged::Once { at_ms, delay_ms }) => {
if let Some(at) = at_ms {
if *at > from_ms { Some(*at) } else { None }
} else if let Some(delay) = delay_ms {
let target = from_ms + delay;
if target > from_ms { Some(target) } else { None }
} else {
None
}
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum CronPayload {
Text(String),
Structured {
#[serde(default, alias = "kind")]
kind: Option<String>,
#[serde(alias = "text", rename = "message", default)]
text: Option<String>,
#[serde(default, alias = "timeoutSeconds")]
timeout_seconds: Option<u64>,
},
}
impl CronPayload {
pub fn text(&self) -> &str {
match self {
CronPayload::Text(s) => s,
CronPayload::Structured { text, .. } => text.as_deref().unwrap_or(""),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CronJobState {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_run_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_run_status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_duration_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_delivery_status: Option<String>,
#[serde(default)]
pub consecutive_errors: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next_run_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub running_at_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CronJob {
pub id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default)]
pub agent_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_key: Option<String>,
pub enabled: bool,
pub schedule: CronSchedule,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload: Option<CronPayload>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delivery: Option<CronDelivery>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_target: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wake_mode: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state: Option<CronJobState>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub created_at_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub updated_at_ms: Option<u64>,
}
impl CronJob {
pub fn effective_message(&self) -> &str {
if let Some(ref payload) = self.payload {
return payload.text();
}
self.message.as_deref().unwrap_or("")
}
pub fn cron_expr(&self) -> &str {
self.schedule.expr()
}
pub fn timezone(&self) -> Option<&str> {
self.schedule.tz()
}
}
impl From<&CronJobConfig> for CronJob {
fn from(cfg: &CronJobConfig) -> Self {
let session_key = cfg.session.as_ref().and_then(|v| {
if let serde_json::Value::String(s) = v {
Some(s.clone())
} else {
None
}
});
let schedule = if let Some(ref tz) = cfg.tz {
CronSchedule::Tagged(CronScheduleTagged::Nested {
expr: cfg.schedule.clone(),
tz: Some(tz.clone()),
})
} else {
CronSchedule::Flat(cfg.schedule.clone())
};
Self {
id: cfg.id.clone(),
name: cfg.name.clone(),
agent_id: cfg.agent_id.clone().unwrap_or_else(|| "default".to_string()),
session_key,
enabled: cfg.enabled.unwrap_or(true),
schedule,
payload: None,
message: Some(cfg.message.clone()),
delivery: cfg.delivery.clone(),
session_target: None,
wake_mode: None,
state: None,
created_at_ms: None,
updated_at_ms: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CronStore {
version: u32,
jobs: Vec<CronJob>,
}
impl Default for CronStore {
fn default() -> Self {
Self { version: 1, jobs: Vec::new() }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RunLogEntry {
pub id: String,
pub job_id: String,
pub started_at: chrono::DateTime<Utc>,
pub finished_at: Option<chrono::DateTime<Utc>>,
pub success: bool,
pub reply_preview: Option<String>,
pub error: Option<String>,
}
pub struct CronRunner {
jobs: Vec<CronJob>,
agents: Arc<AgentRegistry>,
channels: Arc<ChannelManager>,
run_log_dir: PathBuf,
store_path: PathBuf,
semaphore: Arc<Semaphore>,
default_delivery: Option<CronDelivery>,
reload_tx: broadcast::Sender<()>,
ws_conns: Arc<crate::ws::ConnRegistry>,
}
impl CronRunner {
pub fn new(
config: &CronConfig,
jobs: Vec<CronJob>,
agents: Arc<AgentRegistry>,
channels: Arc<ChannelManager>,
data_dir: PathBuf,
reload_tx: broadcast::Sender<()>,
ws_conns: Arc<crate::ws::ConnRegistry>,
) -> Self {
let run_log_dir = data_dir.join("cron");
let store_path = data_dir.join("cron_store.json");
if let Err(e) = std::fs::create_dir_all(&run_log_dir) {
tracing::warn!("failed to create cron run log dir: {e}");
}
Self {
jobs,
agents,
channels,
run_log_dir,
store_path,
semaphore: Arc::new(Semaphore::new(4)),
default_delivery: config.default_delivery.clone(),
reload_tx,
ws_conns,
}
}
pub fn jobs(&self) -> &[CronJob] {
&self.jobs
}
pub async fn run(&self) -> Result<()> {
info!("cron scheduler starting");
let mut jobs = self.jobs.clone();
let now_ms = current_timestamp_ms();
for job in &mut jobs {
if job.state.is_none() {
job.state = Some(CronJobState {
consecutive_errors: 0,
..Default::default()
});
}
let state = job.state.as_mut().unwrap();
if let Some(running_at) = state.running_at_ms {
if now_ms - running_at > STUCK_RUN_MS {
warn!(job_id = %job.id, "cron: clearing stale running marker");
state.running_at_ms = None;
}
}
if state.next_run_at_ms.is_none() || state.next_run_at_ms.is_some_and(|t| t <= now_ms) {
let old_ts = state.next_run_at_ms;
state.next_run_at_ms = job.schedule.compute_next_run(now_ms);
info!(job_id = %job.id, old = ?old_ts, new = ?state.next_run_at_ms, "cron: recomputed next_run_at_ms");
}
}
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to save initial store");
}
let enabled_count = jobs.iter().filter(|j| j.enabled).count();
info!(
total = jobs.len(),
enabled = enabled_count,
next_wake = jobs.iter().filter_map(|j| j.state.as_ref().and_then(|s| s.next_run_at_ms)).min().unwrap_or(0),
"cron scheduler started"
);
let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
let running_clone = Arc::clone(&running);
let semaphore = Arc::clone(&self.semaphore);
let reload_rx = self.reload_tx.subscribe();
let runner = self.clone();
let timer_handle = tokio::spawn(async move {
runner.timer_loop(jobs, running_clone, semaphore, reload_rx).await;
});
tokio::signal::ctrl_c().await?;
info!("cron scheduler shutting down");
running.store(false, std::sync::atomic::Ordering::SeqCst);
sleep(Duration::from_millis(100)).await;
timer_handle.await.ok();
info!("cron scheduler stopped");
Ok(())
}
async fn timer_loop(
&self,
mut jobs: Vec<CronJob>,
running: Arc<std::sync::atomic::AtomicBool>,
semaphore: Arc<Semaphore>,
mut reload_rx: broadcast::Receiver<()>,
) {
let (result_tx, mut result_rx) = tokio::sync::mpsc::channel::<(String, bool, u64, u64, Option<String>)>(64);
let mut cancel_flags: HashMap<String, Arc<std::sync::atomic::AtomicBool>> = HashMap::new();
loop {
if !running.load(std::sync::atomic::Ordering::SeqCst) {
break;
}
let now_ms = current_timestamp_ms();
let next_wake = jobs
.iter()
.filter(|j| j.enabled)
.filter_map(|j| j.state.as_ref().and_then(|s| s.next_run_at_ms))
.min();
let expired_threshold_ms = 5 * 60 * 1000;
let before_len = jobs.len();
jobs.retain(|j| {
if !j.schedule.is_once() || !j.enabled { return true; }
if let Some(state) = &j.state {
if let Some(next_at) = state.next_run_at_ms {
if now_ms > next_at + expired_threshold_ms {
info!(job_id = %j.id, name = ?j.name, "cron: removing expired once job (past due by {}s)", (now_ms - next_at) / 1000);
return false;
}
}
}
true
});
if jobs.len() < before_len {
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to persist after expired job cleanup");
}
}
debug!(next_wake = next_wake.unwrap_or(0), now_ms, "cron: timer tick");
let delay_ms = match next_wake {
Some(next_wake) => {
let delay = next_wake.saturating_sub(now_ms);
if delay == 0 {
MIN_REFIRE_GAP_MS
} else {
delay.min(MAX_TIMER_DELAY_MS)
}
}
None => {
debug!("cron: no jobs scheduled, waiting {}ms", MAX_TIMER_DELAY_MS);
MAX_TIMER_DELAY_MS
}
};
let reload_triggered = tokio::select! {
_ = sleep(Duration::from_millis(delay_ms)) => {
false
}
result = reload_rx.recv() => {
match result {
Ok(()) => true,
Err(broadcast::error::RecvError::Closed) => {
return;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
true
}
}
}
};
if !running.load(std::sync::atomic::Ordering::SeqCst) {
break;
}
if reload_triggered {
let old_count = jobs.len();
let new_jobs = crate::cron::load_cron_jobs();
let file_count = new_jobs.len();
let disabled_in_file: Vec<_> = new_jobs.iter()
.filter(|j| !j.enabled)
.map(|j| (&j.id, j.enabled))
.collect();
info!(old_count, new_count = new_jobs.len(), file_count, disabled=?disabled_in_file, "cron: reload triggered, reloading from file");
jobs = self.merge_jobs(&jobs, new_jobs, now_ms);
let disabled_after_merge: Vec<_> = jobs.iter()
.filter(|j| !j.enabled)
.map(|j| (&j.id, j.enabled))
.collect();
info!(after_merge_count = jobs.len(), disabled=?disabled_after_merge, "cron: merge complete");
let active_ids: std::collections::HashSet<&str> = jobs.iter()
.filter(|j| j.enabled)
.map(|j| j.id.as_str())
.collect();
let to_cancel: Vec<String> = cancel_flags.keys()
.filter(|id| !active_ids.contains(id.as_str()))
.cloned()
.collect();
for id in &to_cancel {
if let Some(flag) = cancel_flags.remove(id) {
flag.store(true, std::sync::atomic::Ordering::SeqCst);
info!(job_id = id, "cron: cancelled running job (deleted/disabled)");
}
}
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to save store after reload");
}
info!(old_count, new_count = jobs.len(), file_count, "cron jobs reloaded");
continue;
}
let due: Vec<_> = jobs
.iter_mut()
.filter(|j| {
j.enabled
&& j.state
.as_ref()
.and_then(|s| s.next_run_at_ms)
.map(|t| t <= now_ms)
.unwrap_or(false)
&& j.state
.as_ref()
.and_then(|s| s.running_at_ms)
.is_none()
})
.map(|j| j.id.clone())
.collect();
if !due.is_empty() {
let disabled_due: Vec<_> = jobs.iter()
.filter(|j| !j.enabled && j.state.as_ref().and_then(|s| s.next_run_at_ms).map(|t| t <= now_ms).unwrap_or(false))
.map(|j| j.id.clone())
.collect();
if !disabled_due.is_empty() {
warn!(job_ids = ?disabled_due, "cron: these jobs are due but disabled!");
}
}
if due.is_empty() {
continue;
}
info!(count = due.len(), "cron: {} jobs due", due.len());
for job_id in due {
let permit = semaphore.clone().acquire_owned().await.ok();
if permit.is_none() {
break;
}
let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) else {
continue;
};
let started_at = current_timestamp_ms();
if let Some(state) = job.state.as_mut() {
state.running_at_ms = Some(started_at);
}
let permit = permit.expect("permit checked above");
let cancelled = Arc::new(std::sync::atomic::AtomicBool::new(false));
cancel_flags.insert(job.id.clone(), Arc::clone(&cancelled));
let job = job.clone();
let agents = Arc::clone(&self.agents);
let channels = Arc::clone(&self.channels);
let run_log_dir = self.run_log_dir.clone();
let default_delivery = self.default_delivery.clone();
let ws_conns = Arc::clone(&self.ws_conns);
let handle = tokio::spawn(async move {
let start_time = current_timestamp_ms();
let job_started_at = started_at;
let prev_consecutive_errors = job.state.as_ref().map(|s| s.consecutive_errors).unwrap_or(0);
info!(job_id = %job.id, "cron job triggered");
let result: Result<String> = if job.payload.as_ref().and_then(|p| match p {
CronPayload::Structured { kind, .. } => kind.as_deref(),
_ => None,
}) == Some("systemEvent") {
Ok(job.effective_message().to_owned())
} else {
tokio::select! {
r = run_cron_job(&job, &agents) => r,
_ = async {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
if cancelled.load(std::sync::atomic::Ordering::SeqCst) {
info!(job_id = %job.id, "cron job cancelled");
break;
}
}
} => {
Err(anyhow::anyhow!("job cancelled"))
}
}
};
let duration_ms = current_timestamp_ms() - start_time;
drop(permit);
let delivery_text = match &result {
Ok(output) if !output.trim().is_empty() => {
output.clone()
}
Ok(_) => {
let job_name = job.name.as_deref().unwrap_or(&job.id);
format!(
"✅ 定时任务执行完成\n\n**任务**: {}\n**耗时**: {}秒",
job_name,
duration_ms / 1000
)
}
Err(e) => {
let job_name = job.name.as_deref().unwrap_or(&job.id);
let consecutive = prev_consecutive_errors + 1;
let backoff = error_backoff_ms(consecutive);
let will_disable = consecutive >= MAX_CONSECUTIVE_ERRORS;
let backoff_text = if backoff < 60_000 {
format!("{}秒", backoff / 1000)
} else if backoff < 3_600_000 {
format!("{}分钟", backoff / 60_000)
} else {
format!("{}小时", backoff / 3_600_000)
};
if will_disable {
format!(
"❌ 定时任务执行失败\n\n**任务**: {}\n**连续失败**: {} 次\n**错误**: {}\n\n⚠️ 任务已被自动禁用,请检查配置后手动启用。",
job_name,
consecutive,
e
)
} else {
format!(
"❌ 定时任务执行失败\n\n**任务**: {}\n**连续失败**: {} 次\n**下次重试**: {}后\n**错误**: {}",
job_name,
consecutive,
backoff_text,
e
)
}
}
};
let _ = &ws_conns;
let delivery_channels = Arc::clone(&channels);
let delivery_job = job.clone();
let delivery_text = delivery_text;
let delivery_default = default_delivery.clone();
tokio::spawn(async move {
if let Err(e) = send_delivery(
&delivery_channels,
&delivery_job,
&delivery_default,
&delivery_text,
)
.await
{
warn!(job_id = %delivery_job.id, %e, "delivery failed");
}
});
let entry = build_run_log_entry(
&job,
result.is_ok(),
result.as_ref().err().map(|e| anyhow::anyhow!("{e}")),
);
if let Err(e) = write_run_log(&run_log_dir, &job.id, entry).await {
tracing::warn!(job_id = %job.id, "failed to write cron run log: {e}");
}
let error_msg = result.as_ref().err().map(|e| e.to_string());
(job.id, result.is_ok(), duration_ms, job_started_at, error_msg)
});
let result_tx = result_tx.clone();
tokio::spawn(async move {
let result = handle.await;
if let Ok(r) = result {
let _ = result_tx.send(r).await;
}
});
}
while let Ok((job_id, success, duration_ms, started_at, error_msg)) = result_rx.try_recv() {
cancel_flags.remove(&job_id);
if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
if let Some(state) = job.state.as_mut() {
state.running_at_ms = None;
state.last_run_at_ms = Some(current_timestamp_ms());
state.last_duration_ms = Some(duration_ms);
let completion_time = started_at + duration_ms;
if success {
state.consecutive_errors = 0;
state.last_run_status = Some("ok".to_string());
state.last_status = Some("ok".to_string());
state.last_error = None;
if job.schedule.is_once() {
info!(job_id = %job.id, "cron: one-shot job completed, marking for removal");
state.next_run_at_ms = None;
job.enabled = false;
} else {
state.next_run_at_ms = job.schedule.compute_next_run(completion_time);
}
} else {
state.consecutive_errors += 1;
state.last_run_status = Some("error".to_string());
state.last_status = Some("error".to_string());
state.last_error = error_msg;
let backoff = error_backoff_ms(state.consecutive_errors);
let backoff_next = completion_time + backoff;
let normal_next = job.schedule.compute_next_run(completion_time);
state.next_run_at_ms = Some(normal_next.map(|n| n.max(backoff_next)).unwrap_or(backoff_next));
info!(
job_id = %job.id,
consecutive_errors = state.consecutive_errors,
backoff_ms = backoff,
next_run_at_ms = state.next_run_at_ms,
"cron: applying error backoff"
);
if state.consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
warn!(
job_id = %job.id,
consecutive_errors = state.consecutive_errors,
"cron: disabling job after repeated failures"
);
job.enabled = false;
}
}
}
}
}
let before = jobs.len();
jobs.retain(|j| !(j.schedule.is_once() && !j.enabled));
if jobs.len() < before {
info!(removed = before - jobs.len(), "cron: cleaned up completed one-shot jobs");
}
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to persist state");
}
}
}
pub async fn trigger(&self, job_id: &str) -> Result<()> {
let job = self
.jobs
.iter()
.find(|j| j.id == job_id)
.with_context(|| format!("cron job not found: {job_id}"))?;
info!(job_id = %job.id, "manually triggering cron job");
let _permit = self.semaphore.acquire().await?;
let prev_consecutive_errors = job.state.as_ref().map(|s| s.consecutive_errors).unwrap_or(0);
let result: Result<String> = if job.payload.as_ref().and_then(|p| match p {
CronPayload::Structured { kind, .. } => kind.as_deref(),
_ => None,
}) == Some("systemEvent") {
Ok(job.effective_message().to_owned())
} else {
run_cron_job(job, &self.agents).await
};
let success = result.is_ok();
let delivery_text = match &result {
Ok(output) if !output.trim().is_empty() => output.clone(),
Ok(_) => {
let job_name = job.name.as_deref().unwrap_or(&job.id);
format!("✅ 定时任务执行完成\n\n**任务**: {}", job_name)
}
Err(e) => {
let job_name = job.name.as_deref().unwrap_or(&job.id);
let consecutive = prev_consecutive_errors + 1;
format!(
"❌ 定时任务执行失败\n\n**任务**: {}\n**连续失败**: {} 次\n**错误**: {}",
job_name, consecutive, e
)
}
};
if let Err(e) =
send_delivery(&self.channels, job, &self.default_delivery, &delivery_text).await
{
warn!(job_id = %job.id, %e, "delivery failed");
}
let log_err = if success {
None
} else {
result.as_ref().err().map(|e| anyhow::anyhow!("{e:#}"))
};
let entry = build_run_log_entry(job, success, log_err);
write_run_log(&self.run_log_dir, &job.id, entry).await?;
result.map(|_| ())
}
fn merge_jobs(&self, old_jobs: &[CronJob], new_jobs: Vec<CronJob>, now_ms: u64) -> Vec<CronJob> {
let mut result = Vec::with_capacity(new_jobs.len());
for mut new_job in new_jobs {
if let Some(old_job) = old_jobs.iter().find(|j| j.id == new_job.id) {
new_job.state = old_job.state.clone();
} else {
if new_job.state.is_none() {
new_job.state = Some(CronJobState {
consecutive_errors: 0,
..Default::default()
});
}
}
if let Some(ref mut state) = new_job.state {
if state.next_run_at_ms.is_none() {
state.next_run_at_ms = new_job.schedule.compute_next_run(now_ms);
}
}
result.push(new_job);
}
result
}
async fn save_store(&self, jobs: &[CronJob]) -> Result<()> {
let store = CronStore {
version: 1,
jobs: jobs.to_vec(),
};
let json = serde_json::to_string_pretty(&store)?;
let tmp = format!("{}.tmp", self.store_path.display());
tokio::fs::write(&tmp, &json).await?;
tokio::fs::rename(&tmp, &self.store_path).await?;
Ok(())
}
}
impl Clone for CronRunner {
fn clone(&self) -> Self {
Self {
jobs: self.jobs.clone(),
agents: Arc::clone(&self.agents),
channels: Arc::clone(&self.channels),
run_log_dir: self.run_log_dir.clone(),
store_path: self.store_path.clone(),
semaphore: Arc::clone(&self.semaphore),
default_delivery: self.default_delivery.clone(),
reload_tx: self.reload_tx.clone(),
ws_conns: Arc::clone(&self.ws_conns),
}
}
}
fn field_matches(field: &str, value: u32) -> bool {
if field == "*" {
return true;
}
if let Some(step) = field.strip_prefix("*/") {
if let Ok(n) = step.parse::<u32>() {
return n > 0 && value % n == 0;
}
}
if field.contains(',') {
return field.split(',').any(|part| field_matches(part.trim(), value));
}
if field.contains('-') {
let parts: Vec<&str> = field.split('-').collect();
if parts.len() == 2 {
if let (Ok(start), Ok(end)) = (parts[0].parse::<u32>(), parts[1].parse::<u32>()) {
return value >= start && value <= end;
}
}
}
field.parse::<u32>().map(|v| v == value).unwrap_or(false)
}
fn dow_matches(field: &str, dow: u32) -> bool {
if field == "*" {
return true;
}
if let Some(step) = field.strip_prefix("*/") {
if let Ok(n) = step.parse::<u32>() {
return n > 0 && dow % n == 0;
}
}
if field.contains(',') {
return field.split(',').any(|part| dow_matches(part.trim(), dow));
}
if field.contains('-') {
let parts: Vec<&str> = field.split('-').collect();
if parts.len() == 2 {
if let (Ok(start), Ok(end)) = (parts[0].parse::<u32>(), parts[1].parse::<u32>()) {
return dow >= start && dow <= end;
}
}
}
field.parse::<u32>().map(|v| v == dow).unwrap_or(false)
}
fn compute_next_run_from_expr(cron_expr: &str, from_ms: u64, tz: Option<&str>) -> Option<u64> {
let fields: Vec<&str> = cron_expr.split_whitespace().collect();
if fields.len() != 5 {
warn!(expr = %cron_expr, "cron: expression must have exactly 5 fields");
return None;
}
let [min_f, hr_f, dom_f, mon_f, dow_f] = fields[..] else {
return None;
};
let utc_dt = match chrono::DateTime::from_timestamp_millis(from_ms as i64) {
Some(dt) => dt,
None => return None,
};
let tz_opt: Option<chrono_tz::Tz> = tz.and_then(|tz_str| tz_str.parse().ok());
let tz_for_search: chrono_tz::Tz = tz_opt.unwrap_or_else(crate::config::system_tz);
let local_now = utc_dt.with_timezone(&tz_for_search);
let mut cand = local_now
.with_second(0).expect("second 0 always valid")
.with_nanosecond(0).expect("nanosecond 0 always valid");
cand += chrono::Duration::minutes(1);
let max_cand = cand + chrono::Duration::days(366);
while cand < max_cand {
let dow = cand.date_naive().weekday().num_days_from_sunday();
let m = field_matches(mon_f, cand.month());
let d = field_matches(dom_f, cand.day());
let w = dow_matches(dow_f, dow);
if !(m && d && w) {
cand = (cand.date_naive() + chrono::Days::new(1))
.and_hms_opt(0, 0, 0)
.and_then(|naive| cand.timezone().from_local_datetime(&naive).single())
.unwrap_or_else(|| cand + chrono::Duration::days(1));
continue;
}
let h = field_matches(hr_f, cand.hour());
let mi = field_matches(min_f, cand.minute());
trace!(expr=%cron_expr, dow, "searching: {} m={} d={} w={} h={} mi={}", cand.date_naive(), m, d, w, h, mi);
if h && mi {
let utc_cand = cand.with_timezone(&chrono::Utc);
debug!(expr=%cron_expr, "MATCH: {} (UTC: {})", cand, utc_cand);
return Some(utc_cand.timestamp_millis() as u64);
}
cand += chrono::Duration::minutes(1);
}
warn!(expr = %cron_expr, "cron: no next run found within 1 year");
None
}
fn current_timestamp_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_millis() as u64
}
async fn run_cron_job(job: &CronJob, agents: &AgentRegistry) -> Result<String> {
let session_key = job
.session_key
.clone()
.unwrap_or_else(|| format!("cron:{}", job.id));
let handle = agents
.get(&job.agent_id)
.with_context(|| format!("agent not found: {}", job.agent_id))?;
let timeout_secs = job
.payload
.as_ref()
.and_then(|p| match p {
CronPayload::Structured { timeout_seconds, .. } => *timeout_seconds,
CronPayload::Text(_) => None,
})
.unwrap_or(300);
let abort_flag = {
let mut flags = handle.abort_flags.write()
.expect("abort_flags lock poisoned");
flags
.entry(session_key.clone())
.or_insert_with(|| Arc::new(std::sync::atomic::AtomicBool::new(false)))
.clone()
};
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let msg = AgentMessage {
session_key: session_key.clone(),
text: job.effective_message().to_owned(),
channel: "cron".to_string(),
peer_id: format!("cron:{}", job.id),
chat_id: String::new(),
reply_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
};
handle.tx.send(msg).await.context("agent inbox closed")?;
let reply = tokio::time::timeout(Duration::from_secs(timeout_secs), reply_rx)
.await
.map_err(|_| {
abort_flag.store(true, std::sync::atomic::Ordering::SeqCst);
warn!(job_id = %job.id, session = %session_key, "cron: timeout fired, aborting agent");
let agent_status = handle
.live_status
.try_read()
.map(|s| {
let task = if s.current_task.is_empty() {
"none".to_string()
} else {
s.current_task.chars().take(100).collect::<String>()
};
let tools = if s.tool_history.is_empty() {
"none".to_string()
} else {
s.tool_history.join(", ")
};
format!(
" (state: {}, task: \"{}\", tools called: [{}])",
s.state, task, tools
)
})
.unwrap_or_default();
anyhow!("cron job timed out after {}s{}", timeout_secs, agent_status)
})?
.context("agent dropped reply channel")?;
abort_flag.store(false, std::sync::atomic::Ordering::SeqCst);
if reply.is_empty {
debug!(job_id = %job.id, "cron job returned no output");
Ok(String::new())
} else {
info!(job_id = %job.id, len = reply.text.len(), "cron job completed");
Ok(reply.text)
}
}
async fn send_delivery(
channels: &ChannelManager,
job: &CronJob,
default_delivery: &Option<CronDelivery>,
output_text: &str,
) -> Result<()> {
let delivery = match &job.delivery {
Some(d) if d.channel.is_some() && d.to.is_some() => {
debug!(job_id = %job.id, "cron: using job-level delivery");
d
}
Some(_) | None => match default_delivery {
Some(d) => {
debug!(job_id = %job.id, mode = ?d.mode, channel = ?d.channel, to = ?d.to, "cron: using default_delivery");
d
}
None => {
info!(job_id = %job.id, name = ?job.name, "cron: no delivery configured, result discarded. Set delivery on the job or configure default_delivery in cron config.");
return Ok(());
}
},
};
let mode = delivery.mode.as_deref().unwrap_or("none");
if mode == "none" {
debug!(job_id = %job.id, "cron: delivery mode is 'none', skipping");
return Ok(());
}
let channel_name = match &delivery.channel {
Some(c) => c,
None => {
warn!(job_id = %job.id, "cron: delivery channel not specified");
return Ok(());
}
};
let to = match &delivery.to {
Some(t) => t,
None => {
warn!(job_id = %job.id, "cron: delivery target 'to' not specified");
return Ok(());
}
};
let text = output_text.trim();
if text.is_empty() && default_delivery.is_none() && job.delivery.is_none() {
debug!(job_id = %job.id, "cron: output text is empty and no delivery configured");
return Ok(());
}
let channel = match channels.get(channel_name) {
Some(ch) => ch,
None => {
warn!(job_id = %job.id, channel = %channel_name, "cron: channel not found in ChannelManager");
return Ok(());
}
};
info!(job_id = %job.id, channel = %channel_name, to = %to, text_len = text.len(), "cron: sending delivery");
let msg = OutboundMessage {
target_id: to.clone(),
is_group: false,
text: text.to_owned(),
reply_to: delivery.thread_id.clone(),
images: vec![],
files: vec![],
channel: Some(channel_name.to_owned()),
};
match channel.send(msg).await {
Ok(()) => {
info!(job_id = %job.id, channel = %channel_name, to = %to, "cron delivery sent successfully");
Ok(())
}
Err(e) => {
if delivery.best_effort.unwrap_or(false) {
warn!(job_id = %job.id, error = %e, "cron delivery failed (best_effort)");
Ok(())
} else {
Err(e)
}
}
}
}
fn build_run_log_entry(
job: &CronJob,
success: bool,
error: Option<anyhow::Error>,
) -> RunLogEntry {
RunLogEntry {
id: uuid::Uuid::new_v4().to_string(),
job_id: job.id.clone(),
started_at: Utc::now(),
finished_at: Some(Utc::now()),
success,
reply_preview: None,
error: error.map(|e| e.to_string()),
}
}
async fn write_run_log(log_dir: &std::path::Path, job_id: &str, entry: RunLogEntry) -> Result<()> {
let path = log_dir.join(format!("{job_id}.jsonl"));
let line = serde_json::to_string(&entry)? + "\n";
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await?;
file.write_all(line.as_bytes()).await?;
Ok(())
}
pub async fn read_jobs_from_file(cron_dir: PathBuf) -> Result<Vec<CronJob>> {
let jobs_path = cron_dir.join("jobs.json");
let data = tokio::fs::read_to_string(&jobs_path)
.await
.unwrap_or_else(|_| "[]".to_owned());
let wrapper: serde_json::Value =
serde_json::from_str(&data).unwrap_or_else(|_| serde_json::Value::Array(vec![]));
let jobs_array = if let Some(arr) = wrapper.get("jobs").and_then(|v| v.as_array()) {
arr.clone()
} else if wrapper.is_array() {
wrapper.as_array().cloned().unwrap_or_default()
} else {
vec![]
};
let mut jobs: Vec<CronJob> = jobs_array
.iter()
.filter_map(|v| serde_json::from_value(v.clone()).ok())
.collect();
jobs.sort_by_key(|j| j.created_at_ms.unwrap_or(0));
Ok(jobs)
}
pub fn resolve_cron_store_path() -> PathBuf {
let base = crate::config::loader::base_dir();
base.join("cron.json5")
}
pub fn load_cron_jobs() -> Vec<CronJob> {
let source = resolve_cron_store_path();
if !source.exists() {
let base = crate::config::loader::base_dir();
let legacy = base.join("cron").join("jobs.json");
if legacy.exists() {
info!(from = %legacy.display(), to = %source.display(), "migrating legacy cron/jobs.json to cron.json5");
if let Err(e) = std::fs::copy(&legacy, &source) {
warn!(err = %e, "failed to migrate legacy cron/jobs.json");
} else {
if let Err(e) = std::fs::remove_file(&legacy) {
tracing::debug!("failed to remove legacy cron file: {e}");
}
if let Err(e) = std::fs::remove_dir(base.join("cron")) {
tracing::debug!("failed to remove legacy cron dir: {e}");
}
}
}
}
if !source.exists() {
return Vec::new();
}
let raw = match std::fs::read_to_string(&source) {
Ok(raw) => raw,
Err(_) => return Vec::new(),
};
let parsed: serde_json::Value = json5::from_str(&raw).or_else(|_| serde_json::from_str(&raw)).unwrap_or_default();
let jobs_array = if let Some(arr) = parsed.get("jobs").and_then(|v| v.as_array()) {
arr.clone()
} else if parsed.is_array() {
parsed.as_array().cloned().unwrap_or_default()
} else {
Vec::new()
};
let total = jobs_array.len();
let jobs: Vec<CronJob> = jobs_array
.iter()
.filter_map(|v| match serde_json::from_value::<CronJob>(v.clone()) {
Ok(job) => Some(job),
Err(e) => {
warn!(err = %e, job_json = %serde_json::to_string_pretty(&v).unwrap_or_default(), "failed to parse cron job");
None
}
})
.collect();
let loaded = jobs.len();
if loaded < total {
warn!(file = %source.display(), total, loaded, "some cron jobs failed to parse");
}
jobs
}
pub fn save_cron_jobs(jobs: &[CronJob]) -> anyhow::Result<()> {
let cron_file = resolve_cron_store_path();
debug!(path = %cron_file.display(), "cron: saving jobs to file");
let store = serde_json::json!({
"version": 1,
"jobs": jobs,
});
let json = serde_json::to_string_pretty(&store)
.context("failed to serialize cron jobs to JSON")?;
if let Some(parent) = cron_file.parent() {
std::fs::create_dir_all(parent).context("failed to create cron directory")?;
}
std::fs::write(&cron_file, json).context("failed to write cron jobs file")?;
Ok(())
}
pub static CRON_FILE_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
#[cfg(test)]
mod cron_validate_tests {
use super::validate_cron_expr;
#[test]
fn accepts_common_patterns() {
for ok in ["*/5 * * * *", "0 17 * * *", "30 8 * * 1-5", "0 9 1 * *"] {
assert!(validate_cron_expr(ok).is_ok(), "should accept '{}'", ok);
}
}
#[test]
fn rejects_empty() {
assert!(validate_cron_expr("").is_err());
assert!(validate_cron_expr(" ").is_err());
}
#[test]
fn rejects_four_fields_with_hint() {
let err = validate_cron_expr("017 * * *").unwrap_err();
assert!(err.contains("5 fields"), "err = {err}");
assert!(err.contains("0 17"), "should hint at '0 17': {err}");
}
#[test]
fn rejects_garbage() {
assert!(validate_cron_expr("not a cron").is_err());
}
}
pub fn validate_cron_expr(expr: &str) -> Result<(), String> {
let trimmed = expr.trim();
if trimmed.is_empty() {
return Err("cron expression is empty".to_owned());
}
let fields: Vec<&str> = trimmed.split_whitespace().collect();
if fields.len() != 5 {
let hint = if fields.len() == 4 && fields[0].len() >= 2 && fields[0].chars().all(|c| c.is_ascii_digit()) {
let n = fields[0];
format!(
" — looks like a missing space: '{}' could be '{} {}' which makes 5 fields (e.g. '0 17 * * *' for 5pm daily)",
n,
&n[..1],
&n[1..]
)
} else {
String::new()
};
return Err(format!(
"cron expression must have exactly 5 fields separated by spaces \
(minute hour day month weekday), got {} field(s): '{}'{}",
fields.len(),
trimmed,
hint
));
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if compute_next_run_from_expr(trimmed, now, None).is_none() {
return Err(format!(
"cron expression '{}' could not be parsed. Valid examples: \
'*/5 * * * *' (every 5 min), '0 17 * * *' (5pm daily), \
'0 9 * * 1' (9am Mondays)",
trimmed
));
}
Ok(())
}