Skip to main content

ceres_core/
job.rs

1//! Job queue types for persistent harvest job management.
2//!
3//! This module provides domain types for the harvest job queue,
4//! enabling recoverable, distributed job processing with exponential backoff retry.
5//!
6//! # Architecture
7//!
8//! Jobs flow through these states:
9//! ```text
10//! pending → running → completed
11//!              ↓
12//!           failed (if retries exhausted)
13//!              ↓
14//!           pending (if retries available, with next_retry_at)
15//! ```
16//!
17//! # Retry Strategy
18//!
19//! Uses exponential backoff with configurable delays:
20//! - Attempt 1: 1 minute
21//! - Attempt 2: 5 minutes
22//! - Attempt 3: 30 minutes
23//! - After max retries: permanently failed
24
25use chrono::{DateTime, TimeDelta, Utc};
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::SyncStats;
30
31// =============================================================================
32// Job Status
33// =============================================================================
34
35/// Status of a harvest job in the queue.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "lowercase")]
38pub enum JobStatus {
39    /// Job is waiting to be processed.
40    Pending,
41    /// Job is currently being processed by a worker.
42    Running,
43    /// Job completed successfully.
44    Completed,
45    /// Job failed after exhausting all retries.
46    Failed,
47    /// Job was cancelled by user or system.
48    Cancelled,
49}
50
51impl JobStatus {
52    /// Returns the string representation for database storage.
53    pub fn as_str(&self) -> &'static str {
54        match self {
55            JobStatus::Pending => "pending",
56            JobStatus::Running => "running",
57            JobStatus::Completed => "completed",
58            JobStatus::Failed => "failed",
59            JobStatus::Cancelled => "cancelled",
60        }
61    }
62
63    /// Returns true if the job is in a terminal state.
64    pub fn is_terminal(&self) -> bool {
65        matches!(
66            self,
67            JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled
68        )
69    }
70}
71
72/// Error type for parsing JobStatus from string.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct ParseJobStatusError(String);
75
76impl std::fmt::Display for ParseJobStatusError {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        write!(f, "invalid job status: {}", self.0)
79    }
80}
81
82impl std::error::Error for ParseJobStatusError {}
83
84impl std::str::FromStr for JobStatus {
85    type Err = ParseJobStatusError;
86
87    fn from_str(s: &str) -> Result<Self, Self::Err> {
88        match s {
89            "pending" => Ok(JobStatus::Pending),
90            "running" => Ok(JobStatus::Running),
91            "completed" => Ok(JobStatus::Completed),
92            "failed" => Ok(JobStatus::Failed),
93            "cancelled" => Ok(JobStatus::Cancelled),
94            _ => Err(ParseJobStatusError(s.to_string())),
95        }
96    }
97}
98
99impl std::fmt::Display for JobStatus {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        write!(f, "{}", self.as_str())
102    }
103}
104
105// =============================================================================
106// Retry Configuration
107// =============================================================================
108
109/// Configuration for job retry behavior with exponential backoff.
110#[derive(Debug, Clone)]
111pub struct RetryConfig {
112    /// Maximum number of retry attempts.
113    pub max_retries: u32,
114    /// Maximum delay cap.
115    pub max_delay: TimeDelta,
116}
117
118impl Default for RetryConfig {
119    fn default() -> Self {
120        Self {
121            max_retries: 3,
122            max_delay: TimeDelta::minutes(60), // 1 hour cap
123        }
124    }
125}
126
127impl RetryConfig {
128    /// Calculate delay for a given retry attempt using exponential backoff.
129    ///
130    /// - Attempt 1: 1 minute
131    /// - Attempt 2: 5 minutes
132    /// - Attempt 3: 30 minutes
133    /// - Attempt 4+: 60 minutes (capped)
134    pub fn delay_for_attempt(&self, attempt: u32) -> TimeDelta {
135        if attempt == 0 {
136            return TimeDelta::zero();
137        }
138
139        // Exponential multipliers: 1, 5, 30 (minutes)
140        let minutes = match attempt {
141            1 => 1,
142            2 => 5,
143            3 => 30,
144            _ => 60, // Cap at 60 minutes for any additional retries
145        };
146
147        let delay = TimeDelta::minutes(minutes);
148        std::cmp::min(delay, self.max_delay)
149    }
150}
151
152// =============================================================================
153// Harvest Job
154// =============================================================================
155
156/// A harvest job in the queue.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct HarvestJob {
159    /// Unique job identifier.
160    pub id: Uuid,
161
162    /// Target portal URL.
163    pub portal_url: String,
164
165    /// Optional friendly portal name.
166    pub portal_name: Option<String>,
167
168    /// Current job status.
169    pub status: JobStatus,
170
171    /// When the job was created.
172    pub created_at: DateTime<Utc>,
173
174    /// When the job was last updated.
175    pub updated_at: DateTime<Utc>,
176
177    /// When the job started processing.
178    pub started_at: Option<DateTime<Utc>>,
179
180    /// When the job completed (success or failure).
181    pub completed_at: Option<DateTime<Utc>>,
182
183    /// Number of retry attempts made.
184    pub retry_count: u32,
185
186    /// Maximum retries allowed.
187    pub max_retries: u32,
188
189    /// When to attempt the next retry.
190    pub next_retry_at: Option<DateTime<Utc>>,
191
192    /// Error message if failed.
193    pub error_message: Option<String>,
194
195    /// Final sync statistics (if completed).
196    pub sync_stats: Option<SyncStats>,
197
198    /// ID of the worker processing this job.
199    pub worker_id: Option<String>,
200
201    /// Whether to force full sync (bypass incremental).
202    pub force_full_sync: bool,
203
204    /// Optional URL template for dataset landing pages.
205    pub url_template: Option<String>,
206
207    /// Preferred language for multilingual portals.
208    pub language: Option<String>,
209}
210
211impl HarvestJob {
212    /// Check if the job can be retried.
213    pub fn can_retry(&self) -> bool {
214        self.retry_count < self.max_retries
215    }
216
217    /// Calculate the next retry time based on current retry count.
218    pub fn calculate_next_retry(&self, config: &RetryConfig) -> DateTime<Utc> {
219        let delay = config.delay_for_attempt(self.retry_count + 1);
220        Utc::now() + delay
221    }
222}
223
224// =============================================================================
225// Job Creation Request
226// =============================================================================
227
228/// Request to create a new harvest job.
229#[derive(Debug, Clone)]
230pub struct CreateJobRequest {
231    /// Target portal URL.
232    pub portal_url: String,
233    /// Optional friendly portal name.
234    pub portal_name: Option<String>,
235    /// Whether to force full sync.
236    pub force_full_sync: bool,
237    /// Maximum retries (uses default if None).
238    pub max_retries: Option<u32>,
239    /// Optional URL template for dataset landing pages.
240    pub url_template: Option<String>,
241
242    /// Preferred language for multilingual portals.
243    pub language: Option<String>,
244}
245
246impl CreateJobRequest {
247    /// Create a new job request for the given portal URL.
248    pub fn new(portal_url: impl Into<String>) -> Self {
249        Self {
250            portal_url: portal_url.into(),
251            portal_name: None,
252            force_full_sync: false,
253            max_retries: None,
254            url_template: None,
255            language: None,
256        }
257    }
258
259    /// Set the portal name.
260    pub fn with_name(mut self, name: impl Into<String>) -> Self {
261        self.portal_name = Some(name.into());
262        self
263    }
264
265    /// Force full sync (bypass incremental).
266    pub fn with_full_sync(mut self) -> Self {
267        self.force_full_sync = true;
268        self
269    }
270
271    /// Set maximum retries.
272    pub fn with_max_retries(mut self, max: u32) -> Self {
273        self.max_retries = Some(max);
274        self
275    }
276
277    /// Set URL template for dataset landing pages.
278    pub fn with_url_template(mut self, template: impl Into<String>) -> Self {
279        self.url_template = Some(template.into());
280        self
281    }
282
283    /// Set preferred language for multilingual portals.
284    pub fn with_language(mut self, language: impl Into<String>) -> Self {
285        self.language = Some(language.into());
286        self
287    }
288}
289
290// =============================================================================
291// Worker Configuration
292// =============================================================================
293
294/// Worker configuration.
295#[derive(Debug, Clone)]
296pub struct WorkerConfig {
297    /// Unique worker identifier.
298    pub worker_id: String,
299    /// How often to poll for new jobs.
300    pub poll_interval: std::time::Duration,
301    /// Retry configuration.
302    pub retry_config: RetryConfig,
303}
304
305impl Default for WorkerConfig {
306    fn default() -> Self {
307        Self {
308            worker_id: format!("worker-{}", Uuid::new_v4()),
309            poll_interval: std::time::Duration::from_secs(5),
310            retry_config: RetryConfig::default(),
311        }
312    }
313}
314
315impl WorkerConfig {
316    /// Set the worker ID.
317    pub fn with_worker_id(mut self, id: impl Into<String>) -> Self {
318        self.worker_id = id.into();
319        self
320    }
321
322    /// Set the poll interval.
323    pub fn with_poll_interval(mut self, interval: std::time::Duration) -> Self {
324        self.poll_interval = interval;
325        self
326    }
327
328    /// Set the retry configuration.
329    pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
330        self.retry_config = config;
331        self
332    }
333}
334
335// =============================================================================
336// Tests
337// =============================================================================
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_job_status_as_str() {
345        assert_eq!(JobStatus::Pending.as_str(), "pending");
346        assert_eq!(JobStatus::Running.as_str(), "running");
347        assert_eq!(JobStatus::Completed.as_str(), "completed");
348        assert_eq!(JobStatus::Failed.as_str(), "failed");
349        assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
350    }
351
352    #[test]
353    fn test_job_status_from_str() {
354        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
355        assert_eq!("running".parse::<JobStatus>(), Ok(JobStatus::Running));
356        assert_eq!("completed".parse::<JobStatus>(), Ok(JobStatus::Completed));
357        assert_eq!("failed".parse::<JobStatus>(), Ok(JobStatus::Failed));
358        assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
359        assert!("unknown".parse::<JobStatus>().is_err());
360    }
361
362    #[test]
363    fn test_job_status_is_terminal() {
364        assert!(!JobStatus::Pending.is_terminal());
365        assert!(!JobStatus::Running.is_terminal());
366        assert!(JobStatus::Completed.is_terminal());
367        assert!(JobStatus::Failed.is_terminal());
368        assert!(JobStatus::Cancelled.is_terminal());
369    }
370
371    #[test]
372    fn test_retry_config_default() {
373        let config = RetryConfig::default();
374        assert_eq!(config.max_retries, 3);
375        assert_eq!(config.max_delay, TimeDelta::minutes(60));
376    }
377
378    #[test]
379    fn test_retry_delay_exponential() {
380        let config = RetryConfig::default();
381
382        assert_eq!(config.delay_for_attempt(0), TimeDelta::zero());
383        assert_eq!(config.delay_for_attempt(1), TimeDelta::minutes(1));
384        assert_eq!(config.delay_for_attempt(2), TimeDelta::minutes(5));
385        assert_eq!(config.delay_for_attempt(3), TimeDelta::minutes(30));
386        assert_eq!(config.delay_for_attempt(4), TimeDelta::minutes(60)); // capped
387        assert_eq!(config.delay_for_attempt(10), TimeDelta::minutes(60)); // still capped
388    }
389
390    #[test]
391    fn test_create_job_request_builder() {
392        let request = CreateJobRequest::new("https://example.com")
393            .with_name("Example Portal")
394            .with_full_sync()
395            .with_max_retries(5);
396
397        assert_eq!(request.portal_url, "https://example.com");
398        assert_eq!(request.portal_name, Some("Example Portal".to_string()));
399        assert!(request.force_full_sync);
400        assert_eq!(request.max_retries, Some(5));
401    }
402
403    #[test]
404    fn test_worker_config_default() {
405        let config = WorkerConfig::default();
406
407        assert!(config.worker_id.starts_with("worker-"));
408        assert_eq!(config.poll_interval, std::time::Duration::from_secs(5));
409        assert_eq!(config.retry_config.max_retries, 3);
410    }
411
412    #[test]
413    fn test_worker_config_builder() {
414        let config = WorkerConfig::default()
415            .with_worker_id("my-worker")
416            .with_poll_interval(std::time::Duration::from_secs(10));
417
418        assert_eq!(config.worker_id, "my-worker");
419        assert_eq!(config.poll_interval, std::time::Duration::from_secs(10));
420    }
421
422    #[test]
423    fn test_harvest_job_can_retry() {
424        let mut job = HarvestJob {
425            id: Uuid::new_v4(),
426            portal_url: "https://example.com".to_string(),
427            portal_name: None,
428            status: JobStatus::Running,
429            created_at: Utc::now(),
430            updated_at: Utc::now(),
431            started_at: Some(Utc::now()),
432            completed_at: None,
433            retry_count: 0,
434            max_retries: 3,
435            next_retry_at: None,
436            error_message: None,
437            sync_stats: None,
438            worker_id: Some("worker-1".to_string()),
439            force_full_sync: false,
440            url_template: None,
441            language: None,
442        };
443
444        assert!(job.can_retry());
445
446        job.retry_count = 2;
447        assert!(job.can_retry());
448
449        job.retry_count = 3;
450        assert!(!job.can_retry());
451    }
452}