ceres_core/job_queue.rs
1//! Job queue trait for abstracting job persistence.
2//!
3//! This module provides the [`JobQueue`] trait that abstracts job queue operations,
4//! enabling different storage backends (PostgreSQL, in-memory for tests) and
5//! facilitating dependency injection in the worker service.
6
7use std::future::Future;
8
9use chrono::{DateTime, Utc};
10use uuid::Uuid;
11
12use crate::SyncStats;
13use crate::error::AppError;
14use crate::job::{CreateJobRequest, HarvestJob, JobStatus};
15
16/// Trait for job queue persistence operations.
17///
18/// This abstraction enables different storage backends (PostgreSQL, in-memory for tests)
19/// and facilitates dependency injection in the worker service.
20///
21/// # Implementation Notes
22///
23/// Implementations should ensure:
24/// - Atomic job claiming with `SELECT FOR UPDATE SKIP LOCKED` semantics
25/// - Proper handling of retry scheduling
26/// - Safe concurrent access from multiple workers
27pub trait JobQueue: Send + Sync + Clone {
28 /// Create a new job in the queue.
29 ///
30 /// Returns the created job with generated ID and timestamps.
31 fn create_job(
32 &self,
33 request: CreateJobRequest,
34 ) -> impl Future<Output = Result<HarvestJob, AppError>> + Send;
35
36 /// Claim the next available pending job for processing.
37 ///
38 /// Uses `SELECT FOR UPDATE SKIP LOCKED` semantics for safe concurrent claiming.
39 /// Jobs are claimed in order of:
40 /// 1. Non-retry jobs first (next_retry_at IS NULL)
41 /// 2. Then retry-ready jobs (next_retry_at <= NOW)
42 /// 3. Oldest first within each category
43 ///
44 /// Returns `None` if no jobs are available.
45 fn claim_job(
46 &self,
47 worker_id: &str,
48 ) -> impl Future<Output = Result<Option<HarvestJob>, AppError>> + Send;
49
50 /// Mark a job as completed with final statistics.
51 fn complete_job(
52 &self,
53 job_id: Uuid,
54 stats: SyncStats,
55 ) -> impl Future<Output = Result<(), AppError>> + Send;
56
57 /// Mark a job as failed with error message.
58 ///
59 /// If `next_retry_at` is provided, the job is reset to pending for retry.
60 /// Otherwise, the job is marked as permanently failed.
61 fn fail_job(
62 &self,
63 job_id: Uuid,
64 error: &str,
65 next_retry_at: Option<DateTime<Utc>>,
66 ) -> impl Future<Output = Result<(), AppError>> + Send;
67
68 /// Mark a job as cancelled.
69 ///
70 /// Optionally saves partial sync statistics.
71 fn cancel_job(
72 &self,
73 job_id: Uuid,
74 stats: Option<SyncStats>,
75 ) -> impl Future<Output = Result<(), AppError>> + Send;
76
77 /// Get a job by ID.
78 fn get_job(
79 &self,
80 job_id: Uuid,
81 ) -> impl Future<Output = Result<Option<HarvestJob>, AppError>> + Send;
82
83 /// List jobs with optional status filter.
84 ///
85 /// Results are ordered by creation time (newest first).
86 fn list_jobs(
87 &self,
88 status: Option<JobStatus>,
89 limit: usize,
90 ) -> impl Future<Output = Result<Vec<HarvestJob>, AppError>> + Send;
91
92 /// Release a job back to pending state.
93 ///
94 /// Used when a worker needs to give up a job (e.g., during shutdown).
95 /// Only affects jobs in 'running' status.
96 fn release_job(&self, job_id: Uuid) -> impl Future<Output = Result<(), AppError>> + Send;
97
98 /// Release all jobs claimed by a specific worker.
99 ///
100 /// Used for graceful shutdown to return all claimed jobs to the queue.
101 /// Returns the number of jobs released.
102 fn release_worker_jobs(
103 &self,
104 worker_id: &str,
105 ) -> impl Future<Output = Result<u64, AppError>> + Send;
106
107 /// Get count of jobs by status.
108 fn count_by_status(
109 &self,
110 status: JobStatus,
111 ) -> impl Future<Output = Result<i64, AppError>> + Send;
112}