Skip to main content

ironclad_schedule/
lib.rs

1//! # ironclad-schedule
2//!
3//! Unified cron/heartbeat scheduler for the Ironclad agent runtime. Jobs are
4//! persisted in SQLite (`ironclad-db/cron.rs`) with lease-based mutual
5//! exclusion to prevent duplicate execution across restarts.
6//!
7//! ## Key Types
8//!
9//! - [`HeartbeatDaemon`] -- Periodic tick loop driving registered heartbeat tasks
10//! - [`DurableScheduler`] -- Cron expression and fixed-interval evaluation
11//! - [`HeartbeatTask`] / [`TaskResult`] -- Pluggable task trait and outcome type
12//!
13//! ## Modules
14//!
15//! - `heartbeat` -- Heartbeat daemon loop with wallet and DB context
16//! - `scheduler` -- Cron expression parsing (`evaluate_cron`) and interval checks
17//! - `tasks` -- `HeartbeatTask` trait and built-in task implementations
18//!
19//! ## Entry Points
20//!
21//! - [`run_heartbeat()`] -- Start the heartbeat daemon
22//! - [`run_cron_worker()`] -- Start the cron worker (lease, execute, record)
23
24pub mod heartbeat;
25pub mod scheduler;
26pub mod tasks;
27
28pub use heartbeat::run as run_heartbeat;
29pub use heartbeat::{HeartbeatDaemon, TickContext};
30pub use scheduler::DurableScheduler;
31pub use tasks::{HeartbeatTask, TaskResult};
32
33/// Cron worker loop: evaluates due jobs, acquires leases, executes, and records results.
34pub async fn run_cron_worker(db: ironclad_db::Database, instance_id: String) {
35    use std::time::Duration;
36
37    let mut interval = tokio::time::interval(Duration::from_secs(60));
38    tracing::info!("Cron worker started");
39
40    loop {
41        interval.tick().await;
42
43        let jobs = match ironclad_db::cron::list_jobs(&db) {
44            Ok(j) => j,
45            Err(e) => {
46                tracing::warn!(error = %e, "Failed to list cron jobs");
47                continue;
48            }
49        };
50
51        let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
52
53        for job in &jobs {
54            if !job.enabled {
55                continue;
56            }
57
58            let kind = normalize_schedule_kind(&job.schedule_kind);
59            let due = match kind {
60                "cron" => job
61                    .schedule_expr
62                    .as_deref()
63                    .map(|expr| {
64                        DurableScheduler::evaluate_cron(expr, job.last_run_at.as_deref(), &now)
65                    })
66                    .unwrap_or(false),
67                "every" => {
68                    let interval_ms = job
69                        .schedule_every_ms
70                        .or_else(|| {
71                            job.schedule_expr
72                                .as_deref()
73                                .and_then(parse_interval_expr_to_ms)
74                        })
75                        .unwrap_or(60_000);
76                    DurableScheduler::evaluate_interval(
77                        job.last_run_at.as_deref(),
78                        interval_ms,
79                        &now,
80                    )
81                }
82                _ => false,
83            };
84
85            if !due {
86                continue;
87            }
88
89            match ironclad_db::cron::acquire_lease(&db, &job.id, &instance_id) {
90                Ok(acquired) => {
91                    if !acquired {
92                        continue;
93                    }
94                }
95                Err(e) => {
96                    tracing::warn!(job_id = %job.id, error = %e, "failed to acquire cron lease");
97                    continue;
98                }
99            }
100
101            tracing::debug!(job = %job.name, "Executing cron job");
102            let start = std::time::Instant::now();
103
104            let (result_status, error_msg) = execute_cron_job(&db, job);
105            let duration = start.elapsed().as_millis() as i64;
106
107            if let Err(e) = ironclad_db::cron::record_run(
108                &db,
109                &job.id,
110                result_status,
111                Some(duration),
112                error_msg.as_deref(),
113                None,
114            ) {
115                tracing::warn!(job_id = %job.id, error = %e, "failed to record cron run");
116            }
117            // Persist next_run_at so the API can expose it
118            let now_str = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
119            let next = DurableScheduler::calculate_next_run(
120                kind,
121                job.schedule_expr.as_deref(),
122                job.schedule_every_ms,
123                &now_str,
124            );
125            if let Err(e) = ironclad_db::cron::update_next_run_at(&db, &job.id, next.as_deref()) {
126                tracing::warn!(job_id = %job.id, error = %e, "failed to update next_run_at");
127            }
128            if let Err(e) = ironclad_db::cron::release_lease(&db, &job.id, &instance_id) {
129                tracing::warn!(job_id = %job.id, error = %e, "failed to release cron lease");
130            }
131        }
132    }
133}
134
135/// Execute a cron job based on its payload. Returns (status, optional error message).
136fn execute_cron_job(
137    db: &ironclad_db::Database,
138    job: &ironclad_db::cron::CronJob,
139) -> (&'static str, Option<String>) {
140    let payload: serde_json::Value = match serde_json::from_str(&job.payload_json) {
141        Ok(v) => v,
142        Err(e) => {
143            tracing::warn!(job = %job.name, error = %e, "invalid job payload JSON");
144            return ("error", Some(format!("invalid payload: {e}")));
145        }
146    };
147
148    let action = payload
149        .get("action")
150        .and_then(|v| v.as_str())
151        .unwrap_or("unknown");
152
153    match action {
154        "log" => {
155            let message = payload
156                .get("message")
157                .and_then(|v| v.as_str())
158                .unwrap_or("cron heartbeat");
159            tracing::info!(job = %job.name, message, "cron job executed");
160            ("success", None)
161        }
162        "metric_snapshot" => {
163            let snapshot = serde_json::json!({
164                "job_id": job.id,
165                "job_name": job.name,
166                "schedule_kind": job.schedule_kind,
167                "timestamp": chrono::Utc::now().to_rfc3339(),
168            });
169            match ironclad_db::metrics::record_metric_snapshot(db, &snapshot.to_string()) {
170                Ok(_) => ("success", None),
171                Err(e) => ("error", Some(format!("metric_snapshot failed: {e}"))),
172            }
173        }
174        "expire_sessions" => {
175            let ttl_seconds = payload
176                .get("ttl_seconds")
177                .and_then(|v| v.as_u64())
178                .unwrap_or(86_400);
179            match ironclad_db::sessions::expire_stale_sessions(db, ttl_seconds) {
180                Ok(expired) => {
181                    tracing::info!(job = %job.name, expired, ttl_seconds, "expired stale sessions");
182                    ("success", None)
183                }
184                Err(e) => ("error", Some(format!("expire_sessions failed: {e}"))),
185            }
186        }
187        "record_transaction" => {
188            let tx_type = payload
189                .get("tx_type")
190                .and_then(|v| v.as_str())
191                .unwrap_or("cron");
192            let amount = payload
193                .get("amount")
194                .and_then(|v| v.as_f64())
195                .unwrap_or(0.0);
196            let currency = payload
197                .get("currency")
198                .and_then(|v| v.as_str())
199                .unwrap_or("USD");
200            let counterparty = payload.get("counterparty").and_then(|v| v.as_str());
201            let tx_hash = payload.get("tx_hash").and_then(|v| v.as_str());
202            match ironclad_db::metrics::record_transaction(
203                db,
204                tx_type,
205                amount,
206                currency,
207                counterparty,
208                tx_hash,
209            ) {
210                Ok(_) => ("success", None),
211                Err(e) => ("error", Some(format!("record_transaction failed: {e}"))),
212            }
213        }
214        "noop" => {
215            tracing::debug!(job = %job.name, "noop cron job");
216            ("success", None)
217        }
218        other => {
219            tracing::warn!(job = %job.name, action = other, "unknown cron action");
220            ("error", Some(format!("unknown action: {other}")))
221        }
222    }
223}
224
225fn normalize_schedule_kind(kind: &str) -> &str {
226    match kind {
227        "interval" => "every",
228        "every" | "cron" => kind,
229        _ => kind,
230    }
231}
232
233fn parse_interval_expr_to_ms(expr: &str) -> Option<i64> {
234    if expr.is_empty() {
235        return None;
236    }
237    let (unit_byte_offset, unit) = expr.char_indices().last()?;
238    let qty = expr[..unit_byte_offset].parse::<i64>().ok()?;
239    let ms = match unit {
240        's' | 'S' => qty.saturating_mul(1_000),
241        'm' | 'M' => qty.saturating_mul(60_000),
242        'h' | 'H' => qty.saturating_mul(3_600_000),
243        _ => return None,
244    };
245    if ms > 0 { Some(ms) } else { None }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use ironclad_db::Database;
252
253    fn test_db() -> Database {
254        Database::new(":memory:").expect("in-memory db")
255    }
256
257    fn job_with_payload(
258        db: &Database,
259        name: &str,
260        payload_json: &str,
261    ) -> ironclad_db::cron::CronJob {
262        let job_id = ironclad_db::cron::create_job(
263            db,
264            name,
265            "test-agent",
266            "every",
267            Some("5m"),
268            payload_json,
269        )
270        .expect("create job");
271        ironclad_db::cron::get_job(db, &job_id)
272            .expect("get job")
273            .expect("job exists")
274    }
275
276    #[test]
277    fn normalize_schedule_kind_maps_interval_to_every() {
278        assert_eq!(normalize_schedule_kind("interval"), "every");
279        assert_eq!(normalize_schedule_kind("every"), "every");
280        assert_eq!(normalize_schedule_kind("cron"), "cron");
281        assert_eq!(normalize_schedule_kind("custom"), "custom");
282    }
283
284    #[test]
285    fn parse_interval_expr_to_ms_parses_supported_units() {
286        assert_eq!(parse_interval_expr_to_ms("5s"), Some(5_000));
287        assert_eq!(parse_interval_expr_to_ms("2m"), Some(120_000));
288        assert_eq!(parse_interval_expr_to_ms("3h"), Some(10_800_000));
289        assert_eq!(parse_interval_expr_to_ms("7S"), Some(7_000));
290        assert_eq!(parse_interval_expr_to_ms("1M"), Some(60_000));
291    }
292
293    #[test]
294    fn parse_interval_expr_to_ms_rejects_invalid_values() {
295        assert_eq!(parse_interval_expr_to_ms(""), None);
296        assert_eq!(parse_interval_expr_to_ms("10"), None);
297        assert_eq!(parse_interval_expr_to_ms("xs"), None);
298        assert_eq!(parse_interval_expr_to_ms("0s"), None);
299        assert_eq!(parse_interval_expr_to_ms("-5m"), None);
300    }
301
302    #[test]
303    fn execute_cron_job_rejects_invalid_payload_json() {
304        let db = test_db();
305        let job = job_with_payload(&db, "bad-json", "{not-json}");
306        let (status, error) = execute_cron_job(&db, &job);
307        assert_eq!(status, "error");
308        assert!(error.unwrap_or_default().contains("invalid payload"));
309    }
310
311    #[test]
312    fn execute_cron_job_handles_log_and_noop_actions() {
313        let db = test_db();
314        let log_job = job_with_payload(&db, "log-job", r#"{"action":"log","message":"hello"}"#);
315        let (status, error) = execute_cron_job(&db, &log_job);
316        assert_eq!(status, "success");
317        assert!(error.is_none());
318
319        let noop_job = job_with_payload(&db, "noop-job", r#"{"action":"noop"}"#);
320        let (status, error) = execute_cron_job(&db, &noop_job);
321        assert_eq!(status, "success");
322        assert!(error.is_none());
323    }
324
325    #[test]
326    fn execute_cron_job_records_metric_snapshot() {
327        let db = test_db();
328        let job = job_with_payload(&db, "metrics-job", r#"{"action":"metric_snapshot"}"#);
329        let (status, error) = execute_cron_job(&db, &job);
330        assert_eq!(status, "success");
331        assert!(error.is_none());
332
333        let count: i64 = db
334            .conn()
335            .query_row("SELECT COUNT(*) FROM metric_snapshots", [], |row| {
336                row.get(0)
337            })
338            .expect("count snapshots");
339        assert_eq!(count, 1);
340    }
341
342    #[test]
343    fn execute_cron_job_expires_stale_sessions() {
344        let db = test_db();
345        let session_id = ironclad_db::sessions::find_or_create(&db, "expire-agent", None)
346            .expect("create session");
347        db.conn()
348            .execute(
349                "UPDATE sessions SET updated_at = datetime('now', '-2 days') WHERE id = ?1",
350                [&session_id],
351            )
352            .expect("age session");
353
354        let job = job_with_payload(
355            &db,
356            "expire-job",
357            r#"{"action":"expire_sessions","ttl_seconds":60}"#,
358        );
359        let (status, error) = execute_cron_job(&db, &job);
360        assert_eq!(status, "success");
361        assert!(error.is_none());
362
363        let status: String = db
364            .conn()
365            .query_row(
366                "SELECT status FROM sessions WHERE id = ?1",
367                [&session_id],
368                |row| row.get(0),
369            )
370            .expect("session status");
371        assert_eq!(status, "expired");
372    }
373
374    #[test]
375    fn execute_cron_job_records_transaction() {
376        let db = test_db();
377        let job = job_with_payload(
378            &db,
379            "tx-job",
380            r#"{"action":"record_transaction","tx_type":"ops","amount":1.25,"currency":"USD","counterparty":"scheduler"}"#,
381        );
382        let (status, error) = execute_cron_job(&db, &job);
383        assert_eq!(status, "success");
384        assert!(error.is_none());
385
386        let txs = ironclad_db::metrics::query_transactions(&db, 24).expect("query txs");
387        assert_eq!(txs.len(), 1);
388        assert_eq!(txs[0].tx_type, "ops");
389        assert_eq!(txs[0].currency, "USD");
390    }
391
392    #[test]
393    fn execute_cron_job_rejects_unknown_action() {
394        let db = test_db();
395        let job = job_with_payload(&db, "unknown-job", r#"{"action":"mystery"}"#);
396        let (status, error) = execute_cron_job(&db, &job);
397        assert_eq!(status, "error");
398        assert!(error.unwrap_or_default().contains("unknown action"));
399    }
400
401    #[test]
402    fn execute_cron_job_rejects_legacy_agent_turn_kind() {
403        let db = test_db();
404        let job = job_with_payload(
405            &db,
406            "legacy-agent-turn",
407            r#"{"kind":"agentTurn","message":"Do work"}"#,
408        );
409        let (status, error) = execute_cron_job(&db, &job);
410        assert_eq!(status, "error");
411        assert_eq!(error.as_deref(), Some("unknown action: unknown"));
412    }
413
414    #[test]
415    fn execute_cron_job_rejects_legacy_metric_snapshot_kind() {
416        let db = test_db();
417        let job = job_with_payload(&db, "legacy-metrics", r#"{"kind":"metricSnapshot"}"#);
418        let (status, error) = execute_cron_job(&db, &job);
419        assert_eq!(status, "error");
420        assert_eq!(error.as_deref(), Some("unknown action: unknown"));
421    }
422
423    // ── BUG-093: normalize_schedule_kind boundary cases ────────────────
424    #[test]
425    fn normalize_schedule_kind_pass_through_unknown_kinds() {
426        assert_eq!(normalize_schedule_kind(""), "");
427        assert_eq!(normalize_schedule_kind("once"), "once");
428        assert_eq!(normalize_schedule_kind("at"), "at");
429        assert_eq!(normalize_schedule_kind("weekly"), "weekly");
430    }
431
432    // ── BUG-094: parse_interval_expr_to_ms edge cases ──────────────────
433    #[test]
434    fn parse_interval_expr_to_ms_uppercase_h() {
435        assert_eq!(parse_interval_expr_to_ms("1H"), Some(3_600_000));
436        assert_eq!(parse_interval_expr_to_ms("2H"), Some(7_200_000));
437    }
438
439    #[test]
440    fn parse_interval_expr_to_ms_single_char() {
441        // Single character like "s" has no numeric part
442        assert_eq!(parse_interval_expr_to_ms("s"), None);
443        assert_eq!(parse_interval_expr_to_ms("m"), None);
444        assert_eq!(parse_interval_expr_to_ms("h"), None);
445    }
446
447    #[test]
448    fn parse_interval_expr_to_ms_large_values() {
449        // 24h = 86_400_000
450        assert_eq!(parse_interval_expr_to_ms("24h"), Some(86_400_000));
451        // 1000s = 1_000_000
452        assert_eq!(parse_interval_expr_to_ms("1000s"), Some(1_000_000));
453    }
454
455    #[test]
456    fn parse_interval_expr_to_ms_unknown_unit() {
457        assert_eq!(parse_interval_expr_to_ms("5d"), None); // days not supported
458        assert_eq!(parse_interval_expr_to_ms("3w"), None); // weeks not supported
459    }
460
461    // ── execute_cron_job: log action with default message ──────────────
462    #[test]
463    fn execute_cron_job_log_action_with_default_message() {
464        let db = test_db();
465        // No "message" key in payload -> should use default "cron heartbeat"
466        let job = job_with_payload(&db, "log-default", r#"{"action":"log"}"#);
467        let (status, error) = execute_cron_job(&db, &job);
468        assert_eq!(status, "success");
469        assert!(error.is_none());
470    }
471
472    // ── execute_cron_job: expire_sessions with default TTL ─────────────
473    #[test]
474    fn execute_cron_job_expire_sessions_uses_default_ttl() {
475        let db = test_db();
476        // No "ttl_seconds" -> should use default 86_400
477        let job = job_with_payload(&db, "expire-default", r#"{"action":"expire_sessions"}"#);
478        let (status, error) = execute_cron_job(&db, &job);
479        assert_eq!(status, "success");
480        assert!(error.is_none());
481    }
482
483    // ── execute_cron_job: record_transaction with defaults ─────────────
484    #[test]
485    fn execute_cron_job_record_transaction_uses_defaults() {
486        let db = test_db();
487        // Minimal payload: no tx_type, amount, currency, counterparty, tx_hash
488        let job = job_with_payload(&db, "tx-minimal", r#"{"action":"record_transaction"}"#);
489        let (status, error) = execute_cron_job(&db, &job);
490        assert_eq!(status, "success");
491        assert!(error.is_none());
492
493        let txs = ironclad_db::metrics::query_transactions(&db, 24).expect("query txs");
494        assert_eq!(txs.len(), 1);
495        assert_eq!(txs[0].tx_type, "cron"); // default tx_type
496        assert_eq!(txs[0].currency, "USD"); // default currency
497    }
498
499    // ── execute_cron_job: record_transaction with tx_hash ──────────────
500    #[test]
501    fn execute_cron_job_record_transaction_with_tx_hash() {
502        let db = test_db();
503        let job = job_with_payload(
504            &db,
505            "tx-with-hash",
506            r#"{"action":"record_transaction","tx_type":"payment","amount":42.0,"currency":"ETH","counterparty":"alice","tx_hash":"0xabc"}"#,
507        );
508        let (status, error) = execute_cron_job(&db, &job);
509        assert_eq!(status, "success");
510        assert!(error.is_none());
511
512        let txs = ironclad_db::metrics::query_transactions(&db, 24).expect("query txs");
513        assert_eq!(txs.len(), 1);
514        assert_eq!(txs[0].tx_type, "payment");
515        assert_eq!(txs[0].currency, "ETH");
516    }
517
518    // ── execute_cron_job: action-dispatch success paths ───────────────────
519    #[test]
520    fn execute_cron_job_expire_sessions_action() {
521        let db = test_db();
522        let job = job_with_payload(
523            &db,
524            "expire-action",
525            r#"{"action":"expire_sessions","ttl_seconds":3600}"#,
526        );
527        let (status, error) = execute_cron_job(&db, &job);
528        assert_eq!(status, "success");
529        assert!(error.is_none());
530    }
531
532    #[test]
533    fn execute_cron_job_record_transaction_action() {
534        let db = test_db();
535        let job = job_with_payload(
536            &db,
537            "tx-action",
538            r#"{"action":"record_transaction","amount":5.0}"#,
539        );
540        let (status, error) = execute_cron_job(&db, &job);
541        assert_eq!(status, "success");
542        assert!(error.is_none());
543    }
544
545    #[test]
546    fn execute_cron_job_log_action() {
547        let db = test_db();
548        let job = job_with_payload(&db, "log-action", r#"{"action":"log","message":"test"}"#);
549        let (status, error) = execute_cron_job(&db, &job);
550        assert_eq!(status, "success");
551        assert!(error.is_none());
552    }
553
554    #[test]
555    fn execute_cron_job_noop_action() {
556        let db = test_db();
557        let job = job_with_payload(&db, "noop-action", r#"{"action":"noop"}"#);
558        let (status, error) = execute_cron_job(&db, &job);
559        assert_eq!(status, "success");
560        assert!(error.is_none());
561    }
562
563    // ── execute_cron_job: unknown action returns error ─────────────────────
564    #[test]
565    fn execute_cron_job_unknown_action() {
566        let db = test_db();
567        let job = job_with_payload(&db, "legacy-unknown", r#"{"kind":"foobar"}"#);
568        let (status, error) = execute_cron_job(&db, &job);
569        assert_eq!(status, "error");
570        assert!(error.unwrap_or_default().contains("unknown action"));
571    }
572
573    // ── execute_cron_job: no action or kind -> "unknown" ───────────────
574    #[test]
575    fn execute_cron_job_no_action_or_kind_is_unknown() {
576        let db = test_db();
577        let job = job_with_payload(&db, "empty-payload", r#"{"data":"value"}"#);
578        let (status, error) = execute_cron_job(&db, &job);
579        assert_eq!(status, "error");
580        assert!(error.unwrap_or_default().contains("unknown action"));
581    }
582
583    // ── execute_cron_job: empty object payload ─────────────────────────
584    #[test]
585    fn execute_cron_job_empty_object_payload() {
586        let db = test_db();
587        let job = job_with_payload(&db, "empty-obj", r#"{}"#);
588        let (status, error) = execute_cron_job(&db, &job);
589        assert_eq!(status, "error");
590        assert!(error.unwrap_or_default().contains("unknown action"));
591    }
592
593    // ── execute_cron_job: action takes precedence over kind ────────────
594    #[test]
595    fn execute_cron_job_action_takes_precedence_over_kind() {
596        let db = test_db();
597        // Both action and kind are present; action should win
598        let job = job_with_payload(&db, "precedence", r#"{"action":"noop","kind":"agentTurn"}"#);
599        let (status, error) = execute_cron_job(&db, &job);
600        assert_eq!(status, "success");
601        assert!(error.is_none());
602    }
603
604    // ── run_cron_worker async integration tests ─────────────────────────
605    // These tests exercise the async cron worker loop by spawning it with
606    // tokio time control and aborting after one iteration completes.
607
608    fn create_due_job(db: &Database, name: &str, payload: &str) -> String {
609        let job_id =
610            ironclad_db::cron::create_job(db, name, "test-agent", "every", Some("1s"), payload)
611                .expect("create job");
612        // Set schedule_every_ms to 1 so the job is immediately due
613        db.conn()
614            .execute(
615                "UPDATE cron_jobs SET schedule_every_ms = 1 WHERE id = ?1",
616                [&job_id],
617            )
618            .expect("update schedule_every_ms");
619        job_id
620    }
621
622    #[tokio::test(start_paused = true)]
623    async fn run_cron_worker_executes_due_log_job() {
624        let db = test_db();
625        let _job_id = create_due_job(&db, "worker-log", r#"{"action":"log","message":"test"}"#);
626
627        let db_clone = db.clone();
628        let handle = tokio::spawn(async move {
629            run_cron_worker(db_clone, "test-instance".into()).await;
630        });
631
632        // Advance past the 60s interval to trigger one tick
633        tokio::time::advance(std::time::Duration::from_secs(61)).await;
634        // Yield to let the worker process
635        tokio::task::yield_now().await;
636        tokio::time::advance(std::time::Duration::from_millis(10)).await;
637        tokio::task::yield_now().await;
638
639        handle.abort();
640        let _ = handle.await;
641
642        // Verify the job was executed by checking cron_runs table
643        let count: i64 = db
644            .conn()
645            .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
646            .expect("count runs");
647        assert!(count >= 1, "expected at least one cron run, got {count}");
648    }
649
650    #[tokio::test(start_paused = true)]
651    async fn run_cron_worker_executes_noop_job() {
652        let db = test_db();
653        let _job_id = create_due_job(&db, "worker-noop", r#"{"action":"noop"}"#);
654
655        let db_clone = db.clone();
656        let handle = tokio::spawn(async move {
657            run_cron_worker(db_clone, "noop-instance".into()).await;
658        });
659
660        tokio::time::advance(std::time::Duration::from_secs(61)).await;
661        tokio::task::yield_now().await;
662        tokio::time::advance(std::time::Duration::from_millis(10)).await;
663        tokio::task::yield_now().await;
664
665        handle.abort();
666        let _ = handle.await;
667
668        let count: i64 = db
669            .conn()
670            .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
671            .expect("count runs");
672        assert!(count >= 1, "expected at least one cron run");
673    }
674
675    #[tokio::test(start_paused = true)]
676    async fn run_cron_worker_executes_metric_snapshot_job() {
677        let db = test_db();
678        let _job_id = create_due_job(&db, "worker-metric", r#"{"action":"metric_snapshot"}"#);
679
680        let db_clone = db.clone();
681        let handle = tokio::spawn(async move {
682            run_cron_worker(db_clone, "metric-instance".into()).await;
683        });
684
685        tokio::time::advance(std::time::Duration::from_secs(61)).await;
686        tokio::task::yield_now().await;
687        tokio::time::advance(std::time::Duration::from_millis(10)).await;
688        tokio::task::yield_now().await;
689
690        handle.abort();
691        let _ = handle.await;
692
693        let snap_count: i64 = db
694            .conn()
695            .query_row("SELECT COUNT(*) FROM metric_snapshots", [], |row| {
696                row.get(0)
697            })
698            .expect("count snapshots");
699        assert!(snap_count >= 1, "expected at least one metric snapshot");
700    }
701
702    #[tokio::test(start_paused = true)]
703    async fn run_cron_worker_executes_expire_sessions_job() {
704        let db = test_db();
705        // Create a session that's old enough to expire
706        let session_id = ironclad_db::sessions::find_or_create(&db, "cron-expire-agent", None)
707            .expect("create session");
708        db.conn()
709            .execute(
710                "UPDATE sessions SET updated_at = datetime('now', '-2 days') WHERE id = ?1",
711                [&session_id],
712            )
713            .expect("age session");
714
715        let _job_id = create_due_job(
716            &db,
717            "worker-expire",
718            r#"{"action":"expire_sessions","ttl_seconds":60}"#,
719        );
720
721        let db_clone = db.clone();
722        let handle = tokio::spawn(async move {
723            run_cron_worker(db_clone, "expire-instance".into()).await;
724        });
725
726        tokio::time::advance(std::time::Duration::from_secs(61)).await;
727        tokio::task::yield_now().await;
728        tokio::time::advance(std::time::Duration::from_millis(10)).await;
729        tokio::task::yield_now().await;
730
731        handle.abort();
732        let _ = handle.await;
733
734        let status: String = db
735            .conn()
736            .query_row(
737                "SELECT status FROM sessions WHERE id = ?1",
738                [&session_id],
739                |row| row.get(0),
740            )
741            .expect("session status");
742        assert_eq!(status, "expired");
743    }
744
745    #[tokio::test(start_paused = true)]
746    async fn run_cron_worker_executes_record_transaction_job() {
747        let db = test_db();
748        let _job_id = create_due_job(
749            &db,
750            "worker-tx",
751            r#"{"action":"record_transaction","tx_type":"cron_test","amount":99.0,"currency":"USDC"}"#,
752        );
753
754        let db_clone = db.clone();
755        let handle = tokio::spawn(async move {
756            run_cron_worker(db_clone, "tx-instance".into()).await;
757        });
758
759        tokio::time::advance(std::time::Duration::from_secs(61)).await;
760        tokio::task::yield_now().await;
761        tokio::time::advance(std::time::Duration::from_millis(10)).await;
762        tokio::task::yield_now().await;
763
764        handle.abort();
765        let _ = handle.await;
766
767        let txs = ironclad_db::metrics::query_transactions(&db, 24).expect("query txs");
768        assert!(!txs.is_empty(), "expected at least one transaction");
769    }
770
771    #[tokio::test(start_paused = true)]
772    async fn run_cron_worker_skips_disabled_job() {
773        let db = test_db();
774        let job_id = create_due_job(
775            &db,
776            "worker-disabled",
777            r#"{"action":"log","message":"skip"}"#,
778        );
779        // Disable the job
780        db.conn()
781            .execute("UPDATE cron_jobs SET enabled = 0 WHERE id = ?1", [&job_id])
782            .expect("disable job");
783
784        let db_clone = db.clone();
785        let handle = tokio::spawn(async move {
786            run_cron_worker(db_clone, "disabled-instance".into()).await;
787        });
788
789        tokio::time::advance(std::time::Duration::from_secs(61)).await;
790        tokio::task::yield_now().await;
791        tokio::time::advance(std::time::Duration::from_millis(10)).await;
792        tokio::task::yield_now().await;
793
794        handle.abort();
795        let _ = handle.await;
796
797        // Disabled job should not produce any cron runs
798        let count: i64 = db
799            .conn()
800            .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
801            .expect("count runs");
802        assert_eq!(count, 0, "disabled job should not be executed");
803    }
804
805    #[tokio::test(start_paused = true)]
806    async fn run_cron_worker_handles_unknown_action_job() {
807        let db = test_db();
808        let _job_id = create_due_job(&db, "worker-unknown", r#"{"action":"nonexistent"}"#);
809
810        let db_clone = db.clone();
811        let handle = tokio::spawn(async move {
812            run_cron_worker(db_clone, "unknown-instance".into()).await;
813        });
814
815        tokio::time::advance(std::time::Duration::from_secs(61)).await;
816        tokio::task::yield_now().await;
817        tokio::time::advance(std::time::Duration::from_millis(10)).await;
818        tokio::task::yield_now().await;
819
820        handle.abort();
821        let _ = handle.await;
822
823        // Should have recorded an error run
824        let error_count: i64 = db
825            .conn()
826            .query_row(
827                "SELECT COUNT(*) FROM cron_runs WHERE status = 'error'",
828                [],
829                |row| row.get(0),
830            )
831            .expect("count error runs");
832        assert!(error_count >= 1, "expected at least one error run");
833    }
834
835    #[tokio::test(start_paused = true)]
836    async fn run_cron_worker_handles_invalid_json_job() {
837        let db = test_db();
838        let _job_id = create_due_job(&db, "worker-badjson", "{not-valid-json}");
839
840        let db_clone = db.clone();
841        let handle = tokio::spawn(async move {
842            run_cron_worker(db_clone, "badjson-instance".into()).await;
843        });
844
845        tokio::time::advance(std::time::Duration::from_secs(61)).await;
846        tokio::task::yield_now().await;
847        tokio::time::advance(std::time::Duration::from_millis(10)).await;
848        tokio::task::yield_now().await;
849
850        handle.abort();
851        let _ = handle.await;
852
853        let error_count: i64 = db
854            .conn()
855            .query_row(
856                "SELECT COUNT(*) FROM cron_runs WHERE status = 'error'",
857                [],
858                |row| row.get(0),
859            )
860            .expect("count error runs");
861        assert!(error_count >= 1, "expected error run for invalid JSON");
862    }
863
864    #[tokio::test(start_paused = true)]
865    async fn run_cron_worker_handles_legacy_agent_turn_job() {
866        let db = test_db();
867        let _job_id = create_due_job(
868            &db,
869            "worker-legacy-turn",
870            r#"{"kind":"agentTurn","message":"hello"}"#,
871        );
872
873        let db_clone = db.clone();
874        let handle = tokio::spawn(async move {
875            run_cron_worker(db_clone, "legacy-instance".into()).await;
876        });
877
878        tokio::time::advance(std::time::Duration::from_secs(61)).await;
879        tokio::task::yield_now().await;
880        tokio::time::advance(std::time::Duration::from_millis(10)).await;
881        tokio::task::yield_now().await;
882
883        handle.abort();
884        let _ = handle.await;
885
886        // Legacy "kind" payloads are no longer mapped — they resolve to "unknown" action
887        // and produce an error run.
888        let error_count: i64 = db
889            .conn()
890            .query_row(
891                "SELECT COUNT(*) FROM cron_runs WHERE status = 'error'",
892                [],
893                |row| row.get(0),
894            )
895            .expect("count error runs");
896        assert!(
897            error_count >= 1,
898            "expected error run for unmapped legacy agent turn kind"
899        );
900    }
901
902    #[tokio::test(start_paused = true)]
903    async fn run_cron_worker_with_no_jobs_does_not_crash() {
904        let db = test_db();
905
906        let db_clone = db.clone();
907        let handle = tokio::spawn(async move {
908            run_cron_worker(db_clone, "empty-instance".into()).await;
909        });
910
911        // Advance past a tick with no jobs
912        tokio::time::advance(std::time::Duration::from_secs(61)).await;
913        tokio::task::yield_now().await;
914        tokio::time::advance(std::time::Duration::from_millis(10)).await;
915        tokio::task::yield_now().await;
916
917        handle.abort();
918        let _ = handle.await;
919
920        // No crash, no runs
921        let count: i64 = db
922            .conn()
923            .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
924            .expect("count runs");
925        assert_eq!(count, 0);
926    }
927
928    #[tokio::test(start_paused = true)]
929    async fn run_cron_worker_cron_schedule_kind_job() {
930        let db = test_db();
931        // Create a job with "cron" kind that uses a cron expression matching every minute
932        let job_id = ironclad_db::cron::create_job(
933            &db,
934            "worker-cron-kind",
935            "test-agent",
936            "cron",
937            Some("* * * * *"),
938            r#"{"action":"log","message":"cron tick"}"#,
939        )
940        .expect("create cron job");
941
942        // The cron expression "* * * * *" matches every minute.
943        // We don't set last_run_at, so it should be evaluated as due.
944
945        let db_clone = db.clone();
946        let handle = tokio::spawn(async move {
947            run_cron_worker(db_clone, "cron-kind-instance".into()).await;
948        });
949
950        tokio::time::advance(std::time::Duration::from_secs(61)).await;
951        tokio::task::yield_now().await;
952        tokio::time::advance(std::time::Duration::from_millis(100)).await;
953        tokio::task::yield_now().await;
954
955        handle.abort();
956        let _ = handle.await;
957
958        // The cron evaluation depends on real wall-clock matching chrono::Utc::now().
959        // In paused-time tests, Utc::now() still returns the real time, so the cron
960        // expression "* * * * *" should match at most times.
961        // We check that the job was at least attempted (run recorded).
962        let count: i64 = db
963            .conn()
964            .query_row(
965                &format!("SELECT COUNT(*) FROM cron_runs WHERE job_id = '{}'", job_id),
966                [],
967                |row| row.get(0),
968            )
969            .expect("count cron runs");
970        // Cron jobs depend on wall time matching. If it happens to match, count >= 1.
971        // We don't assert strictly because wall clock vs cron expression may not align
972        // in CI, but the code path is still exercised.
973        let _ = count;
974    }
975
976    #[tokio::test(start_paused = true)]
977    async fn run_cron_worker_unknown_schedule_kind_not_due() {
978        let db = test_db();
979        // Create a job with an unknown schedule kind -- should not be treated as due
980        let job_id = ironclad_db::cron::create_job(
981            &db,
982            "worker-unknown-kind",
983            "test-agent",
984            "weekly",
985            Some("*"),
986            r#"{"action":"log","message":"weekly"}"#,
987        )
988        .expect("create job");
989
990        let db_clone = db.clone();
991        let handle = tokio::spawn(async move {
992            run_cron_worker(db_clone, "unknown-kind-instance".into()).await;
993        });
994
995        tokio::time::advance(std::time::Duration::from_secs(61)).await;
996        tokio::task::yield_now().await;
997        tokio::time::advance(std::time::Duration::from_millis(10)).await;
998        tokio::task::yield_now().await;
999
1000        handle.abort();
1001        let _ = handle.await;
1002
1003        // Unknown schedule_kind -> due = false -> no runs recorded for that job
1004        let count: i64 = db
1005            .conn()
1006            .query_row(
1007                &format!("SELECT COUNT(*) FROM cron_runs WHERE job_id = '{}'", job_id),
1008                [],
1009                |row| row.get(0),
1010            )
1011            .expect("count runs");
1012        assert_eq!(count, 0, "unknown schedule kind should not be executed");
1013    }
1014
1015    #[tokio::test(start_paused = true)]
1016    async fn run_cron_worker_interval_kind_job() {
1017        let db = test_db();
1018        // Create a job with "interval" schedule_kind (gets normalized to "every")
1019        let job_id = ironclad_db::cron::create_job(
1020            &db,
1021            "worker-interval-kind",
1022            "test-agent",
1023            "interval",
1024            Some("1s"),
1025            r#"{"action":"noop"}"#,
1026        )
1027        .expect("create job");
1028        // Set schedule_every_ms to 1 so it's immediately due
1029        db.conn()
1030            .execute(
1031                "UPDATE cron_jobs SET schedule_every_ms = 1 WHERE id = ?1",
1032                [&job_id],
1033            )
1034            .expect("update ms");
1035
1036        let db_clone = db.clone();
1037        let handle = tokio::spawn(async move {
1038            run_cron_worker(db_clone, "interval-instance".into()).await;
1039        });
1040
1041        tokio::time::advance(std::time::Duration::from_secs(61)).await;
1042        tokio::task::yield_now().await;
1043        tokio::time::advance(std::time::Duration::from_millis(10)).await;
1044        tokio::task::yield_now().await;
1045
1046        handle.abort();
1047        let _ = handle.await;
1048
1049        let count: i64 = db
1050            .conn()
1051            .query_row(
1052                &format!("SELECT COUNT(*) FROM cron_runs WHERE job_id = '{}'", job_id),
1053                [],
1054                |row| row.get(0),
1055            )
1056            .expect("count runs");
1057        assert!(count >= 1, "interval job should have been executed");
1058    }
1059
1060    #[tokio::test(start_paused = true)]
1061    async fn run_cron_worker_every_kind_with_expr_fallback() {
1062        let db = test_db();
1063        // Create a job with "every" kind and schedule_expr but no schedule_every_ms
1064        // This tests the or_else fallback to parse_interval_expr_to_ms
1065        let job_id = ironclad_db::cron::create_job(
1066            &db,
1067            "worker-every-expr",
1068            "test-agent",
1069            "every",
1070            Some("1s"),
1071            r#"{"action":"noop"}"#,
1072        )
1073        .expect("create job");
1074        // Ensure schedule_every_ms is NULL so the expr fallback is used
1075        db.conn()
1076            .execute(
1077                "UPDATE cron_jobs SET schedule_every_ms = NULL WHERE id = ?1",
1078                [&job_id],
1079            )
1080            .expect("clear ms");
1081
1082        let db_clone = db.clone();
1083        let handle = tokio::spawn(async move {
1084            run_cron_worker(db_clone, "every-expr-instance".into()).await;
1085        });
1086
1087        tokio::time::advance(std::time::Duration::from_secs(61)).await;
1088        tokio::task::yield_now().await;
1089        tokio::time::advance(std::time::Duration::from_millis(10)).await;
1090        tokio::task::yield_now().await;
1091
1092        handle.abort();
1093        let _ = handle.await;
1094
1095        // The expr "1s" = 1000ms, and with no last_run the job should be immediately due
1096        let count: i64 = db
1097            .conn()
1098            .query_row(
1099                &format!("SELECT COUNT(*) FROM cron_runs WHERE job_id = '{}'", job_id),
1100                [],
1101                |row| row.get(0),
1102            )
1103            .expect("count runs");
1104        assert!(
1105            count >= 1,
1106            "every-kind with expr fallback should have been executed"
1107        );
1108    }
1109
1110    #[tokio::test(start_paused = true)]
1111    async fn run_cron_worker_multiple_jobs_in_single_tick() {
1112        let db = test_db();
1113        let _id1 = create_due_job(&db, "multi-1", r#"{"action":"log","message":"first"}"#);
1114        let _id2 = create_due_job(&db, "multi-2", r#"{"action":"noop"}"#);
1115        let _id3 = create_due_job(&db, "multi-3", r#"{"action":"log","message":"third"}"#);
1116
1117        let db_clone = db.clone();
1118        let handle = tokio::spawn(async move {
1119            run_cron_worker(db_clone, "multi-instance".into()).await;
1120        });
1121
1122        tokio::time::advance(std::time::Duration::from_secs(61)).await;
1123        tokio::task::yield_now().await;
1124        tokio::time::advance(std::time::Duration::from_millis(10)).await;
1125        tokio::task::yield_now().await;
1126
1127        handle.abort();
1128        let _ = handle.await;
1129
1130        let count: i64 = db
1131            .conn()
1132            .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
1133            .expect("count runs");
1134        assert!(count >= 3, "expected at least 3 cron runs, got {count}");
1135    }
1136}