Skip to main content

ceres_core/
job.rs

1//! Job queue types for persistent harvest job management.
2//!
3//! This module provides domain types for the harvest job queue,
4//! enabling recoverable, distributed job processing with exponential backoff retry.
5//!
6//! # Architecture
7//!
8//! Jobs flow through these states:
9//! ```text
10//! pending → running → completed
11//!              ↓
12//!           failed (if retries exhausted)
13//!              ↓
14//!           pending (if retries available, with next_retry_at)
15//! ```
16//!
17//! # Retry Strategy
18//!
19//! Uses exponential backoff with configurable delays:
20//! - Attempt 1: 1 minute
21//! - Attempt 2: 5 minutes
22//! - Attempt 3: 30 minutes
23//! - After max retries: permanently failed
24
25use chrono::{DateTime, TimeDelta, Utc};
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::SyncStats;
30
31// =============================================================================
32// Job Status
33// =============================================================================
34
35/// Status of a harvest job in the queue.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "lowercase")]
38pub enum JobStatus {
39    /// Job is waiting to be processed.
40    Pending,
41    /// Job is currently being processed by a worker.
42    Running,
43    /// Job completed successfully.
44    Completed,
45    /// Job failed after exhausting all retries.
46    Failed,
47    /// Job was cancelled by user or system.
48    Cancelled,
49}
50
51impl JobStatus {
52    /// Returns the string representation for database storage.
53    pub fn as_str(&self) -> &'static str {
54        match self {
55            JobStatus::Pending => "pending",
56            JobStatus::Running => "running",
57            JobStatus::Completed => "completed",
58            JobStatus::Failed => "failed",
59            JobStatus::Cancelled => "cancelled",
60        }
61    }
62
63    /// Returns true if the job is in a terminal state.
64    pub fn is_terminal(&self) -> bool {
65        matches!(
66            self,
67            JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled
68        )
69    }
70}
71
72/// Error type for parsing JobStatus from string.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct ParseJobStatusError(String);
75
76impl std::fmt::Display for ParseJobStatusError {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        write!(f, "invalid job status: {}", self.0)
79    }
80}
81
82impl std::error::Error for ParseJobStatusError {}
83
84impl std::str::FromStr for JobStatus {
85    type Err = ParseJobStatusError;
86
87    fn from_str(s: &str) -> Result<Self, Self::Err> {
88        match s {
89            "pending" => Ok(JobStatus::Pending),
90            "running" => Ok(JobStatus::Running),
91            "completed" => Ok(JobStatus::Completed),
92            "failed" => Ok(JobStatus::Failed),
93            "cancelled" => Ok(JobStatus::Cancelled),
94            _ => Err(ParseJobStatusError(s.to_string())),
95        }
96    }
97}
98
99impl std::fmt::Display for JobStatus {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        write!(f, "{}", self.as_str())
102    }
103}
104
105// =============================================================================
106// Retry Configuration
107// =============================================================================
108
109/// Configuration for job retry behavior with exponential backoff.
110#[derive(Debug, Clone)]
111pub struct RetryConfig {
112    /// Maximum number of retry attempts.
113    pub max_retries: u32,
114    /// Maximum delay cap.
115    pub max_delay: TimeDelta,
116}
117
118impl Default for RetryConfig {
119    fn default() -> Self {
120        Self {
121            max_retries: 3,
122            max_delay: TimeDelta::minutes(60), // 1 hour cap
123        }
124    }
125}
126
127impl RetryConfig {
128    /// Calculate delay for a given retry attempt using exponential backoff.
129    ///
130    /// - Attempt 1: 1 minute
131    /// - Attempt 2: 5 minutes
132    /// - Attempt 3: 30 minutes
133    /// - Attempt 4+: 60 minutes (capped)
134    pub fn delay_for_attempt(&self, attempt: u32) -> TimeDelta {
135        if attempt == 0 {
136            return TimeDelta::zero();
137        }
138
139        // Exponential multipliers: 1, 5, 30 (minutes)
140        let minutes = match attempt {
141            1 => 1,
142            2 => 5,
143            3 => 30,
144            _ => 60, // Cap at 60 minutes for any additional retries
145        };
146
147        let delay = TimeDelta::minutes(minutes);
148        std::cmp::min(delay, self.max_delay)
149    }
150}
151
152// =============================================================================
153// Harvest Job
154// =============================================================================
155
156/// A harvest job in the queue.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct HarvestJob {
159    /// Unique job identifier.
160    pub id: Uuid,
161
162    /// Target portal URL.
163    pub portal_url: String,
164
165    /// Optional friendly portal name.
166    pub portal_name: Option<String>,
167
168    /// Current job status.
169    pub status: JobStatus,
170
171    /// When the job was created.
172    pub created_at: DateTime<Utc>,
173
174    /// When the job was last updated.
175    pub updated_at: DateTime<Utc>,
176
177    /// When the job started processing.
178    pub started_at: Option<DateTime<Utc>>,
179
180    /// When the job completed (success or failure).
181    pub completed_at: Option<DateTime<Utc>>,
182
183    /// Number of retry attempts made.
184    pub retry_count: u32,
185
186    /// Maximum retries allowed.
187    pub max_retries: u32,
188
189    /// When to attempt the next retry.
190    pub next_retry_at: Option<DateTime<Utc>>,
191
192    /// Error message if failed.
193    pub error_message: Option<String>,
194
195    /// Final sync statistics (if completed).
196    pub sync_stats: Option<SyncStats>,
197
198    /// ID of the worker processing this job.
199    pub worker_id: Option<String>,
200
201    /// Whether to force full sync (bypass incremental).
202    pub force_full_sync: bool,
203}
204
205impl HarvestJob {
206    /// Check if the job can be retried.
207    pub fn can_retry(&self) -> bool {
208        self.retry_count < self.max_retries
209    }
210
211    /// Calculate the next retry time based on current retry count.
212    pub fn calculate_next_retry(&self, config: &RetryConfig) -> DateTime<Utc> {
213        let delay = config.delay_for_attempt(self.retry_count + 1);
214        Utc::now() + delay
215    }
216}
217
218// =============================================================================
219// Job Creation Request
220// =============================================================================
221
222/// Request to create a new harvest job.
223#[derive(Debug, Clone)]
224pub struct CreateJobRequest {
225    /// Target portal URL.
226    pub portal_url: String,
227    /// Optional friendly portal name.
228    pub portal_name: Option<String>,
229    /// Whether to force full sync.
230    pub force_full_sync: bool,
231    /// Maximum retries (uses default if None).
232    pub max_retries: Option<u32>,
233}
234
235impl CreateJobRequest {
236    /// Create a new job request for the given portal URL.
237    pub fn new(portal_url: impl Into<String>) -> Self {
238        Self {
239            portal_url: portal_url.into(),
240            portal_name: None,
241            force_full_sync: false,
242            max_retries: None,
243        }
244    }
245
246    /// Set the portal name.
247    pub fn with_name(mut self, name: impl Into<String>) -> Self {
248        self.portal_name = Some(name.into());
249        self
250    }
251
252    /// Force full sync (bypass incremental).
253    pub fn with_full_sync(mut self) -> Self {
254        self.force_full_sync = true;
255        self
256    }
257
258    /// Set maximum retries.
259    pub fn with_max_retries(mut self, max: u32) -> Self {
260        self.max_retries = Some(max);
261        self
262    }
263}
264
265// =============================================================================
266// Worker Configuration
267// =============================================================================
268
269/// Worker configuration.
270#[derive(Debug, Clone)]
271pub struct WorkerConfig {
272    /// Unique worker identifier.
273    pub worker_id: String,
274    /// How often to poll for new jobs.
275    pub poll_interval: std::time::Duration,
276    /// Retry configuration.
277    pub retry_config: RetryConfig,
278}
279
280impl Default for WorkerConfig {
281    fn default() -> Self {
282        Self {
283            worker_id: format!("worker-{}", Uuid::new_v4()),
284            poll_interval: std::time::Duration::from_secs(5),
285            retry_config: RetryConfig::default(),
286        }
287    }
288}
289
290impl WorkerConfig {
291    /// Set the worker ID.
292    pub fn with_worker_id(mut self, id: impl Into<String>) -> Self {
293        self.worker_id = id.into();
294        self
295    }
296
297    /// Set the poll interval.
298    pub fn with_poll_interval(mut self, interval: std::time::Duration) -> Self {
299        self.poll_interval = interval;
300        self
301    }
302
303    /// Set the retry configuration.
304    pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
305        self.retry_config = config;
306        self
307    }
308}
309
310// =============================================================================
311// Tests
312// =============================================================================
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn test_job_status_as_str() {
320        assert_eq!(JobStatus::Pending.as_str(), "pending");
321        assert_eq!(JobStatus::Running.as_str(), "running");
322        assert_eq!(JobStatus::Completed.as_str(), "completed");
323        assert_eq!(JobStatus::Failed.as_str(), "failed");
324        assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
325    }
326
327    #[test]
328    fn test_job_status_from_str() {
329        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
330        assert_eq!("running".parse::<JobStatus>(), Ok(JobStatus::Running));
331        assert_eq!("completed".parse::<JobStatus>(), Ok(JobStatus::Completed));
332        assert_eq!("failed".parse::<JobStatus>(), Ok(JobStatus::Failed));
333        assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
334        assert!("unknown".parse::<JobStatus>().is_err());
335    }
336
337    #[test]
338    fn test_job_status_is_terminal() {
339        assert!(!JobStatus::Pending.is_terminal());
340        assert!(!JobStatus::Running.is_terminal());
341        assert!(JobStatus::Completed.is_terminal());
342        assert!(JobStatus::Failed.is_terminal());
343        assert!(JobStatus::Cancelled.is_terminal());
344    }
345
346    #[test]
347    fn test_retry_config_default() {
348        let config = RetryConfig::default();
349        assert_eq!(config.max_retries, 3);
350        assert_eq!(config.max_delay, TimeDelta::minutes(60));
351    }
352
353    #[test]
354    fn test_retry_delay_exponential() {
355        let config = RetryConfig::default();
356
357        assert_eq!(config.delay_for_attempt(0), TimeDelta::zero());
358        assert_eq!(config.delay_for_attempt(1), TimeDelta::minutes(1));
359        assert_eq!(config.delay_for_attempt(2), TimeDelta::minutes(5));
360        assert_eq!(config.delay_for_attempt(3), TimeDelta::minutes(30));
361        assert_eq!(config.delay_for_attempt(4), TimeDelta::minutes(60)); // capped
362        assert_eq!(config.delay_for_attempt(10), TimeDelta::minutes(60)); // still capped
363    }
364
365    #[test]
366    fn test_create_job_request_builder() {
367        let request = CreateJobRequest::new("https://example.com")
368            .with_name("Example Portal")
369            .with_full_sync()
370            .with_max_retries(5);
371
372        assert_eq!(request.portal_url, "https://example.com");
373        assert_eq!(request.portal_name, Some("Example Portal".to_string()));
374        assert!(request.force_full_sync);
375        assert_eq!(request.max_retries, Some(5));
376    }
377
378    #[test]
379    fn test_worker_config_default() {
380        let config = WorkerConfig::default();
381
382        assert!(config.worker_id.starts_with("worker-"));
383        assert_eq!(config.poll_interval, std::time::Duration::from_secs(5));
384        assert_eq!(config.retry_config.max_retries, 3);
385    }
386
387    #[test]
388    fn test_worker_config_builder() {
389        let config = WorkerConfig::default()
390            .with_worker_id("my-worker")
391            .with_poll_interval(std::time::Duration::from_secs(10));
392
393        assert_eq!(config.worker_id, "my-worker");
394        assert_eq!(config.poll_interval, std::time::Duration::from_secs(10));
395    }
396
397    #[test]
398    fn test_harvest_job_can_retry() {
399        let mut job = HarvestJob {
400            id: Uuid::new_v4(),
401            portal_url: "https://example.com".to_string(),
402            portal_name: None,
403            status: JobStatus::Running,
404            created_at: Utc::now(),
405            updated_at: Utc::now(),
406            started_at: Some(Utc::now()),
407            completed_at: None,
408            retry_count: 0,
409            max_retries: 3,
410            next_retry_at: None,
411            error_message: None,
412            sync_stats: None,
413            worker_id: Some("worker-1".to_string()),
414            force_full_sync: false,
415        };
416
417        assert!(job.can_retry());
418
419        job.retry_count = 2;
420        assert!(job.can_retry());
421
422        job.retry_count = 3;
423        assert!(!job.can_retry());
424    }
425}