1use zeph_db::DbPool;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use crate::error::SchedulerError;
9
10#[derive(Debug, Clone)]
33pub struct ScheduledTaskRecord {
34 pub name: String,
36 pub kind: String,
38 pub task_mode: String,
40 pub next_run: String,
45}
46
47#[derive(Debug, Clone)]
52pub struct ScheduledTaskInfo {
53 pub name: String,
55 pub kind: String,
57 pub task_mode: String,
59 pub cron_expr: String,
61 pub next_run: String,
63 pub task_data: String,
65}
66
67#[derive(Debug)]
92pub struct JobStore {
93 pool: DbPool,
94}
95
96impl JobStore {
97 #[must_use]
102 pub fn new(pool: DbPool) -> Self {
103 Self { pool }
104 }
105
106 pub async fn open(path: &str) -> Result<Self, SchedulerError> {
112 let pool = zeph_db::DbConfig {
113 url: path.to_string(),
114 max_connections: 5,
115 pool_size: 5,
116 }
117 .connect()
118 .await?;
119 Ok(Self { pool })
120 }
121
122 pub async fn init(&self) -> Result<(), SchedulerError> {
132 zeph_db::run_migrations(&self.pool).await?;
133 Ok(())
134 }
135
136 pub async fn upsert_job(
142 &self,
143 name: &str,
144 cron_expr: &str,
145 kind: &str,
146 ) -> Result<(), SchedulerError> {
147 self.upsert_job_with_mode(name, cron_expr, kind, "periodic", None, "")
148 .await
149 }
150
151 pub async fn upsert_job_with_mode(
157 &self,
158 name: &str,
159 cron_expr: &str,
160 kind: &str,
161 task_mode: &str,
162 run_at: Option<&str>,
163 task_data: &str,
164 ) -> Result<(), SchedulerError> {
165 zeph_db::query(sql!(
166 "INSERT INTO scheduled_jobs (name, cron_expr, kind, task_mode, run_at, task_data)
167 VALUES (?, ?, ?, ?, ?, ?)
168 ON CONFLICT(name) DO UPDATE SET
169 cron_expr = excluded.cron_expr,
170 kind = excluded.kind,
171 task_mode = excluded.task_mode,
172 run_at = excluded.run_at,
173 task_data = excluded.task_data"
174 ))
175 .bind(name)
176 .bind(cron_expr)
177 .bind(kind)
178 .bind(task_mode)
179 .bind(run_at)
180 .bind(task_data)
181 .execute(&self.pool)
182 .await?;
183 Ok(())
184 }
185
186 pub async fn insert_job(
193 &self,
194 name: &str,
195 cron_expr: &str,
196 kind: &str,
197 task_mode: &str,
198 run_at: Option<&str>,
199 task_data: &str,
200 ) -> Result<(), SchedulerError> {
201 let result = zeph_db::query(sql!(
202 "INSERT INTO scheduled_jobs (name, cron_expr, kind, task_mode, run_at, task_data)
203 VALUES (?, ?, ?, ?, ?, ?)"
204 ))
205 .bind(name)
206 .bind(cron_expr)
207 .bind(kind)
208 .bind(task_mode)
209 .bind(run_at)
210 .bind(task_data)
211 .execute(&self.pool)
212 .await;
213 match result {
214 Ok(_) => Ok(()),
215 Err(zeph_db::SqlxError::Database(db_err))
216 if db_err.message().contains("UNIQUE constraint failed")
217 || db_err.code().as_deref() == Some("23505") =>
218 {
219 Err(SchedulerError::DuplicateJob(name.to_string()))
220 }
221 Err(e) => Err(SchedulerError::Database(e)),
222 }
223 }
224
225 pub async fn record_run(
231 &self,
232 name: &str,
233 timestamp: &str,
234 next_run: &str,
235 ) -> Result<(), SchedulerError> {
236 zeph_db::query(
237 sql!("UPDATE scheduled_jobs SET last_run = ?, next_run = ?, status = 'completed' WHERE name = ?"),
238 )
239 .bind(timestamp)
240 .bind(next_run)
241 .bind(name)
242 .execute(&self.pool)
243 .await?;
244 Ok(())
245 }
246
247 pub async fn mark_done(&self, name: &str) -> Result<(), SchedulerError> {
253 zeph_db::query(sql!(
254 "UPDATE scheduled_jobs SET status = 'done', last_run = CURRENT_TIMESTAMP WHERE name = ?"
255 ))
256 .bind(name)
257 .execute(&self.pool)
258 .await?;
259 Ok(())
260 }
261
262 pub async fn delete_job(&self, name: &str) -> Result<bool, SchedulerError> {
268 let result = zeph_db::query(sql!("DELETE FROM scheduled_jobs WHERE name = ?"))
269 .bind(name)
270 .execute(&self.pool)
271 .await?;
272 Ok(result.rows_affected() > 0)
273 }
274
275 pub async fn job_exists(&self, name: &str) -> Result<bool, SchedulerError> {
281 let row: Option<(i64,)> =
282 zeph_db::query_as(sql!("SELECT 1 FROM scheduled_jobs WHERE name = ?"))
283 .bind(name)
284 .fetch_optional(&self.pool)
285 .await?;
286 Ok(row.is_some())
287 }
288
289 pub async fn set_next_run(&self, name: &str, next_run: &str) -> Result<(), SchedulerError> {
295 zeph_db::query(sql!(
296 "UPDATE scheduled_jobs SET next_run = ? WHERE name = ?"
297 ))
298 .bind(next_run)
299 .bind(name)
300 .execute(&self.pool)
301 .await?;
302 Ok(())
303 }
304
305 pub async fn get_next_run(&self, name: &str) -> Result<Option<String>, SchedulerError> {
311 let row: Option<(Option<String>,)> =
312 zeph_db::query_as(sql!("SELECT next_run FROM scheduled_jobs WHERE name = ?"))
313 .bind(name)
314 .fetch_optional(&self.pool)
315 .await?;
316 Ok(row.and_then(|r| r.0))
317 }
318
319 pub async fn list_jobs(&self) -> Result<Vec<ScheduledTaskRecord>, SchedulerError> {
329 let rows: Vec<(String, String, String, Option<String>)> = zeph_db::query_as(
330 sql!("SELECT name, kind, task_mode, COALESCE(next_run, run_at) FROM scheduled_jobs WHERE status != 'done' ORDER BY name"),
331 )
332 .fetch_all(&self.pool)
333 .await?;
334 Ok(rows
335 .into_iter()
336 .map(|(name, kind, task_mode, next_run)| ScheduledTaskRecord {
337 name,
338 kind,
339 task_mode,
340 next_run: next_run.unwrap_or_default(),
341 })
342 .collect())
343 }
344
345 pub async fn list_jobs_full(&self) -> Result<Vec<ScheduledTaskInfo>, SchedulerError> {
351 let rows: Vec<(String, String, String, String, Option<String>, String)> =
352 zeph_db::query_as(sql!(
353 "SELECT name, kind, task_mode, cron_expr, COALESCE(next_run, run_at), task_data \
354 FROM scheduled_jobs WHERE status != 'done' ORDER BY name"
355 ))
356 .fetch_all(&self.pool)
357 .await?;
358 Ok(rows
359 .into_iter()
360 .map(
361 |(name, kind, task_mode, cron_expr, next_run, task_data)| ScheduledTaskInfo {
362 name,
363 kind,
364 task_mode,
365 cron_expr,
366 next_run: next_run.unwrap_or_default(),
367 task_data,
368 },
369 )
370 .collect())
371 }
372
373 #[must_use]
377 pub fn pool(&self) -> &DbPool {
378 &self.pool
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385
386 async fn test_pool() -> DbPool {
387 zeph_db::DbConfig {
388 url: ":memory:".to_string(),
389 max_connections: 5,
390 pool_size: 5,
391 }
392 .connect()
393 .await
394 .unwrap()
395 }
396
397 #[tokio::test]
398 async fn init_creates_table() {
399 let pool = test_pool().await;
400 let store = JobStore::new(pool);
401 assert!(store.init().await.is_ok());
402 }
403
404 #[tokio::test]
405 async fn upsert_and_query() {
406 let pool = test_pool().await;
407 let store = JobStore::new(pool);
408 store.init().await.unwrap();
409
410 store
411 .upsert_job("test_job", "0 * * * * *", "health_check")
412 .await
413 .unwrap();
414 assert!(store.get_next_run("test_job").await.unwrap().is_none());
415
416 store
417 .record_run("test_job", "2026-01-01T00:00:00Z", "2026-01-01T00:01:00Z")
418 .await
419 .unwrap();
420 assert_eq!(
421 store.get_next_run("test_job").await.unwrap().as_deref(),
422 Some("2026-01-01T00:01:00Z")
423 );
424 }
425
426 #[tokio::test]
427 async fn upsert_updates_existing() {
428 let pool = test_pool().await;
429 let store = JobStore::new(pool);
430 store.init().await.unwrap();
431
432 store
433 .upsert_job("job1", "0 * * * * *", "health_check")
434 .await
435 .unwrap();
436 store
437 .upsert_job("job1", "0 0 * * * *", "memory_cleanup")
438 .await
439 .unwrap();
440
441 let row: (String,) =
442 zeph_db::query_as(sql!("SELECT kind FROM scheduled_jobs WHERE name = 'job1'"))
443 .fetch_one(store.pool())
444 .await
445 .unwrap();
446 assert_eq!(row.0, "memory_cleanup");
447 }
448
449 #[tokio::test]
450 async fn next_run_nonexistent_job() {
451 let pool = test_pool().await;
452 let store = JobStore::new(pool);
453 store.init().await.unwrap();
454 assert!(store.get_next_run("no_such_job").await.unwrap().is_none());
455 }
456
457 #[tokio::test]
458 async fn job_exists_returns_true_for_existing() {
459 let pool = test_pool().await;
460 let store = JobStore::new(pool);
461 store.init().await.unwrap();
462 store
463 .upsert_job("exists_job", "0 * * * * *", "health_check")
464 .await
465 .unwrap();
466 assert!(store.job_exists("exists_job").await.unwrap());
467 assert!(!store.job_exists("missing").await.unwrap());
468 }
469
470 #[tokio::test]
471 async fn delete_job_removes_row() {
472 let pool = test_pool().await;
473 let store = JobStore::new(pool);
474 store.init().await.unwrap();
475 store
476 .upsert_job("del_job", "0 * * * * *", "health_check")
477 .await
478 .unwrap();
479 assert!(store.delete_job("del_job").await.unwrap());
480 assert!(!store.job_exists("del_job").await.unwrap());
481 assert!(!store.delete_job("del_job").await.unwrap());
482 }
483
484 #[tokio::test]
485 async fn mark_done_sets_status() {
486 let pool = test_pool().await;
487 let store = JobStore::new(pool);
488 store.init().await.unwrap();
489 store
490 .upsert_job_with_mode(
491 "os_job",
492 "",
493 "health_check",
494 "oneshot",
495 Some("2026-01-01T01:00:00Z"),
496 "",
497 )
498 .await
499 .unwrap();
500 store.mark_done("os_job").await.unwrap();
501 let row: (String,) = zeph_db::query_as(sql!(
502 "SELECT status FROM scheduled_jobs WHERE name = 'os_job'"
503 ))
504 .fetch_one(store.pool())
505 .await
506 .unwrap();
507 assert_eq!(row.0, "done");
508 }
509
510 #[tokio::test]
511 async fn list_jobs_excludes_done_jobs() {
512 let pool = test_pool().await;
513 let store = JobStore::new(pool);
514 store.init().await.unwrap();
515 store
516 .upsert_job_with_mode(
517 "done_job",
518 "",
519 "health_check",
520 "oneshot",
521 Some("2026-01-01T01:00:00Z"),
522 "",
523 )
524 .await
525 .unwrap();
526 store.mark_done("done_job").await.unwrap();
527 let jobs = store.list_jobs().await.unwrap();
528 assert!(
529 jobs.iter().all(|j| j.name != "done_job"),
530 "list_jobs must not return done jobs"
531 );
532 }
533
534 #[tokio::test]
535 async fn list_jobs_uses_run_at_for_oneshot_when_next_run_is_null() {
536 let pool = test_pool().await;
537 let store = JobStore::new(pool);
538 store.init().await.unwrap();
539 store
540 .upsert_job_with_mode(
541 "oneshot_job",
542 "",
543 "custom",
544 "oneshot",
545 Some("2026-06-01T10:00:00Z"),
546 "",
547 )
548 .await
549 .unwrap();
550 let jobs = store.list_jobs().await.unwrap();
551 let job = jobs.iter().find(|j| j.name == "oneshot_job").unwrap();
552 assert_eq!(
553 job.next_run, "2026-06-01T10:00:00Z",
554 "run_at must be shown as next_run for oneshot jobs"
555 );
556 }
557
558 #[tokio::test]
559 async fn list_jobs_full_returns_correct_fields() {
560 let pool = test_pool().await;
561 let store = JobStore::new(pool);
562 store.init().await.unwrap();
563 store
564 .upsert_job("periodic_job", "0 0 3 * * *", "memory_cleanup")
565 .await
566 .unwrap();
567 store
568 .upsert_job_with_mode(
569 "oneshot_job",
570 "",
571 "custom",
572 "oneshot",
573 Some("2030-01-01T10:00:00Z"),
574 "",
575 )
576 .await
577 .unwrap();
578
579 let jobs = store.list_jobs_full().await.unwrap();
580 assert_eq!(jobs.len(), 2);
581
582 let periodic = jobs.iter().find(|j| j.name == "periodic_job").unwrap();
583 assert_eq!(periodic.kind, "memory_cleanup");
584 assert_eq!(periodic.task_mode, "periodic");
585 assert_eq!(periodic.cron_expr, "0 0 3 * * *");
586
587 let oneshot = jobs.iter().find(|j| j.name == "oneshot_job").unwrap();
588 assert_eq!(oneshot.kind, "custom");
589 assert_eq!(oneshot.task_mode, "oneshot");
590 assert!(oneshot.cron_expr.is_empty());
591 assert_eq!(oneshot.next_run, "2030-01-01T10:00:00Z");
592 }
593
594 #[tokio::test]
595 async fn list_jobs_full_excludes_done_jobs() {
596 let pool = test_pool().await;
597 let store = JobStore::new(pool);
598 store.init().await.unwrap();
599 store
600 .upsert_job_with_mode(
601 "done_job",
602 "",
603 "custom",
604 "oneshot",
605 Some("2026-01-01T01:00:00Z"),
606 "",
607 )
608 .await
609 .unwrap();
610 store.mark_done("done_job").await.unwrap();
611 let jobs = store.list_jobs_full().await.unwrap();
612 assert!(jobs.iter().all(|j| j.name != "done_job"));
613 }
614
615 #[tokio::test]
616 async fn duplicate_name_detected() {
617 let pool = test_pool().await;
618 let store = JobStore::new(pool);
619 store.init().await.unwrap();
620 store
621 .upsert_job("dup", "0 * * * * *", "health_check")
622 .await
623 .unwrap();
624 assert!(store.job_exists("dup").await.unwrap());
625 }
626
627 #[tokio::test]
628 async fn insert_job_success() {
629 let pool = test_pool().await;
630 let store = JobStore::new(pool);
631 store.init().await.unwrap();
632 store
633 .insert_job(
634 "new_job",
635 "0 * * * * *",
636 "custom",
637 "periodic",
638 None,
639 "run daily report",
640 )
641 .await
642 .unwrap();
643 assert!(store.job_exists("new_job").await.unwrap());
644 }
645
646 #[tokio::test]
647 async fn insert_job_duplicate_returns_error() {
648 let pool = test_pool().await;
649 let store = JobStore::new(pool);
650 store.init().await.unwrap();
651 store
652 .insert_job(
653 "dup_job",
654 "0 * * * * *",
655 "custom",
656 "periodic",
657 None,
658 "first",
659 )
660 .await
661 .unwrap();
662 let result = store
663 .insert_job(
664 "dup_job",
665 "0 0 * * * *",
666 "custom",
667 "periodic",
668 None,
669 "second",
670 )
671 .await;
672 assert!(
673 matches!(result, Err(SchedulerError::DuplicateJob(ref n)) if n == "dup_job"),
674 "expected DuplicateJob, got {result:?}"
675 );
676 }
677
678 #[tokio::test]
679 async fn list_jobs_full_includes_task_data() {
680 let pool = test_pool().await;
681 let store = JobStore::new(pool);
682 store.init().await.unwrap();
683 store
684 .insert_job(
685 "task_job",
686 "0 * * * * *",
687 "custom",
688 "periodic",
689 None,
690 "my prompt",
691 )
692 .await
693 .unwrap();
694 let jobs = store.list_jobs_full().await.unwrap();
695 let job = jobs.iter().find(|j| j.name == "task_job").unwrap();
696 assert_eq!(job.task_data, "my prompt");
697 }
698}