use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::Arc,
time::Duration,
};
use anyhow::{Context, Result, anyhow};
use rsclaw_agent::{AgentMessage, AgentRegistry};
use rsclaw_channel::{ChannelManager, OutboundMessage};
use rsclaw_config::schema::{CronConfig, CronDelivery};
pub use rsclaw_cron::{
CRON_FILE_LOCK, CronIter, CronJob, CronJobState, CronPayload, CronSchedule, CronScheduleTagged,
CronStore, RunLogEntry, build_run_log_entry, compute_next_run_from_expr,
cron_jobs_config_equal, cron_store, current_timestamp_ms, error_backoff_ms,
export_cron_jobs_to_file, extract_saved_files_content, init_cron_store, install_reload_sender,
load_cron_jobs, load_cron_jobs_from_file, reconcile_file_to_redb_on_boot,
resolve_cron_store_path, save_cron_jobs, trigger_reload, validate_cron_expr,
};
use tokio::{
io::AsyncWriteExt,
sync::{Semaphore, broadcast},
time::sleep,
};
use tracing::{debug, info, warn};
const MAX_TIMER_DELAY_MS: u64 = 60_000;
const MIN_REFIRE_GAP_MS: u64 = 2_000;
const MAX_CONSECUTIVE_ERRORS: u32 = 5;
const STUCK_RUN_MS: u64 = 2 * 60 * 60 * 1000;
const CANCEL_BY_RELOAD: &str = "cron: cancelled by reload";
pub struct CronRunner {
jobs: Vec<CronJob>,
agents: Arc<AgentRegistry>,
daemon_agent_ids: Vec<String>,
channels: Arc<ChannelManager>,
run_log_dir: PathBuf,
store_path: PathBuf,
semaphore: Arc<Semaphore>,
default_delivery: Option<CronDelivery>,
reload_tx: broadcast::Sender<()>,
ws_conns: Arc<crate::ws::ConnRegistry>,
shutdown: Option<crate::gateway::ShutdownCoordinator>,
parse_failed: bool,
}
impl CronRunner {
pub fn new(
config: &CronConfig,
jobs: Vec<CronJob>,
agents: Arc<AgentRegistry>,
channels: Arc<ChannelManager>,
data_dir: PathBuf,
reload_tx: broadcast::Sender<()>,
ws_conns: Arc<crate::ws::ConnRegistry>,
) -> Self {
Self::new_with_shutdown(
config, jobs, false, agents, channels, data_dir, reload_tx, ws_conns, None,
)
}
pub fn new_with_shutdown(
config: &CronConfig,
jobs: Vec<CronJob>,
parse_failed: bool,
agents: Arc<AgentRegistry>,
channels: Arc<ChannelManager>,
data_dir: PathBuf,
reload_tx: broadcast::Sender<()>,
ws_conns: Arc<crate::ws::ConnRegistry>,
shutdown: Option<crate::gateway::ShutdownCoordinator>,
) -> Self {
let run_log_dir = data_dir.join("cron");
let store_path = resolve_cron_store_path();
if let Err(e) = std::fs::create_dir_all(&run_log_dir) {
tracing::warn!("failed to create cron run log dir: {e}");
}
Self {
jobs,
agents,
channels,
run_log_dir,
store_path,
semaphore: Arc::new(Semaphore::new(4)),
default_delivery: config.default_delivery.clone(),
reload_tx,
ws_conns,
shutdown,
parse_failed,
daemon_agent_ids: Vec::new(),
}
}
#[must_use]
pub fn with_daemon_agent_ids(mut self, ids: Vec<String>) -> Self {
self.daemon_agent_ids = ids;
self
}
pub fn jobs(&self) -> &[CronJob] {
&self.jobs
}
pub fn parse_failed(&self) -> bool {
self.parse_failed
}
pub(crate) async fn save_store(&self, jobs: &[CronJob]) -> Result<()> {
if self.parse_failed {
return Ok(());
}
if let Some(store) = cron_store() {
for mem_job in jobs {
let merged = match store.cron_get(&mem_job.id) {
Ok(Some(json)) => match serde_json::from_str::<CronJob>(&json) {
Ok(mut redb_job) => {
redb_job.state = mem_job.state.clone();
if !mem_job.enabled {
redb_job.enabled = false;
}
let sched_changed = serde_json::to_string(&redb_job.schedule).ok()
!= serde_json::to_string(&mem_job.schedule).ok();
if sched_changed {
redb_job.schedule = mem_job.schedule.clone();
if let Some(st) = redb_job.state.as_mut() {
st.next_run_at_ms = None;
}
}
redb_job
}
Err(e) => {
warn!(err = %e, job_id = %mem_job.id, "cron: redb job parse failed; using memory version");
mem_job.clone()
}
},
Ok(None) => {
mem_job.clone()
}
Err(e) => {
warn!(err = %e, job_id = %mem_job.id, "cron: redb cron_get failed; skipping");
continue;
}
};
let json = match serde_json::to_string(&merged) {
Ok(s) => s,
Err(e) => {
warn!(err = %e, job_id = %mem_job.id, "cron: serialize failed");
continue;
}
};
if let Err(e) = store.cron_put(&merged.id, &json) {
warn!(err = %e, job_id = %merged.id, "cron: redb cron_put failed");
}
}
if let Ok(entries) = store.cron_list() {
let exported: Vec<CronJob> = entries
.into_iter()
.filter_map(|(_, j)| serde_json::from_str(&j).ok())
.collect();
tokio::task::spawn_blocking(move || {
export_cron_jobs_to_file(&exported);
});
}
return Ok(());
}
let store_data = CronStore {
version: 1,
jobs: jobs.to_vec(),
};
let json = serde_json::to_string_pretty(&store_data)?;
let tmp = format!("{}.tmp", self.store_path.display());
tokio::fs::write(&tmp, &json).await?;
tokio::fs::rename(&tmp, &self.store_path).await?;
Ok(())
}
pub async fn run(&self) -> Result<()> {
info!("cron scheduler starting");
let mut jobs = self.jobs.clone();
let now_ms = current_timestamp_ms();
for job in &mut jobs {
if job.state.is_none() {
job.state = Some(CronJobState {
consecutive_errors: 0,
..Default::default()
});
}
let state = job.state.as_mut().unwrap();
if let Some(running_at) = state.running_at_ms {
if now_ms - running_at > STUCK_RUN_MS {
warn!(job_id = %job.id, "cron: clearing stale running marker");
state.running_at_ms = None;
}
}
if state.next_run_at_ms.is_none() || state.next_run_at_ms.is_some_and(|t| t <= now_ms) {
let old_ts = state.next_run_at_ms;
state.next_run_at_ms = job.schedule.compute_next_run(now_ms);
info!(job_id = %job.id, old = ?old_ts, new = ?state.next_run_at_ms, "cron: recomputed next_run_at_ms");
}
}
let zombies_before = jobs.len();
jobs.retain(|j| !(j.schedule.is_once() && !j.enabled));
if jobs.len() < zombies_before {
info!(
removed = zombies_before - jobs.len(),
"cron: cleaned up zombie one-shot jobs at startup"
);
}
if !self.parse_failed {
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to save initial store");
}
} else {
warn!(
"cron: parse failed - all saves disabled until cron.json5 syntax errors are fixed"
);
}
let enabled_count = jobs.iter().filter(|j| j.enabled).count();
info!(
total = jobs.len(),
enabled = enabled_count,
next_wake = jobs
.iter()
.filter_map(|j| j.state.as_ref().and_then(|s| s.next_run_at_ms))
.min()
.unwrap_or(0),
"cron scheduler started"
);
let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
let running_clone = Arc::clone(&running);
let semaphore = Arc::clone(&self.semaphore);
let reload_rx = self.reload_tx.subscribe();
let runner = self.clone();
let timer_handle = tokio::spawn(async move {
runner
.timer_loop(jobs, running_clone, semaphore, reload_rx)
.await;
});
if let Some(sd) = self.shutdown.clone() {
sd.notified().await;
} else {
tokio::signal::ctrl_c().await?;
}
info!("cron scheduler shutting down");
running.store(false, std::sync::atomic::Ordering::SeqCst);
sleep(Duration::from_millis(100)).await;
timer_handle.await.ok();
info!("cron scheduler stopped");
Ok(())
}
async fn timer_loop(
&self,
mut jobs: Vec<CronJob>,
running: Arc<std::sync::atomic::AtomicBool>,
semaphore: Arc<Semaphore>,
mut reload_rx: broadcast::Receiver<()>,
) {
let (result_tx, mut result_rx) =
tokio::sync::mpsc::channel::<(String, bool, u64, u64, Option<String>)>(64);
let mut cancel_flags: HashMap<String, Arc<std::sync::atomic::AtomicBool>> = HashMap::new();
let orphan_count = jobs
.iter_mut()
.filter(|j| j.state.as_ref().and_then(|s| s.running_at_ms).is_some())
.count();
if orphan_count > 0 {
warn!(
count = orphan_count,
"cron: clearing orphaned running_at_ms states from previous run"
);
for job in jobs.iter_mut() {
if let Some(state) = job.state.as_mut() {
if state.running_at_ms.is_some() {
info!(job_id = %job.id, "cron: clearing orphaned running_at_ms");
state.running_at_ms = None;
if state.next_run_at_ms.is_none()
|| state
.next_run_at_ms
.map(|t| t <= current_timestamp_ms())
.unwrap_or(true)
{
state.next_run_at_ms =
job.schedule.compute_next_run(current_timestamp_ms());
}
}
}
}
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to save store after clearing orphaned states");
}
}
loop {
if !running.load(std::sync::atomic::Ordering::SeqCst) {
break;
}
if let Some(s) = &self.shutdown {
if s.is_draining() {
info!("cron scheduler: drain signaled, stopping job dispatch");
break;
}
}
let now_ms = current_timestamp_ms();
let next_wake_job = jobs
.iter()
.filter(|j| j.enabled)
.filter_map(|j| {
j.state
.as_ref()
.and_then(|s| s.next_run_at_ms)
.map(|t| (t, &j.id, &j.name))
})
.min_by_key(|(t, _, _)| *t);
let next_wake = next_wake_job.map(|(t, _, _)| t);
let expired_threshold_ms = 5 * 60 * 1000;
let before_len = jobs.len();
jobs.retain(|j| {
if !j.schedule.is_once() || !j.enabled { return true; }
if let Some(state) = &j.state {
if let Some(next_at) = state.next_run_at_ms {
if now_ms > next_at + expired_threshold_ms {
info!(job_id = %j.id, name = ?j.name, "cron: removing expired once job (past due by {}s)", (now_ms - next_at) / 1000);
return false;
}
}
}
true
});
if jobs.len() < before_len {
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to persist after expired job cleanup");
}
}
debug!(
next_wake = next_wake.unwrap_or(0),
now_ms, "cron: timer tick"
);
let delay_ms = match next_wake {
Some(next_wake) => {
let delay = next_wake.saturating_sub(now_ms);
if delay == 0 {
MIN_REFIRE_GAP_MS
} else {
delay.min(MAX_TIMER_DELAY_MS)
}
}
None => {
debug!("cron: no jobs scheduled, waiting {}ms", MAX_TIMER_DELAY_MS);
MAX_TIMER_DELAY_MS
}
};
let mut reload_triggered = tokio::select! {
_ = sleep(Duration::from_millis(delay_ms)) => {
false
}
result = reload_rx.recv() => {
match result {
Ok(()) => true,
Err(broadcast::error::RecvError::Closed) => {
return;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
true
}
}
}
};
loop {
match reload_rx.try_recv() {
Ok(()) => reload_triggered = true,
Err(broadcast::error::TryRecvError::Lagged(_)) => reload_triggered = true,
Err(_) => break,
}
}
if !running.load(std::sync::atomic::Ordering::SeqCst) {
break;
}
if reload_triggered {
let old_count = jobs.len();
let (new_jobs, parse_ok) = load_cron_jobs();
if !parse_ok {
warn!(
old_count,
"cron: reload skipped - cron.json5 has syntax errors, fix before modifying"
);
continue;
}
let file_count = new_jobs.len();
let disabled_in_file: Vec<_> = new_jobs
.iter()
.filter(|j| !j.enabled)
.map(|j| (&j.id, j.enabled))
.collect();
info!(old_count, new_count = new_jobs.len(), file_count, disabled=?disabled_in_file, "cron: reload triggered, reloading from file");
let (merged_jobs, modified_ids) = self.merge_jobs(&jobs, new_jobs, now_ms);
jobs = merged_jobs;
let disabled_after_merge: Vec<_> = jobs
.iter()
.filter(|j| !j.enabled)
.map(|j| (&j.id, j.enabled))
.collect();
info!(after_merge_count = jobs.len(), disabled=?disabled_after_merge, modified=?modified_ids, "cron: merge complete");
let active_unchanged: HashSet<&str> = jobs
.iter()
.filter(|j| j.enabled && !modified_ids.contains(&j.id))
.map(|j| j.id.as_str())
.collect();
let to_cancel: Vec<String> = cancel_flags
.keys()
.filter(|id| !active_unchanged.contains(id.as_str()))
.cloned()
.collect();
for id in &to_cancel {
if let Some(flag) = cancel_flags.remove(id) {
flag.store(true, std::sync::atomic::Ordering::SeqCst);
let reason = if modified_ids.contains(id) {
"modified"
} else {
"deleted/disabled"
};
info!(job_id = id, reason, "cron: cancelled running job");
}
}
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to save store after reload");
}
info!(
old_count,
new_count = jobs.len(),
file_count,
"cron jobs reloaded"
);
continue;
}
let mut collected_count = 0;
while let Ok((job_id, success, duration_ms, started_at, error_msg)) =
result_rx.try_recv()
{
collected_count += 1;
info!(job_id = %job_id, success, duration_ms, "cron: collected job result via try_recv");
cancel_flags.remove(&job_id);
if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
if let Some(state) = job.state.as_mut() {
state.running_at_ms = None;
state.last_run_at_ms = Some(current_timestamp_ms());
state.last_duration_ms = Some(duration_ms);
let completion_time = started_at + duration_ms;
if success {
state.consecutive_errors = 0;
state.last_run_status = Some("ok".to_string());
state.last_status = Some("ok".to_string());
state.last_error = None;
if job.schedule.is_once() {
info!(job_id = %job.id, "cron: one-shot job completed, marking for removal");
state.next_run_at_ms = None;
job.enabled = false;
} else {
state.next_run_at_ms =
job.schedule.compute_next_run(completion_time);
}
info!(job_id = %job.id, next_run_at_ms = state.next_run_at_ms, "cron: updated next_run_at_ms after success");
} else if error_msg.as_deref() == Some(CANCEL_BY_RELOAD) {
state.last_run_status = Some("cancelled".to_string());
state.last_status = Some("cancelled".to_string());
state.last_error = error_msg;
info!(
job_id = %job.id,
next_run_at_ms = state.next_run_at_ms,
"cron: run cancelled by reload (config changed / disabled / deleted)"
);
} else {
state.consecutive_errors += 1;
state.last_run_status = Some("error".to_string());
state.last_status = Some("error".to_string());
state.last_error = error_msg;
let backoff = error_backoff_ms(state.consecutive_errors);
let backoff_next = completion_time + backoff;
let normal_next = job.schedule.compute_next_run(completion_time);
state.next_run_at_ms = Some(
normal_next
.map(|n| n.max(backoff_next))
.unwrap_or(backoff_next),
);
info!(
job_id = %job.id,
consecutive_errors = state.consecutive_errors,
backoff_ms = backoff,
next_run_at_ms = state.next_run_at_ms,
"cron: applying error backoff"
);
if state.consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
warn!(
job_id = %job.id,
consecutive_errors = state.consecutive_errors,
"cron: disabling job after repeated failures"
);
job.enabled = false;
}
}
}
}
}
if collected_count > 0 {
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to save store after collecting results");
}
}
let due: Vec<_> = jobs
.iter_mut()
.filter(|j| {
j.enabled
&& j.state
.as_ref()
.and_then(|s| s.next_run_at_ms)
.map(|t| t <= now_ms)
.unwrap_or(false)
&& j.state.as_ref().and_then(|s| s.running_at_ms).is_none()
})
.map(|j| j.id.clone())
.collect();
if !due.is_empty() {
let disabled_due: Vec<_> = jobs
.iter()
.filter(|j| {
!j.enabled
&& j.state
.as_ref()
.and_then(|s| s.next_run_at_ms)
.map(|t| t <= now_ms)
.unwrap_or(false)
})
.map(|j| j.id.clone())
.collect();
if !disabled_due.is_empty() {
warn!(job_ids = ?disabled_due, "cron: these jobs are due but disabled!");
}
}
if due.is_empty() {
continue;
}
info!(count = due.len(), "cron: {} jobs due", due.len());
for job_id in due {
let permit = semaphore.clone().acquire_owned().await.ok();
if permit.is_none() {
break;
}
if let Some(s) = &self.shutdown {
if s.is_draining() {
info!(
"cron scheduler: drain signaled during permit await, dropping job {}",
job_id
);
drop(permit);
break;
}
}
let started_at = current_timestamp_ms();
let (rendered_text, mut job) = {
let Some(job_ref) = jobs.iter_mut().find(|j| j.id == job_id) else {
continue;
};
if let Some(state) = job_ref.state.as_mut() {
state.running_at_ms = Some(started_at);
}
let rendered = if job_ref.iter.is_some() {
let r = job_ref.render_message();
if job_ref.advance_iter().is_none() {
tracing::warn!(
job_id = %job_ref.id,
"cron: iter set but items list is empty; cursor not advanced"
);
}
Some(r)
} else {
None
};
(rendered, job_ref.clone())
};
if let Some(text) = rendered_text {
if let Err(e) = self.save_store(&jobs).await {
warn!(error = %e, job_id, "cron: failed to persist iter cursor; the next run may repeat the same item");
}
job.bake_message(text);
}
let permit = permit.expect("permit checked above");
let cancelled = Arc::new(std::sync::atomic::AtomicBool::new(false));
cancel_flags.insert(job.id.clone(), Arc::clone(&cancelled));
let job_id_for_log = job.id.clone(); let agents = Arc::clone(&self.agents);
let daemon_agent_ids = self.daemon_agent_ids.clone();
let channels = Arc::clone(&self.channels);
let run_log_dir = self.run_log_dir.clone();
let default_delivery = self.default_delivery.clone();
let ws_conns = Arc::clone(&self.ws_conns);
let inflight_guard = self.shutdown.as_ref().map(|s| s.begin_work());
let handle = tokio::spawn(async move {
let _inflight_guard = inflight_guard;
let start_time = current_timestamp_ms();
let job_started_at = started_at;
let prev_consecutive_errors = job
.state
.as_ref()
.map(|s| s.consecutive_errors)
.unwrap_or(0);
info!(job_id = %job.id, "cron job triggered");
let result: Result<String> = if job.payload.as_ref().and_then(|p| match p {
CronPayload::Structured { kind, .. } => kind.as_deref(),
_ => None,
}) == Some("systemEvent")
{
Ok(job.effective_message().to_owned())
} else if job.payload.as_ref().and_then(|p| match p {
CronPayload::Structured { kind, .. } => kind.as_deref(),
_ => None,
}) == Some("execCommand")
{
run_exec_command(
job.effective_message(),
job.payload.as_ref().and_then(|p| match p {
CronPayload::Structured {
timeout_seconds, ..
} => *timeout_seconds,
_ => None,
}),
job.payload.as_ref().map(|p| p.summarize()).unwrap_or(false),
&job,
&agents,
)
.await
} else {
tokio::select! {
r = run_cron_job(&job, &agents, &daemon_agent_ids) => r,
_ = async {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
if cancelled.load(std::sync::atomic::Ordering::SeqCst) {
info!(job_id = %job.id, "cron job cancelled");
break;
}
}
} => {
Err(anyhow::anyhow!(CANCEL_BY_RELOAD))
}
}
};
let duration_ms = current_timestamp_ms() - start_time;
drop(permit);
let delivery_text: Option<String> = match &result {
Ok(output) if !output.trim().is_empty() => {
Some(output.clone())
}
Ok(_) => {
let job_name = job.name.as_deref().unwrap_or(&job.id);
let seconds = (duration_ms / 1000).to_string();
Some(rsclaw_i18n::t_fmt(
"cron_run_success",
rsclaw_i18n::default_lang(),
&[("name", job_name), ("seconds", &seconds)],
))
}
Err(e) if e.to_string() == CANCEL_BY_RELOAD => {
None
}
Err(e) => {
let job_name = job.name.as_deref().unwrap_or(&job.id);
let consecutive = prev_consecutive_errors + 1;
let backoff = error_backoff_ms(consecutive);
let will_disable = consecutive >= MAX_CONSECUTIVE_ERRORS;
let backoff_text = if backoff < 60_000 {
format!("{}秒", backoff / 1000)
} else if backoff < 3_600_000 {
format!("{}分钟", backoff / 60_000)
} else {
format!("{}小时", backoff / 3_600_000)
};
let consecutive_str = consecutive.to_string();
let error_str = e.to_string();
Some(if will_disable {
rsclaw_i18n::t_fmt(
"cron_run_failed_disabled",
rsclaw_i18n::default_lang(),
&[
("name", job_name),
("consecutive", &consecutive_str),
("error", &error_str),
],
)
} else {
rsclaw_i18n::t_fmt(
"cron_run_failed_retry",
rsclaw_i18n::default_lang(),
&[
("name", job_name),
("consecutive", &consecutive_str),
("backoff", &backoff_text),
("error", &error_str),
],
)
})
}
};
let _ = &ws_conns;
if let Some(delivery_text) = delivery_text {
let delivery_channels = Arc::clone(&channels);
let delivery_agents = Arc::clone(&agents);
let delivery_job = job.clone();
let delivery_default = default_delivery.clone();
tokio::spawn(async move {
if let Err(e) = send_delivery(
&delivery_channels,
&delivery_agents,
&delivery_job,
&delivery_default,
&delivery_text,
)
.await
{
warn!(job_id = %delivery_job.id, %e, "delivery failed");
}
});
}
let entry = build_run_log_entry(
&job,
result.is_ok(),
result.as_ref().err().map(|e| anyhow::anyhow!("{e}")),
);
if let Err(e) = write_run_log(&run_log_dir, &job.id, entry).await {
tracing::warn!(job_id = %job.id, "failed to write cron run log: {e}");
}
let error_msg = result.as_ref().err().map(|e| e.to_string());
(
job.id,
result.is_ok(),
duration_ms,
job_started_at,
error_msg,
)
});
let result_tx = result_tx.clone();
tokio::spawn(async move {
let result = handle.await;
match result {
Ok(r) => {
tracing::info!(job_id = %job_id_for_log, success = r.1, duration_ms = r.2, "cron: result sender got result, sending to channel");
if let Err(e) = result_tx.send(r).await {
tracing::warn!(job_id = %job_id_for_log, "cron: failed to send result to channel: {}", e);
}
}
Err(e) => {
tracing::warn!(job_id = %job_id_for_log, "cron: handle.await failed (spawn error): {}", e);
}
}
});
}
let before = jobs.len();
jobs.retain(|j| !(j.schedule.is_once() && !j.enabled));
if jobs.len() < before {
info!(
removed = before - jobs.len(),
"cron: cleaned up completed one-shot jobs"
);
if let Err(e) = self.save_store(&jobs).await {
warn!(err = %e, "cron: failed to save store after removing one-shot jobs");
}
}
}
}
pub async fn trigger(&self, job_id: &str) -> Result<()> {
let job = self
.jobs
.iter()
.find(|j| j.id == job_id)
.with_context(|| format!("cron job not found: {job_id}"))?;
info!(job_id = %job.id, "manually triggering cron job");
let _permit = self.semaphore.acquire().await?;
let _inflight_guard = self.shutdown.as_ref().map(|s| s.begin_work());
let prev_consecutive_errors = job
.state
.as_ref()
.map(|s| s.consecutive_errors)
.unwrap_or(0);
let result: Result<String> = if job.payload.as_ref().and_then(|p| match p {
CronPayload::Structured { kind, .. } => kind.as_deref(),
_ => None,
}) == Some("systemEvent")
{
Ok(job.effective_message().to_owned())
} else if job.payload.as_ref().and_then(|p| match p {
CronPayload::Structured { kind, .. } => kind.as_deref(),
_ => None,
}) == Some("execCommand")
{
run_exec_command(
job.effective_message(),
job.payload.as_ref().and_then(|p| match p {
CronPayload::Structured {
timeout_seconds, ..
} => *timeout_seconds,
_ => None,
}),
job.payload.as_ref().map(|p| p.summarize()).unwrap_or(false),
job,
&self.agents,
)
.await
} else {
run_cron_job(job, &self.agents, &self.daemon_agent_ids).await
};
let success = result.is_ok();
let delivery_text = match &result {
Ok(output) if !output.trim().is_empty() => output.clone(),
Ok(_) => {
let job_name = job.name.as_deref().unwrap_or(&job.id);
rsclaw_i18n::t_fmt(
"cron_run_success_no_duration",
rsclaw_i18n::default_lang(),
&[("name", job_name)],
)
}
Err(e) => {
let job_name = job.name.as_deref().unwrap_or(&job.id);
let consecutive = prev_consecutive_errors + 1;
let consecutive_str = consecutive.to_string();
let error_str = e.to_string();
rsclaw_i18n::t_fmt(
"cron_run_failed_manual",
rsclaw_i18n::default_lang(),
&[
("name", job_name),
("consecutive", &consecutive_str),
("error", &error_str),
],
)
}
};
if let Err(e) = send_delivery(
&self.channels,
&self.agents,
job,
&self.default_delivery,
&delivery_text,
)
.await
{
warn!(job_id = %job.id, %e, "delivery failed");
}
let log_err = if success {
None
} else {
result.as_ref().err().map(|e| anyhow::anyhow!("{e:#}"))
};
let entry = build_run_log_entry(job, success, log_err);
write_run_log(&self.run_log_dir, &job.id, entry).await?;
result.map(|_| ())
}
fn merge_jobs(
&self,
old_jobs: &[CronJob],
new_jobs: Vec<CronJob>,
now_ms: u64,
) -> (Vec<CronJob>, HashSet<String>) {
let mut result = Vec::with_capacity(new_jobs.len());
let mut modified: HashSet<String> = HashSet::new();
for mut new_job in new_jobs {
let mut schedule_changed = false;
if let Some(old_job) = old_jobs.iter().find(|j| j.id == new_job.id) {
schedule_changed = serde_json::to_value(&old_job.schedule).ok()
!= serde_json::to_value(&new_job.schedule).ok();
if !cron_jobs_config_equal(old_job, &new_job) {
modified.insert(new_job.id.clone());
}
new_job.state = old_job.state.clone();
} else {
if new_job.state.is_none() {
new_job.state = Some(CronJobState {
consecutive_errors: 0,
..Default::default()
});
}
}
if let Some(ref mut state) = new_job.state {
if schedule_changed {
let next = new_job.schedule.compute_next_run(now_ms);
debug!(
job_id = %new_job.id,
old_next = ?state.next_run_at_ms,
new_next = ?next,
"cron: schedule changed, recomputing next_run_at_ms"
);
state.next_run_at_ms = next;
} else if state.next_run_at_ms.is_none() {
state.next_run_at_ms = new_job.schedule.compute_next_run(now_ms);
}
}
result.push(new_job);
}
(result, modified)
}
}
impl Clone for CronRunner {
fn clone(&self) -> Self {
Self {
jobs: self.jobs.clone(),
agents: Arc::clone(&self.agents),
daemon_agent_ids: self.daemon_agent_ids.clone(),
channels: Arc::clone(&self.channels),
run_log_dir: self.run_log_dir.clone(),
store_path: self.store_path.clone(),
semaphore: Arc::clone(&self.semaphore),
default_delivery: self.default_delivery.clone(),
reload_tx: self.reload_tx.clone(),
ws_conns: Arc::clone(&self.ws_conns),
shutdown: self.shutdown.clone(),
parse_failed: self.parse_failed,
}
}
}
async fn run_cron_job(
job: &CronJob,
agents: &AgentRegistry,
daemon_agent_ids: &[String],
) -> Result<String> {
let session_key = job
.session_key
.clone()
.unwrap_or_else(|| format!("cron:{}", job.id));
let handle = agents
.get(&job.agent_id)
.with_context(|| format!("agent not found: {}", job.agent_id))?;
let timeout_secs = if daemon_agent_ids.iter().any(|id| id == &job.agent_id) {
0
} else {
job.payload
.as_ref()
.and_then(|p| match p {
CronPayload::Structured {
timeout_seconds, ..
} => *timeout_seconds,
CronPayload::Text(_) => None,
})
.unwrap_or(300)
};
let abort_flag = {
let mut flags = handle
.abort_flags
.write()
.expect("abort_flags lock poisoned");
flags
.entry(session_key.clone())
.or_insert_with(|| Arc::new(std::sync::atomic::AtomicBool::new(false)))
.clone()
};
let job_text = job.effective_message();
if job_text.starts_with('/') {
let (preparse_channel, preparse_peer) = match job.delivery.as_ref() {
Some(d) => (
d.channel.as_deref().unwrap_or(""),
d.to.as_ref().and_then(|t| t.head()).unwrap_or(""),
),
None => ("", ""),
};
if let Some(reply) = crate::gateway::preparse::try_preparse_locally(
job_text,
handle.as_ref(),
preparse_channel,
preparse_peer,
crate::gateway::preparse::PreparseOrigin::Cron,
)
.await
{
abort_flag.store(false, std::sync::atomic::Ordering::SeqCst);
if reply.text.is_empty() && reply.images.is_empty() {
info!(job_id = %job.id, "cron job handled silently by preparse");
return Ok(String::new());
}
info!(
job_id = %job.id,
len = reply.text.len(),
"cron job handled by preparse short-circuit"
);
return Ok(reply.text);
}
}
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let msg = AgentMessage {
session_key: session_key.clone(),
text: job_text.to_owned(),
channel: "cron".to_string(),
peer_id: format!("cron:{}", job.id),
chat_id: String::new(),
reply_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
};
handle.tx.send(msg).await.context("agent inbox closed")?;
let reply = if timeout_secs == 0 {
reply_rx.await.context("agent dropped reply channel")?
} else {
tokio::time::timeout(Duration::from_secs(timeout_secs), reply_rx)
.await
.map_err(|_| {
abort_flag.store(true, std::sync::atomic::Ordering::SeqCst);
warn!(job_id = %job.id, session = %session_key, "cron: timeout fired, aborting agent");
let agent_status = handle
.live_status
.try_read()
.map(|s| {
let task = if s.current_task.is_empty() {
"none".to_string()
} else {
s.current_task.chars().take(100).collect::<String>()
};
let tools = if s.tool_history.is_empty() {
"none".to_string()
} else {
s.tool_history.join(", ")
};
format!(
" (state: {}, task: \"{}\", tools called: [{}])",
s.state, task, tools
)
})
.unwrap_or_default();
anyhow!("cron job timed out after {}s{}", timeout_secs, agent_status)
})?
.context("agent dropped reply channel")?
};
abort_flag.store(false, std::sync::atomic::Ordering::SeqCst);
if reply.is_empty {
debug!(job_id = %job.id, "cron job returned no output");
Ok(String::new())
} else {
let text = reply.text.clone();
if let Some(exit_match) = text.lines().rev().find(|line| line.contains("[exit code:")) {
if let Some(code_str) = exit_match.split(':').nth(1) {
if let Ok(code) = code_str.trim().replace(']', "").parse::<i64>() {
if code != 0 {
let error_detail = text
.lines()
.filter(|l| !l.contains("[exit code:"))
.collect::<Vec<_>>()
.join("\n");
let error_msg = if error_detail.is_empty() {
"command failed with no output".to_string()
} else {
error_detail
};
info!(job_id = %job.id, exit_code = code, "cron job exec failed");
return Err(anyhow!("command exit_code={}, error: {}", code, error_msg));
}
}
}
}
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(exit_code) = json.get("exit_code").and_then(|v| v.as_i64()) {
if exit_code != 0 {
let stderr = json.get("stderr").and_then(|v| v.as_str()).unwrap_or("");
let stdout = json.get("stdout").and_then(|v| v.as_str()).unwrap_or("");
let error_detail = if !stderr.is_empty() {
stderr
} else if !stdout.is_empty() {
stdout
} else {
"command failed with no output"
};
info!(job_id = %job.id, exit_code, "cron job exec failed");
return Err(anyhow!(
"command exit_code={}, error: {}",
exit_code,
error_detail
));
}
}
}
info!(job_id = %job.id, len = reply.text.len(), "cron job completed");
Ok(reply.text)
}
}
const TO_AGENT_CHANNELS: &str = "agent.channels";
const DELIVERY_BATCH: usize = 10;
fn resolve_recent_session_target(
agent_id: &str,
channel: Option<&str>,
account: Option<&str>,
) -> Option<(String, Option<String>, String, bool)> {
let store = cron_store()?;
let keys = store.list_sessions().ok()?;
let own_prefix = format!("agent:{agent_id}:");
let a2a_suffix = format!(":a2a:{agent_id}");
let mut best: Option<(i64, String)> = None;
for k in &keys {
if !(k.starts_with(&own_prefix) || k.contains(&a2a_suffix)) {
continue;
}
let toks: Vec<&str> = k.split(':').collect();
let Some(marker) = toks.iter().position(|t| *t == "direct" || *t == "group") else {
continue;
};
if let Some(want) = channel {
if toks.get(2) != Some(&want) {
continue;
}
}
let key_account = if marker == 4 {
toks.get(3).copied()
} else {
None
};
if let (Some(want), Some(have)) = (account, key_account) {
if want != have {
continue;
}
}
let la = store
.get_session_meta(k)
.ok()
.flatten()
.map(|m| m.last_active)
.unwrap_or(0);
if best.as_ref().map_or(true, |(b, _)| la > *b) {
best = Some((la, k.clone()));
}
}
let (_, key) = best?;
let toks: Vec<&str> = key.split(':').collect();
let marker = toks.iter().position(|t| *t == "direct" || *t == "group")?;
let ch = toks.get(2)?.to_string();
let key_account = if marker == 4 {
toks.get(3).map(|s| s.to_string())
} else {
None
};
let is_group = toks[marker] == "group";
let peer = toks.get(marker + 1)?.to_string();
Some((ch, key_account, peer, is_group))
}
fn target_is_group(id: &str) -> bool {
id.starts_with("oc_") || id.ends_with("@chatroom")
}
pub(crate) fn resolve_delivery_targets(
agents: &AgentRegistry,
agent_id: &str,
delivery: &CronDelivery,
) -> Vec<(String, Option<String>, String, bool)> {
let default_account = delivery.account_id.clone();
let to_list: Vec<String> = delivery
.to
.as_ref()
.map(|t| t.to_chain())
.unwrap_or_default();
let mut targets: Vec<(String, Option<String>, String, bool)> = Vec::new();
if to_list.iter().any(|s| s == TO_AGENT_CHANNELS) {
let bound: Vec<String> = agents
.get(agent_id)
.ok()
.and_then(|h| h.config.channels.clone())
.filter(|v| !v.is_empty())
.or_else(|| delivery.channel.clone().map(|c| vec![c]))
.unwrap_or_default();
for entry in &bound {
let (ch, acct) = match entry.split_once(':') {
Some((c, a)) => (c, Some(a)),
None => (entry.as_str(), None),
};
match resolve_recent_session_target(agent_id, Some(ch), acct) {
Some((rch, key_acct, peer, is_group)) => {
let send_acct = acct
.map(|s| s.to_string())
.or(key_acct)
.or_else(|| default_account.clone());
targets.push((rch, send_acct, peer, is_group));
}
None => {
warn!(agent = %agent_id, binding = %entry, "cron: agent.channels — no recent conversation for binding, skipping")
}
}
}
if let Some(ch) = &delivery.channel {
for to in to_list.iter().filter(|s| *s != TO_AGENT_CHANNELS) {
targets.push((
ch.clone(),
default_account.clone(),
to.clone(),
target_is_group(to),
));
}
}
} else if !to_list.is_empty() {
let Some(ch) = delivery.channel.clone() else {
warn!(agent = %agent_id, "cron: delivery channel not specified");
return targets;
};
for to in to_list {
let is_group = target_is_group(&to);
targets.push((ch.clone(), default_account.clone(), to, is_group));
}
} else {
match resolve_recent_session_target(agent_id, None, None) {
Some((rch, key_acct, peer, is_group)) => {
let send_acct = key_acct.or_else(|| default_account.clone());
targets.push((rch, send_acct, peer, is_group));
}
None => {
info!(agent = %agent_id, "cron: no `to` set and no recent conversation found; discarding result")
}
}
}
targets
}
async fn send_delivery(
channels: &ChannelManager,
agents: &AgentRegistry,
job: &CronJob,
default_delivery: &Option<CronDelivery>,
output_text: &str,
) -> Result<()> {
let delivery = match job.delivery.as_ref().or(default_delivery.as_ref()) {
Some(d) => d,
None => {
info!(job_id = %job.id, name = ?job.name, "cron: no delivery configured, result discarded. Set delivery on the job or configure default_delivery in cron config.");
return Ok(());
}
};
let mode = delivery.mode.as_deref().unwrap_or("none");
if mode == "none" {
debug!(job_id = %job.id, "cron: delivery mode is 'none', skipping");
return Ok(());
}
let text = output_text.trim();
if text.is_empty() && default_delivery.is_none() && job.delivery.is_none() {
debug!(job_id = %job.id, "cron: output text is empty and no delivery configured");
return Ok(());
}
let thread = delivery.thread_id.clone();
let best_effort = delivery.best_effort.unwrap_or(false);
let targets = resolve_delivery_targets(agents, &job.agent_id, delivery);
if targets.is_empty() {
return Ok(());
}
info!(job_id = %job.id, recipients = targets.len(), text_len = text.len(), "cron: sending delivery");
for chunk in targets.chunks(DELIVERY_BATCH) {
let futs = chunk.iter().map(|(channel_name, account, to, is_group)| {
let resolved_channel = if channel_name == "ws" {
"desktop".to_string()
} else {
channel_name.clone()
};
let ch = channels.get(&resolved_channel);
let msg = OutboundMessage {
target_id: to.clone(),
is_group: *is_group,
text: text.to_owned(),
reply_to: thread.clone(),
images: vec![],
files: vec![],
channel: Some(resolved_channel.clone()),
account: account.clone(),
};
let job_id = job.id.clone();
let to_log = to.clone();
async move {
match ch {
Some(c) => match c.send(msg).await {
Ok(()) => {
info!(job_id = %job_id, channel = %resolved_channel, to = %to_log, "cron delivery sent successfully");
Ok(())
}
Err(e) => Err(e),
},
None => {
warn!(job_id = %job_id, channel = %resolved_channel, "cron: channel not found in ChannelManager");
Ok(())
}
}
}
});
let results = futures::future::join_all(futs).await;
for r in results {
if let Err(e) = r {
if best_effort {
warn!(job_id = %job.id, error = %e, "cron delivery failed (best_effort)");
} else {
return Err(e);
}
}
}
}
Ok(())
}
async fn run_exec_command(
command: &str,
timeout_secs: Option<u64>,
summarize: bool,
job: &CronJob,
agents: &AgentRegistry,
) -> Result<String> {
let exec_timeout = Duration::from_secs(timeout_secs.unwrap_or(120));
let task_id = format!("cron:{}:{}", job.id, chrono::Utc::now().timestamp_millis());
let (shell, shell_args) = if cfg!(target_os = "windows") {
("powershell", vec!["-NoProfile", "-Command"])
} else {
("sh", vec!["-c"])
};
let mut cmd = tokio::process::Command::new(shell);
cmd.args(&shell_args)
.arg(command)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
cmd.creation_flags(0x08000000);
}
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
tracing::info!(task_id = %task_id, command = %command, "cron exec: spawning background task");
let tid = task_id.clone();
let cmd_timeout = exec_timeout;
tokio::spawn(async move {
let started_at = std::time::Instant::now();
let result = tokio::time::timeout(cmd_timeout, cmd.output()).await;
let (exit_code, stdout, stderr) = match result {
Ok(Ok(output)) => {
let exit_code = output.status.code();
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
(exit_code, stdout, stderr)
}
Ok(Err(e)) => {
tracing::error!(task_id = %tid, "cron exec background spawn failed: {}", e);
(None, String::new(), format!("spawn error: {}", e))
}
Err(_) => {
tracing::warn!(task_id = %tid, timeout_secs = cmd_timeout.as_secs(), "cron exec background timed out");
(
None,
String::new(),
format!("timed out after {} seconds", cmd_timeout.as_secs()),
)
}
};
let completed_at = std::time::Instant::now();
tracing::info!(
task_id = %tid,
exit_code = ?exit_code,
stdout_len = stdout.len(),
stderr_len = stderr.len(),
elapsed_ms = (completed_at - started_at).as_millis(),
"cron exec background completed"
);
let _ = result_tx.send((exit_code, stdout, stderr));
});
let (exit_code, stdout, stderr) = result_rx
.await
.map_err(|_| anyhow!("background exec channel closed"))?;
let exit_code = exit_code.unwrap_or(-1);
if exit_code != 0 {
let error_msg = if !stderr.is_empty() {
stderr
} else if !stdout.is_empty() {
stdout
} else {
"command failed with no output".to_string()
};
return Err(anyhow!(
"command exit_code={}, error: {}",
exit_code,
error_msg
));
}
let raw_output = if !stdout.is_empty() {
stdout
} else if !stderr.is_empty() {
stderr
} else {
"command succeeded with no output".to_string()
};
let saved_files_content = extract_saved_files_content(&raw_output);
let full_output = if saved_files_content.is_empty() {
raw_output.clone()
} else {
format!(
"{}\n\n---\n\n[FULL CONTENT OF SAVED REPORT FILES]\n{}\n\n[NOTE] The above is the full report the script saved. Base your summary on this content; don't omit key information.",
raw_output, saved_files_content
)
};
if summarize {
let summarize_agent_id = if agents.get("_summarizer").is_ok() {
"_summarizer"
} else {
&job.agent_id
};
let session_key = job
.session_key
.clone()
.unwrap_or_else(|| format!("cron:{}", job.id));
let handle = agents
.get(summarize_agent_id)
.with_context(|| format!("agent not found: {}", summarize_agent_id))?;
let summarize_prompt = format!(
"[CRON TASK EXECUTION RESULT — NO FABRICATION]\n\
Below is the real output of a script execution.\n\
\n\
[HARD RULES — MUST FOLLOW]\n\
1. You may ONLY summarize information that is already in the output below; do not add anything not present.\n\
2. If the output contains a \"FULL CONTENT OF SAVED REPORT FILES\" section, base your summary on that full content and do not omit key information.\n\
3. If the output has no concrete data (e.g. stock counts, prices), do not invent numbers.\n\
4. If the output is empty or only contains errors, honestly report \"script execution failed\" or \"no output\".\n\
5. Do not claim actions like \"done\", \"found\", \"executed\" — you only summarize, you did not execute anything.\n\
6. Return the summary text directly; do not return HEARTBEAT_OK.\n\
\n\
[OUTPUT]\n\
```\n{}\n\
```\n\
\n\
Summarize strictly per the rules above. Violating any rule counts as deception.",
full_output
);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let msg = AgentMessage {
session_key: format!("summarize:{}", session_key),
text: summarize_prompt,
channel: "cron".to_string(),
peer_id: format!("cron:{}", job.id),
chat_id: String::new(),
reply_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![], images: vec![],
files: vec![],
account: None,
};
handle.tx.send(msg).await.context("agent inbox closed")?;
let summary_timeout = Duration::from_secs(300);
match tokio::time::timeout(summary_timeout, reply_rx).await {
Ok(Ok(reply)) => {
if reply.is_empty {
Ok(raw_output)
} else {
Ok(reply.text)
}
}
Ok(Err(_)) => {
tracing::warn!(job_id = %job.id, "summarize: agent dropped reply, using raw output");
Ok(raw_output)
}
Err(_) => {
tracing::warn!(job_id = %job.id, timeout_secs = summary_timeout.as_secs(), "summarize: timed out, using raw output");
Ok(raw_output)
}
}
} else {
if saved_files_content.is_empty() {
Ok(raw_output)
} else {
Ok(saved_files_content)
}
}
}
async fn write_run_log(log_dir: &std::path::Path, job_id: &str, entry: RunLogEntry) -> Result<()> {
let path = log_dir.join(format!("{job_id}.jsonl"));
let line = serde_json::to_string(&entry)? + "\n";
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await?;
file.write_all(line.as_bytes()).await?;
Ok(())
}
pub async fn read_jobs_from_file(cron_dir: PathBuf) -> Result<Vec<CronJob>> {
let jobs_path = cron_dir.join("jobs.json");
let data = tokio::fs::read_to_string(&jobs_path)
.await
.unwrap_or_else(|_| "[]".to_owned());
let wrapper: serde_json::Value =
serde_json::from_str(&data).unwrap_or_else(|_| serde_json::Value::Array(vec![]));
let jobs_array = if let Some(arr) = wrapper.get("jobs").and_then(|v| v.as_array()) {
arr.clone()
} else if wrapper.is_array() {
wrapper.as_array().cloned().unwrap_or_default()
} else {
vec![]
};
let mut jobs: Vec<CronJob> = jobs_array
.iter()
.filter_map(|v| serde_json::from_value(v.clone()).ok())
.collect();
jobs.sort_by_key(|j| j.created_at_ms.unwrap_or(0));
Ok(jobs)
}