Skip to main content

pylon_runtime/
job_store.rs

1//! SQLite-backed persistent storage for jobs.
2//!
3//! Persists jobs to a SQLite database so they survive server restarts.
4//! The in-memory queue remains the source of truth at runtime; the store
5//! is written to on every state change and read from only at startup to
6//! restore unfinished work.
7
8use rusqlite::Connection;
9use std::sync::Mutex;
10
11use crate::jobs::{Job, JobStatus, Priority};
12
13/// SQLite-backed persistent storage for jobs.
14pub struct JobStore {
15    conn: Mutex<Connection>,
16}
17
18impl JobStore {
19    /// Open or create the job store database at `path`.
20    pub fn open(path: &str) -> Result<Self, String> {
21        let conn = Connection::open(path).map_err(|e| format!("Failed to open job store: {e}"))?;
22        let store = Self {
23            conn: Mutex::new(conn),
24        };
25        store.init_schema()?;
26        Ok(store)
27    }
28
29    /// Create an in-memory store (useful for tests).
30    pub fn in_memory() -> Result<Self, String> {
31        let conn = Connection::open_in_memory()
32            .map_err(|e| format!("Failed to open in-memory store: {e}"))?;
33        let store = Self {
34            conn: Mutex::new(conn),
35        };
36        store.init_schema()?;
37        Ok(store)
38    }
39
40    fn init_schema(&self) -> Result<(), String> {
41        let conn = self.conn.lock().unwrap();
42        conn.execute_batch(
43            "
44            PRAGMA journal_mode=WAL;
45            CREATE TABLE IF NOT EXISTS jobs (
46                id TEXT PRIMARY KEY NOT NULL,
47                name TEXT NOT NULL,
48                payload TEXT NOT NULL,
49                priority INTEGER NOT NULL DEFAULT 1,
50                status TEXT NOT NULL DEFAULT 'pending',
51                max_retries INTEGER NOT NULL DEFAULT 3,
52                retry_count INTEGER NOT NULL DEFAULT 0,
53                queue TEXT NOT NULL DEFAULT 'default',
54                delay_secs INTEGER NOT NULL DEFAULT 0,
55                error TEXT,
56                created_at TEXT NOT NULL,
57                started_at TEXT,
58                completed_at TEXT
59            );
60            CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
61            CREATE INDEX IF NOT EXISTS idx_jobs_queue ON jobs(queue);
62        ",
63        )
64        .map_err(|e| format!("Schema init failed: {e}"))
65    }
66
67    /// Save a job (insert or update).
68    pub fn save(&self, job: &Job) -> Result<(), String> {
69        let conn = self.conn.lock().unwrap();
70        conn.execute(
71            "INSERT OR REPLACE INTO jobs \
72             (id, name, payload, priority, status, max_retries, retry_count, \
73              queue, delay_secs, error, created_at, started_at, completed_at) \
74             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
75            rusqlite::params![
76                job.id,
77                job.name,
78                job.payload.to_string(),
79                priority_to_int(&job.priority),
80                status_to_str(&job.status),
81                job.max_retries,
82                job.retry_count,
83                job.queue,
84                job.delay_secs,
85                job.error,
86                job.created_at,
87                job.started_at,
88                job.completed_at,
89            ],
90        )
91        .map_err(|e| format!("Save failed: {e}"))?;
92        Ok(())
93    }
94
95    /// Load a job by ID.
96    pub fn load(&self, id: &str) -> Result<Option<Job>, String> {
97        let conn = self.conn.lock().unwrap();
98        let mut stmt = conn
99            .prepare(
100                "SELECT id, name, payload, priority, status, max_retries, retry_count, \
101                 queue, delay_secs, error, created_at, started_at, completed_at \
102                 FROM jobs WHERE id = ?1",
103            )
104            .map_err(|e| format!("Prepare failed: {e}"))?;
105
106        let result = stmt
107            .query_row(rusqlite::params![id], |row| Ok(row_to_job(row)))
108            .ok();
109
110        Ok(result)
111    }
112
113    /// Load all pending/running/retrying jobs (for recovery after restart).
114    ///
115    /// Jobs that were `running` at the time of a crash are included so they
116    /// can be re-enqueued. The caller is responsible for resetting their
117    /// status to `Pending` before re-inserting into the in-memory queue.
118    pub fn load_pending(&self) -> Result<Vec<Job>, String> {
119        let conn = self.conn.lock().unwrap();
120        let mut stmt = conn
121            .prepare(
122                "SELECT id, name, payload, priority, status, max_retries, retry_count, \
123                 queue, delay_secs, error, created_at, started_at, completed_at \
124                 FROM jobs \
125                 WHERE status IN ('pending', 'running', 'retrying') \
126                 ORDER BY priority DESC, created_at ASC",
127            )
128            .map_err(|e| format!("Prepare failed: {e}"))?;
129
130        let rows = stmt
131            .query_map([], |row| Ok(row_to_job(row)))
132            .map_err(|e| format!("Query failed: {e}"))?;
133
134        let mut jobs = Vec::new();
135        for row in rows {
136            if let Ok(job) = row {
137                jobs.push(job);
138            }
139        }
140        Ok(jobs)
141    }
142
143    /// Load dead-letter jobs.
144    pub fn load_dead(&self) -> Result<Vec<Job>, String> {
145        let conn = self.conn.lock().unwrap();
146        let mut stmt = conn
147            .prepare(
148                "SELECT id, name, payload, priority, status, max_retries, retry_count, \
149                 queue, delay_secs, error, created_at, started_at, completed_at \
150                 FROM jobs \
151                 WHERE status = 'dead' \
152                 ORDER BY completed_at DESC",
153            )
154            .map_err(|e| format!("Prepare failed: {e}"))?;
155
156        let rows = stmt
157            .query_map([], |row| Ok(row_to_job(row)))
158            .map_err(|e| format!("Query failed: {e}"))?;
159
160        let mut jobs = Vec::new();
161        for row in rows {
162            if let Ok(job) = row {
163                jobs.push(job);
164            }
165        }
166        Ok(jobs)
167    }
168
169    /// Count jobs by status.
170    pub fn count_by_status(&self, status: &str) -> usize {
171        let conn = self.conn.lock().unwrap();
172        conn.query_row(
173            "SELECT COUNT(*) FROM jobs WHERE status = ?1",
174            rusqlite::params![status],
175            |row| row.get::<_, i64>(0),
176        )
177        .unwrap_or(0) as usize
178    }
179
180    /// Delete old completed/dead jobs older than `max_age_secs`.
181    ///
182    /// Returns the number of rows deleted.
183    pub fn cleanup_completed(&self, max_age_secs: u64) -> usize {
184        let conn = self.conn.lock().unwrap();
185        let cutoff = std::time::SystemTime::now()
186            .duration_since(std::time::UNIX_EPOCH)
187            .unwrap_or_default()
188            .as_secs()
189            .saturating_sub(max_age_secs);
190        let cutoff_str = format!("{cutoff}Z");
191
192        conn.execute(
193            "DELETE FROM jobs WHERE status IN ('completed', 'dead') AND completed_at < ?1",
194            rusqlite::params![cutoff_str],
195        )
196        .unwrap_or(0)
197    }
198}
199
200// ---------------------------------------------------------------------------
201// Row mapping helpers
202// ---------------------------------------------------------------------------
203
204fn row_to_job(row: &rusqlite::Row<'_>) -> Job {
205    Job {
206        id: row.get(0).unwrap_or_default(),
207        name: row.get(1).unwrap_or_default(),
208        payload: serde_json::from_str(&row.get::<_, String>(2).unwrap_or_default())
209            .unwrap_or(serde_json::json!({})),
210        priority: int_to_priority(row.get(3).unwrap_or(1)),
211        status: str_to_status(&row.get::<_, String>(4).unwrap_or_default()),
212        max_retries: row.get(5).unwrap_or(3),
213        retry_count: row.get(6).unwrap_or(0),
214        queue: row.get(7).unwrap_or_default(),
215        delay_secs: row.get(8).unwrap_or(0),
216        error: row.get(9).ok(),
217        created_at: row.get(10).unwrap_or_default(),
218        started_at: row.get(11).ok(),
219        completed_at: row.get(12).ok(),
220    }
221}
222
223fn priority_to_int(p: &Priority) -> i32 {
224    match p {
225        Priority::Low => 0,
226        Priority::Normal => 1,
227        Priority::High => 2,
228        Priority::Critical => 3,
229    }
230}
231
232fn int_to_priority(n: i32) -> Priority {
233    match n {
234        0 => Priority::Low,
235        2 => Priority::High,
236        3 => Priority::Critical,
237        _ => Priority::Normal,
238    }
239}
240
241fn status_to_str(s: &JobStatus) -> &'static str {
242    match s {
243        JobStatus::Pending => "pending",
244        JobStatus::Running => "running",
245        JobStatus::Completed => "completed",
246        JobStatus::Failed => "failed",
247        JobStatus::Retrying => "retrying",
248        JobStatus::Dead => "dead",
249    }
250}
251
252fn str_to_status(s: &str) -> JobStatus {
253    match s {
254        "pending" => JobStatus::Pending,
255        "running" => JobStatus::Running,
256        "completed" => JobStatus::Completed,
257        "failed" => JobStatus::Failed,
258        "retrying" => JobStatus::Retrying,
259        "dead" => JobStatus::Dead,
260        _ => JobStatus::Pending,
261    }
262}
263
264// ---------------------------------------------------------------------------
265// Tests
266// ---------------------------------------------------------------------------
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    fn make_job(id: &str, status: JobStatus) -> Job {
273        Job {
274            id: id.to_string(),
275            name: "test_job".to_string(),
276            payload: serde_json::json!({"key": "value"}),
277            priority: Priority::Normal,
278            status,
279            max_retries: 3,
280            retry_count: 0,
281            queue: "default".to_string(),
282            delay_secs: 0,
283            error: None,
284            created_at: "1000Z".to_string(),
285            started_at: None,
286            completed_at: None,
287        }
288    }
289
290    #[test]
291    fn in_memory_opens_without_error() {
292        let store = JobStore::in_memory().unwrap();
293        assert_eq!(store.count_by_status("pending"), 0);
294    }
295
296    #[test]
297    fn save_and_load_roundtrip() {
298        let store = JobStore::in_memory().unwrap();
299
300        let mut job = make_job("job_1", JobStatus::Pending);
301        job.priority = Priority::High;
302        job.error = Some("oops".into());
303        job.started_at = Some("2000Z".into());
304        job.completed_at = Some("3000Z".into());
305        job.delay_secs = 10;
306        job.retry_count = 2;
307        job.max_retries = 5;
308        job.queue = "emails".to_string();
309
310        store.save(&job).unwrap();
311
312        let loaded = store.load("job_1").unwrap().unwrap();
313        assert_eq!(loaded.id, "job_1");
314        assert_eq!(loaded.name, "test_job");
315        assert_eq!(loaded.payload, serde_json::json!({"key": "value"}));
316        assert_eq!(loaded.priority, Priority::High);
317        assert_eq!(loaded.status, JobStatus::Pending);
318        assert_eq!(loaded.max_retries, 5);
319        assert_eq!(loaded.retry_count, 2);
320        assert_eq!(loaded.queue, "emails");
321        assert_eq!(loaded.delay_secs, 10);
322        assert_eq!(loaded.error, Some("oops".into()));
323        assert_eq!(loaded.created_at, "1000Z");
324        assert_eq!(loaded.started_at, Some("2000Z".into()));
325        assert_eq!(loaded.completed_at, Some("3000Z".into()));
326    }
327
328    #[test]
329    fn load_nonexistent_returns_none() {
330        let store = JobStore::in_memory().unwrap();
331        assert!(store.load("nonexistent").unwrap().is_none());
332    }
333
334    #[test]
335    fn save_updates_existing_job() {
336        let store = JobStore::in_memory().unwrap();
337
338        let mut job = make_job("job_1", JobStatus::Pending);
339        store.save(&job).unwrap();
340
341        job.status = JobStatus::Running;
342        job.started_at = Some("2000Z".into());
343        store.save(&job).unwrap();
344
345        let loaded = store.load("job_1").unwrap().unwrap();
346        assert_eq!(loaded.status, JobStatus::Running);
347        assert_eq!(loaded.started_at, Some("2000Z".into()));
348    }
349
350    #[test]
351    fn load_pending_returns_actionable_jobs() {
352        let store = JobStore::in_memory().unwrap();
353
354        store.save(&make_job("j1", JobStatus::Pending)).unwrap();
355        store.save(&make_job("j2", JobStatus::Running)).unwrap();
356        store.save(&make_job("j3", JobStatus::Retrying)).unwrap();
357        store.save(&make_job("j4", JobStatus::Completed)).unwrap();
358        store.save(&make_job("j5", JobStatus::Dead)).unwrap();
359
360        let pending = store.load_pending().unwrap();
361        assert_eq!(pending.len(), 3);
362        let ids: Vec<&str> = pending.iter().map(|j| j.id.as_str()).collect();
363        assert!(ids.contains(&"j1"));
364        assert!(ids.contains(&"j2"));
365        assert!(ids.contains(&"j3"));
366    }
367
368    #[test]
369    fn load_pending_orders_by_priority_then_created_at() {
370        let store = JobStore::in_memory().unwrap();
371
372        let mut low = make_job("j_low", JobStatus::Pending);
373        low.priority = Priority::Low;
374        low.created_at = "1000Z".into();
375
376        let mut high = make_job("j_high", JobStatus::Pending);
377        high.priority = Priority::High;
378        high.created_at = "2000Z".into();
379
380        let mut normal = make_job("j_normal", JobStatus::Pending);
381        normal.priority = Priority::Normal;
382        normal.created_at = "1500Z".into();
383
384        store.save(&low).unwrap();
385        store.save(&high).unwrap();
386        store.save(&normal).unwrap();
387
388        let pending = store.load_pending().unwrap();
389        assert_eq!(pending[0].id, "j_high");
390        assert_eq!(pending[1].id, "j_normal");
391        assert_eq!(pending[2].id, "j_low");
392    }
393
394    #[test]
395    fn load_dead_returns_dead_jobs() {
396        let store = JobStore::in_memory().unwrap();
397
398        store.save(&make_job("j1", JobStatus::Dead)).unwrap();
399        store.save(&make_job("j2", JobStatus::Pending)).unwrap();
400
401        let dead = store.load_dead().unwrap();
402        assert_eq!(dead.len(), 1);
403        assert_eq!(dead[0].id, "j1");
404    }
405
406    #[test]
407    fn count_by_status_counts_correctly() {
408        let store = JobStore::in_memory().unwrap();
409
410        store.save(&make_job("j1", JobStatus::Pending)).unwrap();
411        store.save(&make_job("j2", JobStatus::Pending)).unwrap();
412        store.save(&make_job("j3", JobStatus::Running)).unwrap();
413        store.save(&make_job("j4", JobStatus::Dead)).unwrap();
414
415        assert_eq!(store.count_by_status("pending"), 2);
416        assert_eq!(store.count_by_status("running"), 1);
417        assert_eq!(store.count_by_status("dead"), 1);
418        assert_eq!(store.count_by_status("completed"), 0);
419    }
420
421    #[test]
422    fn cleanup_completed_removes_old_jobs() {
423        let store = JobStore::in_memory().unwrap();
424
425        // A completed job with a very old completed_at timestamp.
426        let mut old = make_job("j_old", JobStatus::Completed);
427        old.completed_at = Some("100Z".into());
428        store.save(&old).unwrap();
429
430        // A completed job with a recent timestamp (should not be cleaned).
431        let mut recent = make_job("j_recent", JobStatus::Completed);
432        let now = std::time::SystemTime::now()
433            .duration_since(std::time::UNIX_EPOCH)
434            .unwrap()
435            .as_secs();
436        recent.completed_at = Some(format!("{now}Z"));
437        store.save(&recent).unwrap();
438
439        // A pending job (should never be cleaned regardless of age).
440        store
441            .save(&make_job("j_pending", JobStatus::Pending))
442            .unwrap();
443
444        // Cleanup anything completed more than 1 hour ago.
445        let deleted = store.cleanup_completed(3600);
446        assert_eq!(deleted, 1);
447
448        // Old one gone, recent one remains, pending untouched.
449        assert!(store.load("j_old").unwrap().is_none());
450        assert!(store.load("j_recent").unwrap().is_some());
451        assert!(store.load("j_pending").unwrap().is_some());
452    }
453
454    #[test]
455    fn all_priorities_roundtrip() {
456        let store = JobStore::in_memory().unwrap();
457        for (i, prio) in [
458            Priority::Low,
459            Priority::Normal,
460            Priority::High,
461            Priority::Critical,
462        ]
463        .iter()
464        .enumerate()
465        {
466            let mut job = make_job(&format!("j_{i}"), JobStatus::Pending);
467            job.priority = *prio;
468            store.save(&job).unwrap();
469            let loaded = store.load(&format!("j_{i}")).unwrap().unwrap();
470            assert_eq!(loaded.priority, *prio);
471        }
472    }
473
474    #[test]
475    fn all_statuses_roundtrip() {
476        let store = JobStore::in_memory().unwrap();
477        let statuses = [
478            JobStatus::Pending,
479            JobStatus::Running,
480            JobStatus::Completed,
481            JobStatus::Failed,
482            JobStatus::Retrying,
483            JobStatus::Dead,
484        ];
485        for (i, status) in statuses.iter().enumerate() {
486            let job = make_job(&format!("j_{i}"), status.clone());
487            store.save(&job).unwrap();
488            let loaded = store.load(&format!("j_{i}")).unwrap().unwrap();
489            assert_eq!(loaded.status, *status);
490        }
491    }
492}