1pub mod heartbeat;
25pub mod scheduler;
26pub mod tasks;
27
28pub use heartbeat::run as run_heartbeat;
29pub use heartbeat::{HeartbeatDaemon, TickContext};
30pub use scheduler::DurableScheduler;
31pub use tasks::{HeartbeatTask, TaskResult};
32
33pub async fn run_cron_worker(db: ironclad_db::Database, instance_id: String) {
35 use std::time::Duration;
36
37 let mut interval = tokio::time::interval(Duration::from_secs(60));
38 tracing::info!("Cron worker started");
39
40 loop {
41 interval.tick().await;
42
43 let jobs = match ironclad_db::cron::list_jobs(&db) {
44 Ok(j) => j,
45 Err(e) => {
46 tracing::warn!(error = %e, "Failed to list cron jobs");
47 continue;
48 }
49 };
50
51 let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
52
53 for job in &jobs {
54 if !job.enabled {
55 continue;
56 }
57
58 let kind = normalize_schedule_kind(&job.schedule_kind);
59 let due = match kind {
60 "cron" => job
61 .schedule_expr
62 .as_deref()
63 .map(|expr| {
64 DurableScheduler::evaluate_cron(expr, job.last_run_at.as_deref(), &now)
65 })
66 .unwrap_or(false),
67 "every" => {
68 let interval_ms = job
69 .schedule_every_ms
70 .or_else(|| {
71 job.schedule_expr
72 .as_deref()
73 .and_then(parse_interval_expr_to_ms)
74 })
75 .unwrap_or(60_000);
76 DurableScheduler::evaluate_interval(
77 job.last_run_at.as_deref(),
78 interval_ms,
79 &now,
80 )
81 }
82 _ => false,
83 };
84
85 if !due {
86 continue;
87 }
88
89 match ironclad_db::cron::acquire_lease(&db, &job.id, &instance_id) {
90 Ok(acquired) => {
91 if !acquired {
92 continue;
93 }
94 }
95 Err(e) => {
96 tracing::warn!(job_id = %job.id, error = %e, "failed to acquire cron lease");
97 continue;
98 }
99 }
100
101 tracing::debug!(job = %job.name, "Executing cron job");
102 let start = std::time::Instant::now();
103
104 let (result_status, error_msg) = execute_cron_job(&db, job);
105 let duration = start.elapsed().as_millis() as i64;
106
107 if let Err(e) = ironclad_db::cron::record_run(
108 &db,
109 &job.id,
110 result_status,
111 Some(duration),
112 error_msg.as_deref(),
113 None,
114 ) {
115 tracing::warn!(job_id = %job.id, error = %e, "failed to record cron run");
116 }
117 let now_str = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
119 let next = DurableScheduler::calculate_next_run(
120 kind,
121 job.schedule_expr.as_deref(),
122 job.schedule_every_ms,
123 &now_str,
124 );
125 if let Err(e) = ironclad_db::cron::update_next_run_at(&db, &job.id, next.as_deref()) {
126 tracing::warn!(job_id = %job.id, error = %e, "failed to update next_run_at");
127 }
128 if let Err(e) = ironclad_db::cron::release_lease(&db, &job.id, &instance_id) {
129 tracing::warn!(job_id = %job.id, error = %e, "failed to release cron lease");
130 }
131 }
132 }
133}
134
135fn execute_cron_job(
137 db: &ironclad_db::Database,
138 job: &ironclad_db::cron::CronJob,
139) -> (&'static str, Option<String>) {
140 let payload: serde_json::Value = match serde_json::from_str(&job.payload_json) {
141 Ok(v) => v,
142 Err(e) => {
143 tracing::warn!(job = %job.name, error = %e, "invalid job payload JSON");
144 return ("error", Some(format!("invalid payload: {e}")));
145 }
146 };
147
148 let action = payload
149 .get("action")
150 .and_then(|v| v.as_str())
151 .unwrap_or("unknown");
152
153 match action {
154 "log" => {
155 let message = payload
156 .get("message")
157 .and_then(|v| v.as_str())
158 .unwrap_or("cron heartbeat");
159 tracing::info!(job = %job.name, message, "cron job executed");
160 ("success", None)
161 }
162 "metric_snapshot" => {
163 let snapshot = serde_json::json!({
164 "job_id": job.id,
165 "job_name": job.name,
166 "schedule_kind": job.schedule_kind,
167 "timestamp": chrono::Utc::now().to_rfc3339(),
168 });
169 match ironclad_db::metrics::record_metric_snapshot(db, &snapshot.to_string()) {
170 Ok(_) => ("success", None),
171 Err(e) => ("error", Some(format!("metric_snapshot failed: {e}"))),
172 }
173 }
174 "expire_sessions" => {
175 let ttl_seconds = payload
176 .get("ttl_seconds")
177 .and_then(|v| v.as_u64())
178 .unwrap_or(86_400);
179 match ironclad_db::sessions::expire_stale_sessions(db, ttl_seconds) {
180 Ok(expired) => {
181 tracing::info!(job = %job.name, expired, ttl_seconds, "expired stale sessions");
182 ("success", None)
183 }
184 Err(e) => ("error", Some(format!("expire_sessions failed: {e}"))),
185 }
186 }
187 "record_transaction" => {
188 let tx_type = payload
189 .get("tx_type")
190 .and_then(|v| v.as_str())
191 .unwrap_or("cron");
192 let amount = payload
193 .get("amount")
194 .and_then(|v| v.as_f64())
195 .unwrap_or(0.0);
196 let currency = payload
197 .get("currency")
198 .and_then(|v| v.as_str())
199 .unwrap_or("USD");
200 let counterparty = payload.get("counterparty").and_then(|v| v.as_str());
201 let tx_hash = payload.get("tx_hash").and_then(|v| v.as_str());
202 match ironclad_db::metrics::record_transaction(
203 db,
204 tx_type,
205 amount,
206 currency,
207 counterparty,
208 tx_hash,
209 ) {
210 Ok(_) => ("success", None),
211 Err(e) => ("error", Some(format!("record_transaction failed: {e}"))),
212 }
213 }
214 "noop" => {
215 tracing::debug!(job = %job.name, "noop cron job");
216 ("success", None)
217 }
218 other => {
219 tracing::warn!(job = %job.name, action = other, "unknown cron action");
220 ("error", Some(format!("unknown action: {other}")))
221 }
222 }
223}
224
225fn normalize_schedule_kind(kind: &str) -> &str {
226 match kind {
227 "interval" => "every",
228 "every" | "cron" => kind,
229 _ => kind,
230 }
231}
232
233fn parse_interval_expr_to_ms(expr: &str) -> Option<i64> {
234 if expr.is_empty() {
235 return None;
236 }
237 let (unit_byte_offset, unit) = expr.char_indices().last()?;
238 let qty = expr[..unit_byte_offset].parse::<i64>().ok()?;
239 let ms = match unit {
240 's' | 'S' => qty.saturating_mul(1_000),
241 'm' | 'M' => qty.saturating_mul(60_000),
242 'h' | 'H' => qty.saturating_mul(3_600_000),
243 _ => return None,
244 };
245 if ms > 0 { Some(ms) } else { None }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use ironclad_db::Database;
252
253 fn test_db() -> Database {
254 Database::new(":memory:").expect("in-memory db")
255 }
256
257 fn job_with_payload(
258 db: &Database,
259 name: &str,
260 payload_json: &str,
261 ) -> ironclad_db::cron::CronJob {
262 let job_id = ironclad_db::cron::create_job(
263 db,
264 name,
265 "test-agent",
266 "every",
267 Some("5m"),
268 payload_json,
269 )
270 .expect("create job");
271 ironclad_db::cron::get_job(db, &job_id)
272 .expect("get job")
273 .expect("job exists")
274 }
275
276 #[test]
277 fn normalize_schedule_kind_maps_interval_to_every() {
278 assert_eq!(normalize_schedule_kind("interval"), "every");
279 assert_eq!(normalize_schedule_kind("every"), "every");
280 assert_eq!(normalize_schedule_kind("cron"), "cron");
281 assert_eq!(normalize_schedule_kind("custom"), "custom");
282 }
283
284 #[test]
285 fn parse_interval_expr_to_ms_parses_supported_units() {
286 assert_eq!(parse_interval_expr_to_ms("5s"), Some(5_000));
287 assert_eq!(parse_interval_expr_to_ms("2m"), Some(120_000));
288 assert_eq!(parse_interval_expr_to_ms("3h"), Some(10_800_000));
289 assert_eq!(parse_interval_expr_to_ms("7S"), Some(7_000));
290 assert_eq!(parse_interval_expr_to_ms("1M"), Some(60_000));
291 }
292
293 #[test]
294 fn parse_interval_expr_to_ms_rejects_invalid_values() {
295 assert_eq!(parse_interval_expr_to_ms(""), None);
296 assert_eq!(parse_interval_expr_to_ms("10"), None);
297 assert_eq!(parse_interval_expr_to_ms("xs"), None);
298 assert_eq!(parse_interval_expr_to_ms("0s"), None);
299 assert_eq!(parse_interval_expr_to_ms("-5m"), None);
300 }
301
302 #[test]
303 fn execute_cron_job_rejects_invalid_payload_json() {
304 let db = test_db();
305 let job = job_with_payload(&db, "bad-json", "{not-json}");
306 let (status, error) = execute_cron_job(&db, &job);
307 assert_eq!(status, "error");
308 assert!(error.unwrap_or_default().contains("invalid payload"));
309 }
310
311 #[test]
312 fn execute_cron_job_handles_log_and_noop_actions() {
313 let db = test_db();
314 let log_job = job_with_payload(&db, "log-job", r#"{"action":"log","message":"hello"}"#);
315 let (status, error) = execute_cron_job(&db, &log_job);
316 assert_eq!(status, "success");
317 assert!(error.is_none());
318
319 let noop_job = job_with_payload(&db, "noop-job", r#"{"action":"noop"}"#);
320 let (status, error) = execute_cron_job(&db, &noop_job);
321 assert_eq!(status, "success");
322 assert!(error.is_none());
323 }
324
325 #[test]
326 fn execute_cron_job_records_metric_snapshot() {
327 let db = test_db();
328 let job = job_with_payload(&db, "metrics-job", r#"{"action":"metric_snapshot"}"#);
329 let (status, error) = execute_cron_job(&db, &job);
330 assert_eq!(status, "success");
331 assert!(error.is_none());
332
333 let count: i64 = db
334 .conn()
335 .query_row("SELECT COUNT(*) FROM metric_snapshots", [], |row| {
336 row.get(0)
337 })
338 .expect("count snapshots");
339 assert_eq!(count, 1);
340 }
341
342 #[test]
343 fn execute_cron_job_expires_stale_sessions() {
344 let db = test_db();
345 let session_id = ironclad_db::sessions::find_or_create(&db, "expire-agent", None)
346 .expect("create session");
347 db.conn()
348 .execute(
349 "UPDATE sessions SET updated_at = datetime('now', '-2 days') WHERE id = ?1",
350 [&session_id],
351 )
352 .expect("age session");
353
354 let job = job_with_payload(
355 &db,
356 "expire-job",
357 r#"{"action":"expire_sessions","ttl_seconds":60}"#,
358 );
359 let (status, error) = execute_cron_job(&db, &job);
360 assert_eq!(status, "success");
361 assert!(error.is_none());
362
363 let status: String = db
364 .conn()
365 .query_row(
366 "SELECT status FROM sessions WHERE id = ?1",
367 [&session_id],
368 |row| row.get(0),
369 )
370 .expect("session status");
371 assert_eq!(status, "expired");
372 }
373
374 #[test]
375 fn execute_cron_job_records_transaction() {
376 let db = test_db();
377 let job = job_with_payload(
378 &db,
379 "tx-job",
380 r#"{"action":"record_transaction","tx_type":"ops","amount":1.25,"currency":"USD","counterparty":"scheduler"}"#,
381 );
382 let (status, error) = execute_cron_job(&db, &job);
383 assert_eq!(status, "success");
384 assert!(error.is_none());
385
386 let txs = ironclad_db::metrics::query_transactions(&db, 24).expect("query txs");
387 assert_eq!(txs.len(), 1);
388 assert_eq!(txs[0].tx_type, "ops");
389 assert_eq!(txs[0].currency, "USD");
390 }
391
392 #[test]
393 fn execute_cron_job_rejects_unknown_action() {
394 let db = test_db();
395 let job = job_with_payload(&db, "unknown-job", r#"{"action":"mystery"}"#);
396 let (status, error) = execute_cron_job(&db, &job);
397 assert_eq!(status, "error");
398 assert!(error.unwrap_or_default().contains("unknown action"));
399 }
400
401 #[test]
402 fn execute_cron_job_rejects_legacy_agent_turn_kind() {
403 let db = test_db();
404 let job = job_with_payload(
405 &db,
406 "legacy-agent-turn",
407 r#"{"kind":"agentTurn","message":"Do work"}"#,
408 );
409 let (status, error) = execute_cron_job(&db, &job);
410 assert_eq!(status, "error");
411 assert_eq!(error.as_deref(), Some("unknown action: unknown"));
412 }
413
414 #[test]
415 fn execute_cron_job_rejects_legacy_metric_snapshot_kind() {
416 let db = test_db();
417 let job = job_with_payload(&db, "legacy-metrics", r#"{"kind":"metricSnapshot"}"#);
418 let (status, error) = execute_cron_job(&db, &job);
419 assert_eq!(status, "error");
420 assert_eq!(error.as_deref(), Some("unknown action: unknown"));
421 }
422
423 #[test]
425 fn normalize_schedule_kind_pass_through_unknown_kinds() {
426 assert_eq!(normalize_schedule_kind(""), "");
427 assert_eq!(normalize_schedule_kind("once"), "once");
428 assert_eq!(normalize_schedule_kind("at"), "at");
429 assert_eq!(normalize_schedule_kind("weekly"), "weekly");
430 }
431
432 #[test]
434 fn parse_interval_expr_to_ms_uppercase_h() {
435 assert_eq!(parse_interval_expr_to_ms("1H"), Some(3_600_000));
436 assert_eq!(parse_interval_expr_to_ms("2H"), Some(7_200_000));
437 }
438
439 #[test]
440 fn parse_interval_expr_to_ms_single_char() {
441 assert_eq!(parse_interval_expr_to_ms("s"), None);
443 assert_eq!(parse_interval_expr_to_ms("m"), None);
444 assert_eq!(parse_interval_expr_to_ms("h"), None);
445 }
446
447 #[test]
448 fn parse_interval_expr_to_ms_large_values() {
449 assert_eq!(parse_interval_expr_to_ms("24h"), Some(86_400_000));
451 assert_eq!(parse_interval_expr_to_ms("1000s"), Some(1_000_000));
453 }
454
455 #[test]
456 fn parse_interval_expr_to_ms_unknown_unit() {
457 assert_eq!(parse_interval_expr_to_ms("5d"), None); assert_eq!(parse_interval_expr_to_ms("3w"), None); }
460
461 #[test]
463 fn execute_cron_job_log_action_with_default_message() {
464 let db = test_db();
465 let job = job_with_payload(&db, "log-default", r#"{"action":"log"}"#);
467 let (status, error) = execute_cron_job(&db, &job);
468 assert_eq!(status, "success");
469 assert!(error.is_none());
470 }
471
472 #[test]
474 fn execute_cron_job_expire_sessions_uses_default_ttl() {
475 let db = test_db();
476 let job = job_with_payload(&db, "expire-default", r#"{"action":"expire_sessions"}"#);
478 let (status, error) = execute_cron_job(&db, &job);
479 assert_eq!(status, "success");
480 assert!(error.is_none());
481 }
482
483 #[test]
485 fn execute_cron_job_record_transaction_uses_defaults() {
486 let db = test_db();
487 let job = job_with_payload(&db, "tx-minimal", r#"{"action":"record_transaction"}"#);
489 let (status, error) = execute_cron_job(&db, &job);
490 assert_eq!(status, "success");
491 assert!(error.is_none());
492
493 let txs = ironclad_db::metrics::query_transactions(&db, 24).expect("query txs");
494 assert_eq!(txs.len(), 1);
495 assert_eq!(txs[0].tx_type, "cron"); assert_eq!(txs[0].currency, "USD"); }
498
499 #[test]
501 fn execute_cron_job_record_transaction_with_tx_hash() {
502 let db = test_db();
503 let job = job_with_payload(
504 &db,
505 "tx-with-hash",
506 r#"{"action":"record_transaction","tx_type":"payment","amount":42.0,"currency":"ETH","counterparty":"alice","tx_hash":"0xabc"}"#,
507 );
508 let (status, error) = execute_cron_job(&db, &job);
509 assert_eq!(status, "success");
510 assert!(error.is_none());
511
512 let txs = ironclad_db::metrics::query_transactions(&db, 24).expect("query txs");
513 assert_eq!(txs.len(), 1);
514 assert_eq!(txs[0].tx_type, "payment");
515 assert_eq!(txs[0].currency, "ETH");
516 }
517
518 #[test]
520 fn execute_cron_job_expire_sessions_action() {
521 let db = test_db();
522 let job = job_with_payload(
523 &db,
524 "expire-action",
525 r#"{"action":"expire_sessions","ttl_seconds":3600}"#,
526 );
527 let (status, error) = execute_cron_job(&db, &job);
528 assert_eq!(status, "success");
529 assert!(error.is_none());
530 }
531
532 #[test]
533 fn execute_cron_job_record_transaction_action() {
534 let db = test_db();
535 let job = job_with_payload(
536 &db,
537 "tx-action",
538 r#"{"action":"record_transaction","amount":5.0}"#,
539 );
540 let (status, error) = execute_cron_job(&db, &job);
541 assert_eq!(status, "success");
542 assert!(error.is_none());
543 }
544
545 #[test]
546 fn execute_cron_job_log_action() {
547 let db = test_db();
548 let job = job_with_payload(&db, "log-action", r#"{"action":"log","message":"test"}"#);
549 let (status, error) = execute_cron_job(&db, &job);
550 assert_eq!(status, "success");
551 assert!(error.is_none());
552 }
553
554 #[test]
555 fn execute_cron_job_noop_action() {
556 let db = test_db();
557 let job = job_with_payload(&db, "noop-action", r#"{"action":"noop"}"#);
558 let (status, error) = execute_cron_job(&db, &job);
559 assert_eq!(status, "success");
560 assert!(error.is_none());
561 }
562
563 #[test]
565 fn execute_cron_job_unknown_action() {
566 let db = test_db();
567 let job = job_with_payload(&db, "legacy-unknown", r#"{"kind":"foobar"}"#);
568 let (status, error) = execute_cron_job(&db, &job);
569 assert_eq!(status, "error");
570 assert!(error.unwrap_or_default().contains("unknown action"));
571 }
572
573 #[test]
575 fn execute_cron_job_no_action_or_kind_is_unknown() {
576 let db = test_db();
577 let job = job_with_payload(&db, "empty-payload", r#"{"data":"value"}"#);
578 let (status, error) = execute_cron_job(&db, &job);
579 assert_eq!(status, "error");
580 assert!(error.unwrap_or_default().contains("unknown action"));
581 }
582
583 #[test]
585 fn execute_cron_job_empty_object_payload() {
586 let db = test_db();
587 let job = job_with_payload(&db, "empty-obj", r#"{}"#);
588 let (status, error) = execute_cron_job(&db, &job);
589 assert_eq!(status, "error");
590 assert!(error.unwrap_or_default().contains("unknown action"));
591 }
592
593 #[test]
595 fn execute_cron_job_action_takes_precedence_over_kind() {
596 let db = test_db();
597 let job = job_with_payload(&db, "precedence", r#"{"action":"noop","kind":"agentTurn"}"#);
599 let (status, error) = execute_cron_job(&db, &job);
600 assert_eq!(status, "success");
601 assert!(error.is_none());
602 }
603
604 fn create_due_job(db: &Database, name: &str, payload: &str) -> String {
609 let job_id =
610 ironclad_db::cron::create_job(db, name, "test-agent", "every", Some("1s"), payload)
611 .expect("create job");
612 db.conn()
614 .execute(
615 "UPDATE cron_jobs SET schedule_every_ms = 1 WHERE id = ?1",
616 [&job_id],
617 )
618 .expect("update schedule_every_ms");
619 job_id
620 }
621
622 #[tokio::test(start_paused = true)]
623 async fn run_cron_worker_executes_due_log_job() {
624 let db = test_db();
625 let _job_id = create_due_job(&db, "worker-log", r#"{"action":"log","message":"test"}"#);
626
627 let db_clone = db.clone();
628 let handle = tokio::spawn(async move {
629 run_cron_worker(db_clone, "test-instance".into()).await;
630 });
631
632 tokio::time::advance(std::time::Duration::from_secs(61)).await;
634 tokio::task::yield_now().await;
636 tokio::time::advance(std::time::Duration::from_millis(10)).await;
637 tokio::task::yield_now().await;
638
639 handle.abort();
640 let _ = handle.await;
641
642 let count: i64 = db
644 .conn()
645 .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
646 .expect("count runs");
647 assert!(count >= 1, "expected at least one cron run, got {count}");
648 }
649
650 #[tokio::test(start_paused = true)]
651 async fn run_cron_worker_executes_noop_job() {
652 let db = test_db();
653 let _job_id = create_due_job(&db, "worker-noop", r#"{"action":"noop"}"#);
654
655 let db_clone = db.clone();
656 let handle = tokio::spawn(async move {
657 run_cron_worker(db_clone, "noop-instance".into()).await;
658 });
659
660 tokio::time::advance(std::time::Duration::from_secs(61)).await;
661 tokio::task::yield_now().await;
662 tokio::time::advance(std::time::Duration::from_millis(10)).await;
663 tokio::task::yield_now().await;
664
665 handle.abort();
666 let _ = handle.await;
667
668 let count: i64 = db
669 .conn()
670 .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
671 .expect("count runs");
672 assert!(count >= 1, "expected at least one cron run");
673 }
674
675 #[tokio::test(start_paused = true)]
676 async fn run_cron_worker_executes_metric_snapshot_job() {
677 let db = test_db();
678 let _job_id = create_due_job(&db, "worker-metric", r#"{"action":"metric_snapshot"}"#);
679
680 let db_clone = db.clone();
681 let handle = tokio::spawn(async move {
682 run_cron_worker(db_clone, "metric-instance".into()).await;
683 });
684
685 tokio::time::advance(std::time::Duration::from_secs(61)).await;
686 tokio::task::yield_now().await;
687 tokio::time::advance(std::time::Duration::from_millis(10)).await;
688 tokio::task::yield_now().await;
689
690 handle.abort();
691 let _ = handle.await;
692
693 let snap_count: i64 = db
694 .conn()
695 .query_row("SELECT COUNT(*) FROM metric_snapshots", [], |row| {
696 row.get(0)
697 })
698 .expect("count snapshots");
699 assert!(snap_count >= 1, "expected at least one metric snapshot");
700 }
701
702 #[tokio::test(start_paused = true)]
703 async fn run_cron_worker_executes_expire_sessions_job() {
704 let db = test_db();
705 let session_id = ironclad_db::sessions::find_or_create(&db, "cron-expire-agent", None)
707 .expect("create session");
708 db.conn()
709 .execute(
710 "UPDATE sessions SET updated_at = datetime('now', '-2 days') WHERE id = ?1",
711 [&session_id],
712 )
713 .expect("age session");
714
715 let _job_id = create_due_job(
716 &db,
717 "worker-expire",
718 r#"{"action":"expire_sessions","ttl_seconds":60}"#,
719 );
720
721 let db_clone = db.clone();
722 let handle = tokio::spawn(async move {
723 run_cron_worker(db_clone, "expire-instance".into()).await;
724 });
725
726 tokio::time::advance(std::time::Duration::from_secs(61)).await;
727 tokio::task::yield_now().await;
728 tokio::time::advance(std::time::Duration::from_millis(10)).await;
729 tokio::task::yield_now().await;
730
731 handle.abort();
732 let _ = handle.await;
733
734 let status: String = db
735 .conn()
736 .query_row(
737 "SELECT status FROM sessions WHERE id = ?1",
738 [&session_id],
739 |row| row.get(0),
740 )
741 .expect("session status");
742 assert_eq!(status, "expired");
743 }
744
745 #[tokio::test(start_paused = true)]
746 async fn run_cron_worker_executes_record_transaction_job() {
747 let db = test_db();
748 let _job_id = create_due_job(
749 &db,
750 "worker-tx",
751 r#"{"action":"record_transaction","tx_type":"cron_test","amount":99.0,"currency":"USDC"}"#,
752 );
753
754 let db_clone = db.clone();
755 let handle = tokio::spawn(async move {
756 run_cron_worker(db_clone, "tx-instance".into()).await;
757 });
758
759 tokio::time::advance(std::time::Duration::from_secs(61)).await;
760 tokio::task::yield_now().await;
761 tokio::time::advance(std::time::Duration::from_millis(10)).await;
762 tokio::task::yield_now().await;
763
764 handle.abort();
765 let _ = handle.await;
766
767 let txs = ironclad_db::metrics::query_transactions(&db, 24).expect("query txs");
768 assert!(!txs.is_empty(), "expected at least one transaction");
769 }
770
771 #[tokio::test(start_paused = true)]
772 async fn run_cron_worker_skips_disabled_job() {
773 let db = test_db();
774 let job_id = create_due_job(
775 &db,
776 "worker-disabled",
777 r#"{"action":"log","message":"skip"}"#,
778 );
779 db.conn()
781 .execute("UPDATE cron_jobs SET enabled = 0 WHERE id = ?1", [&job_id])
782 .expect("disable job");
783
784 let db_clone = db.clone();
785 let handle = tokio::spawn(async move {
786 run_cron_worker(db_clone, "disabled-instance".into()).await;
787 });
788
789 tokio::time::advance(std::time::Duration::from_secs(61)).await;
790 tokio::task::yield_now().await;
791 tokio::time::advance(std::time::Duration::from_millis(10)).await;
792 tokio::task::yield_now().await;
793
794 handle.abort();
795 let _ = handle.await;
796
797 let count: i64 = db
799 .conn()
800 .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
801 .expect("count runs");
802 assert_eq!(count, 0, "disabled job should not be executed");
803 }
804
805 #[tokio::test(start_paused = true)]
806 async fn run_cron_worker_handles_unknown_action_job() {
807 let db = test_db();
808 let _job_id = create_due_job(&db, "worker-unknown", r#"{"action":"nonexistent"}"#);
809
810 let db_clone = db.clone();
811 let handle = tokio::spawn(async move {
812 run_cron_worker(db_clone, "unknown-instance".into()).await;
813 });
814
815 tokio::time::advance(std::time::Duration::from_secs(61)).await;
816 tokio::task::yield_now().await;
817 tokio::time::advance(std::time::Duration::from_millis(10)).await;
818 tokio::task::yield_now().await;
819
820 handle.abort();
821 let _ = handle.await;
822
823 let error_count: i64 = db
825 .conn()
826 .query_row(
827 "SELECT COUNT(*) FROM cron_runs WHERE status = 'error'",
828 [],
829 |row| row.get(0),
830 )
831 .expect("count error runs");
832 assert!(error_count >= 1, "expected at least one error run");
833 }
834
835 #[tokio::test(start_paused = true)]
836 async fn run_cron_worker_handles_invalid_json_job() {
837 let db = test_db();
838 let _job_id = create_due_job(&db, "worker-badjson", "{not-valid-json}");
839
840 let db_clone = db.clone();
841 let handle = tokio::spawn(async move {
842 run_cron_worker(db_clone, "badjson-instance".into()).await;
843 });
844
845 tokio::time::advance(std::time::Duration::from_secs(61)).await;
846 tokio::task::yield_now().await;
847 tokio::time::advance(std::time::Duration::from_millis(10)).await;
848 tokio::task::yield_now().await;
849
850 handle.abort();
851 let _ = handle.await;
852
853 let error_count: i64 = db
854 .conn()
855 .query_row(
856 "SELECT COUNT(*) FROM cron_runs WHERE status = 'error'",
857 [],
858 |row| row.get(0),
859 )
860 .expect("count error runs");
861 assert!(error_count >= 1, "expected error run for invalid JSON");
862 }
863
864 #[tokio::test(start_paused = true)]
865 async fn run_cron_worker_handles_legacy_agent_turn_job() {
866 let db = test_db();
867 let _job_id = create_due_job(
868 &db,
869 "worker-legacy-turn",
870 r#"{"kind":"agentTurn","message":"hello"}"#,
871 );
872
873 let db_clone = db.clone();
874 let handle = tokio::spawn(async move {
875 run_cron_worker(db_clone, "legacy-instance".into()).await;
876 });
877
878 tokio::time::advance(std::time::Duration::from_secs(61)).await;
879 tokio::task::yield_now().await;
880 tokio::time::advance(std::time::Duration::from_millis(10)).await;
881 tokio::task::yield_now().await;
882
883 handle.abort();
884 let _ = handle.await;
885
886 let error_count: i64 = db
889 .conn()
890 .query_row(
891 "SELECT COUNT(*) FROM cron_runs WHERE status = 'error'",
892 [],
893 |row| row.get(0),
894 )
895 .expect("count error runs");
896 assert!(
897 error_count >= 1,
898 "expected error run for unmapped legacy agent turn kind"
899 );
900 }
901
902 #[tokio::test(start_paused = true)]
903 async fn run_cron_worker_with_no_jobs_does_not_crash() {
904 let db = test_db();
905
906 let db_clone = db.clone();
907 let handle = tokio::spawn(async move {
908 run_cron_worker(db_clone, "empty-instance".into()).await;
909 });
910
911 tokio::time::advance(std::time::Duration::from_secs(61)).await;
913 tokio::task::yield_now().await;
914 tokio::time::advance(std::time::Duration::from_millis(10)).await;
915 tokio::task::yield_now().await;
916
917 handle.abort();
918 let _ = handle.await;
919
920 let count: i64 = db
922 .conn()
923 .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
924 .expect("count runs");
925 assert_eq!(count, 0);
926 }
927
928 #[tokio::test(start_paused = true)]
929 async fn run_cron_worker_cron_schedule_kind_job() {
930 let db = test_db();
931 let job_id = ironclad_db::cron::create_job(
933 &db,
934 "worker-cron-kind",
935 "test-agent",
936 "cron",
937 Some("* * * * *"),
938 r#"{"action":"log","message":"cron tick"}"#,
939 )
940 .expect("create cron job");
941
942 let db_clone = db.clone();
946 let handle = tokio::spawn(async move {
947 run_cron_worker(db_clone, "cron-kind-instance".into()).await;
948 });
949
950 tokio::time::advance(std::time::Duration::from_secs(61)).await;
951 tokio::task::yield_now().await;
952 tokio::time::advance(std::time::Duration::from_millis(100)).await;
953 tokio::task::yield_now().await;
954
955 handle.abort();
956 let _ = handle.await;
957
958 let count: i64 = db
963 .conn()
964 .query_row(
965 &format!("SELECT COUNT(*) FROM cron_runs WHERE job_id = '{}'", job_id),
966 [],
967 |row| row.get(0),
968 )
969 .expect("count cron runs");
970 let _ = count;
974 }
975
976 #[tokio::test(start_paused = true)]
977 async fn run_cron_worker_unknown_schedule_kind_not_due() {
978 let db = test_db();
979 let job_id = ironclad_db::cron::create_job(
981 &db,
982 "worker-unknown-kind",
983 "test-agent",
984 "weekly",
985 Some("*"),
986 r#"{"action":"log","message":"weekly"}"#,
987 )
988 .expect("create job");
989
990 let db_clone = db.clone();
991 let handle = tokio::spawn(async move {
992 run_cron_worker(db_clone, "unknown-kind-instance".into()).await;
993 });
994
995 tokio::time::advance(std::time::Duration::from_secs(61)).await;
996 tokio::task::yield_now().await;
997 tokio::time::advance(std::time::Duration::from_millis(10)).await;
998 tokio::task::yield_now().await;
999
1000 handle.abort();
1001 let _ = handle.await;
1002
1003 let count: i64 = db
1005 .conn()
1006 .query_row(
1007 &format!("SELECT COUNT(*) FROM cron_runs WHERE job_id = '{}'", job_id),
1008 [],
1009 |row| row.get(0),
1010 )
1011 .expect("count runs");
1012 assert_eq!(count, 0, "unknown schedule kind should not be executed");
1013 }
1014
1015 #[tokio::test(start_paused = true)]
1016 async fn run_cron_worker_interval_kind_job() {
1017 let db = test_db();
1018 let job_id = ironclad_db::cron::create_job(
1020 &db,
1021 "worker-interval-kind",
1022 "test-agent",
1023 "interval",
1024 Some("1s"),
1025 r#"{"action":"noop"}"#,
1026 )
1027 .expect("create job");
1028 db.conn()
1030 .execute(
1031 "UPDATE cron_jobs SET schedule_every_ms = 1 WHERE id = ?1",
1032 [&job_id],
1033 )
1034 .expect("update ms");
1035
1036 let db_clone = db.clone();
1037 let handle = tokio::spawn(async move {
1038 run_cron_worker(db_clone, "interval-instance".into()).await;
1039 });
1040
1041 tokio::time::advance(std::time::Duration::from_secs(61)).await;
1042 tokio::task::yield_now().await;
1043 tokio::time::advance(std::time::Duration::from_millis(10)).await;
1044 tokio::task::yield_now().await;
1045
1046 handle.abort();
1047 let _ = handle.await;
1048
1049 let count: i64 = db
1050 .conn()
1051 .query_row(
1052 &format!("SELECT COUNT(*) FROM cron_runs WHERE job_id = '{}'", job_id),
1053 [],
1054 |row| row.get(0),
1055 )
1056 .expect("count runs");
1057 assert!(count >= 1, "interval job should have been executed");
1058 }
1059
1060 #[tokio::test(start_paused = true)]
1061 async fn run_cron_worker_every_kind_with_expr_fallback() {
1062 let db = test_db();
1063 let job_id = ironclad_db::cron::create_job(
1066 &db,
1067 "worker-every-expr",
1068 "test-agent",
1069 "every",
1070 Some("1s"),
1071 r#"{"action":"noop"}"#,
1072 )
1073 .expect("create job");
1074 db.conn()
1076 .execute(
1077 "UPDATE cron_jobs SET schedule_every_ms = NULL WHERE id = ?1",
1078 [&job_id],
1079 )
1080 .expect("clear ms");
1081
1082 let db_clone = db.clone();
1083 let handle = tokio::spawn(async move {
1084 run_cron_worker(db_clone, "every-expr-instance".into()).await;
1085 });
1086
1087 tokio::time::advance(std::time::Duration::from_secs(61)).await;
1088 tokio::task::yield_now().await;
1089 tokio::time::advance(std::time::Duration::from_millis(10)).await;
1090 tokio::task::yield_now().await;
1091
1092 handle.abort();
1093 let _ = handle.await;
1094
1095 let count: i64 = db
1097 .conn()
1098 .query_row(
1099 &format!("SELECT COUNT(*) FROM cron_runs WHERE job_id = '{}'", job_id),
1100 [],
1101 |row| row.get(0),
1102 )
1103 .expect("count runs");
1104 assert!(
1105 count >= 1,
1106 "every-kind with expr fallback should have been executed"
1107 );
1108 }
1109
1110 #[tokio::test(start_paused = true)]
1111 async fn run_cron_worker_multiple_jobs_in_single_tick() {
1112 let db = test_db();
1113 let _id1 = create_due_job(&db, "multi-1", r#"{"action":"log","message":"first"}"#);
1114 let _id2 = create_due_job(&db, "multi-2", r#"{"action":"noop"}"#);
1115 let _id3 = create_due_job(&db, "multi-3", r#"{"action":"log","message":"third"}"#);
1116
1117 let db_clone = db.clone();
1118 let handle = tokio::spawn(async move {
1119 run_cron_worker(db_clone, "multi-instance".into()).await;
1120 });
1121
1122 tokio::time::advance(std::time::Duration::from_secs(61)).await;
1123 tokio::task::yield_now().await;
1124 tokio::time::advance(std::time::Duration::from_millis(10)).await;
1125 tokio::task::yield_now().await;
1126
1127 handle.abort();
1128 let _ = handle.await;
1129
1130 let count: i64 = db
1131 .conn()
1132 .query_row("SELECT COUNT(*) FROM cron_runs", [], |row| row.get(0))
1133 .expect("count runs");
1134 assert!(count >= 3, "expected at least 3 cron runs, got {count}");
1135 }
1136}