Skip to main content

ceres_core/
job_queue.rs

1//! Job queue trait for abstracting job persistence.
2//!
3//! This module provides the [`JobQueue`] trait that abstracts job queue operations,
4//! enabling different storage backends (PostgreSQL, in-memory for tests) and
5//! facilitating dependency injection in the worker service.
6
7use std::future::Future;
8
9use chrono::{DateTime, Utc};
10use uuid::Uuid;
11
12use crate::SyncStats;
13use crate::error::AppError;
14use crate::job::{CreateJobRequest, HarvestJob, JobStatus};
15
16/// Trait for job queue persistence operations.
17///
18/// This abstraction enables different storage backends (PostgreSQL, in-memory for tests)
19/// and facilitates dependency injection in the worker service.
20///
21/// # Implementation Notes
22///
23/// Implementations should ensure:
24/// - Atomic job claiming with `SELECT FOR UPDATE SKIP LOCKED` semantics
25/// - Proper handling of retry scheduling
26/// - Safe concurrent access from multiple workers
27pub trait JobQueue: Send + Sync + Clone {
28    /// Create a new job in the queue.
29    ///
30    /// Returns the created job with generated ID and timestamps.
31    fn create_job(
32        &self,
33        request: CreateJobRequest,
34    ) -> impl Future<Output = Result<HarvestJob, AppError>> + Send;
35
36    /// Claim the next available pending job for processing.
37    ///
38    /// Uses `SELECT FOR UPDATE SKIP LOCKED` semantics for safe concurrent claiming.
39    /// Jobs are claimed in order of:
40    /// 1. Non-retry jobs first (next_retry_at IS NULL)
41    /// 2. Then retry-ready jobs (next_retry_at <= NOW)
42    /// 3. Oldest first within each category
43    ///
44    /// Returns `None` if no jobs are available.
45    fn claim_job(
46        &self,
47        worker_id: &str,
48    ) -> impl Future<Output = Result<Option<HarvestJob>, AppError>> + Send;
49
50    /// Mark a job as completed with final statistics.
51    fn complete_job(
52        &self,
53        job_id: Uuid,
54        stats: SyncStats,
55    ) -> impl Future<Output = Result<(), AppError>> + Send;
56
57    /// Mark a job as failed with error message.
58    ///
59    /// If `next_retry_at` is provided, the job is reset to pending for retry.
60    /// Otherwise, the job is marked as permanently failed.
61    fn fail_job(
62        &self,
63        job_id: Uuid,
64        error: &str,
65        next_retry_at: Option<DateTime<Utc>>,
66    ) -> impl Future<Output = Result<(), AppError>> + Send;
67
68    /// Mark a job as cancelled.
69    ///
70    /// Optionally saves partial sync statistics.
71    fn cancel_job(
72        &self,
73        job_id: Uuid,
74        stats: Option<SyncStats>,
75    ) -> impl Future<Output = Result<(), AppError>> + Send;
76
77    /// Get a job by ID.
78    fn get_job(
79        &self,
80        job_id: Uuid,
81    ) -> impl Future<Output = Result<Option<HarvestJob>, AppError>> + Send;
82
83    /// List jobs with optional status filter.
84    ///
85    /// Results are ordered by creation time (newest first).
86    fn list_jobs(
87        &self,
88        status: Option<JobStatus>,
89        limit: usize,
90    ) -> impl Future<Output = Result<Vec<HarvestJob>, AppError>> + Send;
91
92    /// Release a job back to pending state.
93    ///
94    /// Used when a worker needs to give up a job (e.g., during shutdown).
95    /// Only affects jobs in 'running' status.
96    fn release_job(&self, job_id: Uuid) -> impl Future<Output = Result<(), AppError>> + Send;
97
98    /// Release all jobs claimed by a specific worker.
99    ///
100    /// Used for graceful shutdown to return all claimed jobs to the queue.
101    /// Returns the number of jobs released.
102    fn release_worker_jobs(
103        &self,
104        worker_id: &str,
105    ) -> impl Future<Output = Result<u64, AppError>> + Send;
106
107    /// Get count of jobs by status.
108    fn count_by_status(
109        &self,
110        status: JobStatus,
111    ) -> impl Future<Output = Result<i64, AppError>> + Send;
112}