1use crate::config::Config;
2use crate::cron::{
3 CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
4 next_run_for_schedule, schedule_cron_expression, validate_delivery_config, validate_schedule,
5};
6use anyhow::{Context, Result};
7use chrono::{DateTime, Utc};
8use rusqlite::types::{FromSqlResult, ValueRef};
9use rusqlite::{Connection, params};
10use uuid::Uuid;
11
12const MAX_CRON_OUTPUT_BYTES: usize = 16 * 1024;
13const TRUNCATED_OUTPUT_MARKER: &str = "\n...[truncated]";
14
15impl rusqlite::types::FromSql for JobType {
16 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
17 let text = value.as_str()?;
18 JobType::try_from(text).map_err(|e| rusqlite::types::FromSqlError::Other(e.into()))
19 }
20}
21
22pub fn add_job(config: &Config, expression: &str, command: &str) -> Result<CronJob> {
23 let schedule = Schedule::Cron {
24 expr: expression.to_string(),
25 tz: None,
26 };
27 add_shell_job(config, None, schedule, command, None)
28}
29
30pub fn add_shell_job(
31 config: &Config,
32 name: Option<String>,
33 schedule: Schedule,
34 command: &str,
35 delivery: Option<DeliveryConfig>,
36) -> Result<CronJob> {
37 let now = Utc::now();
38 validate_schedule(&schedule, now)?;
39 validate_delivery_config(delivery.as_ref())?;
40 let next_run = next_run_for_schedule(&schedule, now)?;
41 let id = Uuid::new_v4().to_string();
42 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
43 let schedule_json = serde_json::to_string(&schedule)?;
44 let delivery = delivery.unwrap_or_default();
45
46 let delete_after_run = matches!(schedule, Schedule::At { .. });
47
48 with_connection(config, |conn| {
49 conn.execute(
50 "INSERT INTO cron_jobs (
51 id, expression, command, schedule, job_type, prompt, name, session_target, model,
52 enabled, delivery, delete_after_run, created_at, next_run
53 ) VALUES (?1, ?2, ?3, ?4, 'shell', NULL, ?5, 'isolated', NULL, 1, ?6, ?7, ?8, ?9)",
54 params![
55 id,
56 expression,
57 command,
58 schedule_json,
59 name,
60 serde_json::to_string(&delivery)?,
61 if delete_after_run { 1 } else { 0 },
62 now.to_rfc3339(),
63 next_run.to_rfc3339(),
64 ],
65 )
66 .context("Failed to insert cron shell job")?;
67 Ok(())
68 })?;
69
70 get_job(config, &id)
71}
72
73#[allow(clippy::too_many_arguments)]
74pub fn add_agent_job(
75 config: &Config,
76 name: Option<String>,
77 schedule: Schedule,
78 prompt: &str,
79 session_target: SessionTarget,
80 model: Option<String>,
81 delivery: Option<DeliveryConfig>,
82 delete_after_run: bool,
83 allowed_tools: Option<Vec<String>>,
84) -> Result<CronJob> {
85 let now = Utc::now();
86 validate_schedule(&schedule, now)?;
87 validate_delivery_config(delivery.as_ref())?;
88 let next_run = next_run_for_schedule(&schedule, now)?;
89 let id = Uuid::new_v4().to_string();
90 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
91 let schedule_json = serde_json::to_string(&schedule)?;
92 let delivery = delivery.unwrap_or_default();
93
94 with_connection(config, |conn| {
95 conn.execute(
96 "INSERT INTO cron_jobs (
97 id, expression, command, schedule, job_type, prompt, name, session_target, model,
98 enabled, delivery, delete_after_run, allowed_tools, created_at, next_run
99 ) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11, ?12)",
100 params![
101 id,
102 expression,
103 schedule_json,
104 prompt,
105 name,
106 session_target.as_str(),
107 model,
108 serde_json::to_string(&delivery)?,
109 if delete_after_run { 1 } else { 0 },
110 encode_allowed_tools(allowed_tools.as_ref())?,
111 now.to_rfc3339(),
112 next_run.to_rfc3339(),
113 ],
114 )
115 .context("Failed to insert cron agent job")?;
116 Ok(())
117 })?;
118
119 get_job(config, &id)
120}
121
122pub fn list_jobs(config: &Config) -> Result<Vec<CronJob>> {
123 with_connection(config, |conn| {
124 let mut stmt = conn.prepare(
125 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
126 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
127 allowed_tools, source
128 FROM cron_jobs ORDER BY next_run ASC",
129 )?;
130
131 let rows = stmt.query_map([], map_cron_job_row)?;
132
133 let mut jobs = Vec::new();
134 for row in rows {
135 jobs.push(row?);
136 }
137 Ok(jobs)
138 })
139}
140
141pub fn get_job(config: &Config, job_id: &str) -> Result<CronJob> {
142 with_connection(config, |conn| {
143 let mut stmt = conn.prepare(
144 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
145 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
146 allowed_tools, source
147 FROM cron_jobs WHERE id = ?1",
148 )?;
149
150 let mut rows = stmt.query(params![job_id])?;
151 if let Some(row) = rows.next()? {
152 map_cron_job_row(row).map_err(Into::into)
153 } else {
154 anyhow::bail!("Cron job '{job_id}' not found")
155 }
156 })
157}
158
159pub fn remove_job(config: &Config, id: &str) -> Result<()> {
160 let changed = with_connection(config, |conn| {
161 conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id])
162 .context("Failed to delete cron job")
163 })?;
164
165 if changed == 0 {
166 anyhow::bail!("Cron job '{id}' not found");
167 }
168
169 println!("✅ Removed cron job {id}");
170 Ok(())
171}
172
173pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
174 let lim = i64::try_from(config.scheduler.max_tasks.max(1))
175 .context("Scheduler max_tasks overflows i64")?;
176 with_connection(config, |conn| {
177 let mut stmt = conn.prepare(
178 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
179 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
180 allowed_tools, source
181 FROM cron_jobs
182 WHERE enabled = 1 AND next_run <= ?1
183 ORDER BY next_run ASC
184 LIMIT ?2",
185 )?;
186
187 let rows = stmt.query_map(params![now.to_rfc3339(), lim], map_cron_job_row)?;
188
189 let mut jobs = Vec::new();
190 for row in rows {
191 match row {
192 Ok(job) => jobs.push(job),
193 Err(e) => tracing::warn!("Skipping cron job with unparseable row data: {e}"),
194 }
195 }
196 Ok(jobs)
197 })
198}
199
200pub fn all_overdue_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
206 with_connection(config, |conn| {
207 let mut stmt = conn.prepare(
208 "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
209 enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
210 allowed_tools, source
211 FROM cron_jobs
212 WHERE enabled = 1 AND next_run <= ?1
213 ORDER BY next_run ASC",
214 )?;
215
216 let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
217
218 let mut jobs = Vec::new();
219 for row in rows {
220 match row {
221 Ok(job) => jobs.push(job),
222 Err(e) => tracing::warn!("Skipping cron job with unparseable row data: {e}"),
223 }
224 }
225 Ok(jobs)
226 })
227}
228
229pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<CronJob> {
230 let mut job = get_job(config, job_id)?;
231 let mut schedule_changed = false;
232
233 if let Some(schedule) = patch.schedule {
234 validate_schedule(&schedule, Utc::now())?;
235 job.schedule = schedule;
236 job.expression = schedule_cron_expression(&job.schedule).unwrap_or_default();
237 schedule_changed = true;
238 }
239 if let Some(command) = patch.command {
240 job.command = command;
241 }
242 if let Some(prompt) = patch.prompt {
243 job.prompt = Some(prompt);
244 }
245 if let Some(name) = patch.name {
246 job.name = Some(name);
247 }
248 if let Some(enabled) = patch.enabled {
249 job.enabled = enabled;
250 }
251 if let Some(delivery) = patch.delivery {
252 job.delivery = delivery;
253 }
254 if let Some(model) = patch.model {
255 job.model = Some(model);
256 }
257 if let Some(target) = patch.session_target {
258 job.session_target = target;
259 }
260 if let Some(delete_after_run) = patch.delete_after_run {
261 job.delete_after_run = delete_after_run;
262 }
263 if let Some(allowed_tools) = patch.allowed_tools {
264 if allowed_tools.is_empty() {
267 job.allowed_tools = None;
268 } else {
269 job.allowed_tools = Some(allowed_tools);
270 }
271 }
272
273 if schedule_changed {
274 job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?;
275 }
276
277 with_connection(config, |conn| {
278 conn.execute(
279 "UPDATE cron_jobs
280 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6,
281 session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11,
282 allowed_tools = ?12, next_run = ?13
283 WHERE id = ?14",
284 params![
285 job.expression,
286 job.command,
287 serde_json::to_string(&job.schedule)?,
288 <JobType as Into<&str>>::into(job.job_type).to_string(),
289 job.prompt,
290 job.name,
291 job.session_target.as_str(),
292 job.model,
293 if job.enabled { 1 } else { 0 },
294 serde_json::to_string(&job.delivery)?,
295 if job.delete_after_run { 1 } else { 0 },
296 encode_allowed_tools(job.allowed_tools.as_ref())?,
297 job.next_run.to_rfc3339(),
298 job.id,
299 ],
300 )
301 .context("Failed to update cron job")?;
302 Ok(())
303 })?;
304
305 get_job(config, job_id)
306}
307
308pub fn record_last_run(
309 config: &Config,
310 job_id: &str,
311 finished_at: DateTime<Utc>,
312 success: bool,
313 output: &str,
314) -> Result<()> {
315 let status = if success { "ok" } else { "error" };
316 let bounded_output = truncate_cron_output(output);
317 with_connection(config, |conn| {
318 conn.execute(
319 "UPDATE cron_jobs
320 SET last_run = ?1, last_status = ?2, last_output = ?3
321 WHERE id = ?4",
322 params![finished_at.to_rfc3339(), status, bounded_output, job_id],
323 )
324 .context("Failed to update cron last run fields")?;
325 Ok(())
326 })
327}
328
329pub fn reschedule_after_run(
330 config: &Config,
331 job: &CronJob,
332 success: bool,
333 output: &str,
334) -> Result<()> {
335 let now = Utc::now();
336 let status = if success { "ok" } else { "error" };
337 let bounded_output = truncate_cron_output(output);
338
339 if matches!(job.schedule, Schedule::At { .. }) {
342 with_connection(config, |conn| {
343 conn.execute(
344 "UPDATE cron_jobs
345 SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
346 WHERE id = ?4",
347 params![now.to_rfc3339(), status, bounded_output, job.id],
348 )
349 .context("Failed to disable completed one-shot cron job")?;
350 Ok(())
351 })
352 } else {
353 let next_run = next_run_for_schedule(&job.schedule, now)?;
354 with_connection(config, |conn| {
355 conn.execute(
356 "UPDATE cron_jobs
357 SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
358 WHERE id = ?5",
359 params![
360 next_run.to_rfc3339(),
361 now.to_rfc3339(),
362 status,
363 bounded_output,
364 job.id
365 ],
366 )
367 .context("Failed to update cron job run state")?;
368 Ok(())
369 })
370 }
371}
372
373pub fn record_run(
374 config: &Config,
375 job_id: &str,
376 started_at: DateTime<Utc>,
377 finished_at: DateTime<Utc>,
378 status: &str,
379 output: Option<&str>,
380 duration_ms: i64,
381) -> Result<()> {
382 let bounded_output = output.map(truncate_cron_output);
383 with_connection(config, |conn| {
384 let tx = conn.unchecked_transaction()?;
388
389 tx.execute(
390 "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
391 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
392 params![
393 job_id,
394 started_at.to_rfc3339(),
395 finished_at.to_rfc3339(),
396 status,
397 bounded_output.as_deref(),
398 duration_ms,
399 ],
400 )
401 .context("Failed to insert cron run")?;
402
403 let keep = i64::from(config.cron.max_run_history.max(1));
404 tx.execute(
405 "DELETE FROM cron_runs
406 WHERE job_id = ?1
407 AND id NOT IN (
408 SELECT id FROM cron_runs
409 WHERE job_id = ?1
410 ORDER BY started_at DESC, id DESC
411 LIMIT ?2
412 )",
413 params![job_id, keep],
414 )
415 .context("Failed to prune cron run history")?;
416
417 tx.commit()
418 .context("Failed to commit cron run transaction")?;
419 Ok(())
420 })
421}
422
423fn truncate_cron_output(output: &str) -> String {
424 if output.len() <= MAX_CRON_OUTPUT_BYTES {
425 return output.to_string();
426 }
427
428 if MAX_CRON_OUTPUT_BYTES <= TRUNCATED_OUTPUT_MARKER.len() {
429 return TRUNCATED_OUTPUT_MARKER.to_string();
430 }
431
432 let mut cutoff = MAX_CRON_OUTPUT_BYTES - TRUNCATED_OUTPUT_MARKER.len();
433 while cutoff > 0 && !output.is_char_boundary(cutoff) {
434 cutoff -= 1;
435 }
436
437 let mut truncated = output[..cutoff].to_string();
438 truncated.push_str(TRUNCATED_OUTPUT_MARKER);
439 truncated
440}
441
442pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result<Vec<CronRun>> {
443 with_connection(config, |conn| {
444 let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
445 let mut stmt = conn.prepare(
446 "SELECT id, job_id, started_at, finished_at, status, output, duration_ms
447 FROM cron_runs
448 WHERE job_id = ?1
449 ORDER BY started_at DESC, id DESC
450 LIMIT ?2",
451 )?;
452
453 let rows = stmt.query_map(params![job_id, lim], |row| {
454 Ok(CronRun {
455 id: row.get(0)?,
456 job_id: row.get(1)?,
457 started_at: parse_rfc3339(&row.get::<_, String>(2)?)
458 .map_err(sql_conversion_error)?,
459 finished_at: parse_rfc3339(&row.get::<_, String>(3)?)
460 .map_err(sql_conversion_error)?,
461 status: row.get(4)?,
462 output: row.get(5)?,
463 duration_ms: row.get(6)?,
464 })
465 })?;
466
467 let mut runs = Vec::new();
468 for row in rows {
469 runs.push(row?);
470 }
471 Ok(runs)
472 })
473}
474
475fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
476 let parsed = DateTime::parse_from_rfc3339(raw)
477 .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?;
478 Ok(parsed.with_timezone(&Utc))
479}
480
481fn sql_conversion_error(err: anyhow::Error) -> rusqlite::Error {
482 rusqlite::Error::ToSqlConversionFailure(err.into())
483}
484
485fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
486 let expression: String = row.get(1)?;
487 let schedule_raw: Option<String> = row.get(3)?;
488 let schedule =
489 decode_schedule(schedule_raw.as_deref(), &expression).map_err(sql_conversion_error)?;
490
491 let delivery_raw: Option<String> = row.get(10)?;
492 let delivery = decode_delivery(delivery_raw.as_deref()).map_err(sql_conversion_error)?;
493
494 let next_run_raw: String = row.get(13)?;
495 let last_run_raw: Option<String> = row.get(14)?;
496 let created_at_raw: String = row.get(12)?;
497 let allowed_tools_raw: Option<String> = row.get(17)?;
498 let source: Option<String> = row.get(18)?;
499
500 Ok(CronJob {
501 id: row.get(0)?,
502 expression,
503 schedule,
504 command: row.get(2)?,
505 job_type: row.get(4)?,
506 prompt: row.get(5)?,
507 name: row.get(6)?,
508 session_target: SessionTarget::parse(&row.get::<_, String>(7)?),
509 model: row.get(8)?,
510 enabled: row.get::<_, i64>(9)? != 0,
511 delivery,
512 delete_after_run: row.get::<_, i64>(11)? != 0,
513 source: source.unwrap_or_else(|| "imperative".to_string()),
514 created_at: parse_rfc3339(&created_at_raw).map_err(sql_conversion_error)?,
515 next_run: parse_rfc3339(&next_run_raw).map_err(sql_conversion_error)?,
516 last_run: match last_run_raw {
517 Some(raw) => Some(parse_rfc3339(&raw).map_err(sql_conversion_error)?),
518 None => None,
519 },
520 last_status: row.get(15)?,
521 last_output: row.get(16)?,
522 allowed_tools: decode_allowed_tools(allowed_tools_raw.as_deref())
523 .map_err(sql_conversion_error)?,
524 })
525}
526
527fn decode_schedule(schedule_raw: Option<&str>, expression: &str) -> Result<Schedule> {
528 if let Some(raw) = schedule_raw {
529 let trimmed = raw.trim();
530 if !trimmed.is_empty() {
531 return serde_json::from_str(trimmed)
532 .with_context(|| format!("Failed to parse cron schedule JSON: {trimmed}"));
533 }
534 }
535
536 if expression.trim().is_empty() {
537 anyhow::bail!("Missing schedule and legacy expression for cron job")
538 }
539
540 Ok(Schedule::Cron {
541 expr: expression.to_string(),
542 tz: None,
543 })
544}
545
546fn decode_delivery(delivery_raw: Option<&str>) -> Result<DeliveryConfig> {
547 if let Some(raw) = delivery_raw {
548 let trimmed = raw.trim();
549 if !trimmed.is_empty() {
550 return serde_json::from_str(trimmed)
551 .with_context(|| format!("Failed to parse cron delivery JSON: {trimmed}"));
552 }
553 }
554 Ok(DeliveryConfig::default())
555}
556
557fn encode_allowed_tools(allowed_tools: Option<&Vec<String>>) -> Result<Option<String>> {
558 allowed_tools
559 .map(serde_json::to_string)
560 .transpose()
561 .context("Failed to serialize cron allowed_tools")
562}
563
564fn decode_allowed_tools(raw: Option<&str>) -> Result<Option<Vec<String>>> {
565 if let Some(raw) = raw {
566 let trimmed = raw.trim();
567 if !trimmed.is_empty() {
568 return serde_json::from_str(trimmed)
569 .map(Some)
570 .with_context(|| format!("Failed to parse cron allowed_tools JSON: {trimmed}"));
571 }
572 }
573 Ok(None)
574}
575
576pub fn sync_declarative_jobs(
585 config: &Config,
586 decls: &[crate::config::schema::CronJobDecl],
587) -> Result<()> {
588 use crate::config::schema::CronScheduleDecl;
589
590 if decls.is_empty() {
591 with_connection(config, |conn| {
594 let deleted = conn
595 .execute("DELETE FROM cron_jobs WHERE source = 'declarative'", [])
596 .context("Failed to remove stale declarative cron jobs")?;
597 if deleted > 0 {
598 tracing::info!(
599 count = deleted,
600 "Removed declarative cron jobs no longer in config"
601 );
602 }
603 Ok(())
604 })?;
605 return Ok(());
606 }
607
608 for decl in decls {
610 validate_decl(decl)?;
611 }
612
613 let now = Utc::now();
614
615 with_connection(config, |conn| {
616 let config_ids: std::collections::HashSet<&str> =
618 decls.iter().map(|d| d.id.as_str()).collect();
619
620 {
622 let mut stmt = conn.prepare("SELECT id FROM cron_jobs WHERE source = 'declarative'")?;
623 let db_ids: Vec<String> = stmt
624 .query_map([], |row| row.get(0))?
625 .filter_map(|r| r.ok())
626 .collect();
627
628 for db_id in &db_ids {
629 if !config_ids.contains(db_id.as_str()) {
630 conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![db_id])
631 .with_context(|| {
632 format!("Failed to remove stale declarative cron job '{db_id}'")
633 })?;
634 tracing::info!(
635 job_id = %db_id,
636 "Removed declarative cron job no longer in config"
637 );
638 }
639 }
640 }
641
642 for decl in decls {
643 let schedule = convert_schedule_decl(&decl.schedule)?;
644 let expression = schedule_cron_expression(&schedule).unwrap_or_default();
645 let schedule_json = serde_json::to_string(&schedule)?;
646 let job_type = &decl.job_type;
647 let session_target = decl.session_target.as_deref().unwrap_or("isolated");
648 let delivery = match &decl.delivery {
649 Some(d) => convert_delivery_decl(d),
650 None => DeliveryConfig::default(),
651 };
652 let delivery_json = serde_json::to_string(&delivery)?;
653 let allowed_tools_json = encode_allowed_tools(decl.allowed_tools.as_ref())?;
654 let command = decl.command.as_deref().unwrap_or("");
655 let delete_after_run = matches!(decl.schedule, CronScheduleDecl::At { .. });
656
657 let exists: bool = conn
659 .prepare("SELECT COUNT(*) FROM cron_jobs WHERE id = ?1")?
660 .query_row(params![decl.id], |row| row.get::<_, i64>(0))
661 .map(|c| c > 0)
662 .unwrap_or(false);
663
664 if exists {
665 let current_schedule_raw: Option<String> = conn
669 .prepare("SELECT schedule FROM cron_jobs WHERE id = ?1")?
670 .query_row(params![decl.id], |row| row.get(0))
671 .ok();
672
673 let schedule_changed = current_schedule_raw.as_deref() != Some(&schedule_json);
674
675 if schedule_changed {
676 let next_run = next_run_for_schedule(&schedule, now)?;
677 conn.execute(
678 "UPDATE cron_jobs
679 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
680 prompt = ?5, name = ?6, session_target = ?7, model = ?8,
681 enabled = ?9, delivery = ?10, delete_after_run = ?11,
682 allowed_tools = ?12, source = 'declarative', next_run = ?13
683 WHERE id = ?14",
684 params![
685 expression,
686 command,
687 schedule_json,
688 job_type,
689 decl.prompt,
690 decl.name,
691 session_target,
692 decl.model,
693 if decl.enabled { 1 } else { 0 },
694 delivery_json,
695 if delete_after_run { 1 } else { 0 },
696 allowed_tools_json,
697 next_run.to_rfc3339(),
698 decl.id,
699 ],
700 )
701 .with_context(|| {
702 format!("Failed to update declarative cron job '{}'", decl.id)
703 })?;
704 } else {
705 conn.execute(
706 "UPDATE cron_jobs
707 SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
708 prompt = ?5, name = ?6, session_target = ?7, model = ?8,
709 enabled = ?9, delivery = ?10, delete_after_run = ?11,
710 allowed_tools = ?12, source = 'declarative'
711 WHERE id = ?13",
712 params![
713 expression,
714 command,
715 schedule_json,
716 job_type,
717 decl.prompt,
718 decl.name,
719 session_target,
720 decl.model,
721 if decl.enabled { 1 } else { 0 },
722 delivery_json,
723 if delete_after_run { 1 } else { 0 },
724 allowed_tools_json,
725 decl.id,
726 ],
727 )
728 .with_context(|| {
729 format!("Failed to update declarative cron job '{}'", decl.id)
730 })?;
731 }
732
733 tracing::debug!(job_id = %decl.id, "Updated declarative cron job");
734 } else {
735 let next_run = next_run_for_schedule(&schedule, now)?;
737 conn.execute(
738 "INSERT INTO cron_jobs (
739 id, expression, command, schedule, job_type, prompt, name,
740 session_target, model, enabled, delivery, delete_after_run,
741 allowed_tools, source, created_at, next_run
742 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, 'declarative', ?14, ?15)",
743 params![
744 decl.id,
745 expression,
746 command,
747 schedule_json,
748 job_type,
749 decl.prompt,
750 decl.name,
751 session_target,
752 decl.model,
753 if decl.enabled { 1 } else { 0 },
754 delivery_json,
755 if delete_after_run { 1 } else { 0 },
756 allowed_tools_json,
757 now.to_rfc3339(),
758 next_run.to_rfc3339(),
759 ],
760 )
761 .with_context(|| {
762 format!(
763 "Failed to insert declarative cron job '{}'",
764 decl.id
765 )
766 })?;
767
768 tracing::info!(job_id = %decl.id, "Inserted declarative cron job from config");
769 }
770 }
771
772 Ok(())
773 })
774}
775
776fn slug(s: &str) -> String {
778 s.chars()
779 .map(|c| if c.is_alphanumeric() { c } else { '_' })
780 .collect::<String>()
781 .to_lowercase()
782}
783
784pub fn sync_workflow_cron_jobs(
790 config: &Config,
791 workflows: &[(String, String, Option<String>)], ) -> Result<()> {
793 if workflows.is_empty() {
794 with_connection(config, |conn| {
796 let deleted = conn
797 .execute("DELETE FROM cron_jobs WHERE source = 'workflow'", [])
798 .context("Failed to remove stale workflow cron jobs")?;
799 if deleted > 0 {
800 tracing::info!(
801 count = deleted,
802 "Removed workflow cron jobs no longer present"
803 );
804 }
805 Ok(())
806 })?;
807 return Ok(());
808 }
809
810 let now = Utc::now();
811
812 with_connection(config, |conn| {
813 let expected_ids: Vec<String> = workflows
815 .iter()
816 .enumerate()
817 .map(|(idx, (name, _expr, _tz))| format!("__wf_cron_{}_{}", slug(name), idx))
818 .collect();
819
820 {
822 let mut stmt = conn.prepare("SELECT id FROM cron_jobs WHERE source = 'workflow'")?;
823 let db_ids: Vec<String> = stmt
824 .query_map([], |row| row.get(0))?
825 .filter_map(|r| r.ok())
826 .collect();
827
828 for old_id in &db_ids {
829 if !expected_ids.contains(old_id) {
830 conn.execute(
831 "DELETE FROM cron_jobs WHERE id = ?1 AND source = 'workflow'",
832 params![old_id],
833 )
834 .with_context(|| {
835 format!("Failed to remove stale workflow cron job '{old_id}'")
836 })?;
837 tracing::info!(
838 job_id = %old_id,
839 "Removed stale workflow cron job"
840 );
841 }
842 }
843 }
844
845 for (idx, (name, expr, tz)) in workflows.iter().enumerate() {
847 let job_id = format!("__wf_cron_{}_{}", slug(name), idx);
848
849 let schedule = Schedule::Cron {
851 expr: expr.clone(),
852 tz: tz.clone(),
853 };
854 let schedule_json = serde_json::to_string(&schedule)?;
855
856 let next_run = match next_run_for_schedule(&schedule, now) {
858 Ok(t) => t,
859 Err(e) => {
860 tracing::warn!(
861 workflow = %name,
862 expr = %expr,
863 "Invalid cron expr for workflow trigger, skipping: {e}"
864 );
865 continue;
866 }
867 };
868
869 let exists: bool = conn
870 .prepare("SELECT COUNT(*) FROM cron_jobs WHERE id = ?1")?
871 .query_row(params![job_id], |row| row.get::<_, i64>(0))
872 .map(|c| c > 0)
873 .unwrap_or(false);
874
875 if exists {
876 let current_schedule_raw: Option<String> = conn
878 .prepare("SELECT schedule FROM cron_jobs WHERE id = ?1")?
879 .query_row(params![job_id], |row| row.get(0))
880 .ok();
881
882 let schedule_changed = current_schedule_raw.as_deref() != Some(&schedule_json);
883
884 if schedule_changed {
885 conn.execute(
886 "UPDATE cron_jobs
887 SET expression = ?1, schedule = ?2, next_run = ?3,
888 command = ?4, name = ?5, source = 'workflow'
889 WHERE id = ?6 AND source = 'workflow'",
890 params![
891 expr,
892 schedule_json,
893 next_run.to_rfc3339(),
894 name,
895 format!("Workflow: {name}"),
896 job_id,
897 ],
898 )
899 .with_context(|| format!("Failed to update workflow cron job '{job_id}'"))?;
900 } else {
901 conn.execute(
902 "UPDATE cron_jobs
903 SET command = ?1, name = ?2, source = 'workflow'
904 WHERE id = ?3 AND source = 'workflow'",
905 params![name, format!("Workflow: {name}"), job_id],
906 )
907 .with_context(|| format!("Failed to update workflow cron job '{job_id}'"))?;
908 }
909
910 tracing::debug!(
911 job_id = %job_id,
912 expr = %expr,
913 "Updated workflow cron job"
914 );
915 } else {
916 conn.execute(
918 "INSERT INTO cron_jobs (
919 id, expression, command, schedule, job_type, name,
920 enabled, source, created_at, next_run,
921 session_target, delete_after_run
922 ) VALUES (?1, ?2, ?3, ?4, 'workflow', ?5, 1, 'workflow', ?6, ?7, 'isolated', 0)",
923 params![
924 job_id,
925 expr,
926 name,
927 schedule_json,
928 format!("Workflow: {name}"),
929 now.to_rfc3339(),
930 next_run.to_rfc3339(),
931 ],
932 )
933 .with_context(|| {
934 format!("Failed to insert workflow cron job '{job_id}' for {name}")
935 })?;
936
937 tracing::info!(
938 job_id = %job_id,
939 workflow = %name,
940 expr = %expr,
941 "Inserted workflow cron job"
942 );
943 }
944 }
945
946 Ok(())
947 })
948}
949
950pub fn remove_workflow_cron_jobs(config: &Config, workflow_name: &str) -> Result<()> {
955 let prefix = format!("__wf_cron_{}_%", slug(workflow_name));
956 with_connection(config, |conn| {
957 let deleted = conn
958 .execute(
959 "DELETE FROM cron_jobs WHERE source = 'workflow' AND id LIKE ?1",
960 params![prefix],
961 )
962 .with_context(|| {
963 format!("Failed to remove workflow cron jobs for '{workflow_name}'")
964 })?;
965 if deleted > 0 {
966 tracing::info!(
967 workflow = %workflow_name,
968 count = deleted,
969 "Removed cron jobs for workflow"
970 );
971 }
972 Ok(())
973 })
974}
975
976fn validate_decl(decl: &crate::config::schema::CronJobDecl) -> Result<()> {
978 if decl.id.trim().is_empty() {
979 anyhow::bail!("Declarative cron job has empty id");
980 }
981
982 match decl.job_type.to_lowercase().as_str() {
983 "shell" => {
984 if decl
985 .command
986 .as_deref()
987 .map_or(true, |c| c.trim().is_empty())
988 {
989 anyhow::bail!(
990 "Declarative cron job '{}': shell job requires a non-empty 'command'",
991 decl.id
992 );
993 }
994 }
995 "agent" => {
996 if decl.prompt.as_deref().map_or(true, |p| p.trim().is_empty()) {
997 anyhow::bail!(
998 "Declarative cron job '{}': agent job requires a non-empty 'prompt'",
999 decl.id
1000 );
1001 }
1002 }
1003 "workflow" => {
1004 if decl
1007 .command
1008 .as_deref()
1009 .map_or(true, |c| c.trim().is_empty())
1010 {
1011 anyhow::bail!(
1012 "Declarative cron job '{}': workflow job requires a non-empty 'command' (workflow name)",
1013 decl.id
1014 );
1015 }
1016 }
1017 other => {
1018 anyhow::bail!(
1019 "Declarative cron job '{}': invalid job_type '{}', expected 'shell', 'agent', or 'workflow'",
1020 decl.id,
1021 other
1022 );
1023 }
1024 }
1025
1026 Ok(())
1027}
1028
1029fn convert_schedule_decl(decl: &crate::config::schema::CronScheduleDecl) -> Result<Schedule> {
1031 use crate::config::schema::CronScheduleDecl;
1032 match decl {
1033 CronScheduleDecl::Cron { expr, tz } => Ok(Schedule::Cron {
1034 expr: expr.clone(),
1035 tz: tz.clone(),
1036 }),
1037 CronScheduleDecl::Every { every_ms } => Ok(Schedule::Every {
1038 every_ms: *every_ms,
1039 }),
1040 CronScheduleDecl::At { at } => {
1041 let parsed = DateTime::parse_from_rfc3339(at)
1042 .with_context(|| {
1043 format!("Invalid RFC3339 timestamp in declarative cron 'at': {at}")
1044 })?
1045 .with_timezone(&Utc);
1046 Ok(Schedule::At { at: parsed })
1047 }
1048 }
1049}
1050
1051fn convert_delivery_decl(decl: &crate::config::schema::DeliveryConfigDecl) -> DeliveryConfig {
1053 DeliveryConfig {
1054 mode: decl.mode.clone(),
1055 channel: decl.channel.clone(),
1056 to: decl.to.clone(),
1057 best_effort: decl.best_effort,
1058 }
1059}
1060
1061fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> {
1062 let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?;
1063 let mut rows = stmt.query([])?;
1064 while let Some(row) = rows.next()? {
1065 let col_name: String = row.get(1)?;
1066 if col_name == name {
1067 return Ok(());
1068 }
1069 }
1070 drop(rows);
1072 drop(stmt);
1073
1074 match conn.execute(
1077 &format!("ALTER TABLE cron_jobs ADD COLUMN {name} {sql_type}"),
1078 [],
1079 ) {
1080 Ok(_) => Ok(()),
1081 Err(rusqlite::Error::SqliteFailure(err, Some(ref msg)))
1082 if msg.contains("duplicate column name") =>
1083 {
1084 tracing::debug!("Column cron_jobs.{name} already exists (concurrent migration): {err}");
1085 Ok(())
1086 }
1087 Err(e) => Err(e).with_context(|| format!("Failed to add cron_jobs.{name}")),
1088 }
1089}
1090
1091fn with_connection<T>(config: &Config, f: impl FnOnce(&Connection) -> Result<T>) -> Result<T> {
1092 let db_path = config.workspace_dir.join("cron").join("jobs.db");
1093 if let Some(parent) = db_path.parent() {
1094 std::fs::create_dir_all(parent)
1095 .with_context(|| format!("Failed to create cron directory: {}", parent.display()))?;
1096 }
1097
1098 let conn = Connection::open(&db_path)
1099 .with_context(|| format!("Failed to open cron DB: {}", db_path.display()))?;
1100
1101 conn.execute_batch(
1102 "PRAGMA foreign_keys = ON;
1103 CREATE TABLE IF NOT EXISTS cron_jobs (
1104 id TEXT PRIMARY KEY,
1105 expression TEXT NOT NULL,
1106 command TEXT NOT NULL,
1107 schedule TEXT,
1108 job_type TEXT NOT NULL DEFAULT 'shell',
1109 prompt TEXT,
1110 name TEXT,
1111 session_target TEXT NOT NULL DEFAULT 'isolated',
1112 model TEXT,
1113 enabled INTEGER NOT NULL DEFAULT 1,
1114 delivery TEXT,
1115 delete_after_run INTEGER NOT NULL DEFAULT 0,
1116 allowed_tools TEXT,
1117 created_at TEXT NOT NULL,
1118 next_run TEXT NOT NULL,
1119 last_run TEXT,
1120 last_status TEXT,
1121 last_output TEXT
1122 );
1123 CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);
1124
1125 CREATE TABLE IF NOT EXISTS cron_runs (
1126 id INTEGER PRIMARY KEY AUTOINCREMENT,
1127 job_id TEXT NOT NULL,
1128 started_at TEXT NOT NULL,
1129 finished_at TEXT NOT NULL,
1130 status TEXT NOT NULL,
1131 output TEXT,
1132 duration_ms INTEGER,
1133 FOREIGN KEY (job_id) REFERENCES cron_jobs(id) ON DELETE CASCADE
1134 );
1135 CREATE INDEX IF NOT EXISTS idx_cron_runs_job_id ON cron_runs(job_id);
1136 CREATE INDEX IF NOT EXISTS idx_cron_runs_started_at ON cron_runs(started_at);
1137 CREATE INDEX IF NOT EXISTS idx_cron_runs_job_started ON cron_runs(job_id, started_at);",
1138 )
1139 .context("Failed to initialize cron schema")?;
1140
1141 add_column_if_missing(&conn, "schedule", "TEXT")?;
1142 add_column_if_missing(&conn, "job_type", "TEXT NOT NULL DEFAULT 'shell'")?;
1143 add_column_if_missing(&conn, "prompt", "TEXT")?;
1144 add_column_if_missing(&conn, "name", "TEXT")?;
1145 add_column_if_missing(&conn, "session_target", "TEXT NOT NULL DEFAULT 'isolated'")?;
1146 add_column_if_missing(&conn, "model", "TEXT")?;
1147 add_column_if_missing(&conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?;
1148 add_column_if_missing(&conn, "delivery", "TEXT")?;
1149 add_column_if_missing(&conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?;
1150 add_column_if_missing(&conn, "allowed_tools", "TEXT")?;
1151 add_column_if_missing(&conn, "source", "TEXT DEFAULT 'imperative'")?;
1152
1153 f(&conn)
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158 use super::*;
1159 use crate::config::Config;
1160 use chrono::Duration as ChronoDuration;
1161 use tempfile::TempDir;
1162
1163 fn test_config(tmp: &TempDir) -> Config {
1164 let config = Config {
1165 workspace_dir: tmp.path().join("workspace"),
1166 config_path: tmp.path().join("config.toml"),
1167 ..Config::default()
1168 };
1169 std::fs::create_dir_all(&config.workspace_dir).unwrap();
1170 config
1171 }
1172
1173 #[test]
1174 fn add_job_accepts_five_field_expression() {
1175 let tmp = TempDir::new().unwrap();
1176 let config = test_config(&tmp);
1177
1178 let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap();
1179 assert_eq!(job.expression, "*/5 * * * *");
1180 assert_eq!(job.command, "echo ok");
1181 assert!(matches!(job.schedule, Schedule::Cron { .. }));
1182 }
1183
1184 #[test]
1185 fn add_shell_job_marks_at_schedule_for_auto_delete() {
1186 let tmp = TempDir::new().unwrap();
1187 let config = test_config(&tmp);
1188
1189 let one_shot = add_shell_job(
1190 &config,
1191 None,
1192 Schedule::At {
1193 at: Utc::now() + ChronoDuration::minutes(10),
1194 },
1195 "echo once",
1196 None,
1197 )
1198 .unwrap();
1199 assert!(one_shot.delete_after_run);
1200
1201 let recurring = add_shell_job(
1202 &config,
1203 None,
1204 Schedule::Every { every_ms: 60_000 },
1205 "echo recurring",
1206 None,
1207 )
1208 .unwrap();
1209 assert!(!recurring.delete_after_run);
1210 }
1211
1212 #[test]
1213 fn add_shell_job_persists_delivery() {
1214 let tmp = TempDir::new().unwrap();
1215 let config = test_config(&tmp);
1216
1217 let job = add_shell_job(
1218 &config,
1219 Some("deliver-shell".into()),
1220 Schedule::Cron {
1221 expr: "*/5 * * * *".into(),
1222 tz: None,
1223 },
1224 "echo delivered",
1225 Some(DeliveryConfig {
1226 mode: "announce".into(),
1227 channel: Some("discord".into()),
1228 to: Some("1234567890".into()),
1229 best_effort: true,
1230 }),
1231 )
1232 .unwrap();
1233
1234 assert_eq!(job.delivery.mode, "announce");
1235 assert_eq!(job.delivery.channel.as_deref(), Some("discord"));
1236 assert_eq!(job.delivery.to.as_deref(), Some("1234567890"));
1237
1238 let stored = get_job(&config, &job.id).unwrap();
1239 assert_eq!(stored.delivery.mode, "announce");
1240 assert_eq!(stored.delivery.channel.as_deref(), Some("discord"));
1241 assert_eq!(stored.delivery.to.as_deref(), Some("1234567890"));
1242 }
1243
1244 #[test]
1245 fn add_agent_job_rejects_invalid_announce_delivery() {
1246 let tmp = TempDir::new().unwrap();
1247 let config = test_config(&tmp);
1248
1249 let err = add_agent_job(
1250 &config,
1251 Some("deliver-agent".into()),
1252 Schedule::Cron {
1253 expr: "*/5 * * * *".into(),
1254 tz: None,
1255 },
1256 "summarize logs",
1257 SessionTarget::Isolated,
1258 None,
1259 Some(DeliveryConfig {
1260 mode: "announce".into(),
1261 channel: Some("discord".into()),
1262 to: None,
1263 best_effort: true,
1264 }),
1265 false,
1266 None,
1267 )
1268 .unwrap_err();
1269
1270 assert!(err.to_string().contains("delivery.to is required"));
1271 }
1272
1273 #[test]
1274 fn add_shell_job_rejects_invalid_delivery_mode() {
1275 let tmp = TempDir::new().unwrap();
1276 let config = test_config(&tmp);
1277
1278 let err = add_shell_job(
1279 &config,
1280 Some("deliver-shell".into()),
1281 Schedule::Cron {
1282 expr: "*/5 * * * *".into(),
1283 tz: None,
1284 },
1285 "echo delivered",
1286 Some(DeliveryConfig {
1287 mode: "annouce".into(),
1288 channel: Some("discord".into()),
1289 to: Some("1234567890".into()),
1290 best_effort: true,
1291 }),
1292 )
1293 .unwrap_err();
1294
1295 assert!(err.to_string().contains("unsupported delivery mode"));
1296 }
1297
1298 #[test]
1299 fn add_list_remove_roundtrip() {
1300 let tmp = TempDir::new().unwrap();
1301 let config = test_config(&tmp);
1302
1303 let job = add_job(&config, "*/10 * * * *", "echo roundtrip").unwrap();
1304 let listed = list_jobs(&config).unwrap();
1305 assert_eq!(listed.len(), 1);
1306 assert_eq!(listed[0].id, job.id);
1307
1308 remove_job(&config, &job.id).unwrap();
1309 assert!(list_jobs(&config).unwrap().is_empty());
1310 }
1311
1312 #[test]
1313 fn due_jobs_filters_by_timestamp_and_enabled() {
1314 let tmp = TempDir::new().unwrap();
1315 let config = test_config(&tmp);
1316
1317 let job = add_job(&config, "* * * * *", "echo due").unwrap();
1318
1319 let due_now = due_jobs(&config, Utc::now()).unwrap();
1320 assert!(due_now.is_empty(), "new job should not be due immediately");
1321
1322 let far_future = Utc::now() + ChronoDuration::days(365);
1323 let due_future = due_jobs(&config, far_future).unwrap();
1324 assert_eq!(due_future.len(), 1, "job should be due in far future");
1325
1326 let _ = update_job(
1327 &config,
1328 &job.id,
1329 CronJobPatch {
1330 enabled: Some(false),
1331 ..CronJobPatch::default()
1332 },
1333 )
1334 .unwrap();
1335 let due_after_disable = due_jobs(&config, far_future).unwrap();
1336 assert!(due_after_disable.is_empty());
1337 }
1338
1339 #[test]
1340 fn due_jobs_respects_scheduler_max_tasks_limit() {
1341 let tmp = TempDir::new().unwrap();
1342 let mut config = test_config(&tmp);
1343 config.scheduler.max_tasks = 2;
1344
1345 let _ = add_job(&config, "* * * * *", "echo due-1").unwrap();
1346 let _ = add_job(&config, "* * * * *", "echo due-2").unwrap();
1347 let _ = add_job(&config, "* * * * *", "echo due-3").unwrap();
1348
1349 let far_future = Utc::now() + ChronoDuration::days(365);
1350 let due = due_jobs(&config, far_future).unwrap();
1351 assert_eq!(due.len(), 2);
1352 }
1353
1354 #[test]
1355 fn all_overdue_jobs_ignores_max_tasks_limit() {
1356 let tmp = TempDir::new().unwrap();
1357 let mut config = test_config(&tmp);
1358 config.scheduler.max_tasks = 2;
1359
1360 let _ = add_job(&config, "* * * * *", "echo ov-1").unwrap();
1361 let _ = add_job(&config, "* * * * *", "echo ov-2").unwrap();
1362 let _ = add_job(&config, "* * * * *", "echo ov-3").unwrap();
1363
1364 let far_future = Utc::now() + ChronoDuration::days(365);
1365 let due = due_jobs(&config, far_future).unwrap();
1367 assert_eq!(due.len(), 2);
1368 let overdue = all_overdue_jobs(&config, far_future).unwrap();
1370 assert_eq!(overdue.len(), 3);
1371 }
1372
1373 #[test]
1374 fn all_overdue_jobs_excludes_disabled_jobs() {
1375 let tmp = TempDir::new().unwrap();
1376 let config = test_config(&tmp);
1377
1378 let job = add_job(&config, "* * * * *", "echo disabled").unwrap();
1379 let _ = update_job(
1380 &config,
1381 &job.id,
1382 CronJobPatch {
1383 enabled: Some(false),
1384 ..CronJobPatch::default()
1385 },
1386 )
1387 .unwrap();
1388
1389 let far_future = Utc::now() + ChronoDuration::days(365);
1390 let overdue = all_overdue_jobs(&config, far_future).unwrap();
1391 assert!(overdue.is_empty());
1392 }
1393
1394 #[test]
1395 fn add_agent_job_persists_allowed_tools() {
1396 let tmp = TempDir::new().unwrap();
1397 let config = test_config(&tmp);
1398
1399 let job = add_agent_job(
1400 &config,
1401 Some("agent".into()),
1402 Schedule::Every { every_ms: 60_000 },
1403 "do work",
1404 SessionTarget::Isolated,
1405 None,
1406 None,
1407 false,
1408 Some(vec!["file_read".into(), "web_search".into()]),
1409 )
1410 .unwrap();
1411
1412 assert_eq!(
1413 job.allowed_tools,
1414 Some(vec!["file_read".into(), "web_search".into()])
1415 );
1416
1417 let stored = get_job(&config, &job.id).unwrap();
1418 assert_eq!(stored.allowed_tools, job.allowed_tools);
1419 }
1420
1421 #[test]
1422 fn update_job_persists_allowed_tools_patch() {
1423 let tmp = TempDir::new().unwrap();
1424 let config = test_config(&tmp);
1425
1426 let job = add_agent_job(
1427 &config,
1428 Some("agent".into()),
1429 Schedule::Every { every_ms: 60_000 },
1430 "do work",
1431 SessionTarget::Isolated,
1432 None,
1433 None,
1434 false,
1435 None,
1436 )
1437 .unwrap();
1438
1439 let updated = update_job(
1440 &config,
1441 &job.id,
1442 CronJobPatch {
1443 allowed_tools: Some(vec!["shell".into()]),
1444 ..CronJobPatch::default()
1445 },
1446 )
1447 .unwrap();
1448
1449 assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
1450 assert_eq!(
1451 get_job(&config, &job.id).unwrap().allowed_tools,
1452 Some(vec!["shell".into()])
1453 );
1454 }
1455
1456 #[test]
1457 fn reschedule_after_run_persists_last_status_and_last_run() {
1458 let tmp = TempDir::new().unwrap();
1459 let config = test_config(&tmp);
1460
1461 let job = add_job(&config, "*/15 * * * *", "echo run").unwrap();
1462 reschedule_after_run(&config, &job, false, "failed output").unwrap();
1463
1464 let listed = list_jobs(&config).unwrap();
1465 let stored = listed.iter().find(|j| j.id == job.id).unwrap();
1466 assert_eq!(stored.last_status.as_deref(), Some("error"));
1467 assert!(stored.last_run.is_some());
1468 assert_eq!(stored.last_output.as_deref(), Some("failed output"));
1469 }
1470
1471 #[test]
1472 fn job_type_from_sql_reads_valid_value() {
1473 let tmp = TempDir::new().unwrap();
1474 let config = test_config(&tmp);
1475 let now = Utc::now();
1476
1477 with_connection(&config, |conn| {
1478 conn.execute(
1479 "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1480 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1481 params![
1482 "job-type-valid",
1483 "*/5 * * * *",
1484 "echo ok",
1485 Option::<String>::None,
1486 "agent",
1487 now.to_rfc3339(),
1488 (now + ChronoDuration::minutes(5)).to_rfc3339(),
1489 ],
1490 )?;
1491 Ok(())
1492 })
1493 .unwrap();
1494
1495 let job = get_job(&config, "job-type-valid").unwrap();
1496 assert_eq!(job.job_type, JobType::Agent);
1497 }
1498
1499 #[test]
1500 fn job_type_from_sql_rejects_invalid_value() {
1501 let tmp = TempDir::new().unwrap();
1502 let config = test_config(&tmp);
1503 let now = Utc::now();
1504
1505 with_connection(&config, |conn| {
1506 conn.execute(
1507 "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1508 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1509 params![
1510 "job-type-invalid",
1511 "*/5 * * * *",
1512 "echo ok",
1513 Option::<String>::None,
1514 "unknown",
1515 now.to_rfc3339(),
1516 (now + ChronoDuration::minutes(5)).to_rfc3339(),
1517 ],
1518 )?;
1519 Ok(())
1520 })
1521 .unwrap();
1522
1523 assert!(get_job(&config, "job-type-invalid").is_err());
1524 }
1525
1526 #[test]
1527 fn migration_falls_back_to_legacy_expression() {
1528 let tmp = TempDir::new().unwrap();
1529 let config = test_config(&tmp);
1530
1531 with_connection(&config, |conn| {
1532 conn.execute(
1533 "INSERT INTO cron_jobs (id, expression, command, created_at, next_run)
1534 VALUES (?1, ?2, ?3, ?4, ?5)",
1535 params![
1536 "legacy-id",
1537 "*/5 * * * *",
1538 "echo legacy",
1539 Utc::now().to_rfc3339(),
1540 (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1541 ],
1542 )?;
1543 conn.execute(
1544 "UPDATE cron_jobs SET schedule = NULL WHERE id = 'legacy-id'",
1545 [],
1546 )?;
1547 Ok(())
1548 })
1549 .unwrap();
1550
1551 let job = get_job(&config, "legacy-id").unwrap();
1552 assert!(matches!(job.schedule, Schedule::Cron { .. }));
1553 }
1554
1555 #[test]
1556 fn record_and_prune_runs() {
1557 let tmp = TempDir::new().unwrap();
1558 let mut config = test_config(&tmp);
1559 config.cron.max_run_history = 2;
1560 let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap();
1561 let base = Utc::now();
1562
1563 for idx in 0..3 {
1564 let start = base + ChronoDuration::seconds(idx);
1565 let end = start + ChronoDuration::milliseconds(100);
1566 record_run(&config, &job.id, start, end, "ok", Some("done"), 100).unwrap();
1567 }
1568
1569 let runs = list_runs(&config, &job.id, 10).unwrap();
1570 assert_eq!(runs.len(), 2);
1571 }
1572
1573 #[test]
1574 fn remove_job_cascades_run_history() {
1575 let tmp = TempDir::new().unwrap();
1576 let config = test_config(&tmp);
1577 let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap();
1578 let start = Utc::now();
1579 record_run(
1580 &config,
1581 &job.id,
1582 start,
1583 start + ChronoDuration::milliseconds(5),
1584 "ok",
1585 Some("ok"),
1586 5,
1587 )
1588 .unwrap();
1589
1590 remove_job(&config, &job.id).unwrap();
1591 let runs = list_runs(&config, &job.id, 10).unwrap();
1592 assert!(runs.is_empty());
1593 }
1594
1595 #[test]
1596 fn record_run_truncates_large_output() {
1597 let tmp = TempDir::new().unwrap();
1598 let config = test_config(&tmp);
1599 let job = add_job(&config, "*/5 * * * *", "echo trunc").unwrap();
1600 let output = "x".repeat(MAX_CRON_OUTPUT_BYTES + 512);
1601
1602 record_run(
1603 &config,
1604 &job.id,
1605 Utc::now(),
1606 Utc::now(),
1607 "ok",
1608 Some(&output),
1609 1,
1610 )
1611 .unwrap();
1612
1613 let runs = list_runs(&config, &job.id, 1).unwrap();
1614 let stored = runs[0].output.as_deref().unwrap_or_default();
1615 assert!(stored.ends_with(TRUNCATED_OUTPUT_MARKER));
1616 assert!(stored.len() <= MAX_CRON_OUTPUT_BYTES);
1617 }
1618
1619 #[test]
1620 fn reschedule_after_run_disables_at_schedule_job() {
1621 let tmp = TempDir::new().unwrap();
1622 let config = test_config(&tmp);
1623 let at = Utc::now() + ChronoDuration::minutes(10);
1624 let job = add_shell_job(&config, None, Schedule::At { at }, "echo once", None).unwrap();
1625
1626 reschedule_after_run(&config, &job, true, "done").unwrap();
1627
1628 let stored = get_job(&config, &job.id).unwrap();
1629 assert!(
1630 !stored.enabled,
1631 "At schedule job should be disabled after reschedule"
1632 );
1633 assert_eq!(stored.last_status.as_deref(), Some("ok"));
1634 }
1635
1636 #[test]
1637 fn reschedule_after_run_disables_at_schedule_job_on_failure() {
1638 let tmp = TempDir::new().unwrap();
1639 let config = test_config(&tmp);
1640 let at = Utc::now() + ChronoDuration::minutes(10);
1641 let job = add_shell_job(&config, None, Schedule::At { at }, "echo once", None).unwrap();
1642
1643 reschedule_after_run(&config, &job, false, "failed").unwrap();
1644
1645 let stored = get_job(&config, &job.id).unwrap();
1646 assert!(
1647 !stored.enabled,
1648 "At schedule job should be disabled after reschedule even on failure"
1649 );
1650 assert_eq!(stored.last_status.as_deref(), Some("error"));
1651 assert_eq!(stored.last_output.as_deref(), Some("failed"));
1652 }
1653
1654 #[test]
1655 fn reschedule_after_run_truncates_last_output() {
1656 let tmp = TempDir::new().unwrap();
1657 let config = test_config(&tmp);
1658 let job = add_job(&config, "*/5 * * * *", "echo trunc").unwrap();
1659 let output = "y".repeat(MAX_CRON_OUTPUT_BYTES + 1024);
1660
1661 reschedule_after_run(&config, &job, false, &output).unwrap();
1662
1663 let stored = get_job(&config, &job.id).unwrap();
1664 let last_output = stored.last_output.as_deref().unwrap_or_default();
1665 assert!(last_output.ends_with(TRUNCATED_OUTPUT_MARKER));
1666 assert!(last_output.len() <= MAX_CRON_OUTPUT_BYTES);
1667 }
1668
1669 fn make_shell_decl(id: &str, expr: &str, cmd: &str) -> crate::config::schema::CronJobDecl {
1672 crate::config::schema::CronJobDecl {
1673 id: id.to_string(),
1674 name: Some(format!("decl-{id}")),
1675 job_type: "shell".to_string(),
1676 schedule: crate::config::schema::CronScheduleDecl::Cron {
1677 expr: expr.to_string(),
1678 tz: None,
1679 },
1680 command: Some(cmd.to_string()),
1681 prompt: None,
1682 enabled: true,
1683 model: None,
1684 allowed_tools: None,
1685 session_target: None,
1686 delivery: None,
1687 }
1688 }
1689
1690 fn make_agent_decl(id: &str, expr: &str, prompt: &str) -> crate::config::schema::CronJobDecl {
1691 crate::config::schema::CronJobDecl {
1692 id: id.to_string(),
1693 name: Some(format!("decl-{id}")),
1694 job_type: "agent".to_string(),
1695 schedule: crate::config::schema::CronScheduleDecl::Cron {
1696 expr: expr.to_string(),
1697 tz: None,
1698 },
1699 command: None,
1700 prompt: Some(prompt.to_string()),
1701 enabled: true,
1702 model: None,
1703 allowed_tools: None,
1704 session_target: None,
1705 delivery: None,
1706 }
1707 }
1708
1709 #[test]
1710 fn sync_inserts_new_declarative_job() {
1711 let tmp = TempDir::new().unwrap();
1712 let config = test_config(&tmp);
1713
1714 let decls = vec![make_shell_decl("daily-backup", "0 2 * * *", "echo backup")];
1715 sync_declarative_jobs(&config, &decls).unwrap();
1716
1717 let job = get_job(&config, "daily-backup").unwrap();
1718 assert_eq!(job.command, "echo backup");
1719 assert_eq!(job.source, "declarative");
1720 assert_eq!(job.name.as_deref(), Some("decl-daily-backup"));
1721 }
1722
1723 #[test]
1724 fn sync_updates_existing_declarative_job() {
1725 let tmp = TempDir::new().unwrap();
1726 let config = test_config(&tmp);
1727
1728 let decls = vec![make_shell_decl("updatable", "0 2 * * *", "echo v1")];
1729 sync_declarative_jobs(&config, &decls).unwrap();
1730
1731 let job_v1 = get_job(&config, "updatable").unwrap();
1732 assert_eq!(job_v1.command, "echo v1");
1733
1734 let decls_v2 = vec![make_shell_decl("updatable", "0 3 * * *", "echo v2")];
1735 sync_declarative_jobs(&config, &decls_v2).unwrap();
1736
1737 let job_v2 = get_job(&config, "updatable").unwrap();
1738 assert_eq!(job_v2.command, "echo v2");
1739 assert_eq!(job_v2.expression, "0 3 * * *");
1740 assert_eq!(job_v2.source, "declarative");
1741 }
1742
1743 #[test]
1744 fn sync_does_not_delete_imperative_jobs() {
1745 let tmp = TempDir::new().unwrap();
1746 let config = test_config(&tmp);
1747
1748 let imperative = add_job(&config, "*/10 * * * *", "echo imperative").unwrap();
1750
1751 let decls = vec![make_shell_decl("my-decl", "0 2 * * *", "echo decl")];
1753 sync_declarative_jobs(&config, &decls).unwrap();
1754
1755 let still_there = get_job(&config, &imperative.id).unwrap();
1757 assert_eq!(still_there.command, "echo imperative");
1758 assert_eq!(still_there.source, "imperative");
1759
1760 let decl_job = get_job(&config, "my-decl").unwrap();
1762 assert_eq!(decl_job.command, "echo decl");
1763 }
1764
1765 #[test]
1766 fn sync_removes_stale_declarative_jobs() {
1767 let tmp = TempDir::new().unwrap();
1768 let config = test_config(&tmp);
1769
1770 let decls = vec![
1772 make_shell_decl("keeper", "0 2 * * *", "echo keep"),
1773 make_shell_decl("stale", "0 3 * * *", "echo stale"),
1774 ];
1775 sync_declarative_jobs(&config, &decls).unwrap();
1776
1777 let decls_v2 = vec![make_shell_decl("keeper", "0 2 * * *", "echo keep")];
1779 sync_declarative_jobs(&config, &decls_v2).unwrap();
1780
1781 assert!(get_job(&config, "stale").is_err());
1782 assert!(get_job(&config, "keeper").is_ok());
1783 }
1784
1785 #[test]
1786 fn sync_empty_removes_all_declarative_jobs() {
1787 let tmp = TempDir::new().unwrap();
1788 let config = test_config(&tmp);
1789
1790 let decls = vec![make_shell_decl("to-remove", "0 2 * * *", "echo bye")];
1791 sync_declarative_jobs(&config, &decls).unwrap();
1792 assert!(get_job(&config, "to-remove").is_ok());
1793
1794 sync_declarative_jobs(&config, &[]).unwrap();
1796 assert!(get_job(&config, "to-remove").is_err());
1797 }
1798
1799 #[test]
1800 fn sync_validates_shell_job_requires_command() {
1801 let tmp = TempDir::new().unwrap();
1802 let config = test_config(&tmp);
1803
1804 let mut decl = make_shell_decl("bad", "0 2 * * *", "echo ok");
1805 decl.command = None;
1806
1807 let result = sync_declarative_jobs(&config, &[decl]);
1808 assert!(result.is_err());
1809 assert!(result.unwrap_err().to_string().contains("command"));
1810 }
1811
1812 #[test]
1813 fn sync_validates_agent_job_requires_prompt() {
1814 let tmp = TempDir::new().unwrap();
1815 let config = test_config(&tmp);
1816
1817 let mut decl = make_agent_decl("bad-agent", "0 2 * * *", "do stuff");
1818 decl.prompt = None;
1819
1820 let result = sync_declarative_jobs(&config, &[decl]);
1821 assert!(result.is_err());
1822 assert!(result.unwrap_err().to_string().contains("prompt"));
1823 }
1824
1825 #[test]
1826 fn sync_agent_job_inserts_correctly() {
1827 let tmp = TempDir::new().unwrap();
1828 let config = test_config(&tmp);
1829
1830 let decls = vec![make_agent_decl(
1831 "agent-check",
1832 "*/15 * * * *",
1833 "check health",
1834 )];
1835 sync_declarative_jobs(&config, &decls).unwrap();
1836
1837 let job = get_job(&config, "agent-check").unwrap();
1838 assert_eq!(job.job_type, JobType::Agent);
1839 assert_eq!(job.prompt.as_deref(), Some("check health"));
1840 assert_eq!(job.source, "declarative");
1841 }
1842
1843 #[test]
1844 fn sync_every_schedule_works() {
1845 let tmp = TempDir::new().unwrap();
1846 let config = test_config(&tmp);
1847
1848 let decl = crate::config::schema::CronJobDecl {
1849 id: "interval-job".to_string(),
1850 name: None,
1851 job_type: "shell".to_string(),
1852 schedule: crate::config::schema::CronScheduleDecl::Every { every_ms: 60000 },
1853 command: Some("echo interval".to_string()),
1854 prompt: None,
1855 enabled: true,
1856 model: None,
1857 allowed_tools: None,
1858 session_target: None,
1859 delivery: None,
1860 };
1861
1862 sync_declarative_jobs(&config, &[decl]).unwrap();
1863
1864 let job = get_job(&config, "interval-job").unwrap();
1865 assert!(matches!(job.schedule, Schedule::Every { every_ms: 60000 }));
1866 assert_eq!(job.command, "echo interval");
1867 }
1868
1869 #[test]
1870 fn declarative_config_parses_from_toml() {
1871 let toml_str = r#"
1872enabled = true
1873
1874[[jobs]]
1875id = "daily-report"
1876name = "Daily Report"
1877job_type = "shell"
1878command = "echo report"
1879schedule = { kind = "cron", expr = "0 9 * * *" }
1880
1881[[jobs]]
1882id = "health-check"
1883job_type = "agent"
1884prompt = "Check server health"
1885schedule = { kind = "every", every_ms = 300000 }
1886 "#;
1887
1888 let parsed: crate::config::schema::CronConfig = toml::from_str(toml_str).unwrap();
1889 assert!(parsed.enabled);
1890 assert_eq!(parsed.jobs.len(), 2);
1891
1892 assert_eq!(parsed.jobs[0].id, "daily-report");
1893 assert_eq!(parsed.jobs[0].command.as_deref(), Some("echo report"));
1894 assert!(matches!(
1895 parsed.jobs[0].schedule,
1896 crate::config::schema::CronScheduleDecl::Cron { ref expr, .. } if expr == "0 9 * * *"
1897 ));
1898
1899 assert_eq!(parsed.jobs[1].id, "health-check");
1900 assert_eq!(parsed.jobs[1].job_type, "agent");
1901 assert_eq!(
1902 parsed.jobs[1].prompt.as_deref(),
1903 Some("Check server health")
1904 );
1905 assert!(matches!(
1906 parsed.jobs[1].schedule,
1907 crate::config::schema::CronScheduleDecl::Every { every_ms: 300_000 }
1908 ));
1909 }
1910}