use zeph_db::DbPool;
#[allow(unused_imports)]
use zeph_db::sql;
use crate::error::SchedulerError;
#[derive(Debug, Clone)]
pub struct ScheduledTaskRecord {
pub name: String,
pub kind: String,
pub task_mode: String,
pub next_run: String,
}
#[derive(Debug, Clone)]
pub struct ScheduledTaskInfo {
pub name: String,
pub kind: String,
pub task_mode: String,
pub cron_expr: String,
pub next_run: String,
pub task_data: String,
}
#[derive(Debug)]
pub struct JobStore {
pool: DbPool,
}
impl JobStore {
#[must_use]
pub fn new(pool: DbPool) -> Self {
Self { pool }
}
pub async fn open(path: &str) -> Result<Self, SchedulerError> {
let pool = zeph_db::DbConfig {
url: path.to_string(),
max_connections: 5,
pool_size: 5,
}
.connect()
.await?;
Ok(Self { pool })
}
pub async fn init(&self) -> Result<(), SchedulerError> {
zeph_db::run_migrations(&self.pool).await?;
Ok(())
}
pub async fn upsert_job(
&self,
name: &str,
cron_expr: &str,
kind: &str,
) -> Result<(), SchedulerError> {
self.upsert_job_with_mode(name, cron_expr, kind, "periodic", None, "")
.await
}
pub async fn upsert_job_with_mode(
&self,
name: &str,
cron_expr: &str,
kind: &str,
task_mode: &str,
run_at: Option<&str>,
task_data: &str,
) -> Result<(), SchedulerError> {
zeph_db::query(sql!(
"INSERT INTO scheduled_jobs (name, cron_expr, kind, task_mode, run_at, task_data)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
cron_expr = excluded.cron_expr,
kind = excluded.kind,
task_mode = excluded.task_mode,
run_at = excluded.run_at,
task_data = excluded.task_data"
))
.bind(name)
.bind(cron_expr)
.bind(kind)
.bind(task_mode)
.bind(run_at)
.bind(task_data)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn insert_job(
&self,
name: &str,
cron_expr: &str,
kind: &str,
task_mode: &str,
run_at: Option<&str>,
task_data: &str,
) -> Result<(), SchedulerError> {
let result = zeph_db::query(sql!(
"INSERT INTO scheduled_jobs (name, cron_expr, kind, task_mode, run_at, task_data)
VALUES (?, ?, ?, ?, ?, ?)"
))
.bind(name)
.bind(cron_expr)
.bind(kind)
.bind(task_mode)
.bind(run_at)
.bind(task_data)
.execute(&self.pool)
.await;
match result {
Ok(_) => Ok(()),
Err(zeph_db::SqlxError::Database(db_err))
if db_err.message().contains("UNIQUE constraint failed")
|| db_err.code().as_deref() == Some("23505") =>
{
Err(SchedulerError::DuplicateJob(name.to_string()))
}
Err(e) => Err(SchedulerError::Database(e)),
}
}
pub async fn record_run(
&self,
name: &str,
timestamp: &str,
next_run: &str,
) -> Result<(), SchedulerError> {
zeph_db::query(
sql!("UPDATE scheduled_jobs SET last_run = ?, next_run = ?, status = 'completed' WHERE name = ?"),
)
.bind(timestamp)
.bind(next_run)
.bind(name)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn mark_done(&self, name: &str) -> Result<(), SchedulerError> {
zeph_db::query(sql!(
"UPDATE scheduled_jobs SET status = 'done', last_run = CURRENT_TIMESTAMP WHERE name = ?"
))
.bind(name)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn delete_job(&self, name: &str) -> Result<bool, SchedulerError> {
let result = zeph_db::query(sql!("DELETE FROM scheduled_jobs WHERE name = ?"))
.bind(name)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn job_exists(&self, name: &str) -> Result<bool, SchedulerError> {
let row: Option<(i64,)> =
zeph_db::query_as(sql!("SELECT 1 FROM scheduled_jobs WHERE name = ?"))
.bind(name)
.fetch_optional(&self.pool)
.await?;
Ok(row.is_some())
}
pub async fn set_next_run(&self, name: &str, next_run: &str) -> Result<(), SchedulerError> {
zeph_db::query(sql!(
"UPDATE scheduled_jobs SET next_run = ? WHERE name = ?"
))
.bind(next_run)
.bind(name)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_next_run(&self, name: &str) -> Result<Option<String>, SchedulerError> {
let row: Option<(Option<String>,)> =
zeph_db::query_as(sql!("SELECT next_run FROM scheduled_jobs WHERE name = ?"))
.bind(name)
.fetch_optional(&self.pool)
.await?;
Ok(row.and_then(|r| r.0))
}
pub async fn list_jobs(&self) -> Result<Vec<ScheduledTaskRecord>, SchedulerError> {
let rows: Vec<(String, String, String, Option<String>)> = zeph_db::query_as(
sql!("SELECT name, kind, task_mode, COALESCE(next_run, run_at) FROM scheduled_jobs WHERE status != 'done' ORDER BY name"),
)
.fetch_all(&self.pool)
.await?;
Ok(rows
.into_iter()
.map(|(name, kind, task_mode, next_run)| ScheduledTaskRecord {
name,
kind,
task_mode,
next_run: next_run.unwrap_or_default(),
})
.collect())
}
pub async fn list_jobs_full(&self) -> Result<Vec<ScheduledTaskInfo>, SchedulerError> {
let rows: Vec<(String, String, String, String, Option<String>, String)> =
zeph_db::query_as(sql!(
"SELECT name, kind, task_mode, cron_expr, COALESCE(next_run, run_at), task_data \
FROM scheduled_jobs WHERE status != 'done' ORDER BY name"
))
.fetch_all(&self.pool)
.await?;
Ok(rows
.into_iter()
.map(
|(name, kind, task_mode, cron_expr, next_run, task_data)| ScheduledTaskInfo {
name,
kind,
task_mode,
cron_expr,
next_run: next_run.unwrap_or_default(),
task_data,
},
)
.collect())
}
#[must_use]
pub fn pool(&self) -> &DbPool {
&self.pool
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn test_pool() -> DbPool {
zeph_db::DbConfig {
url: ":memory:".to_string(),
max_connections: 5,
pool_size: 5,
}
.connect()
.await
.unwrap()
}
#[tokio::test]
async fn init_creates_table() {
let pool = test_pool().await;
let store = JobStore::new(pool);
assert!(store.init().await.is_ok());
}
#[tokio::test]
async fn upsert_and_query() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job("test_job", "0 * * * * *", "health_check")
.await
.unwrap();
assert!(store.get_next_run("test_job").await.unwrap().is_none());
store
.record_run("test_job", "2026-01-01T00:00:00Z", "2026-01-01T00:01:00Z")
.await
.unwrap();
assert_eq!(
store.get_next_run("test_job").await.unwrap().as_deref(),
Some("2026-01-01T00:01:00Z")
);
}
#[tokio::test]
async fn upsert_updates_existing() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job("job1", "0 * * * * *", "health_check")
.await
.unwrap();
store
.upsert_job("job1", "0 0 * * * *", "memory_cleanup")
.await
.unwrap();
let row: (String,) =
zeph_db::query_as(sql!("SELECT kind FROM scheduled_jobs WHERE name = 'job1'"))
.fetch_one(store.pool())
.await
.unwrap();
assert_eq!(row.0, "memory_cleanup");
}
#[tokio::test]
async fn next_run_nonexistent_job() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
assert!(store.get_next_run("no_such_job").await.unwrap().is_none());
}
#[tokio::test]
async fn job_exists_returns_true_for_existing() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job("exists_job", "0 * * * * *", "health_check")
.await
.unwrap();
assert!(store.job_exists("exists_job").await.unwrap());
assert!(!store.job_exists("missing").await.unwrap());
}
#[tokio::test]
async fn delete_job_removes_row() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job("del_job", "0 * * * * *", "health_check")
.await
.unwrap();
assert!(store.delete_job("del_job").await.unwrap());
assert!(!store.job_exists("del_job").await.unwrap());
assert!(!store.delete_job("del_job").await.unwrap());
}
#[tokio::test]
async fn mark_done_sets_status() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job_with_mode(
"os_job",
"",
"health_check",
"oneshot",
Some("2026-01-01T01:00:00Z"),
"",
)
.await
.unwrap();
store.mark_done("os_job").await.unwrap();
let row: (String,) = zeph_db::query_as(sql!(
"SELECT status FROM scheduled_jobs WHERE name = 'os_job'"
))
.fetch_one(store.pool())
.await
.unwrap();
assert_eq!(row.0, "done");
}
#[tokio::test]
async fn list_jobs_excludes_done_jobs() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job_with_mode(
"done_job",
"",
"health_check",
"oneshot",
Some("2026-01-01T01:00:00Z"),
"",
)
.await
.unwrap();
store.mark_done("done_job").await.unwrap();
let jobs = store.list_jobs().await.unwrap();
assert!(
jobs.iter().all(|j| j.name != "done_job"),
"list_jobs must not return done jobs"
);
}
#[tokio::test]
async fn list_jobs_uses_run_at_for_oneshot_when_next_run_is_null() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job_with_mode(
"oneshot_job",
"",
"custom",
"oneshot",
Some("2026-06-01T10:00:00Z"),
"",
)
.await
.unwrap();
let jobs = store.list_jobs().await.unwrap();
let job = jobs.iter().find(|j| j.name == "oneshot_job").unwrap();
assert_eq!(
job.next_run, "2026-06-01T10:00:00Z",
"run_at must be shown as next_run for oneshot jobs"
);
}
#[tokio::test]
async fn list_jobs_full_returns_correct_fields() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job("periodic_job", "0 0 3 * * *", "memory_cleanup")
.await
.unwrap();
store
.upsert_job_with_mode(
"oneshot_job",
"",
"custom",
"oneshot",
Some("2030-01-01T10:00:00Z"),
"",
)
.await
.unwrap();
let jobs = store.list_jobs_full().await.unwrap();
assert_eq!(jobs.len(), 2);
let periodic = jobs.iter().find(|j| j.name == "periodic_job").unwrap();
assert_eq!(periodic.kind, "memory_cleanup");
assert_eq!(periodic.task_mode, "periodic");
assert_eq!(periodic.cron_expr, "0 0 3 * * *");
let oneshot = jobs.iter().find(|j| j.name == "oneshot_job").unwrap();
assert_eq!(oneshot.kind, "custom");
assert_eq!(oneshot.task_mode, "oneshot");
assert!(oneshot.cron_expr.is_empty());
assert_eq!(oneshot.next_run, "2030-01-01T10:00:00Z");
}
#[tokio::test]
async fn list_jobs_full_excludes_done_jobs() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job_with_mode(
"done_job",
"",
"custom",
"oneshot",
Some("2026-01-01T01:00:00Z"),
"",
)
.await
.unwrap();
store.mark_done("done_job").await.unwrap();
let jobs = store.list_jobs_full().await.unwrap();
assert!(jobs.iter().all(|j| j.name != "done_job"));
}
#[tokio::test]
async fn duplicate_name_detected() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.upsert_job("dup", "0 * * * * *", "health_check")
.await
.unwrap();
assert!(store.job_exists("dup").await.unwrap());
}
#[tokio::test]
async fn insert_job_success() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.insert_job(
"new_job",
"0 * * * * *",
"custom",
"periodic",
None,
"run daily report",
)
.await
.unwrap();
assert!(store.job_exists("new_job").await.unwrap());
}
#[tokio::test]
async fn insert_job_duplicate_returns_error() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.insert_job(
"dup_job",
"0 * * * * *",
"custom",
"periodic",
None,
"first",
)
.await
.unwrap();
let result = store
.insert_job(
"dup_job",
"0 0 * * * *",
"custom",
"periodic",
None,
"second",
)
.await;
assert!(
matches!(result, Err(SchedulerError::DuplicateJob(ref n)) if n == "dup_job"),
"expected DuplicateJob, got {result:?}"
);
}
#[tokio::test]
async fn list_jobs_full_includes_task_data() {
let pool = test_pool().await;
let store = JobStore::new(pool);
store.init().await.unwrap();
store
.insert_job(
"task_job",
"0 * * * * *",
"custom",
"periodic",
None,
"my prompt",
)
.await
.unwrap();
let jobs = store.list_jobs_full().await.unwrap();
let job = jobs.iter().find(|j| j.name == "task_job").unwrap();
assert_eq!(job.task_data, "my prompt");
}
}