Skip to main content

zeph_scheduler/
store.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_db::DbPool;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use crate::error::SchedulerError;
9
10/// A scheduled task row returned by [`JobStore::list_jobs`].
11///
12/// Replaces the previous `(String, String, String, String)` tuple to eliminate
13/// positional destructuring bugs. Fields map 1-to-1 to the SQL columns in the
14/// same order as the query: `name`, `kind`, `task_mode`, and the coalesced
15/// `next_run`.
16///
17/// # Examples
18///
19/// ```rust,no_run
20/// use zeph_scheduler::JobStore;
21///
22/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23/// let store = JobStore::open("sqlite:scheduler.db").await?;
24/// store.init().await?;
25///
26/// for job in store.list_jobs().await? {
27///     println!("{}: {} ({}) → {}", job.name, job.kind, job.task_mode, job.next_run);
28/// }
29/// # Ok(())
30/// # }
31/// ```
32#[derive(Debug, Clone)]
33pub struct ScheduledTaskRecord {
34    /// Unique task name (primary key in the `scheduled_jobs` table).
35    pub name: String,
36    /// Serialised [`crate::TaskKind`] string (e.g. `"health_check"`).
37    pub kind: String,
38    /// Execution mode: `"periodic"` or `"oneshot"`.
39    pub task_mode: String,
40    /// Next scheduled run time as an ISO 8601 / RFC 3339 string.
41    ///
42    /// Falls back to `run_at` for one-shot jobs that have not yet computed a
43    /// `next_run`. Empty string when neither field is set.
44    pub next_run: String,
45}
46
47/// Full details for a scheduled task, returned by [`JobStore::list_jobs_full`].
48///
49/// Intended for display in the TUI or CLI task list. All string fields are UTF-8
50/// and come directly from the `scheduled_jobs` `SQLite` table.
51#[derive(Debug, Clone)]
52pub struct ScheduledTaskInfo {
53    /// Unique task name (primary key in the `scheduled_jobs` table).
54    pub name: String,
55    /// Serialised [`crate::TaskKind`] string (e.g. `"health_check"`).
56    pub kind: String,
57    /// Execution mode: `"periodic"` or `"oneshot"`.
58    pub task_mode: String,
59    /// Cron expression for periodic tasks, empty string for one-shot tasks.
60    pub cron_expr: String,
61    /// Next scheduled run time as an ISO 8601 / RFC 3339 string, or empty if unknown.
62    pub next_run: String,
63    /// Stored task prompt for custom tasks; empty for config-driven built-in tasks.
64    pub task_data: String,
65}
66
67/// Persistent storage layer for scheduled jobs.
68///
69/// All job definitions and run history are stored in a `SQLite` database managed by
70/// `zeph-db` migrations. The `scheduled_jobs` table schema is defined in migration
71/// `051_scheduler_jobs.sql`.
72///
73/// # Examples
74///
75/// ```rust,no_run
76/// use zeph_scheduler::JobStore;
77///
78/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
79/// // Open from a file path.
80/// let store = JobStore::open("sqlite:scheduler.db").await?;
81/// store.init().await?;
82///
83/// // Query job list.
84/// let jobs = store.list_jobs().await?;
85/// for job in &jobs {
86///     println!("{}: {} ({}) → {}", job.name, job.kind, job.task_mode, job.next_run);
87/// }
88/// # Ok(())
89/// # }
90/// ```
91#[derive(Debug)]
92pub struct JobStore {
93    pool: DbPool,
94}
95
96impl JobStore {
97    /// Create a `JobStore` from an existing [`zeph_db::DbPool`].
98    ///
99    /// You must call [`JobStore::init`] before any other operation to ensure the
100    /// schema migrations have been applied.
101    #[must_use]
102    pub fn new(pool: DbPool) -> Self {
103        Self { pool }
104    }
105
106    /// Open (or create) a `JobStore` from a `SQLite` file path.
107    ///
108    /// # Errors
109    ///
110    /// Returns [`SchedulerError::Db`] if the connection cannot be established.
111    pub async fn open(path: &str) -> Result<Self, SchedulerError> {
112        let pool = zeph_db::DbConfig {
113            url: path.to_string(),
114            max_connections: 5,
115            pool_size: 5,
116        }
117        .connect()
118        .await?;
119        Ok(Self { pool })
120    }
121
122    /// Run all pending migrations on the underlying pool.
123    ///
124    /// Replaces the former inline `CREATE TABLE IF NOT EXISTS` DDL. The
125    /// `scheduled_jobs` schema is now managed by migration
126    /// `051_scheduler_jobs.sql` in `zeph-db`.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if any migration fails.
131    pub async fn init(&self) -> Result<(), SchedulerError> {
132        zeph_db::run_migrations(&self.pool).await?;
133        Ok(())
134    }
135
136    /// Upsert a job definition.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the SQL statement fails.
141    pub async fn upsert_job(
142        &self,
143        name: &str,
144        cron_expr: &str,
145        kind: &str,
146    ) -> Result<(), SchedulerError> {
147        self.upsert_job_with_mode(name, cron_expr, kind, "periodic", None, "")
148            .await
149    }
150
151    /// Upsert a job definition with explicit `task_mode`, optional `run_at`, and `task_data`.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the SQL statement fails.
156    pub async fn upsert_job_with_mode(
157        &self,
158        name: &str,
159        cron_expr: &str,
160        kind: &str,
161        task_mode: &str,
162        run_at: Option<&str>,
163        task_data: &str,
164    ) -> Result<(), SchedulerError> {
165        zeph_db::query(sql!(
166            "INSERT INTO scheduled_jobs (name, cron_expr, kind, task_mode, run_at, task_data)
167             VALUES (?, ?, ?, ?, ?, ?)
168             ON CONFLICT(name) DO UPDATE SET
169               cron_expr = excluded.cron_expr,
170               kind = excluded.kind,
171               task_mode = excluded.task_mode,
172               run_at = excluded.run_at,
173               task_data = excluded.task_data"
174        ))
175        .bind(name)
176        .bind(cron_expr)
177        .bind(kind)
178        .bind(task_mode)
179        .bind(run_at)
180        .bind(task_data)
181        .execute(&self.pool)
182        .await?;
183        Ok(())
184    }
185
186    /// Insert a new job. Returns [`SchedulerError::DuplicateJob`] if a job with this name exists.
187    ///
188    /// # Errors
189    ///
190    /// Returns [`SchedulerError::DuplicateJob`] on unique constraint violation,
191    /// or [`SchedulerError::Database`] on other SQL errors.
192    pub async fn insert_job(
193        &self,
194        name: &str,
195        cron_expr: &str,
196        kind: &str,
197        task_mode: &str,
198        run_at: Option<&str>,
199        task_data: &str,
200    ) -> Result<(), SchedulerError> {
201        let result = zeph_db::query(sql!(
202            "INSERT INTO scheduled_jobs (name, cron_expr, kind, task_mode, run_at, task_data)
203             VALUES (?, ?, ?, ?, ?, ?)"
204        ))
205        .bind(name)
206        .bind(cron_expr)
207        .bind(kind)
208        .bind(task_mode)
209        .bind(run_at)
210        .bind(task_data)
211        .execute(&self.pool)
212        .await;
213        match result {
214            Ok(_) => Ok(()),
215            Err(zeph_db::SqlxError::Database(db_err))
216                if db_err.message().contains("UNIQUE constraint failed")
217                    || db_err.code().as_deref() == Some("23505") =>
218            {
219                Err(SchedulerError::DuplicateJob(name.to_string()))
220            }
221            Err(e) => Err(SchedulerError::Database(e)),
222        }
223    }
224
225    /// Record a job execution and persist the next scheduled run time.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if the SQL statement fails.
230    pub async fn record_run(
231        &self,
232        name: &str,
233        timestamp: &str,
234        next_run: &str,
235    ) -> Result<(), SchedulerError> {
236        zeph_db::query(
237            sql!("UPDATE scheduled_jobs SET last_run = ?, next_run = ?, status = 'completed' WHERE name = ?"),
238        )
239        .bind(timestamp)
240        .bind(next_run)
241        .bind(name)
242        .execute(&self.pool)
243        .await?;
244        Ok(())
245    }
246
247    /// Mark a one-shot job as done.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the SQL statement fails.
252    pub async fn mark_done(&self, name: &str) -> Result<(), SchedulerError> {
253        zeph_db::query(sql!(
254            "UPDATE scheduled_jobs SET status = 'done', last_run = CURRENT_TIMESTAMP WHERE name = ?"
255        ))
256        .bind(name)
257        .execute(&self.pool)
258        .await?;
259        Ok(())
260    }
261
262    /// Delete a job by name.
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if the SQL statement fails.
267    pub async fn delete_job(&self, name: &str) -> Result<bool, SchedulerError> {
268        let result = zeph_db::query(sql!("DELETE FROM scheduled_jobs WHERE name = ?"))
269            .bind(name)
270            .execute(&self.pool)
271            .await?;
272        Ok(result.rows_affected() > 0)
273    }
274
275    /// Check if a job with the given name exists.
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the SQL query fails.
280    pub async fn job_exists(&self, name: &str) -> Result<bool, SchedulerError> {
281        let row: Option<(i64,)> =
282            zeph_db::query_as(sql!("SELECT 1 FROM scheduled_jobs WHERE name = ?"))
283                .bind(name)
284                .fetch_optional(&self.pool)
285                .await?;
286        Ok(row.is_some())
287    }
288
289    /// Persist the next scheduled run time for a job (used during init).
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if the SQL statement fails.
294    pub async fn set_next_run(&self, name: &str, next_run: &str) -> Result<(), SchedulerError> {
295        zeph_db::query(sql!(
296            "UPDATE scheduled_jobs SET next_run = ? WHERE name = ?"
297        ))
298        .bind(next_run)
299        .bind(name)
300        .execute(&self.pool)
301        .await?;
302        Ok(())
303    }
304
305    /// Get the persisted next run timestamp for a job.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the SQL query fails.
310    pub async fn get_next_run(&self, name: &str) -> Result<Option<String>, SchedulerError> {
311        let row: Option<(Option<String>,)> =
312            zeph_db::query_as(sql!("SELECT next_run FROM scheduled_jobs WHERE name = ?"))
313                .bind(name)
314                .fetch_optional(&self.pool)
315                .await?;
316        Ok(row.and_then(|r| r.0))
317    }
318
319    /// List all active (non-done) jobs.
320    ///
321    /// Returns a [`ScheduledTaskRecord`] per active job, ordered by name.
322    /// One-shot jobs without a computed `next_run` fall back to their `run_at` value;
323    /// if neither is set the field is an empty string.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if the SQL query fails.
328    pub async fn list_jobs(&self) -> Result<Vec<ScheduledTaskRecord>, SchedulerError> {
329        let rows: Vec<(String, String, String, Option<String>)> = zeph_db::query_as(
330            sql!("SELECT name, kind, task_mode, COALESCE(next_run, run_at) FROM scheduled_jobs WHERE status != 'done' ORDER BY name"),
331        )
332        .fetch_all(&self.pool)
333        .await?;
334        Ok(rows
335            .into_iter()
336            .map(|(name, kind, task_mode, next_run)| ScheduledTaskRecord {
337                name,
338                kind,
339                task_mode,
340                next_run: next_run.unwrap_or_default(),
341            })
342            .collect())
343    }
344
345    /// List all active (non-done) jobs with full details for display.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if the SQL query fails.
350    pub async fn list_jobs_full(&self) -> Result<Vec<ScheduledTaskInfo>, SchedulerError> {
351        let rows: Vec<(String, String, String, String, Option<String>, String)> =
352            zeph_db::query_as(sql!(
353                "SELECT name, kind, task_mode, cron_expr, COALESCE(next_run, run_at), task_data \
354                 FROM scheduled_jobs WHERE status != 'done' ORDER BY name"
355            ))
356            .fetch_all(&self.pool)
357            .await?;
358        Ok(rows
359            .into_iter()
360            .map(
361                |(name, kind, task_mode, cron_expr, next_run, task_data)| ScheduledTaskInfo {
362                    name,
363                    kind,
364                    task_mode,
365                    cron_expr,
366                    next_run: next_run.unwrap_or_default(),
367                    task_data,
368                },
369            )
370            .collect())
371    }
372
373    /// Returns a reference to the underlying connection pool.
374    ///
375    /// Primarily used in tests that need to execute raw SQL against the same database.
376    #[must_use]
377    pub fn pool(&self) -> &DbPool {
378        &self.pool
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385
386    async fn test_pool() -> DbPool {
387        zeph_db::DbConfig {
388            url: ":memory:".to_string(),
389            max_connections: 5,
390            pool_size: 5,
391        }
392        .connect()
393        .await
394        .unwrap()
395    }
396
397    #[tokio::test]
398    async fn init_creates_table() {
399        let pool = test_pool().await;
400        let store = JobStore::new(pool);
401        assert!(store.init().await.is_ok());
402    }
403
404    #[tokio::test]
405    async fn upsert_and_query() {
406        let pool = test_pool().await;
407        let store = JobStore::new(pool);
408        store.init().await.unwrap();
409
410        store
411            .upsert_job("test_job", "0 * * * * *", "health_check")
412            .await
413            .unwrap();
414        assert!(store.get_next_run("test_job").await.unwrap().is_none());
415
416        store
417            .record_run("test_job", "2026-01-01T00:00:00Z", "2026-01-01T00:01:00Z")
418            .await
419            .unwrap();
420        assert_eq!(
421            store.get_next_run("test_job").await.unwrap().as_deref(),
422            Some("2026-01-01T00:01:00Z")
423        );
424    }
425
426    #[tokio::test]
427    async fn upsert_updates_existing() {
428        let pool = test_pool().await;
429        let store = JobStore::new(pool);
430        store.init().await.unwrap();
431
432        store
433            .upsert_job("job1", "0 * * * * *", "health_check")
434            .await
435            .unwrap();
436        store
437            .upsert_job("job1", "0 0 * * * *", "memory_cleanup")
438            .await
439            .unwrap();
440
441        let row: (String,) =
442            zeph_db::query_as(sql!("SELECT kind FROM scheduled_jobs WHERE name = 'job1'"))
443                .fetch_one(store.pool())
444                .await
445                .unwrap();
446        assert_eq!(row.0, "memory_cleanup");
447    }
448
449    #[tokio::test]
450    async fn next_run_nonexistent_job() {
451        let pool = test_pool().await;
452        let store = JobStore::new(pool);
453        store.init().await.unwrap();
454        assert!(store.get_next_run("no_such_job").await.unwrap().is_none());
455    }
456
457    #[tokio::test]
458    async fn job_exists_returns_true_for_existing() {
459        let pool = test_pool().await;
460        let store = JobStore::new(pool);
461        store.init().await.unwrap();
462        store
463            .upsert_job("exists_job", "0 * * * * *", "health_check")
464            .await
465            .unwrap();
466        assert!(store.job_exists("exists_job").await.unwrap());
467        assert!(!store.job_exists("missing").await.unwrap());
468    }
469
470    #[tokio::test]
471    async fn delete_job_removes_row() {
472        let pool = test_pool().await;
473        let store = JobStore::new(pool);
474        store.init().await.unwrap();
475        store
476            .upsert_job("del_job", "0 * * * * *", "health_check")
477            .await
478            .unwrap();
479        assert!(store.delete_job("del_job").await.unwrap());
480        assert!(!store.job_exists("del_job").await.unwrap());
481        assert!(!store.delete_job("del_job").await.unwrap());
482    }
483
484    #[tokio::test]
485    async fn mark_done_sets_status() {
486        let pool = test_pool().await;
487        let store = JobStore::new(pool);
488        store.init().await.unwrap();
489        store
490            .upsert_job_with_mode(
491                "os_job",
492                "",
493                "health_check",
494                "oneshot",
495                Some("2026-01-01T01:00:00Z"),
496                "",
497            )
498            .await
499            .unwrap();
500        store.mark_done("os_job").await.unwrap();
501        let row: (String,) = zeph_db::query_as(sql!(
502            "SELECT status FROM scheduled_jobs WHERE name = 'os_job'"
503        ))
504        .fetch_one(store.pool())
505        .await
506        .unwrap();
507        assert_eq!(row.0, "done");
508    }
509
510    #[tokio::test]
511    async fn list_jobs_excludes_done_jobs() {
512        let pool = test_pool().await;
513        let store = JobStore::new(pool);
514        store.init().await.unwrap();
515        store
516            .upsert_job_with_mode(
517                "done_job",
518                "",
519                "health_check",
520                "oneshot",
521                Some("2026-01-01T01:00:00Z"),
522                "",
523            )
524            .await
525            .unwrap();
526        store.mark_done("done_job").await.unwrap();
527        let jobs = store.list_jobs().await.unwrap();
528        assert!(
529            jobs.iter().all(|j| j.name != "done_job"),
530            "list_jobs must not return done jobs"
531        );
532    }
533
534    #[tokio::test]
535    async fn list_jobs_uses_run_at_for_oneshot_when_next_run_is_null() {
536        let pool = test_pool().await;
537        let store = JobStore::new(pool);
538        store.init().await.unwrap();
539        store
540            .upsert_job_with_mode(
541                "oneshot_job",
542                "",
543                "custom",
544                "oneshot",
545                Some("2026-06-01T10:00:00Z"),
546                "",
547            )
548            .await
549            .unwrap();
550        let jobs = store.list_jobs().await.unwrap();
551        let job = jobs.iter().find(|j| j.name == "oneshot_job").unwrap();
552        assert_eq!(
553            job.next_run, "2026-06-01T10:00:00Z",
554            "run_at must be shown as next_run for oneshot jobs"
555        );
556    }
557
558    #[tokio::test]
559    async fn list_jobs_full_returns_correct_fields() {
560        let pool = test_pool().await;
561        let store = JobStore::new(pool);
562        store.init().await.unwrap();
563        store
564            .upsert_job("periodic_job", "0 0 3 * * *", "memory_cleanup")
565            .await
566            .unwrap();
567        store
568            .upsert_job_with_mode(
569                "oneshot_job",
570                "",
571                "custom",
572                "oneshot",
573                Some("2030-01-01T10:00:00Z"),
574                "",
575            )
576            .await
577            .unwrap();
578
579        let jobs = store.list_jobs_full().await.unwrap();
580        assert_eq!(jobs.len(), 2);
581
582        let periodic = jobs.iter().find(|j| j.name == "periodic_job").unwrap();
583        assert_eq!(periodic.kind, "memory_cleanup");
584        assert_eq!(periodic.task_mode, "periodic");
585        assert_eq!(periodic.cron_expr, "0 0 3 * * *");
586
587        let oneshot = jobs.iter().find(|j| j.name == "oneshot_job").unwrap();
588        assert_eq!(oneshot.kind, "custom");
589        assert_eq!(oneshot.task_mode, "oneshot");
590        assert!(oneshot.cron_expr.is_empty());
591        assert_eq!(oneshot.next_run, "2030-01-01T10:00:00Z");
592    }
593
594    #[tokio::test]
595    async fn list_jobs_full_excludes_done_jobs() {
596        let pool = test_pool().await;
597        let store = JobStore::new(pool);
598        store.init().await.unwrap();
599        store
600            .upsert_job_with_mode(
601                "done_job",
602                "",
603                "custom",
604                "oneshot",
605                Some("2026-01-01T01:00:00Z"),
606                "",
607            )
608            .await
609            .unwrap();
610        store.mark_done("done_job").await.unwrap();
611        let jobs = store.list_jobs_full().await.unwrap();
612        assert!(jobs.iter().all(|j| j.name != "done_job"));
613    }
614
615    #[tokio::test]
616    async fn duplicate_name_detected() {
617        let pool = test_pool().await;
618        let store = JobStore::new(pool);
619        store.init().await.unwrap();
620        store
621            .upsert_job("dup", "0 * * * * *", "health_check")
622            .await
623            .unwrap();
624        assert!(store.job_exists("dup").await.unwrap());
625    }
626
627    #[tokio::test]
628    async fn insert_job_success() {
629        let pool = test_pool().await;
630        let store = JobStore::new(pool);
631        store.init().await.unwrap();
632        store
633            .insert_job(
634                "new_job",
635                "0 * * * * *",
636                "custom",
637                "periodic",
638                None,
639                "run daily report",
640            )
641            .await
642            .unwrap();
643        assert!(store.job_exists("new_job").await.unwrap());
644    }
645
646    #[tokio::test]
647    async fn insert_job_duplicate_returns_error() {
648        let pool = test_pool().await;
649        let store = JobStore::new(pool);
650        store.init().await.unwrap();
651        store
652            .insert_job(
653                "dup_job",
654                "0 * * * * *",
655                "custom",
656                "periodic",
657                None,
658                "first",
659            )
660            .await
661            .unwrap();
662        let result = store
663            .insert_job(
664                "dup_job",
665                "0 0 * * * *",
666                "custom",
667                "periodic",
668                None,
669                "second",
670            )
671            .await;
672        assert!(
673            matches!(result, Err(SchedulerError::DuplicateJob(ref n)) if n == "dup_job"),
674            "expected DuplicateJob, got {result:?}"
675        );
676    }
677
678    #[tokio::test]
679    async fn list_jobs_full_includes_task_data() {
680        let pool = test_pool().await;
681        let store = JobStore::new(pool);
682        store.init().await.unwrap();
683        store
684            .insert_job(
685                "task_job",
686                "0 * * * * *",
687                "custom",
688                "periodic",
689                None,
690                "my prompt",
691            )
692            .await
693            .unwrap();
694        let jobs = store.list_jobs_full().await.unwrap();
695        let job = jobs.iter().find(|j| j.name == "task_job").unwrap();
696        assert_eq!(job.task_data, "my prompt");
697    }
698}