Skip to main content

construct/cron/
store.rs

1use crate::config::Config;
2use crate::cron::{
3    CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
4    next_run_for_schedule, schedule_cron_expression, validate_delivery_config, validate_schedule,
5};
6use anyhow::{Context, Result};
7use chrono::{DateTime, Utc};
8use rusqlite::types::{FromSqlResult, ValueRef};
9use rusqlite::{Connection, params};
10use uuid::Uuid;
11
12const MAX_CRON_OUTPUT_BYTES: usize = 16 * 1024;
13const TRUNCATED_OUTPUT_MARKER: &str = "\n...[truncated]";
14
15impl rusqlite::types::FromSql for JobType {
16    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
17        let text = value.as_str()?;
18        JobType::try_from(text).map_err(|e| rusqlite::types::FromSqlError::Other(e.into()))
19    }
20}
21
22pub fn add_job(config: &Config, expression: &str, command: &str) -> Result<CronJob> {
23    let schedule = Schedule::Cron {
24        expr: expression.to_string(),
25        tz: None,
26    };
27    add_shell_job(config, None, schedule, command, None)
28}
29
30pub fn add_shell_job(
31    config: &Config,
32    name: Option<String>,
33    schedule: Schedule,
34    command: &str,
35    delivery: Option<DeliveryConfig>,
36) -> Result<CronJob> {
37    let now = Utc::now();
38    validate_schedule(&schedule, now)?;
39    validate_delivery_config(delivery.as_ref())?;
40    let next_run = next_run_for_schedule(&schedule, now)?;
41    let id = Uuid::new_v4().to_string();
42    let expression = schedule_cron_expression(&schedule).unwrap_or_default();
43    let schedule_json = serde_json::to_string(&schedule)?;
44    let delivery = delivery.unwrap_or_default();
45
46    let delete_after_run = matches!(schedule, Schedule::At { .. });
47
48    with_connection(config, |conn| {
49        conn.execute(
50            "INSERT INTO cron_jobs (
51                id, expression, command, schedule, job_type, prompt, name, session_target, model,
52                enabled, delivery, delete_after_run, created_at, next_run
53             ) VALUES (?1, ?2, ?3, ?4, 'shell', NULL, ?5, 'isolated', NULL, 1, ?6, ?7, ?8, ?9)",
54            params![
55                id,
56                expression,
57                command,
58                schedule_json,
59                name,
60                serde_json::to_string(&delivery)?,
61                if delete_after_run { 1 } else { 0 },
62                now.to_rfc3339(),
63                next_run.to_rfc3339(),
64            ],
65        )
66        .context("Failed to insert cron shell job")?;
67        Ok(())
68    })?;
69
70    get_job(config, &id)
71}
72
73#[allow(clippy::too_many_arguments)]
74pub fn add_agent_job(
75    config: &Config,
76    name: Option<String>,
77    schedule: Schedule,
78    prompt: &str,
79    session_target: SessionTarget,
80    model: Option<String>,
81    delivery: Option<DeliveryConfig>,
82    delete_after_run: bool,
83    allowed_tools: Option<Vec<String>>,
84) -> Result<CronJob> {
85    let now = Utc::now();
86    validate_schedule(&schedule, now)?;
87    validate_delivery_config(delivery.as_ref())?;
88    let next_run = next_run_for_schedule(&schedule, now)?;
89    let id = Uuid::new_v4().to_string();
90    let expression = schedule_cron_expression(&schedule).unwrap_or_default();
91    let schedule_json = serde_json::to_string(&schedule)?;
92    let delivery = delivery.unwrap_or_default();
93
94    with_connection(config, |conn| {
95        conn.execute(
96            "INSERT INTO cron_jobs (
97                id, expression, command, schedule, job_type, prompt, name, session_target, model,
98                enabled, delivery, delete_after_run, allowed_tools, created_at, next_run
99             ) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11, ?12)",
100            params![
101                id,
102                expression,
103                schedule_json,
104                prompt,
105                name,
106                session_target.as_str(),
107                model,
108                serde_json::to_string(&delivery)?,
109                if delete_after_run { 1 } else { 0 },
110                encode_allowed_tools(allowed_tools.as_ref())?,
111                now.to_rfc3339(),
112                next_run.to_rfc3339(),
113            ],
114        )
115        .context("Failed to insert cron agent job")?;
116        Ok(())
117    })?;
118
119    get_job(config, &id)
120}
121
122pub fn list_jobs(config: &Config) -> Result<Vec<CronJob>> {
123    with_connection(config, |conn| {
124        let mut stmt = conn.prepare(
125            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
126                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
127                    allowed_tools, source
128             FROM cron_jobs ORDER BY next_run ASC",
129        )?;
130
131        let rows = stmt.query_map([], map_cron_job_row)?;
132
133        let mut jobs = Vec::new();
134        for row in rows {
135            jobs.push(row?);
136        }
137        Ok(jobs)
138    })
139}
140
141pub fn get_job(config: &Config, job_id: &str) -> Result<CronJob> {
142    with_connection(config, |conn| {
143        let mut stmt = conn.prepare(
144            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
145                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
146                    allowed_tools, source
147             FROM cron_jobs WHERE id = ?1",
148        )?;
149
150        let mut rows = stmt.query(params![job_id])?;
151        if let Some(row) = rows.next()? {
152            map_cron_job_row(row).map_err(Into::into)
153        } else {
154            anyhow::bail!("Cron job '{job_id}' not found")
155        }
156    })
157}
158
159pub fn remove_job(config: &Config, id: &str) -> Result<()> {
160    let changed = with_connection(config, |conn| {
161        conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![id])
162            .context("Failed to delete cron job")
163    })?;
164
165    if changed == 0 {
166        anyhow::bail!("Cron job '{id}' not found");
167    }
168
169    println!("✅ Removed cron job {id}");
170    Ok(())
171}
172
173pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
174    let lim = i64::try_from(config.scheduler.max_tasks.max(1))
175        .context("Scheduler max_tasks overflows i64")?;
176    with_connection(config, |conn| {
177        let mut stmt = conn.prepare(
178            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
179                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
180                    allowed_tools, source
181             FROM cron_jobs
182             WHERE enabled = 1 AND next_run <= ?1
183             ORDER BY next_run ASC
184             LIMIT ?2",
185        )?;
186
187        let rows = stmt.query_map(params![now.to_rfc3339(), lim], map_cron_job_row)?;
188
189        let mut jobs = Vec::new();
190        for row in rows {
191            match row {
192                Ok(job) => jobs.push(job),
193                Err(e) => tracing::warn!("Skipping cron job with unparseable row data: {e}"),
194            }
195        }
196        Ok(jobs)
197    })
198}
199
200/// Return **all** enabled overdue jobs without the `max_tasks` limit.
201///
202/// Used by the scheduler startup catch-up to ensure every missed job is
203/// executed at least once after a period of downtime (late boot, daemon
204/// restart, etc.).
205pub fn all_overdue_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
206    with_connection(config, |conn| {
207        let mut stmt = conn.prepare(
208            "SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
209                    enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
210                    allowed_tools, source
211             FROM cron_jobs
212             WHERE enabled = 1 AND next_run <= ?1
213             ORDER BY next_run ASC",
214        )?;
215
216        let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
217
218        let mut jobs = Vec::new();
219        for row in rows {
220            match row {
221                Ok(job) => jobs.push(job),
222                Err(e) => tracing::warn!("Skipping cron job with unparseable row data: {e}"),
223            }
224        }
225        Ok(jobs)
226    })
227}
228
229pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<CronJob> {
230    let mut job = get_job(config, job_id)?;
231    let mut schedule_changed = false;
232
233    if let Some(schedule) = patch.schedule {
234        validate_schedule(&schedule, Utc::now())?;
235        job.schedule = schedule;
236        job.expression = schedule_cron_expression(&job.schedule).unwrap_or_default();
237        schedule_changed = true;
238    }
239    if let Some(command) = patch.command {
240        job.command = command;
241    }
242    if let Some(prompt) = patch.prompt {
243        job.prompt = Some(prompt);
244    }
245    if let Some(name) = patch.name {
246        job.name = Some(name);
247    }
248    if let Some(enabled) = patch.enabled {
249        job.enabled = enabled;
250    }
251    if let Some(delivery) = patch.delivery {
252        job.delivery = delivery;
253    }
254    if let Some(model) = patch.model {
255        job.model = Some(model);
256    }
257    if let Some(target) = patch.session_target {
258        job.session_target = target;
259    }
260    if let Some(delete_after_run) = patch.delete_after_run {
261        job.delete_after_run = delete_after_run;
262    }
263    if let Some(allowed_tools) = patch.allowed_tools {
264        // Empty list means "clear the allowlist" (all tools available),
265        // not "allow zero tools".
266        if allowed_tools.is_empty() {
267            job.allowed_tools = None;
268        } else {
269            job.allowed_tools = Some(allowed_tools);
270        }
271    }
272
273    if schedule_changed {
274        job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?;
275    }
276
277    with_connection(config, |conn| {
278        conn.execute(
279            "UPDATE cron_jobs
280             SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6,
281                 session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11,
282                 allowed_tools = ?12, next_run = ?13
283             WHERE id = ?14",
284            params![
285                job.expression,
286                job.command,
287                serde_json::to_string(&job.schedule)?,
288                <JobType as Into<&str>>::into(job.job_type).to_string(),
289                job.prompt,
290                job.name,
291                job.session_target.as_str(),
292                job.model,
293                if job.enabled { 1 } else { 0 },
294                serde_json::to_string(&job.delivery)?,
295                if job.delete_after_run { 1 } else { 0 },
296                encode_allowed_tools(job.allowed_tools.as_ref())?,
297                job.next_run.to_rfc3339(),
298                job.id,
299            ],
300        )
301        .context("Failed to update cron job")?;
302        Ok(())
303    })?;
304
305    get_job(config, job_id)
306}
307
308pub fn record_last_run(
309    config: &Config,
310    job_id: &str,
311    finished_at: DateTime<Utc>,
312    success: bool,
313    output: &str,
314) -> Result<()> {
315    let status = if success { "ok" } else { "error" };
316    let bounded_output = truncate_cron_output(output);
317    with_connection(config, |conn| {
318        conn.execute(
319            "UPDATE cron_jobs
320             SET last_run = ?1, last_status = ?2, last_output = ?3
321             WHERE id = ?4",
322            params![finished_at.to_rfc3339(), status, bounded_output, job_id],
323        )
324        .context("Failed to update cron last run fields")?;
325        Ok(())
326    })
327}
328
329pub fn reschedule_after_run(
330    config: &Config,
331    job: &CronJob,
332    success: bool,
333    output: &str,
334) -> Result<()> {
335    let now = Utc::now();
336    let status = if success { "ok" } else { "error" };
337    let bounded_output = truncate_cron_output(output);
338
339    // One-shot `At` schedules have no future occurrence — record the run
340    // result and disable the job so it won't be picked up again.
341    if matches!(job.schedule, Schedule::At { .. }) {
342        with_connection(config, |conn| {
343            conn.execute(
344                "UPDATE cron_jobs
345                 SET enabled = 0, last_run = ?1, last_status = ?2, last_output = ?3
346                 WHERE id = ?4",
347                params![now.to_rfc3339(), status, bounded_output, job.id],
348            )
349            .context("Failed to disable completed one-shot cron job")?;
350            Ok(())
351        })
352    } else {
353        let next_run = next_run_for_schedule(&job.schedule, now)?;
354        with_connection(config, |conn| {
355            conn.execute(
356                "UPDATE cron_jobs
357                 SET next_run = ?1, last_run = ?2, last_status = ?3, last_output = ?4
358                 WHERE id = ?5",
359                params![
360                    next_run.to_rfc3339(),
361                    now.to_rfc3339(),
362                    status,
363                    bounded_output,
364                    job.id
365                ],
366            )
367            .context("Failed to update cron job run state")?;
368            Ok(())
369        })
370    }
371}
372
373pub fn record_run(
374    config: &Config,
375    job_id: &str,
376    started_at: DateTime<Utc>,
377    finished_at: DateTime<Utc>,
378    status: &str,
379    output: Option<&str>,
380    duration_ms: i64,
381) -> Result<()> {
382    let bounded_output = output.map(truncate_cron_output);
383    with_connection(config, |conn| {
384        // Wrap INSERT + pruning DELETE in an explicit transaction so that
385        // if the DELETE fails, the INSERT is rolled back and the run table
386        // cannot grow unboundedly.
387        let tx = conn.unchecked_transaction()?;
388
389        tx.execute(
390            "INSERT INTO cron_runs (job_id, started_at, finished_at, status, output, duration_ms)
391             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
392            params![
393                job_id,
394                started_at.to_rfc3339(),
395                finished_at.to_rfc3339(),
396                status,
397                bounded_output.as_deref(),
398                duration_ms,
399            ],
400        )
401        .context("Failed to insert cron run")?;
402
403        let keep = i64::from(config.cron.max_run_history.max(1));
404        tx.execute(
405            "DELETE FROM cron_runs
406             WHERE job_id = ?1
407               AND id NOT IN (
408                 SELECT id FROM cron_runs
409                 WHERE job_id = ?1
410                 ORDER BY started_at DESC, id DESC
411                 LIMIT ?2
412               )",
413            params![job_id, keep],
414        )
415        .context("Failed to prune cron run history")?;
416
417        tx.commit()
418            .context("Failed to commit cron run transaction")?;
419        Ok(())
420    })
421}
422
423fn truncate_cron_output(output: &str) -> String {
424    if output.len() <= MAX_CRON_OUTPUT_BYTES {
425        return output.to_string();
426    }
427
428    if MAX_CRON_OUTPUT_BYTES <= TRUNCATED_OUTPUT_MARKER.len() {
429        return TRUNCATED_OUTPUT_MARKER.to_string();
430    }
431
432    let mut cutoff = MAX_CRON_OUTPUT_BYTES - TRUNCATED_OUTPUT_MARKER.len();
433    while cutoff > 0 && !output.is_char_boundary(cutoff) {
434        cutoff -= 1;
435    }
436
437    let mut truncated = output[..cutoff].to_string();
438    truncated.push_str(TRUNCATED_OUTPUT_MARKER);
439    truncated
440}
441
442pub fn list_runs(config: &Config, job_id: &str, limit: usize) -> Result<Vec<CronRun>> {
443    with_connection(config, |conn| {
444        let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
445        let mut stmt = conn.prepare(
446            "SELECT id, job_id, started_at, finished_at, status, output, duration_ms
447             FROM cron_runs
448             WHERE job_id = ?1
449             ORDER BY started_at DESC, id DESC
450             LIMIT ?2",
451        )?;
452
453        let rows = stmt.query_map(params![job_id, lim], |row| {
454            Ok(CronRun {
455                id: row.get(0)?,
456                job_id: row.get(1)?,
457                started_at: parse_rfc3339(&row.get::<_, String>(2)?)
458                    .map_err(sql_conversion_error)?,
459                finished_at: parse_rfc3339(&row.get::<_, String>(3)?)
460                    .map_err(sql_conversion_error)?,
461                status: row.get(4)?,
462                output: row.get(5)?,
463                duration_ms: row.get(6)?,
464            })
465        })?;
466
467        let mut runs = Vec::new();
468        for row in rows {
469            runs.push(row?);
470        }
471        Ok(runs)
472    })
473}
474
475fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
476    let parsed = DateTime::parse_from_rfc3339(raw)
477        .with_context(|| format!("Invalid RFC3339 timestamp in cron DB: {raw}"))?;
478    Ok(parsed.with_timezone(&Utc))
479}
480
481fn sql_conversion_error(err: anyhow::Error) -> rusqlite::Error {
482    rusqlite::Error::ToSqlConversionFailure(err.into())
483}
484
485fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
486    let expression: String = row.get(1)?;
487    let schedule_raw: Option<String> = row.get(3)?;
488    let schedule =
489        decode_schedule(schedule_raw.as_deref(), &expression).map_err(sql_conversion_error)?;
490
491    let delivery_raw: Option<String> = row.get(10)?;
492    let delivery = decode_delivery(delivery_raw.as_deref()).map_err(sql_conversion_error)?;
493
494    let next_run_raw: String = row.get(13)?;
495    let last_run_raw: Option<String> = row.get(14)?;
496    let created_at_raw: String = row.get(12)?;
497    let allowed_tools_raw: Option<String> = row.get(17)?;
498    let source: Option<String> = row.get(18)?;
499
500    Ok(CronJob {
501        id: row.get(0)?,
502        expression,
503        schedule,
504        command: row.get(2)?,
505        job_type: row.get(4)?,
506        prompt: row.get(5)?,
507        name: row.get(6)?,
508        session_target: SessionTarget::parse(&row.get::<_, String>(7)?),
509        model: row.get(8)?,
510        enabled: row.get::<_, i64>(9)? != 0,
511        delivery,
512        delete_after_run: row.get::<_, i64>(11)? != 0,
513        source: source.unwrap_or_else(|| "imperative".to_string()),
514        created_at: parse_rfc3339(&created_at_raw).map_err(sql_conversion_error)?,
515        next_run: parse_rfc3339(&next_run_raw).map_err(sql_conversion_error)?,
516        last_run: match last_run_raw {
517            Some(raw) => Some(parse_rfc3339(&raw).map_err(sql_conversion_error)?),
518            None => None,
519        },
520        last_status: row.get(15)?,
521        last_output: row.get(16)?,
522        allowed_tools: decode_allowed_tools(allowed_tools_raw.as_deref())
523            .map_err(sql_conversion_error)?,
524    })
525}
526
527fn decode_schedule(schedule_raw: Option<&str>, expression: &str) -> Result<Schedule> {
528    if let Some(raw) = schedule_raw {
529        let trimmed = raw.trim();
530        if !trimmed.is_empty() {
531            return serde_json::from_str(trimmed)
532                .with_context(|| format!("Failed to parse cron schedule JSON: {trimmed}"));
533        }
534    }
535
536    if expression.trim().is_empty() {
537        anyhow::bail!("Missing schedule and legacy expression for cron job")
538    }
539
540    Ok(Schedule::Cron {
541        expr: expression.to_string(),
542        tz: None,
543    })
544}
545
546fn decode_delivery(delivery_raw: Option<&str>) -> Result<DeliveryConfig> {
547    if let Some(raw) = delivery_raw {
548        let trimmed = raw.trim();
549        if !trimmed.is_empty() {
550            return serde_json::from_str(trimmed)
551                .with_context(|| format!("Failed to parse cron delivery JSON: {trimmed}"));
552        }
553    }
554    Ok(DeliveryConfig::default())
555}
556
557fn encode_allowed_tools(allowed_tools: Option<&Vec<String>>) -> Result<Option<String>> {
558    allowed_tools
559        .map(serde_json::to_string)
560        .transpose()
561        .context("Failed to serialize cron allowed_tools")
562}
563
564fn decode_allowed_tools(raw: Option<&str>) -> Result<Option<Vec<String>>> {
565    if let Some(raw) = raw {
566        let trimmed = raw.trim();
567        if !trimmed.is_empty() {
568            return serde_json::from_str(trimmed)
569                .map(Some)
570                .with_context(|| format!("Failed to parse cron allowed_tools JSON: {trimmed}"));
571        }
572    }
573    Ok(None)
574}
575
576/// Synchronize declarative cron job definitions from config into the database.
577///
578/// For each declarative job (identified by `id`):
579/// - If the job exists in DB: update it to match the config definition.
580/// - If the job does not exist: insert it.
581///
582/// Jobs created imperatively (via CLI/API) are never modified or deleted.
583/// Declarative jobs that are no longer present in config are removed.
584pub fn sync_declarative_jobs(
585    config: &Config,
586    decls: &[crate::config::schema::CronJobDecl],
587) -> Result<()> {
588    use crate::config::schema::CronScheduleDecl;
589
590    if decls.is_empty() {
591        // If no declarative jobs are defined, clean up any previously
592        // synced declarative jobs that are no longer in config.
593        with_connection(config, |conn| {
594            let deleted = conn
595                .execute("DELETE FROM cron_jobs WHERE source = 'declarative'", [])
596                .context("Failed to remove stale declarative cron jobs")?;
597            if deleted > 0 {
598                tracing::info!(
599                    count = deleted,
600                    "Removed declarative cron jobs no longer in config"
601                );
602            }
603            Ok(())
604        })?;
605        return Ok(());
606    }
607
608    // Validate declarations before touching the DB.
609    for decl in decls {
610        validate_decl(decl)?;
611    }
612
613    let now = Utc::now();
614
615    with_connection(config, |conn| {
616        // Collect IDs of all declarative jobs currently defined in config.
617        let config_ids: std::collections::HashSet<&str> =
618            decls.iter().map(|d| d.id.as_str()).collect();
619
620        // Remove declarative jobs no longer in config.
621        {
622            let mut stmt = conn.prepare("SELECT id FROM cron_jobs WHERE source = 'declarative'")?;
623            let db_ids: Vec<String> = stmt
624                .query_map([], |row| row.get(0))?
625                .filter_map(|r| r.ok())
626                .collect();
627
628            for db_id in &db_ids {
629                if !config_ids.contains(db_id.as_str()) {
630                    conn.execute("DELETE FROM cron_jobs WHERE id = ?1", params![db_id])
631                        .with_context(|| {
632                            format!("Failed to remove stale declarative cron job '{db_id}'")
633                        })?;
634                    tracing::info!(
635                        job_id = %db_id,
636                        "Removed declarative cron job no longer in config"
637                    );
638                }
639            }
640        }
641
642        for decl in decls {
643            let schedule = convert_schedule_decl(&decl.schedule)?;
644            let expression = schedule_cron_expression(&schedule).unwrap_or_default();
645            let schedule_json = serde_json::to_string(&schedule)?;
646            let job_type = &decl.job_type;
647            let session_target = decl.session_target.as_deref().unwrap_or("isolated");
648            let delivery = match &decl.delivery {
649                Some(d) => convert_delivery_decl(d),
650                None => DeliveryConfig::default(),
651            };
652            let delivery_json = serde_json::to_string(&delivery)?;
653            let allowed_tools_json = encode_allowed_tools(decl.allowed_tools.as_ref())?;
654            let command = decl.command.as_deref().unwrap_or("");
655            let delete_after_run = matches!(decl.schedule, CronScheduleDecl::At { .. });
656
657            // Check if job already exists.
658            let exists: bool = conn
659                .prepare("SELECT COUNT(*) FROM cron_jobs WHERE id = ?1")?
660                .query_row(params![decl.id], |row| row.get::<_, i64>(0))
661                .map(|c| c > 0)
662                .unwrap_or(false);
663
664            if exists {
665                // Update existing declarative job — preserve runtime state
666                // (next_run, last_run, last_status, last_output, created_at).
667                // Only update the schedule's next_run if the schedule itself changed.
668                let current_schedule_raw: Option<String> = conn
669                    .prepare("SELECT schedule FROM cron_jobs WHERE id = ?1")?
670                    .query_row(params![decl.id], |row| row.get(0))
671                    .ok();
672
673                let schedule_changed = current_schedule_raw.as_deref() != Some(&schedule_json);
674
675                if schedule_changed {
676                    let next_run = next_run_for_schedule(&schedule, now)?;
677                    conn.execute(
678                        "UPDATE cron_jobs
679                         SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
680                             prompt = ?5, name = ?6, session_target = ?7, model = ?8,
681                             enabled = ?9, delivery = ?10, delete_after_run = ?11,
682                             allowed_tools = ?12, source = 'declarative', next_run = ?13
683                         WHERE id = ?14",
684                        params![
685                            expression,
686                            command,
687                            schedule_json,
688                            job_type,
689                            decl.prompt,
690                            decl.name,
691                            session_target,
692                            decl.model,
693                            if decl.enabled { 1 } else { 0 },
694                            delivery_json,
695                            if delete_after_run { 1 } else { 0 },
696                            allowed_tools_json,
697                            next_run.to_rfc3339(),
698                            decl.id,
699                        ],
700                    )
701                    .with_context(|| {
702                        format!("Failed to update declarative cron job '{}'", decl.id)
703                    })?;
704                } else {
705                    conn.execute(
706                        "UPDATE cron_jobs
707                         SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4,
708                             prompt = ?5, name = ?6, session_target = ?7, model = ?8,
709                             enabled = ?9, delivery = ?10, delete_after_run = ?11,
710                             allowed_tools = ?12, source = 'declarative'
711                         WHERE id = ?13",
712                        params![
713                            expression,
714                            command,
715                            schedule_json,
716                            job_type,
717                            decl.prompt,
718                            decl.name,
719                            session_target,
720                            decl.model,
721                            if decl.enabled { 1 } else { 0 },
722                            delivery_json,
723                            if delete_after_run { 1 } else { 0 },
724                            allowed_tools_json,
725                            decl.id,
726                        ],
727                    )
728                    .with_context(|| {
729                        format!("Failed to update declarative cron job '{}'", decl.id)
730                    })?;
731                }
732
733                tracing::debug!(job_id = %decl.id, "Updated declarative cron job");
734            } else {
735                // Insert new declarative job.
736                let next_run = next_run_for_schedule(&schedule, now)?;
737                conn.execute(
738                    "INSERT INTO cron_jobs (
739                        id, expression, command, schedule, job_type, prompt, name,
740                        session_target, model, enabled, delivery, delete_after_run,
741                        allowed_tools, source, created_at, next_run
742                     ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, 'declarative', ?14, ?15)",
743                    params![
744                        decl.id,
745                        expression,
746                        command,
747                        schedule_json,
748                        job_type,
749                        decl.prompt,
750                        decl.name,
751                        session_target,
752                        decl.model,
753                        if decl.enabled { 1 } else { 0 },
754                        delivery_json,
755                        if delete_after_run { 1 } else { 0 },
756                        allowed_tools_json,
757                        now.to_rfc3339(),
758                        next_run.to_rfc3339(),
759                    ],
760                )
761                .with_context(|| {
762                    format!(
763                        "Failed to insert declarative cron job '{}'",
764                        decl.id
765                    )
766                })?;
767
768                tracing::info!(job_id = %decl.id, "Inserted declarative cron job from config");
769            }
770        }
771
772        Ok(())
773    })
774}
775
776/// Sanitize a string into a URL/ID-safe slug.
777fn slug(s: &str) -> String {
778    s.chars()
779        .map(|c| if c.is_alphanumeric() { c } else { '_' })
780        .collect::<String>()
781        .to_lowercase()
782}
783
784/// Reconcile cron jobs sourced from workflow YAML triggers.
785///
786/// Each entry is `(workflow_name, cron_expression, optional_tz)`.
787/// Jobs use `source = 'workflow'` and deterministic IDs so they survive
788/// across restarts without duplication.
789pub fn sync_workflow_cron_jobs(
790    config: &Config,
791    workflows: &[(String, String, Option<String>)], // (name, cron_expr, tz)
792) -> Result<()> {
793    if workflows.is_empty() {
794        // If no workflow triggers are defined, clean up any stale workflow jobs.
795        with_connection(config, |conn| {
796            let deleted = conn
797                .execute("DELETE FROM cron_jobs WHERE source = 'workflow'", [])
798                .context("Failed to remove stale workflow cron jobs")?;
799            if deleted > 0 {
800                tracing::info!(
801                    count = deleted,
802                    "Removed workflow cron jobs no longer present"
803                );
804            }
805            Ok(())
806        })?;
807        return Ok(());
808    }
809
810    let now = Utc::now();
811
812    with_connection(config, |conn| {
813        // Build the set of expected job IDs.
814        let expected_ids: Vec<String> = workflows
815            .iter()
816            .enumerate()
817            .map(|(idx, (name, _expr, _tz))| format!("__wf_cron_{}_{}", slug(name), idx))
818            .collect();
819
820        // Remove stale workflow jobs no longer in the set.
821        {
822            let mut stmt = conn.prepare("SELECT id FROM cron_jobs WHERE source = 'workflow'")?;
823            let db_ids: Vec<String> = stmt
824                .query_map([], |row| row.get(0))?
825                .filter_map(|r| r.ok())
826                .collect();
827
828            for old_id in &db_ids {
829                if !expected_ids.contains(old_id) {
830                    conn.execute(
831                        "DELETE FROM cron_jobs WHERE id = ?1 AND source = 'workflow'",
832                        params![old_id],
833                    )
834                    .with_context(|| {
835                        format!("Failed to remove stale workflow cron job '{old_id}'")
836                    })?;
837                    tracing::info!(
838                        job_id = %old_id,
839                        "Removed stale workflow cron job"
840                    );
841                }
842            }
843        }
844
845        // Upsert each workflow cron trigger.
846        for (idx, (name, expr, tz)) in workflows.iter().enumerate() {
847            let job_id = format!("__wf_cron_{}_{}", slug(name), idx);
848
849            // Parse the schedule.
850            let schedule = Schedule::Cron {
851                expr: expr.clone(),
852                tz: tz.clone(),
853            };
854            let schedule_json = serde_json::to_string(&schedule)?;
855
856            // Calculate next run.
857            let next_run = match next_run_for_schedule(&schedule, now) {
858                Ok(t) => t,
859                Err(e) => {
860                    tracing::warn!(
861                        workflow = %name,
862                        expr = %expr,
863                        "Invalid cron expr for workflow trigger, skipping: {e}"
864                    );
865                    continue;
866                }
867            };
868
869            let exists: bool = conn
870                .prepare("SELECT COUNT(*) FROM cron_jobs WHERE id = ?1")?
871                .query_row(params![job_id], |row| row.get::<_, i64>(0))
872                .map(|c| c > 0)
873                .unwrap_or(false);
874
875            if exists {
876                // Update schedule if changed, preserve runtime state.
877                let current_schedule_raw: Option<String> = conn
878                    .prepare("SELECT schedule FROM cron_jobs WHERE id = ?1")?
879                    .query_row(params![job_id], |row| row.get(0))
880                    .ok();
881
882                let schedule_changed = current_schedule_raw.as_deref() != Some(&schedule_json);
883
884                if schedule_changed {
885                    conn.execute(
886                        "UPDATE cron_jobs
887                         SET expression = ?1, schedule = ?2, next_run = ?3,
888                             command = ?4, name = ?5, source = 'workflow'
889                         WHERE id = ?6 AND source = 'workflow'",
890                        params![
891                            expr,
892                            schedule_json,
893                            next_run.to_rfc3339(),
894                            name,
895                            format!("Workflow: {name}"),
896                            job_id,
897                        ],
898                    )
899                    .with_context(|| format!("Failed to update workflow cron job '{job_id}'"))?;
900                } else {
901                    conn.execute(
902                        "UPDATE cron_jobs
903                         SET command = ?1, name = ?2, source = 'workflow'
904                         WHERE id = ?3 AND source = 'workflow'",
905                        params![name, format!("Workflow: {name}"), job_id],
906                    )
907                    .with_context(|| format!("Failed to update workflow cron job '{job_id}'"))?;
908                }
909
910                tracing::debug!(
911                    job_id = %job_id,
912                    expr = %expr,
913                    "Updated workflow cron job"
914                );
915            } else {
916                // Insert new workflow cron job.
917                conn.execute(
918                    "INSERT INTO cron_jobs (
919                        id, expression, command, schedule, job_type, name,
920                        enabled, source, created_at, next_run,
921                        session_target, delete_after_run
922                     ) VALUES (?1, ?2, ?3, ?4, 'workflow', ?5, 1, 'workflow', ?6, ?7, 'isolated', 0)",
923                    params![
924                        job_id,
925                        expr,
926                        name,
927                        schedule_json,
928                        format!("Workflow: {name}"),
929                        now.to_rfc3339(),
930                        next_run.to_rfc3339(),
931                    ],
932                )
933                .with_context(|| {
934                    format!("Failed to insert workflow cron job '{job_id}' for {name}")
935                })?;
936
937                tracing::info!(
938                    job_id = %job_id,
939                    workflow = %name,
940                    expr = %expr,
941                    "Inserted workflow cron job"
942                );
943            }
944        }
945
946        Ok(())
947    })
948}
949
950/// Remove all cron jobs sourced from a single workflow.
951///
952/// Deletes jobs whose `source = 'workflow'` and whose `id` starts with the
953/// deterministic prefix `__wf_cron_{slug}_`.
954pub fn remove_workflow_cron_jobs(config: &Config, workflow_name: &str) -> Result<()> {
955    let prefix = format!("__wf_cron_{}_%", slug(workflow_name));
956    with_connection(config, |conn| {
957        let deleted = conn
958            .execute(
959                "DELETE FROM cron_jobs WHERE source = 'workflow' AND id LIKE ?1",
960                params![prefix],
961            )
962            .with_context(|| {
963                format!("Failed to remove workflow cron jobs for '{workflow_name}'")
964            })?;
965        if deleted > 0 {
966            tracing::info!(
967                workflow = %workflow_name,
968                count = deleted,
969                "Removed cron jobs for workflow"
970            );
971        }
972        Ok(())
973    })
974}
975
976/// Validate a declarative cron job definition.
977fn validate_decl(decl: &crate::config::schema::CronJobDecl) -> Result<()> {
978    if decl.id.trim().is_empty() {
979        anyhow::bail!("Declarative cron job has empty id");
980    }
981
982    match decl.job_type.to_lowercase().as_str() {
983        "shell" => {
984            if decl
985                .command
986                .as_deref()
987                .map_or(true, |c| c.trim().is_empty())
988            {
989                anyhow::bail!(
990                    "Declarative cron job '{}': shell job requires a non-empty 'command'",
991                    decl.id
992                );
993            }
994        }
995        "agent" => {
996            if decl.prompt.as_deref().map_or(true, |p| p.trim().is_empty()) {
997                anyhow::bail!(
998                    "Declarative cron job '{}': agent job requires a non-empty 'prompt'",
999                    decl.id
1000                );
1001            }
1002        }
1003        "workflow" => {
1004            // Workflow jobs use the command field as the workflow name;
1005            // validation of the actual workflow definition happens at trigger time.
1006            if decl
1007                .command
1008                .as_deref()
1009                .map_or(true, |c| c.trim().is_empty())
1010            {
1011                anyhow::bail!(
1012                    "Declarative cron job '{}': workflow job requires a non-empty 'command' (workflow name)",
1013                    decl.id
1014                );
1015            }
1016        }
1017        other => {
1018            anyhow::bail!(
1019                "Declarative cron job '{}': invalid job_type '{}', expected 'shell', 'agent', or 'workflow'",
1020                decl.id,
1021                other
1022            );
1023        }
1024    }
1025
1026    Ok(())
1027}
1028
1029/// Convert a `CronScheduleDecl` to the runtime `Schedule` type.
1030fn convert_schedule_decl(decl: &crate::config::schema::CronScheduleDecl) -> Result<Schedule> {
1031    use crate::config::schema::CronScheduleDecl;
1032    match decl {
1033        CronScheduleDecl::Cron { expr, tz } => Ok(Schedule::Cron {
1034            expr: expr.clone(),
1035            tz: tz.clone(),
1036        }),
1037        CronScheduleDecl::Every { every_ms } => Ok(Schedule::Every {
1038            every_ms: *every_ms,
1039        }),
1040        CronScheduleDecl::At { at } => {
1041            let parsed = DateTime::parse_from_rfc3339(at)
1042                .with_context(|| {
1043                    format!("Invalid RFC3339 timestamp in declarative cron 'at': {at}")
1044                })?
1045                .with_timezone(&Utc);
1046            Ok(Schedule::At { at: parsed })
1047        }
1048    }
1049}
1050
1051/// Convert a `DeliveryConfigDecl` to the runtime `DeliveryConfig`.
1052fn convert_delivery_decl(decl: &crate::config::schema::DeliveryConfigDecl) -> DeliveryConfig {
1053    DeliveryConfig {
1054        mode: decl.mode.clone(),
1055        channel: decl.channel.clone(),
1056        to: decl.to.clone(),
1057        best_effort: decl.best_effort,
1058    }
1059}
1060
1061fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> {
1062    let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?;
1063    let mut rows = stmt.query([])?;
1064    while let Some(row) = rows.next()? {
1065        let col_name: String = row.get(1)?;
1066        if col_name == name {
1067            return Ok(());
1068        }
1069    }
1070    // Drop the statement/rows before executing ALTER to release any locks
1071    drop(rows);
1072    drop(stmt);
1073
1074    // Tolerate "duplicate column name" errors to handle the race where
1075    // another process adds the column between our PRAGMA check and ALTER.
1076    match conn.execute(
1077        &format!("ALTER TABLE cron_jobs ADD COLUMN {name} {sql_type}"),
1078        [],
1079    ) {
1080        Ok(_) => Ok(()),
1081        Err(rusqlite::Error::SqliteFailure(err, Some(ref msg)))
1082            if msg.contains("duplicate column name") =>
1083        {
1084            tracing::debug!("Column cron_jobs.{name} already exists (concurrent migration): {err}");
1085            Ok(())
1086        }
1087        Err(e) => Err(e).with_context(|| format!("Failed to add cron_jobs.{name}")),
1088    }
1089}
1090
1091fn with_connection<T>(config: &Config, f: impl FnOnce(&Connection) -> Result<T>) -> Result<T> {
1092    let db_path = config.workspace_dir.join("cron").join("jobs.db");
1093    if let Some(parent) = db_path.parent() {
1094        std::fs::create_dir_all(parent)
1095            .with_context(|| format!("Failed to create cron directory: {}", parent.display()))?;
1096    }
1097
1098    let conn = Connection::open(&db_path)
1099        .with_context(|| format!("Failed to open cron DB: {}", db_path.display()))?;
1100
1101    conn.execute_batch(
1102        "PRAGMA foreign_keys = ON;
1103         CREATE TABLE IF NOT EXISTS cron_jobs (
1104            id               TEXT PRIMARY KEY,
1105            expression       TEXT NOT NULL,
1106            command          TEXT NOT NULL,
1107            schedule         TEXT,
1108            job_type         TEXT NOT NULL DEFAULT 'shell',
1109            prompt           TEXT,
1110            name             TEXT,
1111            session_target   TEXT NOT NULL DEFAULT 'isolated',
1112            model            TEXT,
1113            enabled          INTEGER NOT NULL DEFAULT 1,
1114            delivery         TEXT,
1115            delete_after_run INTEGER NOT NULL DEFAULT 0,
1116            allowed_tools    TEXT,
1117            created_at       TEXT NOT NULL,
1118            next_run         TEXT NOT NULL,
1119            last_run         TEXT,
1120            last_status      TEXT,
1121            last_output      TEXT
1122        );
1123        CREATE INDEX IF NOT EXISTS idx_cron_jobs_next_run ON cron_jobs(next_run);
1124
1125        CREATE TABLE IF NOT EXISTS cron_runs (
1126            id          INTEGER PRIMARY KEY AUTOINCREMENT,
1127            job_id      TEXT NOT NULL,
1128            started_at  TEXT NOT NULL,
1129            finished_at TEXT NOT NULL,
1130            status      TEXT NOT NULL,
1131            output      TEXT,
1132            duration_ms INTEGER,
1133            FOREIGN KEY (job_id) REFERENCES cron_jobs(id) ON DELETE CASCADE
1134        );
1135        CREATE INDEX IF NOT EXISTS idx_cron_runs_job_id ON cron_runs(job_id);
1136        CREATE INDEX IF NOT EXISTS idx_cron_runs_started_at ON cron_runs(started_at);
1137        CREATE INDEX IF NOT EXISTS idx_cron_runs_job_started ON cron_runs(job_id, started_at);",
1138    )
1139    .context("Failed to initialize cron schema")?;
1140
1141    add_column_if_missing(&conn, "schedule", "TEXT")?;
1142    add_column_if_missing(&conn, "job_type", "TEXT NOT NULL DEFAULT 'shell'")?;
1143    add_column_if_missing(&conn, "prompt", "TEXT")?;
1144    add_column_if_missing(&conn, "name", "TEXT")?;
1145    add_column_if_missing(&conn, "session_target", "TEXT NOT NULL DEFAULT 'isolated'")?;
1146    add_column_if_missing(&conn, "model", "TEXT")?;
1147    add_column_if_missing(&conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?;
1148    add_column_if_missing(&conn, "delivery", "TEXT")?;
1149    add_column_if_missing(&conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?;
1150    add_column_if_missing(&conn, "allowed_tools", "TEXT")?;
1151    add_column_if_missing(&conn, "source", "TEXT DEFAULT 'imperative'")?;
1152
1153    f(&conn)
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158    use super::*;
1159    use crate::config::Config;
1160    use chrono::Duration as ChronoDuration;
1161    use tempfile::TempDir;
1162
1163    fn test_config(tmp: &TempDir) -> Config {
1164        let config = Config {
1165            workspace_dir: tmp.path().join("workspace"),
1166            config_path: tmp.path().join("config.toml"),
1167            ..Config::default()
1168        };
1169        std::fs::create_dir_all(&config.workspace_dir).unwrap();
1170        config
1171    }
1172
1173    #[test]
1174    fn add_job_accepts_five_field_expression() {
1175        let tmp = TempDir::new().unwrap();
1176        let config = test_config(&tmp);
1177
1178        let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap();
1179        assert_eq!(job.expression, "*/5 * * * *");
1180        assert_eq!(job.command, "echo ok");
1181        assert!(matches!(job.schedule, Schedule::Cron { .. }));
1182    }
1183
1184    #[test]
1185    fn add_shell_job_marks_at_schedule_for_auto_delete() {
1186        let tmp = TempDir::new().unwrap();
1187        let config = test_config(&tmp);
1188
1189        let one_shot = add_shell_job(
1190            &config,
1191            None,
1192            Schedule::At {
1193                at: Utc::now() + ChronoDuration::minutes(10),
1194            },
1195            "echo once",
1196            None,
1197        )
1198        .unwrap();
1199        assert!(one_shot.delete_after_run);
1200
1201        let recurring = add_shell_job(
1202            &config,
1203            None,
1204            Schedule::Every { every_ms: 60_000 },
1205            "echo recurring",
1206            None,
1207        )
1208        .unwrap();
1209        assert!(!recurring.delete_after_run);
1210    }
1211
1212    #[test]
1213    fn add_shell_job_persists_delivery() {
1214        let tmp = TempDir::new().unwrap();
1215        let config = test_config(&tmp);
1216
1217        let job = add_shell_job(
1218            &config,
1219            Some("deliver-shell".into()),
1220            Schedule::Cron {
1221                expr: "*/5 * * * *".into(),
1222                tz: None,
1223            },
1224            "echo delivered",
1225            Some(DeliveryConfig {
1226                mode: "announce".into(),
1227                channel: Some("discord".into()),
1228                to: Some("1234567890".into()),
1229                best_effort: true,
1230            }),
1231        )
1232        .unwrap();
1233
1234        assert_eq!(job.delivery.mode, "announce");
1235        assert_eq!(job.delivery.channel.as_deref(), Some("discord"));
1236        assert_eq!(job.delivery.to.as_deref(), Some("1234567890"));
1237
1238        let stored = get_job(&config, &job.id).unwrap();
1239        assert_eq!(stored.delivery.mode, "announce");
1240        assert_eq!(stored.delivery.channel.as_deref(), Some("discord"));
1241        assert_eq!(stored.delivery.to.as_deref(), Some("1234567890"));
1242    }
1243
1244    #[test]
1245    fn add_agent_job_rejects_invalid_announce_delivery() {
1246        let tmp = TempDir::new().unwrap();
1247        let config = test_config(&tmp);
1248
1249        let err = add_agent_job(
1250            &config,
1251            Some("deliver-agent".into()),
1252            Schedule::Cron {
1253                expr: "*/5 * * * *".into(),
1254                tz: None,
1255            },
1256            "summarize logs",
1257            SessionTarget::Isolated,
1258            None,
1259            Some(DeliveryConfig {
1260                mode: "announce".into(),
1261                channel: Some("discord".into()),
1262                to: None,
1263                best_effort: true,
1264            }),
1265            false,
1266            None,
1267        )
1268        .unwrap_err();
1269
1270        assert!(err.to_string().contains("delivery.to is required"));
1271    }
1272
1273    #[test]
1274    fn add_shell_job_rejects_invalid_delivery_mode() {
1275        let tmp = TempDir::new().unwrap();
1276        let config = test_config(&tmp);
1277
1278        let err = add_shell_job(
1279            &config,
1280            Some("deliver-shell".into()),
1281            Schedule::Cron {
1282                expr: "*/5 * * * *".into(),
1283                tz: None,
1284            },
1285            "echo delivered",
1286            Some(DeliveryConfig {
1287                mode: "annouce".into(),
1288                channel: Some("discord".into()),
1289                to: Some("1234567890".into()),
1290                best_effort: true,
1291            }),
1292        )
1293        .unwrap_err();
1294
1295        assert!(err.to_string().contains("unsupported delivery mode"));
1296    }
1297
1298    #[test]
1299    fn add_list_remove_roundtrip() {
1300        let tmp = TempDir::new().unwrap();
1301        let config = test_config(&tmp);
1302
1303        let job = add_job(&config, "*/10 * * * *", "echo roundtrip").unwrap();
1304        let listed = list_jobs(&config).unwrap();
1305        assert_eq!(listed.len(), 1);
1306        assert_eq!(listed[0].id, job.id);
1307
1308        remove_job(&config, &job.id).unwrap();
1309        assert!(list_jobs(&config).unwrap().is_empty());
1310    }
1311
1312    #[test]
1313    fn due_jobs_filters_by_timestamp_and_enabled() {
1314        let tmp = TempDir::new().unwrap();
1315        let config = test_config(&tmp);
1316
1317        let job = add_job(&config, "* * * * *", "echo due").unwrap();
1318
1319        let due_now = due_jobs(&config, Utc::now()).unwrap();
1320        assert!(due_now.is_empty(), "new job should not be due immediately");
1321
1322        let far_future = Utc::now() + ChronoDuration::days(365);
1323        let due_future = due_jobs(&config, far_future).unwrap();
1324        assert_eq!(due_future.len(), 1, "job should be due in far future");
1325
1326        let _ = update_job(
1327            &config,
1328            &job.id,
1329            CronJobPatch {
1330                enabled: Some(false),
1331                ..CronJobPatch::default()
1332            },
1333        )
1334        .unwrap();
1335        let due_after_disable = due_jobs(&config, far_future).unwrap();
1336        assert!(due_after_disable.is_empty());
1337    }
1338
1339    #[test]
1340    fn due_jobs_respects_scheduler_max_tasks_limit() {
1341        let tmp = TempDir::new().unwrap();
1342        let mut config = test_config(&tmp);
1343        config.scheduler.max_tasks = 2;
1344
1345        let _ = add_job(&config, "* * * * *", "echo due-1").unwrap();
1346        let _ = add_job(&config, "* * * * *", "echo due-2").unwrap();
1347        let _ = add_job(&config, "* * * * *", "echo due-3").unwrap();
1348
1349        let far_future = Utc::now() + ChronoDuration::days(365);
1350        let due = due_jobs(&config, far_future).unwrap();
1351        assert_eq!(due.len(), 2);
1352    }
1353
1354    #[test]
1355    fn all_overdue_jobs_ignores_max_tasks_limit() {
1356        let tmp = TempDir::new().unwrap();
1357        let mut config = test_config(&tmp);
1358        config.scheduler.max_tasks = 2;
1359
1360        let _ = add_job(&config, "* * * * *", "echo ov-1").unwrap();
1361        let _ = add_job(&config, "* * * * *", "echo ov-2").unwrap();
1362        let _ = add_job(&config, "* * * * *", "echo ov-3").unwrap();
1363
1364        let far_future = Utc::now() + ChronoDuration::days(365);
1365        // due_jobs respects the limit
1366        let due = due_jobs(&config, far_future).unwrap();
1367        assert_eq!(due.len(), 2);
1368        // all_overdue_jobs returns everything
1369        let overdue = all_overdue_jobs(&config, far_future).unwrap();
1370        assert_eq!(overdue.len(), 3);
1371    }
1372
1373    #[test]
1374    fn all_overdue_jobs_excludes_disabled_jobs() {
1375        let tmp = TempDir::new().unwrap();
1376        let config = test_config(&tmp);
1377
1378        let job = add_job(&config, "* * * * *", "echo disabled").unwrap();
1379        let _ = update_job(
1380            &config,
1381            &job.id,
1382            CronJobPatch {
1383                enabled: Some(false),
1384                ..CronJobPatch::default()
1385            },
1386        )
1387        .unwrap();
1388
1389        let far_future = Utc::now() + ChronoDuration::days(365);
1390        let overdue = all_overdue_jobs(&config, far_future).unwrap();
1391        assert!(overdue.is_empty());
1392    }
1393
1394    #[test]
1395    fn add_agent_job_persists_allowed_tools() {
1396        let tmp = TempDir::new().unwrap();
1397        let config = test_config(&tmp);
1398
1399        let job = add_agent_job(
1400            &config,
1401            Some("agent".into()),
1402            Schedule::Every { every_ms: 60_000 },
1403            "do work",
1404            SessionTarget::Isolated,
1405            None,
1406            None,
1407            false,
1408            Some(vec!["file_read".into(), "web_search".into()]),
1409        )
1410        .unwrap();
1411
1412        assert_eq!(
1413            job.allowed_tools,
1414            Some(vec!["file_read".into(), "web_search".into()])
1415        );
1416
1417        let stored = get_job(&config, &job.id).unwrap();
1418        assert_eq!(stored.allowed_tools, job.allowed_tools);
1419    }
1420
1421    #[test]
1422    fn update_job_persists_allowed_tools_patch() {
1423        let tmp = TempDir::new().unwrap();
1424        let config = test_config(&tmp);
1425
1426        let job = add_agent_job(
1427            &config,
1428            Some("agent".into()),
1429            Schedule::Every { every_ms: 60_000 },
1430            "do work",
1431            SessionTarget::Isolated,
1432            None,
1433            None,
1434            false,
1435            None,
1436        )
1437        .unwrap();
1438
1439        let updated = update_job(
1440            &config,
1441            &job.id,
1442            CronJobPatch {
1443                allowed_tools: Some(vec!["shell".into()]),
1444                ..CronJobPatch::default()
1445            },
1446        )
1447        .unwrap();
1448
1449        assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
1450        assert_eq!(
1451            get_job(&config, &job.id).unwrap().allowed_tools,
1452            Some(vec!["shell".into()])
1453        );
1454    }
1455
1456    #[test]
1457    fn reschedule_after_run_persists_last_status_and_last_run() {
1458        let tmp = TempDir::new().unwrap();
1459        let config = test_config(&tmp);
1460
1461        let job = add_job(&config, "*/15 * * * *", "echo run").unwrap();
1462        reschedule_after_run(&config, &job, false, "failed output").unwrap();
1463
1464        let listed = list_jobs(&config).unwrap();
1465        let stored = listed.iter().find(|j| j.id == job.id).unwrap();
1466        assert_eq!(stored.last_status.as_deref(), Some("error"));
1467        assert!(stored.last_run.is_some());
1468        assert_eq!(stored.last_output.as_deref(), Some("failed output"));
1469    }
1470
1471    #[test]
1472    fn job_type_from_sql_reads_valid_value() {
1473        let tmp = TempDir::new().unwrap();
1474        let config = test_config(&tmp);
1475        let now = Utc::now();
1476
1477        with_connection(&config, |conn| {
1478            conn.execute(
1479                "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1480                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1481                params![
1482                    "job-type-valid",
1483                    "*/5 * * * *",
1484                    "echo ok",
1485                    Option::<String>::None,
1486                    "agent",
1487                    now.to_rfc3339(),
1488                    (now + ChronoDuration::minutes(5)).to_rfc3339(),
1489                ],
1490            )?;
1491            Ok(())
1492        })
1493        .unwrap();
1494
1495        let job = get_job(&config, "job-type-valid").unwrap();
1496        assert_eq!(job.job_type, JobType::Agent);
1497    }
1498
1499    #[test]
1500    fn job_type_from_sql_rejects_invalid_value() {
1501        let tmp = TempDir::new().unwrap();
1502        let config = test_config(&tmp);
1503        let now = Utc::now();
1504
1505        with_connection(&config, |conn| {
1506            conn.execute(
1507                "INSERT INTO cron_jobs (id, expression, command, schedule, job_type, created_at, next_run)
1508                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1509                params![
1510                    "job-type-invalid",
1511                    "*/5 * * * *",
1512                    "echo ok",
1513                    Option::<String>::None,
1514                    "unknown",
1515                    now.to_rfc3339(),
1516                    (now + ChronoDuration::minutes(5)).to_rfc3339(),
1517                ],
1518            )?;
1519            Ok(())
1520        })
1521        .unwrap();
1522
1523        assert!(get_job(&config, "job-type-invalid").is_err());
1524    }
1525
1526    #[test]
1527    fn migration_falls_back_to_legacy_expression() {
1528        let tmp = TempDir::new().unwrap();
1529        let config = test_config(&tmp);
1530
1531        with_connection(&config, |conn| {
1532            conn.execute(
1533                "INSERT INTO cron_jobs (id, expression, command, created_at, next_run)
1534                 VALUES (?1, ?2, ?3, ?4, ?5)",
1535                params![
1536                    "legacy-id",
1537                    "*/5 * * * *",
1538                    "echo legacy",
1539                    Utc::now().to_rfc3339(),
1540                    (Utc::now() + ChronoDuration::minutes(5)).to_rfc3339(),
1541                ],
1542            )?;
1543            conn.execute(
1544                "UPDATE cron_jobs SET schedule = NULL WHERE id = 'legacy-id'",
1545                [],
1546            )?;
1547            Ok(())
1548        })
1549        .unwrap();
1550
1551        let job = get_job(&config, "legacy-id").unwrap();
1552        assert!(matches!(job.schedule, Schedule::Cron { .. }));
1553    }
1554
1555    #[test]
1556    fn record_and_prune_runs() {
1557        let tmp = TempDir::new().unwrap();
1558        let mut config = test_config(&tmp);
1559        config.cron.max_run_history = 2;
1560        let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap();
1561        let base = Utc::now();
1562
1563        for idx in 0..3 {
1564            let start = base + ChronoDuration::seconds(idx);
1565            let end = start + ChronoDuration::milliseconds(100);
1566            record_run(&config, &job.id, start, end, "ok", Some("done"), 100).unwrap();
1567        }
1568
1569        let runs = list_runs(&config, &job.id, 10).unwrap();
1570        assert_eq!(runs.len(), 2);
1571    }
1572
1573    #[test]
1574    fn remove_job_cascades_run_history() {
1575        let tmp = TempDir::new().unwrap();
1576        let config = test_config(&tmp);
1577        let job = add_job(&config, "*/5 * * * *", "echo ok").unwrap();
1578        let start = Utc::now();
1579        record_run(
1580            &config,
1581            &job.id,
1582            start,
1583            start + ChronoDuration::milliseconds(5),
1584            "ok",
1585            Some("ok"),
1586            5,
1587        )
1588        .unwrap();
1589
1590        remove_job(&config, &job.id).unwrap();
1591        let runs = list_runs(&config, &job.id, 10).unwrap();
1592        assert!(runs.is_empty());
1593    }
1594
1595    #[test]
1596    fn record_run_truncates_large_output() {
1597        let tmp = TempDir::new().unwrap();
1598        let config = test_config(&tmp);
1599        let job = add_job(&config, "*/5 * * * *", "echo trunc").unwrap();
1600        let output = "x".repeat(MAX_CRON_OUTPUT_BYTES + 512);
1601
1602        record_run(
1603            &config,
1604            &job.id,
1605            Utc::now(),
1606            Utc::now(),
1607            "ok",
1608            Some(&output),
1609            1,
1610        )
1611        .unwrap();
1612
1613        let runs = list_runs(&config, &job.id, 1).unwrap();
1614        let stored = runs[0].output.as_deref().unwrap_or_default();
1615        assert!(stored.ends_with(TRUNCATED_OUTPUT_MARKER));
1616        assert!(stored.len() <= MAX_CRON_OUTPUT_BYTES);
1617    }
1618
1619    #[test]
1620    fn reschedule_after_run_disables_at_schedule_job() {
1621        let tmp = TempDir::new().unwrap();
1622        let config = test_config(&tmp);
1623        let at = Utc::now() + ChronoDuration::minutes(10);
1624        let job = add_shell_job(&config, None, Schedule::At { at }, "echo once", None).unwrap();
1625
1626        reschedule_after_run(&config, &job, true, "done").unwrap();
1627
1628        let stored = get_job(&config, &job.id).unwrap();
1629        assert!(
1630            !stored.enabled,
1631            "At schedule job should be disabled after reschedule"
1632        );
1633        assert_eq!(stored.last_status.as_deref(), Some("ok"));
1634    }
1635
1636    #[test]
1637    fn reschedule_after_run_disables_at_schedule_job_on_failure() {
1638        let tmp = TempDir::new().unwrap();
1639        let config = test_config(&tmp);
1640        let at = Utc::now() + ChronoDuration::minutes(10);
1641        let job = add_shell_job(&config, None, Schedule::At { at }, "echo once", None).unwrap();
1642
1643        reschedule_after_run(&config, &job, false, "failed").unwrap();
1644
1645        let stored = get_job(&config, &job.id).unwrap();
1646        assert!(
1647            !stored.enabled,
1648            "At schedule job should be disabled after reschedule even on failure"
1649        );
1650        assert_eq!(stored.last_status.as_deref(), Some("error"));
1651        assert_eq!(stored.last_output.as_deref(), Some("failed"));
1652    }
1653
1654    #[test]
1655    fn reschedule_after_run_truncates_last_output() {
1656        let tmp = TempDir::new().unwrap();
1657        let config = test_config(&tmp);
1658        let job = add_job(&config, "*/5 * * * *", "echo trunc").unwrap();
1659        let output = "y".repeat(MAX_CRON_OUTPUT_BYTES + 1024);
1660
1661        reschedule_after_run(&config, &job, false, &output).unwrap();
1662
1663        let stored = get_job(&config, &job.id).unwrap();
1664        let last_output = stored.last_output.as_deref().unwrap_or_default();
1665        assert!(last_output.ends_with(TRUNCATED_OUTPUT_MARKER));
1666        assert!(last_output.len() <= MAX_CRON_OUTPUT_BYTES);
1667    }
1668
1669    // ── Declarative cron job sync tests ──────────────────────────
1670
1671    fn make_shell_decl(id: &str, expr: &str, cmd: &str) -> crate::config::schema::CronJobDecl {
1672        crate::config::schema::CronJobDecl {
1673            id: id.to_string(),
1674            name: Some(format!("decl-{id}")),
1675            job_type: "shell".to_string(),
1676            schedule: crate::config::schema::CronScheduleDecl::Cron {
1677                expr: expr.to_string(),
1678                tz: None,
1679            },
1680            command: Some(cmd.to_string()),
1681            prompt: None,
1682            enabled: true,
1683            model: None,
1684            allowed_tools: None,
1685            session_target: None,
1686            delivery: None,
1687        }
1688    }
1689
1690    fn make_agent_decl(id: &str, expr: &str, prompt: &str) -> crate::config::schema::CronJobDecl {
1691        crate::config::schema::CronJobDecl {
1692            id: id.to_string(),
1693            name: Some(format!("decl-{id}")),
1694            job_type: "agent".to_string(),
1695            schedule: crate::config::schema::CronScheduleDecl::Cron {
1696                expr: expr.to_string(),
1697                tz: None,
1698            },
1699            command: None,
1700            prompt: Some(prompt.to_string()),
1701            enabled: true,
1702            model: None,
1703            allowed_tools: None,
1704            session_target: None,
1705            delivery: None,
1706        }
1707    }
1708
1709    #[test]
1710    fn sync_inserts_new_declarative_job() {
1711        let tmp = TempDir::new().unwrap();
1712        let config = test_config(&tmp);
1713
1714        let decls = vec![make_shell_decl("daily-backup", "0 2 * * *", "echo backup")];
1715        sync_declarative_jobs(&config, &decls).unwrap();
1716
1717        let job = get_job(&config, "daily-backup").unwrap();
1718        assert_eq!(job.command, "echo backup");
1719        assert_eq!(job.source, "declarative");
1720        assert_eq!(job.name.as_deref(), Some("decl-daily-backup"));
1721    }
1722
1723    #[test]
1724    fn sync_updates_existing_declarative_job() {
1725        let tmp = TempDir::new().unwrap();
1726        let config = test_config(&tmp);
1727
1728        let decls = vec![make_shell_decl("updatable", "0 2 * * *", "echo v1")];
1729        sync_declarative_jobs(&config, &decls).unwrap();
1730
1731        let job_v1 = get_job(&config, "updatable").unwrap();
1732        assert_eq!(job_v1.command, "echo v1");
1733
1734        let decls_v2 = vec![make_shell_decl("updatable", "0 3 * * *", "echo v2")];
1735        sync_declarative_jobs(&config, &decls_v2).unwrap();
1736
1737        let job_v2 = get_job(&config, "updatable").unwrap();
1738        assert_eq!(job_v2.command, "echo v2");
1739        assert_eq!(job_v2.expression, "0 3 * * *");
1740        assert_eq!(job_v2.source, "declarative");
1741    }
1742
1743    #[test]
1744    fn sync_does_not_delete_imperative_jobs() {
1745        let tmp = TempDir::new().unwrap();
1746        let config = test_config(&tmp);
1747
1748        // Create an imperative job via the normal API.
1749        let imperative = add_job(&config, "*/10 * * * *", "echo imperative").unwrap();
1750
1751        // Sync declarative jobs (none of which match the imperative job).
1752        let decls = vec![make_shell_decl("my-decl", "0 2 * * *", "echo decl")];
1753        sync_declarative_jobs(&config, &decls).unwrap();
1754
1755        // Imperative job should still exist.
1756        let still_there = get_job(&config, &imperative.id).unwrap();
1757        assert_eq!(still_there.command, "echo imperative");
1758        assert_eq!(still_there.source, "imperative");
1759
1760        // Declarative job should also exist.
1761        let decl_job = get_job(&config, "my-decl").unwrap();
1762        assert_eq!(decl_job.command, "echo decl");
1763    }
1764
1765    #[test]
1766    fn sync_removes_stale_declarative_jobs() {
1767        let tmp = TempDir::new().unwrap();
1768        let config = test_config(&tmp);
1769
1770        // Insert two declarative jobs.
1771        let decls = vec![
1772            make_shell_decl("keeper", "0 2 * * *", "echo keep"),
1773            make_shell_decl("stale", "0 3 * * *", "echo stale"),
1774        ];
1775        sync_declarative_jobs(&config, &decls).unwrap();
1776
1777        // Now sync with only "keeper" — "stale" should be removed.
1778        let decls_v2 = vec![make_shell_decl("keeper", "0 2 * * *", "echo keep")];
1779        sync_declarative_jobs(&config, &decls_v2).unwrap();
1780
1781        assert!(get_job(&config, "stale").is_err());
1782        assert!(get_job(&config, "keeper").is_ok());
1783    }
1784
1785    #[test]
1786    fn sync_empty_removes_all_declarative_jobs() {
1787        let tmp = TempDir::new().unwrap();
1788        let config = test_config(&tmp);
1789
1790        let decls = vec![make_shell_decl("to-remove", "0 2 * * *", "echo bye")];
1791        sync_declarative_jobs(&config, &decls).unwrap();
1792        assert!(get_job(&config, "to-remove").is_ok());
1793
1794        // Sync with empty list.
1795        sync_declarative_jobs(&config, &[]).unwrap();
1796        assert!(get_job(&config, "to-remove").is_err());
1797    }
1798
1799    #[test]
1800    fn sync_validates_shell_job_requires_command() {
1801        let tmp = TempDir::new().unwrap();
1802        let config = test_config(&tmp);
1803
1804        let mut decl = make_shell_decl("bad", "0 2 * * *", "echo ok");
1805        decl.command = None;
1806
1807        let result = sync_declarative_jobs(&config, &[decl]);
1808        assert!(result.is_err());
1809        assert!(result.unwrap_err().to_string().contains("command"));
1810    }
1811
1812    #[test]
1813    fn sync_validates_agent_job_requires_prompt() {
1814        let tmp = TempDir::new().unwrap();
1815        let config = test_config(&tmp);
1816
1817        let mut decl = make_agent_decl("bad-agent", "0 2 * * *", "do stuff");
1818        decl.prompt = None;
1819
1820        let result = sync_declarative_jobs(&config, &[decl]);
1821        assert!(result.is_err());
1822        assert!(result.unwrap_err().to_string().contains("prompt"));
1823    }
1824
1825    #[test]
1826    fn sync_agent_job_inserts_correctly() {
1827        let tmp = TempDir::new().unwrap();
1828        let config = test_config(&tmp);
1829
1830        let decls = vec![make_agent_decl(
1831            "agent-check",
1832            "*/15 * * * *",
1833            "check health",
1834        )];
1835        sync_declarative_jobs(&config, &decls).unwrap();
1836
1837        let job = get_job(&config, "agent-check").unwrap();
1838        assert_eq!(job.job_type, JobType::Agent);
1839        assert_eq!(job.prompt.as_deref(), Some("check health"));
1840        assert_eq!(job.source, "declarative");
1841    }
1842
1843    #[test]
1844    fn sync_every_schedule_works() {
1845        let tmp = TempDir::new().unwrap();
1846        let config = test_config(&tmp);
1847
1848        let decl = crate::config::schema::CronJobDecl {
1849            id: "interval-job".to_string(),
1850            name: None,
1851            job_type: "shell".to_string(),
1852            schedule: crate::config::schema::CronScheduleDecl::Every { every_ms: 60000 },
1853            command: Some("echo interval".to_string()),
1854            prompt: None,
1855            enabled: true,
1856            model: None,
1857            allowed_tools: None,
1858            session_target: None,
1859            delivery: None,
1860        };
1861
1862        sync_declarative_jobs(&config, &[decl]).unwrap();
1863
1864        let job = get_job(&config, "interval-job").unwrap();
1865        assert!(matches!(job.schedule, Schedule::Every { every_ms: 60000 }));
1866        assert_eq!(job.command, "echo interval");
1867    }
1868
1869    #[test]
1870    fn declarative_config_parses_from_toml() {
1871        let toml_str = r#"
1872enabled = true
1873
1874[[jobs]]
1875id = "daily-report"
1876name = "Daily Report"
1877job_type = "shell"
1878command = "echo report"
1879schedule = { kind = "cron", expr = "0 9 * * *" }
1880
1881[[jobs]]
1882id = "health-check"
1883job_type = "agent"
1884prompt = "Check server health"
1885schedule = { kind = "every", every_ms = 300000 }
1886        "#;
1887
1888        let parsed: crate::config::schema::CronConfig = toml::from_str(toml_str).unwrap();
1889        assert!(parsed.enabled);
1890        assert_eq!(parsed.jobs.len(), 2);
1891
1892        assert_eq!(parsed.jobs[0].id, "daily-report");
1893        assert_eq!(parsed.jobs[0].command.as_deref(), Some("echo report"));
1894        assert!(matches!(
1895            parsed.jobs[0].schedule,
1896            crate::config::schema::CronScheduleDecl::Cron { ref expr, .. } if expr == "0 9 * * *"
1897        ));
1898
1899        assert_eq!(parsed.jobs[1].id, "health-check");
1900        assert_eq!(parsed.jobs[1].job_type, "agent");
1901        assert_eq!(
1902            parsed.jobs[1].prompt.as_deref(),
1903            Some("Check server health")
1904        );
1905        assert!(matches!(
1906            parsed.jobs[1].schedule,
1907            crate::config::schema::CronScheduleDecl::Every { every_ms: 300_000 }
1908        ));
1909    }
1910}