1use rusqlite::Connection;
9use std::sync::Mutex;
10
11use crate::jobs::{Job, JobStatus, Priority};
12
13pub struct JobStore {
15 conn: Mutex<Connection>,
16}
17
18impl JobStore {
19 pub fn open(path: &str) -> Result<Self, String> {
21 let conn = Connection::open(path).map_err(|e| format!("Failed to open job store: {e}"))?;
22 let store = Self {
23 conn: Mutex::new(conn),
24 };
25 store.init_schema()?;
26 Ok(store)
27 }
28
29 pub fn in_memory() -> Result<Self, String> {
31 let conn = Connection::open_in_memory()
32 .map_err(|e| format!("Failed to open in-memory store: {e}"))?;
33 let store = Self {
34 conn: Mutex::new(conn),
35 };
36 store.init_schema()?;
37 Ok(store)
38 }
39
40 fn init_schema(&self) -> Result<(), String> {
41 let conn = self.conn.lock().unwrap();
42 conn.execute_batch(
43 "
44 PRAGMA journal_mode=WAL;
45 CREATE TABLE IF NOT EXISTS jobs (
46 id TEXT PRIMARY KEY NOT NULL,
47 name TEXT NOT NULL,
48 payload TEXT NOT NULL,
49 priority INTEGER NOT NULL DEFAULT 1,
50 status TEXT NOT NULL DEFAULT 'pending',
51 max_retries INTEGER NOT NULL DEFAULT 3,
52 retry_count INTEGER NOT NULL DEFAULT 0,
53 queue TEXT NOT NULL DEFAULT 'default',
54 delay_secs INTEGER NOT NULL DEFAULT 0,
55 error TEXT,
56 created_at TEXT NOT NULL,
57 started_at TEXT,
58 completed_at TEXT
59 );
60 CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
61 CREATE INDEX IF NOT EXISTS idx_jobs_queue ON jobs(queue);
62 ",
63 )
64 .map_err(|e| format!("Schema init failed: {e}"))
65 }
66
67 pub fn save(&self, job: &Job) -> Result<(), String> {
69 let conn = self.conn.lock().unwrap();
70 conn.execute(
71 "INSERT OR REPLACE INTO jobs \
72 (id, name, payload, priority, status, max_retries, retry_count, \
73 queue, delay_secs, error, created_at, started_at, completed_at) \
74 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
75 rusqlite::params![
76 job.id,
77 job.name,
78 job.payload.to_string(),
79 priority_to_int(&job.priority),
80 status_to_str(&job.status),
81 job.max_retries,
82 job.retry_count,
83 job.queue,
84 job.delay_secs,
85 job.error,
86 job.created_at,
87 job.started_at,
88 job.completed_at,
89 ],
90 )
91 .map_err(|e| format!("Save failed: {e}"))?;
92 Ok(())
93 }
94
95 pub fn load(&self, id: &str) -> Result<Option<Job>, String> {
97 let conn = self.conn.lock().unwrap();
98 let mut stmt = conn
99 .prepare(
100 "SELECT id, name, payload, priority, status, max_retries, retry_count, \
101 queue, delay_secs, error, created_at, started_at, completed_at \
102 FROM jobs WHERE id = ?1",
103 )
104 .map_err(|e| format!("Prepare failed: {e}"))?;
105
106 let result = stmt
107 .query_row(rusqlite::params![id], |row| Ok(row_to_job(row)))
108 .ok();
109
110 Ok(result)
111 }
112
113 pub fn load_pending(&self) -> Result<Vec<Job>, String> {
119 let conn = self.conn.lock().unwrap();
120 let mut stmt = conn
121 .prepare(
122 "SELECT id, name, payload, priority, status, max_retries, retry_count, \
123 queue, delay_secs, error, created_at, started_at, completed_at \
124 FROM jobs \
125 WHERE status IN ('pending', 'running', 'retrying') \
126 ORDER BY priority DESC, created_at ASC",
127 )
128 .map_err(|e| format!("Prepare failed: {e}"))?;
129
130 let rows = stmt
131 .query_map([], |row| Ok(row_to_job(row)))
132 .map_err(|e| format!("Query failed: {e}"))?;
133
134 let mut jobs = Vec::new();
135 for row in rows {
136 if let Ok(job) = row {
137 jobs.push(job);
138 }
139 }
140 Ok(jobs)
141 }
142
143 pub fn load_dead(&self) -> Result<Vec<Job>, String> {
145 let conn = self.conn.lock().unwrap();
146 let mut stmt = conn
147 .prepare(
148 "SELECT id, name, payload, priority, status, max_retries, retry_count, \
149 queue, delay_secs, error, created_at, started_at, completed_at \
150 FROM jobs \
151 WHERE status = 'dead' \
152 ORDER BY completed_at DESC",
153 )
154 .map_err(|e| format!("Prepare failed: {e}"))?;
155
156 let rows = stmt
157 .query_map([], |row| Ok(row_to_job(row)))
158 .map_err(|e| format!("Query failed: {e}"))?;
159
160 let mut jobs = Vec::new();
161 for row in rows {
162 if let Ok(job) = row {
163 jobs.push(job);
164 }
165 }
166 Ok(jobs)
167 }
168
169 pub fn count_by_status(&self, status: &str) -> usize {
171 let conn = self.conn.lock().unwrap();
172 conn.query_row(
173 "SELECT COUNT(*) FROM jobs WHERE status = ?1",
174 rusqlite::params![status],
175 |row| row.get::<_, i64>(0),
176 )
177 .unwrap_or(0) as usize
178 }
179
180 pub fn cleanup_completed(&self, max_age_secs: u64) -> usize {
184 let conn = self.conn.lock().unwrap();
185 let cutoff = std::time::SystemTime::now()
186 .duration_since(std::time::UNIX_EPOCH)
187 .unwrap_or_default()
188 .as_secs()
189 .saturating_sub(max_age_secs);
190 let cutoff_str = format!("{cutoff}Z");
191
192 conn.execute(
193 "DELETE FROM jobs WHERE status IN ('completed', 'dead') AND completed_at < ?1",
194 rusqlite::params![cutoff_str],
195 )
196 .unwrap_or(0)
197 }
198}
199
200fn row_to_job(row: &rusqlite::Row<'_>) -> Job {
205 Job {
206 id: row.get(0).unwrap_or_default(),
207 name: row.get(1).unwrap_or_default(),
208 payload: serde_json::from_str(&row.get::<_, String>(2).unwrap_or_default())
209 .unwrap_or(serde_json::json!({})),
210 priority: int_to_priority(row.get(3).unwrap_or(1)),
211 status: str_to_status(&row.get::<_, String>(4).unwrap_or_default()),
212 max_retries: row.get(5).unwrap_or(3),
213 retry_count: row.get(6).unwrap_or(0),
214 queue: row.get(7).unwrap_or_default(),
215 delay_secs: row.get(8).unwrap_or(0),
216 error: row.get(9).ok(),
217 created_at: row.get(10).unwrap_or_default(),
218 started_at: row.get(11).ok(),
219 completed_at: row.get(12).ok(),
220 }
221}
222
223fn priority_to_int(p: &Priority) -> i32 {
224 match p {
225 Priority::Low => 0,
226 Priority::Normal => 1,
227 Priority::High => 2,
228 Priority::Critical => 3,
229 }
230}
231
232fn int_to_priority(n: i32) -> Priority {
233 match n {
234 0 => Priority::Low,
235 2 => Priority::High,
236 3 => Priority::Critical,
237 _ => Priority::Normal,
238 }
239}
240
241fn status_to_str(s: &JobStatus) -> &'static str {
242 match s {
243 JobStatus::Pending => "pending",
244 JobStatus::Running => "running",
245 JobStatus::Completed => "completed",
246 JobStatus::Failed => "failed",
247 JobStatus::Retrying => "retrying",
248 JobStatus::Dead => "dead",
249 }
250}
251
252fn str_to_status(s: &str) -> JobStatus {
253 match s {
254 "pending" => JobStatus::Pending,
255 "running" => JobStatus::Running,
256 "completed" => JobStatus::Completed,
257 "failed" => JobStatus::Failed,
258 "retrying" => JobStatus::Retrying,
259 "dead" => JobStatus::Dead,
260 _ => JobStatus::Pending,
261 }
262}
263
264#[cfg(test)]
269mod tests {
270 use super::*;
271
272 fn make_job(id: &str, status: JobStatus) -> Job {
273 Job {
274 id: id.to_string(),
275 name: "test_job".to_string(),
276 payload: serde_json::json!({"key": "value"}),
277 priority: Priority::Normal,
278 status,
279 max_retries: 3,
280 retry_count: 0,
281 queue: "default".to_string(),
282 delay_secs: 0,
283 error: None,
284 created_at: "1000Z".to_string(),
285 started_at: None,
286 completed_at: None,
287 }
288 }
289
290 #[test]
291 fn in_memory_opens_without_error() {
292 let store = JobStore::in_memory().unwrap();
293 assert_eq!(store.count_by_status("pending"), 0);
294 }
295
296 #[test]
297 fn save_and_load_roundtrip() {
298 let store = JobStore::in_memory().unwrap();
299
300 let mut job = make_job("job_1", JobStatus::Pending);
301 job.priority = Priority::High;
302 job.error = Some("oops".into());
303 job.started_at = Some("2000Z".into());
304 job.completed_at = Some("3000Z".into());
305 job.delay_secs = 10;
306 job.retry_count = 2;
307 job.max_retries = 5;
308 job.queue = "emails".to_string();
309
310 store.save(&job).unwrap();
311
312 let loaded = store.load("job_1").unwrap().unwrap();
313 assert_eq!(loaded.id, "job_1");
314 assert_eq!(loaded.name, "test_job");
315 assert_eq!(loaded.payload, serde_json::json!({"key": "value"}));
316 assert_eq!(loaded.priority, Priority::High);
317 assert_eq!(loaded.status, JobStatus::Pending);
318 assert_eq!(loaded.max_retries, 5);
319 assert_eq!(loaded.retry_count, 2);
320 assert_eq!(loaded.queue, "emails");
321 assert_eq!(loaded.delay_secs, 10);
322 assert_eq!(loaded.error, Some("oops".into()));
323 assert_eq!(loaded.created_at, "1000Z");
324 assert_eq!(loaded.started_at, Some("2000Z".into()));
325 assert_eq!(loaded.completed_at, Some("3000Z".into()));
326 }
327
328 #[test]
329 fn load_nonexistent_returns_none() {
330 let store = JobStore::in_memory().unwrap();
331 assert!(store.load("nonexistent").unwrap().is_none());
332 }
333
334 #[test]
335 fn save_updates_existing_job() {
336 let store = JobStore::in_memory().unwrap();
337
338 let mut job = make_job("job_1", JobStatus::Pending);
339 store.save(&job).unwrap();
340
341 job.status = JobStatus::Running;
342 job.started_at = Some("2000Z".into());
343 store.save(&job).unwrap();
344
345 let loaded = store.load("job_1").unwrap().unwrap();
346 assert_eq!(loaded.status, JobStatus::Running);
347 assert_eq!(loaded.started_at, Some("2000Z".into()));
348 }
349
350 #[test]
351 fn load_pending_returns_actionable_jobs() {
352 let store = JobStore::in_memory().unwrap();
353
354 store.save(&make_job("j1", JobStatus::Pending)).unwrap();
355 store.save(&make_job("j2", JobStatus::Running)).unwrap();
356 store.save(&make_job("j3", JobStatus::Retrying)).unwrap();
357 store.save(&make_job("j4", JobStatus::Completed)).unwrap();
358 store.save(&make_job("j5", JobStatus::Dead)).unwrap();
359
360 let pending = store.load_pending().unwrap();
361 assert_eq!(pending.len(), 3);
362 let ids: Vec<&str> = pending.iter().map(|j| j.id.as_str()).collect();
363 assert!(ids.contains(&"j1"));
364 assert!(ids.contains(&"j2"));
365 assert!(ids.contains(&"j3"));
366 }
367
368 #[test]
369 fn load_pending_orders_by_priority_then_created_at() {
370 let store = JobStore::in_memory().unwrap();
371
372 let mut low = make_job("j_low", JobStatus::Pending);
373 low.priority = Priority::Low;
374 low.created_at = "1000Z".into();
375
376 let mut high = make_job("j_high", JobStatus::Pending);
377 high.priority = Priority::High;
378 high.created_at = "2000Z".into();
379
380 let mut normal = make_job("j_normal", JobStatus::Pending);
381 normal.priority = Priority::Normal;
382 normal.created_at = "1500Z".into();
383
384 store.save(&low).unwrap();
385 store.save(&high).unwrap();
386 store.save(&normal).unwrap();
387
388 let pending = store.load_pending().unwrap();
389 assert_eq!(pending[0].id, "j_high");
390 assert_eq!(pending[1].id, "j_normal");
391 assert_eq!(pending[2].id, "j_low");
392 }
393
394 #[test]
395 fn load_dead_returns_dead_jobs() {
396 let store = JobStore::in_memory().unwrap();
397
398 store.save(&make_job("j1", JobStatus::Dead)).unwrap();
399 store.save(&make_job("j2", JobStatus::Pending)).unwrap();
400
401 let dead = store.load_dead().unwrap();
402 assert_eq!(dead.len(), 1);
403 assert_eq!(dead[0].id, "j1");
404 }
405
406 #[test]
407 fn count_by_status_counts_correctly() {
408 let store = JobStore::in_memory().unwrap();
409
410 store.save(&make_job("j1", JobStatus::Pending)).unwrap();
411 store.save(&make_job("j2", JobStatus::Pending)).unwrap();
412 store.save(&make_job("j3", JobStatus::Running)).unwrap();
413 store.save(&make_job("j4", JobStatus::Dead)).unwrap();
414
415 assert_eq!(store.count_by_status("pending"), 2);
416 assert_eq!(store.count_by_status("running"), 1);
417 assert_eq!(store.count_by_status("dead"), 1);
418 assert_eq!(store.count_by_status("completed"), 0);
419 }
420
421 #[test]
422 fn cleanup_completed_removes_old_jobs() {
423 let store = JobStore::in_memory().unwrap();
424
425 let mut old = make_job("j_old", JobStatus::Completed);
427 old.completed_at = Some("100Z".into());
428 store.save(&old).unwrap();
429
430 let mut recent = make_job("j_recent", JobStatus::Completed);
432 let now = std::time::SystemTime::now()
433 .duration_since(std::time::UNIX_EPOCH)
434 .unwrap()
435 .as_secs();
436 recent.completed_at = Some(format!("{now}Z"));
437 store.save(&recent).unwrap();
438
439 store
441 .save(&make_job("j_pending", JobStatus::Pending))
442 .unwrap();
443
444 let deleted = store.cleanup_completed(3600);
446 assert_eq!(deleted, 1);
447
448 assert!(store.load("j_old").unwrap().is_none());
450 assert!(store.load("j_recent").unwrap().is_some());
451 assert!(store.load("j_pending").unwrap().is_some());
452 }
453
454 #[test]
455 fn all_priorities_roundtrip() {
456 let store = JobStore::in_memory().unwrap();
457 for (i, prio) in [
458 Priority::Low,
459 Priority::Normal,
460 Priority::High,
461 Priority::Critical,
462 ]
463 .iter()
464 .enumerate()
465 {
466 let mut job = make_job(&format!("j_{i}"), JobStatus::Pending);
467 job.priority = *prio;
468 store.save(&job).unwrap();
469 let loaded = store.load(&format!("j_{i}")).unwrap().unwrap();
470 assert_eq!(loaded.priority, *prio);
471 }
472 }
473
474 #[test]
475 fn all_statuses_roundtrip() {
476 let store = JobStore::in_memory().unwrap();
477 let statuses = [
478 JobStatus::Pending,
479 JobStatus::Running,
480 JobStatus::Completed,
481 JobStatus::Failed,
482 JobStatus::Retrying,
483 JobStatus::Dead,
484 ];
485 for (i, status) in statuses.iter().enumerate() {
486 let job = make_job(&format!("j_{i}"), status.clone());
487 store.save(&job).unwrap();
488 let loaded = store.load(&format!("j_{i}")).unwrap().unwrap();
489 assert_eq!(loaded.status, *status);
490 }
491 }
492}