Skip to main content

pylon_runtime/
jobs.rs

1//! Background job queue with priority scheduling, retries, and dead-letter support.
2//!
3//! Jobs are enqueued with a name, JSON payload, priority, and retry policy.
4//! Workers pull from the queue and invoke registered handlers. Failed jobs are
5//! retried with exponential back-off until `max_retries` is exhausted, at which
6//! point they move to the dead-letter queue for manual inspection.
7
8use std::collections::{HashMap, VecDeque};
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Condvar, Mutex};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use serde::{Deserialize, Serialize};
14
15// ---------------------------------------------------------------------------
16// Types
17// ---------------------------------------------------------------------------
18
19/// Job priority levels.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum Priority {
23    Low = 0,
24    Normal = 1,
25    High = 2,
26    Critical = 3,
27}
28
29impl Priority {
30    /// Parse a priority from a string (case-insensitive).
31    pub fn from_str_loose(s: &str) -> Self {
32        match s.to_lowercase().as_str() {
33            "low" => Self::Low,
34            "high" => Self::High,
35            "critical" => Self::Critical,
36            _ => Self::Normal,
37        }
38    }
39}
40
41/// Job status.
42#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "lowercase")]
44pub enum JobStatus {
45    Pending,
46    Running,
47    Completed,
48    Failed,
49    Retrying,
50    Dead,
51}
52
53/// A job in the queue.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Job {
56    pub id: String,
57    pub name: String,
58    pub payload: serde_json::Value,
59    pub priority: Priority,
60    pub status: JobStatus,
61    pub max_retries: u32,
62    pub retry_count: u32,
63    pub created_at: String,
64    pub started_at: Option<String>,
65    pub completed_at: Option<String>,
66    pub error: Option<String>,
67    /// Delay before first execution (in seconds from creation).
68    pub delay_secs: u64,
69    /// Queue name (for routing to specific workers).
70    pub queue: String,
71}
72
73/// Result of processing a job.
74pub enum JobResult {
75    Success,
76    Failure(String),
77    Retry(String),
78}
79
80/// A handler function for a named job type.
81pub type JobHandler = Arc<dyn Fn(&Job) -> JobResult + Send + Sync>;
82
83// ---------------------------------------------------------------------------
84// Queue statistics
85// ---------------------------------------------------------------------------
86
87#[derive(Debug, Clone, Serialize)]
88pub struct QueueStats {
89    pub pending: usize,
90    pub running: usize,
91    pub completed: u64,
92    pub failed: u64,
93    pub dead: usize,
94    pub handlers: Vec<String>,
95}
96
97// ---------------------------------------------------------------------------
98// JobQueue
99// ---------------------------------------------------------------------------
100
101/// The job queue engine.
102///
103/// Thread-safe: all internal state is behind mutexes, and a `Condvar` wakes
104/// blocked workers when new jobs arrive.
105pub struct JobQueue {
106    /// Pending jobs, sorted by insertion with priority taken into account
107    /// during dequeue.
108    pending: Mutex<VecDeque<Job>>,
109    /// Running jobs (id -> job).
110    running: Mutex<HashMap<String, Job>>,
111    /// Completed/failed job history (bounded ring buffer).
112    history: Mutex<VecDeque<Job>>,
113    /// Registered handlers by job name.
114    handlers: Mutex<HashMap<String, JobHandler>>,
115    /// Signal for workers to wake up when jobs are available.
116    notify: Condvar,
117    /// Maximum history entries to retain.
118    max_history: usize,
119    /// Dead letter queue.
120    dead_letters: Mutex<VecDeque<Job>>,
121    /// Monotonic counters for stats.
122    completed_count: AtomicU64,
123    failed_count: AtomicU64,
124    /// Monotonic ID counter.
125    next_id: AtomicU64,
126    /// Optional persistent backing store. When set, every state transition is
127    /// mirrored to SQLite so jobs survive restart. Failures to persist are
128    /// logged but not surfaced — durability is best-effort, never blocking.
129    store: Mutex<Option<std::sync::Arc<crate::job_store::JobStore>>>,
130}
131
132impl JobQueue {
133    pub fn new(max_history: usize) -> Self {
134        Self {
135            pending: Mutex::new(VecDeque::new()),
136            running: Mutex::new(HashMap::new()),
137            history: Mutex::new(VecDeque::new()),
138            handlers: Mutex::new(HashMap::new()),
139            notify: Condvar::new(),
140            max_history,
141            dead_letters: Mutex::new(VecDeque::new()),
142            completed_count: AtomicU64::new(0),
143            failed_count: AtomicU64::new(0),
144            next_id: AtomicU64::new(1),
145            store: Mutex::new(None),
146        }
147    }
148
149    /// Attach a persistent store. After this, every enqueue, state change, and
150    /// terminal event is mirrored to the store. Call once at startup.
151    pub fn attach_store(&self, store: std::sync::Arc<crate::job_store::JobStore>) {
152        *self.store.lock().unwrap() = Some(store);
153    }
154
155    /// Best-effort persist. Never panics, never propagates errors — durability
156    /// is opportunistic. If the store is detached or write fails, logs and
157    /// continues.
158    fn persist(&self, job: &Job) {
159        if let Some(store) = self.store.lock().unwrap().as_ref() {
160            if let Err(e) = store.save(job) {
161                tracing::warn!("[jobs] failed to persist job {}: {e}", job.id);
162            }
163        }
164    }
165
166    /// Register a handler for a job type.
167    pub fn register(&self, job_name: &str, handler: JobHandler) {
168        self.handlers
169            .lock()
170            .unwrap()
171            .insert(job_name.to_string(), handler);
172    }
173
174    /// Enqueue a new job with default options. Returns the job ID.
175    pub fn enqueue(&self, name: &str, payload: serde_json::Value) -> String {
176        self.enqueue_with_options(name, payload, Priority::Normal, 0, 3, "default")
177    }
178
179    /// Enqueue with full options. Returns the job id, or an empty string if
180    /// the persistent store rejected the write. Prefer
181    /// [`try_enqueue_with_options`] in new code so persist failures don't
182    /// look like success to the caller.
183    pub fn enqueue_with_options(
184        &self,
185        name: &str,
186        payload: serde_json::Value,
187        priority: Priority,
188        delay_secs: u64,
189        max_retries: u32,
190        queue: &str,
191    ) -> String {
192        match self.try_enqueue_with_options(name, payload, priority, delay_secs, max_retries, queue)
193        {
194            Ok(id) => id,
195            Err(e) => {
196                tracing::warn!("[jobs] enqueue rejected: {e}");
197                String::new()
198            }
199        }
200    }
201
202    /// Result-returning variant of [`enqueue_with_options`]. Use this from
203    /// any path where a silent failure would propagate as an apparent
204    /// success (e.g. the TS scheduler hook returning `id: ""` to the user).
205    pub fn try_enqueue_with_options(
206        &self,
207        name: &str,
208        payload: serde_json::Value,
209        priority: Priority,
210        delay_secs: u64,
211        max_retries: u32,
212        queue: &str,
213    ) -> Result<String, String> {
214        let id = format!("job_{}", self.next_id.fetch_add(1, Ordering::Relaxed));
215        let now = now_iso();
216        let job = Job {
217            id: id.clone(),
218            name: name.to_string(),
219            payload,
220            priority,
221            status: JobStatus::Pending,
222            max_retries,
223            retry_count: 0,
224            created_at: now,
225            started_at: None,
226            completed_at: None,
227            error: None,
228            delay_secs,
229            queue: queue.to_string(),
230        };
231        self.try_enqueue_job(job)
232    }
233
234    fn try_enqueue_job(&self, job: Job) -> Result<String, String> {
235        // Write-ahead: persist BEFORE the in-memory queue accepts the job, so
236        // a crash between the two states can't lose an accepted job.
237        if let Some(store) = self.store.lock().unwrap().as_ref() {
238            if let Err(e) = store.save(&job) {
239                return Err(format!("persist failed for job {}: {e}", job.id));
240            }
241        }
242
243        let id = job.id.clone();
244        let priority = job.priority;
245        {
246            let mut pending = self.pending.lock().unwrap();
247            // Insert in priority order (higher priority closer to front).
248            let pos = pending
249                .iter()
250                .position(|j| (j.priority as u8) < (priority as u8))
251                .unwrap_or(pending.len());
252            pending.insert(pos, job);
253        }
254        self.notify.notify_one();
255        Ok(id)
256    }
257
258    /// Dequeue the highest-priority pending job whose `delay_secs` has
259    /// elapsed. Blocks up to `timeout` if nothing is ready.
260    pub fn dequeue(&self, timeout: Duration) -> Option<Job> {
261        let mut pending = self.pending.lock().unwrap();
262        let now = now_secs();
263        if !pending.iter().any(|j| is_ready(j, now)) {
264            let (guard, _) = self.notify.wait_timeout(pending, timeout).unwrap();
265            pending = guard;
266        }
267
268        let now = now_secs();
269        let pos = pending.iter().position(|j| is_ready(j, now));
270        if let Some(idx) = pos {
271            let mut job = pending.remove(idx).unwrap();
272            job.status = JobStatus::Running;
273            job.started_at = Some(now_iso());
274            self.running
275                .lock()
276                .unwrap()
277                .insert(job.id.clone(), job.clone());
278            self.persist(&job);
279            Some(job)
280        } else {
281            None
282        }
283    }
284
285    /// Dequeue from a specific queue. Blocks up to `timeout` if nothing
286    /// in the queue is ready (delay-respecting).
287    pub fn dequeue_from(&self, queue: &str, timeout: Duration) -> Option<Job> {
288        let mut pending = self.pending.lock().unwrap();
289        let now = now_secs();
290        if !pending.iter().any(|j| j.queue == queue && is_ready(j, now)) {
291            let (guard, _) = self.notify.wait_timeout(pending, timeout).unwrap();
292            pending = guard;
293        }
294
295        let now = now_secs();
296        let pos = pending
297            .iter()
298            .position(|j| j.queue == queue && is_ready(j, now));
299        if let Some(idx) = pos {
300            let mut job = pending.remove(idx).unwrap();
301            job.status = JobStatus::Running;
302            job.started_at = Some(now_iso());
303            self.running
304                .lock()
305                .unwrap()
306                .insert(job.id.clone(), job.clone());
307            self.persist(&job);
308            Some(job)
309        } else {
310            None
311        }
312    }
313
314    /// Mark a job as completed.
315    pub fn complete(&self, job_id: &str) {
316        let job = self.running.lock().unwrap().remove(job_id);
317        if let Some(mut job) = job {
318            job.status = JobStatus::Completed;
319            job.completed_at = Some(now_iso());
320            self.completed_count.fetch_add(1, Ordering::Relaxed);
321            self.persist(&job);
322            self.push_history(job);
323        }
324    }
325
326    /// Mark a job as failed. Retries if under max_retries.
327    pub fn fail(&self, job_id: &str, error: &str) {
328        let job = self.running.lock().unwrap().remove(job_id);
329        if let Some(mut job) = job {
330            job.error = Some(error.to_string());
331
332            if job.retry_count < job.max_retries {
333                // Re-enqueue for retry.
334                job.retry_count += 1;
335                job.status = JobStatus::Retrying;
336                job.started_at = None;
337                job.completed_at = None;
338
339                self.persist(&job);
340                let mut pending = self.pending.lock().unwrap();
341                let priority = job.priority as u8;
342                let pos = pending
343                    .iter()
344                    .position(|j| (j.priority as u8) < priority)
345                    .unwrap_or(pending.len());
346                pending.insert(pos, job);
347                drop(pending);
348                self.notify.notify_one();
349            } else {
350                // Exhausted retries -- move to dead letter queue.
351                job.status = JobStatus::Dead;
352                job.completed_at = Some(now_iso());
353                self.failed_count.fetch_add(1, Ordering::Relaxed);
354                self.persist(&job);
355                self.dead_letters.lock().unwrap().push_back(job);
356            }
357        }
358    }
359
360    /// Process the next available job using registered handlers.
361    /// Returns true if a job was processed.
362    pub fn process_one(&self) -> bool {
363        let job = match self.dequeue(Duration::from_millis(100)) {
364            Some(j) => j,
365            None => return false,
366        };
367
368        let handler = {
369            let handlers = self.handlers.lock().unwrap();
370            handlers.get(&job.name).cloned()
371        };
372
373        match handler {
374            Some(h) => match h(&job) {
375                JobResult::Success => self.complete(&job.id),
376                JobResult::Failure(e) => self.fail(&job.id, &e),
377                JobResult::Retry(reason) => self.fail(&job.id, &reason),
378            },
379            None => {
380                self.fail(
381                    &job.id,
382                    &format!("No handler registered for '{}'", job.name),
383                );
384            }
385        }
386
387        true
388    }
389
390    /// Get job by ID (searches pending, running, history, dead letters).
391    pub fn get_job(&self, id: &str) -> Option<Job> {
392        // Check running first (most common lookup).
393        if let Some(j) = self.running.lock().unwrap().get(id) {
394            return Some(j.clone());
395        }
396        // Check pending.
397        if let Some(j) = self.pending.lock().unwrap().iter().find(|j| j.id == id) {
398            return Some(j.clone());
399        }
400        // Check history.
401        if let Some(j) = self.history.lock().unwrap().iter().find(|j| j.id == id) {
402            return Some(j.clone());
403        }
404        // Check dead letters.
405        if let Some(j) = self
406            .dead_letters
407            .lock()
408            .unwrap()
409            .iter()
410            .find(|j| j.id == id)
411        {
412            return Some(j.clone());
413        }
414        None
415    }
416
417    /// Get queue statistics.
418    pub fn stats(&self) -> QueueStats {
419        let handler_names: Vec<String> = self.handlers.lock().unwrap().keys().cloned().collect();
420        QueueStats {
421            pending: self.pending.lock().unwrap().len(),
422            running: self.running.lock().unwrap().len(),
423            completed: self.completed_count.load(Ordering::Relaxed),
424            failed: self.failed_count.load(Ordering::Relaxed),
425            dead: self.dead_letters.lock().unwrap().len(),
426            handlers: handler_names,
427        }
428    }
429
430    /// Get pending job count.
431    pub fn pending_count(&self) -> usize {
432        self.pending.lock().unwrap().len()
433    }
434
435    /// Get running job count.
436    pub fn running_count(&self) -> usize {
437        self.running.lock().unwrap().len()
438    }
439
440    /// Get dead letter queue contents.
441    pub fn dead_letters(&self) -> Vec<Job> {
442        self.dead_letters.lock().unwrap().iter().cloned().collect()
443    }
444
445    /// Retry a dead letter by moving it back to pending.
446    pub fn retry_dead(&self, job_id: &str) -> bool {
447        let mut dead = self.dead_letters.lock().unwrap();
448        let pos = dead.iter().position(|j| j.id == job_id);
449        if let Some(idx) = pos {
450            let mut job = dead.remove(idx).unwrap();
451            job.status = JobStatus::Pending;
452            job.retry_count = 0;
453            job.error = None;
454            job.started_at = None;
455            job.completed_at = None;
456
457            let priority = job.priority as u8;
458            let mut pending = self.pending.lock().unwrap();
459            let insert_pos = pending
460                .iter()
461                .position(|j| (j.priority as u8) < priority)
462                .unwrap_or(pending.len());
463            pending.insert(insert_pos, job);
464            drop(pending);
465            drop(dead);
466            self.notify.notify_one();
467            true
468        } else {
469            false
470        }
471    }
472
473    /// Get recent job history.
474    pub fn recent_history(&self, limit: usize) -> Vec<Job> {
475        let history = self.history.lock().unwrap();
476        history.iter().rev().take(limit).cloned().collect()
477    }
478
479    /// List pending jobs with optional status/queue filters.
480    pub fn list_jobs(&self, status: Option<&str>, queue: Option<&str>, limit: usize) -> Vec<Job> {
481        let mut result = Vec::new();
482
483        // Gather from all collections.
484        let pending = self.pending.lock().unwrap();
485        let running = self.running.lock().unwrap();
486        let history = self.history.lock().unwrap();
487
488        let all_jobs = pending.iter().chain(running.values()).chain(history.iter());
489
490        for job in all_jobs {
491            if let Some(s) = status {
492                let job_status = match &job.status {
493                    JobStatus::Pending => "pending",
494                    JobStatus::Running => "running",
495                    JobStatus::Completed => "completed",
496                    JobStatus::Failed => "failed",
497                    JobStatus::Retrying => "retrying",
498                    JobStatus::Dead => "dead",
499                };
500                if job_status != s {
501                    continue;
502                }
503            }
504            if let Some(q) = queue {
505                if job.queue != q {
506                    continue;
507                }
508            }
509            result.push(job.clone());
510            if result.len() >= limit {
511                break;
512            }
513        }
514
515        result
516    }
517
518    // -----------------------------------------------------------------------
519    // Internal helpers
520    // -----------------------------------------------------------------------
521
522    fn push_history(&self, job: Job) {
523        let mut history = self.history.lock().unwrap();
524        history.push_back(job);
525        while history.len() > self.max_history {
526            history.pop_front();
527        }
528    }
529
530    // -----------------------------------------------------------------------
531    // Persistence
532    // -----------------------------------------------------------------------
533
534    /// Restore pending/running/retrying jobs from a persistent store.
535    ///
536    /// Jobs that were `Running` at the time of the crash are reset to
537    /// `Pending` so they will be re-processed. Returns the number of jobs
538    /// restored.
539    ///
540    /// Call this once at startup, before workers begin processing.
541    pub fn restore_from(&self, store: &crate::job_store::JobStore) -> usize {
542        let jobs = match store.load_pending() {
543            Ok(j) => j,
544            Err(_) => return 0,
545        };
546
547        let mut pending = self.pending.lock().unwrap();
548        let count = jobs.len();
549
550        for mut job in jobs {
551            // Jobs that were mid-flight when the server died should be
552            // treated as pending so they get picked up again.
553            if job.status == JobStatus::Running {
554                job.status = JobStatus::Pending;
555                job.started_at = None;
556            }
557            if job.status == JobStatus::Retrying {
558                job.status = JobStatus::Pending;
559            }
560
561            // Insert in priority order.
562            let priority = job.priority as u8;
563            let pos = pending
564                .iter()
565                .position(|j| (j.priority as u8) < priority)
566                .unwrap_or(pending.len());
567            pending.insert(pos, job);
568        }
569
570        // Ensure the ID counter doesn't collide with restored IDs.
571        // Parse the numeric suffix from "job_N" and set next_id above the max.
572        let max_id = pending
573            .iter()
574            .filter_map(|j| {
575                j.id.strip_prefix("job_")
576                    .and_then(|n| n.parse::<u64>().ok())
577            })
578            .max()
579            .unwrap_or(0);
580        let current = self.next_id.load(Ordering::Relaxed);
581        if max_id >= current {
582            self.next_id.store(max_id + 1, Ordering::Relaxed);
583        }
584
585        count
586    }
587}
588
589// ---------------------------------------------------------------------------
590// Worker
591// ---------------------------------------------------------------------------
592
593/// A worker that continuously processes jobs from the queue.
594pub struct Worker {
595    queue: Arc<JobQueue>,
596    #[allow(dead_code)]
597    name: String,
598    running: Arc<AtomicBool>,
599}
600
601impl Worker {
602    pub fn new(queue: Arc<JobQueue>, name: &str) -> Self {
603        Self {
604            queue,
605            name: name.to_string(),
606            running: Arc::new(AtomicBool::new(true)),
607        }
608    }
609
610    /// Start the worker in a background thread. Returns a handle to stop it.
611    pub fn start(self) -> WorkerHandle {
612        let running = Arc::clone(&self.running);
613        let handle = std::thread::spawn(move || {
614            while self.running.load(Ordering::Relaxed) {
615                self.queue.process_one();
616            }
617        });
618        WorkerHandle {
619            running,
620            handle: Some(handle),
621        }
622    }
623}
624
625/// Handle returned by `Worker::start()` to stop the background thread.
626pub struct WorkerHandle {
627    running: Arc<AtomicBool>,
628    #[allow(dead_code)]
629    handle: Option<std::thread::JoinHandle<()>>,
630}
631
632impl WorkerHandle {
633    /// Signal the worker to stop after its current iteration.
634    pub fn stop(&self) {
635        self.running.store(false, Ordering::Relaxed);
636    }
637}
638
639// ---------------------------------------------------------------------------
640// Helpers
641// ---------------------------------------------------------------------------
642
643fn now_iso() -> String {
644    format!("{}Z", now_secs())
645}
646
647fn now_secs() -> u64 {
648    SystemTime::now()
649        .duration_since(UNIX_EPOCH)
650        .unwrap_or_default()
651        .as_secs()
652}
653
654/// True if a job's `delay_secs` has elapsed since `created_at`. Jobs
655/// without a delay (`delay_secs == 0`) are always ready.
656fn is_ready(job: &Job, now: u64) -> bool {
657    if job.delay_secs == 0 {
658        return true;
659    }
660    let created = job
661        .created_at
662        .trim_end_matches('Z')
663        .parse::<u64>()
664        .unwrap_or(0);
665    now >= created.saturating_add(job.delay_secs)
666}
667
668// ---------------------------------------------------------------------------
669// Tests
670// ---------------------------------------------------------------------------
671
672#[cfg(test)]
673mod tests {
674    use super::*;
675
676    #[test]
677    fn enqueue_and_dequeue() {
678        let q = JobQueue::new(100);
679        let id = q.enqueue("test_job", serde_json::json!({"x": 1}));
680        assert!(id.starts_with("job_"));
681        assert_eq!(q.pending_count(), 1);
682
683        let job = q.dequeue(Duration::from_millis(10)).unwrap();
684        assert_eq!(job.name, "test_job");
685        assert_eq!(job.status, JobStatus::Running);
686        assert_eq!(q.pending_count(), 0);
687        assert_eq!(q.running_count(), 1);
688    }
689
690    #[test]
691    fn dequeue_returns_none_on_empty() {
692        let q = JobQueue::new(100);
693        assert!(q.dequeue(Duration::from_millis(10)).is_none());
694    }
695
696    #[test]
697    fn priority_ordering() {
698        let q = JobQueue::new(100);
699        q.enqueue_with_options("low", serde_json::json!({}), Priority::Low, 0, 0, "default");
700        q.enqueue_with_options(
701            "high",
702            serde_json::json!({}),
703            Priority::High,
704            0,
705            0,
706            "default",
707        );
708        q.enqueue_with_options(
709            "normal",
710            serde_json::json!({}),
711            Priority::Normal,
712            0,
713            0,
714            "default",
715        );
716        q.enqueue_with_options(
717            "critical",
718            serde_json::json!({}),
719            Priority::Critical,
720            0,
721            0,
722            "default",
723        );
724
725        let j1 = q.dequeue(Duration::from_millis(10)).unwrap();
726        let j2 = q.dequeue(Duration::from_millis(10)).unwrap();
727        let j3 = q.dequeue(Duration::from_millis(10)).unwrap();
728        let j4 = q.dequeue(Duration::from_millis(10)).unwrap();
729
730        assert_eq!(j1.name, "critical");
731        assert_eq!(j2.name, "high");
732        assert_eq!(j3.name, "normal");
733        assert_eq!(j4.name, "low");
734    }
735
736    #[test]
737    fn complete_moves_to_history() {
738        let q = JobQueue::new(100);
739        let id = q.enqueue("test", serde_json::json!({}));
740        let _job = q.dequeue(Duration::from_millis(10)).unwrap();
741        q.complete(&id);
742
743        assert_eq!(q.running_count(), 0);
744        let job = q.get_job(&id).unwrap();
745        assert_eq!(job.status, JobStatus::Completed);
746    }
747
748    #[test]
749    fn fail_retries_when_under_max() {
750        let q = JobQueue::new(100);
751        let id = q.enqueue_with_options(
752            "test",
753            serde_json::json!({}),
754            Priority::Normal,
755            0,
756            2,
757            "default",
758        );
759
760        // First attempt -- fail.
761        let _job = q.dequeue(Duration::from_millis(10)).unwrap();
762        q.fail(&id, "oops");
763
764        // Should be back in pending with retry_count=1.
765        let job = q.get_job(&id).unwrap();
766        assert_eq!(job.retry_count, 1);
767        assert_eq!(job.status, JobStatus::Retrying);
768        assert_eq!(q.pending_count(), 1);
769    }
770
771    #[test]
772    fn fail_moves_to_dead_after_max_retries() {
773        let q = JobQueue::new(100);
774        let id = q.enqueue_with_options(
775            "test",
776            serde_json::json!({}),
777            Priority::Normal,
778            0,
779            1,
780            "default",
781        );
782
783        // Attempt 1 -- fail.
784        let _job = q.dequeue(Duration::from_millis(10)).unwrap();
785        q.fail(&id, "fail 1");
786
787        // Attempt 2 (retry_count=1 == max_retries=1) -- fail again.
788        let _job = q.dequeue(Duration::from_millis(10)).unwrap();
789        q.fail(&id, "fail 2");
790
791        // Should be dead now.
792        let dead = q.dead_letters();
793        assert_eq!(dead.len(), 1);
794        assert_eq!(dead[0].id, id);
795        assert_eq!(dead[0].status, JobStatus::Dead);
796    }
797
798    #[test]
799    fn retry_dead_letter() {
800        let q = JobQueue::new(100);
801        let id = q.enqueue_with_options(
802            "test",
803            serde_json::json!({}),
804            Priority::Normal,
805            0,
806            0,
807            "default",
808        );
809
810        let _job = q.dequeue(Duration::from_millis(10)).unwrap();
811        q.fail(&id, "dead");
812        assert_eq!(q.dead_letters().len(), 1);
813
814        assert!(q.retry_dead(&id));
815        assert_eq!(q.dead_letters().len(), 0);
816        assert_eq!(q.pending_count(), 1);
817
818        let job = q.get_job(&id).unwrap();
819        assert_eq!(job.status, JobStatus::Pending);
820        assert_eq!(job.retry_count, 0);
821    }
822
823    #[test]
824    fn retry_dead_returns_false_for_unknown() {
825        let q = JobQueue::new(100);
826        assert!(!q.retry_dead("nonexistent"));
827    }
828
829    #[test]
830    fn get_job_searches_all_collections() {
831        let q = JobQueue::new(100);
832        let id1 = q.enqueue("pending_job", serde_json::json!({}));
833        assert!(q.get_job(&id1).is_some());
834
835        let id2 = q.enqueue("running_job", serde_json::json!({}));
836        let _job = q.dequeue(Duration::from_millis(10)).unwrap(); // dequeues id1 (earlier)
837        let _job = q.dequeue(Duration::from_millis(10)).unwrap(); // dequeues id2
838        assert!(q.get_job(&id2).is_some());
839
840        q.complete(&id1);
841        let found = q.get_job(&id1).unwrap();
842        assert_eq!(found.status, JobStatus::Completed);
843    }
844
845    #[test]
846    fn dequeue_from_specific_queue() {
847        let q = JobQueue::new(100);
848        q.enqueue_with_options("a", serde_json::json!({}), Priority::High, 0, 0, "alpha");
849        q.enqueue_with_options("b", serde_json::json!({}), Priority::Critical, 0, 0, "beta");
850
851        let job = q.dequeue_from("beta", Duration::from_millis(10)).unwrap();
852        assert_eq!(job.name, "b");
853        assert_eq!(job.queue, "beta");
854    }
855
856    #[test]
857    fn process_one_with_handler() {
858        let q = Arc::new(JobQueue::new(100));
859        q.register("echo", Arc::new(|_job| JobResult::Success));
860        q.enqueue("echo", serde_json::json!({"msg": "hello"}));
861        assert!(q.process_one());
862
863        let stats = q.stats();
864        assert_eq!(stats.completed, 1);
865        assert_eq!(stats.pending, 0);
866    }
867
868    #[test]
869    fn process_one_without_handler_fails() {
870        let q = Arc::new(JobQueue::new(100));
871        q.enqueue_with_options(
872            "unhandled",
873            serde_json::json!({}),
874            Priority::Normal,
875            0,
876            0,
877            "default",
878        );
879        q.process_one();
880
881        // Should be in dead letters since max_retries=0.
882        assert_eq!(q.dead_letters().len(), 1);
883    }
884
885    #[test]
886    fn stats_reports_handler_names() {
887        let q = JobQueue::new(100);
888        q.register("alpha", Arc::new(|_| JobResult::Success));
889        q.register("beta", Arc::new(|_| JobResult::Success));
890
891        let stats = q.stats();
892        assert!(stats.handlers.contains(&"alpha".to_string()));
893        assert!(stats.handlers.contains(&"beta".to_string()));
894    }
895
896    #[test]
897    fn history_is_bounded() {
898        let q = JobQueue::new(3);
899        for i in 0..5 {
900            let id = q.enqueue(&format!("job_{i}"), serde_json::json!({}));
901            let _job = q.dequeue(Duration::from_millis(10)).unwrap();
902            q.complete(&id);
903        }
904        let history = q.recent_history(10);
905        assert_eq!(history.len(), 3);
906    }
907
908    #[test]
909    fn list_jobs_with_filters() {
910        let q = JobQueue::new(100);
911        q.enqueue_with_options("a", serde_json::json!({}), Priority::Normal, 0, 0, "emails");
912        q.enqueue_with_options(
913            "b",
914            serde_json::json!({}),
915            Priority::Normal,
916            0,
917            0,
918            "default",
919        );
920        q.enqueue_with_options("c", serde_json::json!({}), Priority::Normal, 0, 0, "emails");
921
922        let email_jobs = q.list_jobs(None, Some("emails"), 50);
923        assert_eq!(email_jobs.len(), 2);
924
925        let pending_jobs = q.list_jobs(Some("pending"), None, 50);
926        assert_eq!(pending_jobs.len(), 3);
927    }
928
929    #[test]
930    fn worker_processes_jobs() {
931        let q = Arc::new(JobQueue::new(100));
932        q.register("add", Arc::new(|_job| JobResult::Success));
933        q.enqueue("add", serde_json::json!({"a": 1, "b": 2}));
934
935        let worker = Worker::new(Arc::clone(&q), "test-worker");
936        let handle = worker.start();
937
938        // Give the worker time to pick up the job.
939        std::thread::sleep(Duration::from_millis(200));
940        handle.stop();
941
942        assert_eq!(q.stats().completed, 1);
943    }
944
945    #[test]
946    fn priority_from_str_loose() {
947        assert_eq!(Priority::from_str_loose("low"), Priority::Low);
948        assert_eq!(Priority::from_str_loose("HIGH"), Priority::High);
949        assert_eq!(Priority::from_str_loose("critical"), Priority::Critical);
950        assert_eq!(Priority::from_str_loose("unknown"), Priority::Normal);
951    }
952
953    #[test]
954    fn restore_from_store() {
955        let store = crate::job_store::JobStore::in_memory().unwrap();
956
957        // Save some jobs to the store with different statuses.
958        let pending_job = Job {
959            id: "job_100".into(),
960            name: "email".into(),
961            payload: serde_json::json!({"to": "alice"}),
962            priority: Priority::High,
963            status: JobStatus::Pending,
964            max_retries: 3,
965            retry_count: 0,
966            queue: "default".into(),
967            delay_secs: 0,
968            error: None,
969            created_at: "1000Z".into(),
970            started_at: None,
971            completed_at: None,
972        };
973        let running_job = Job {
974            id: "job_200".into(),
975            name: "process".into(),
976            payload: serde_json::json!({}),
977            priority: Priority::Normal,
978            status: JobStatus::Running,
979            max_retries: 2,
980            retry_count: 1,
981            queue: "default".into(),
982            delay_secs: 0,
983            error: None,
984            created_at: "2000Z".into(),
985            started_at: Some("2001Z".into()),
986            completed_at: None,
987        };
988
989        store.save(&pending_job).unwrap();
990        store.save(&running_job).unwrap();
991
992        let q = JobQueue::new(100);
993        let restored = q.restore_from(&store);
994        assert_eq!(restored, 2);
995        assert_eq!(q.pending_count(), 2);
996
997        // Running job should have been reset to Pending.
998        let job = q.get_job("job_200").unwrap();
999        assert_eq!(job.status, JobStatus::Pending);
1000        assert!(job.started_at.is_none());
1001
1002        // ID counter should be past restored IDs.
1003        let new_id = q.enqueue("new", serde_json::json!({}));
1004        let num: u64 = new_id.strip_prefix("job_").unwrap().parse().unwrap();
1005        assert!(num > 200);
1006    }
1007}