Skip to main content

ceres_core/
job.rs

1//! Job queue types for persistent harvest job management.
2//!
3//! This module provides domain types for the harvest job queue,
4//! enabling recoverable, distributed job processing with exponential backoff retry.
5//!
6//! # Architecture
7//!
8//! Jobs flow through these states:
9//! ```text
10//! pending → running → completed
11//!              ↓
12//!           failed (if retries exhausted)
13//!              ↓
14//!           pending (if retries available, with next_retry_at)
15//! ```
16//!
17//! # Retry Strategy
18//!
19//! Uses exponential backoff with configurable delays:
20//! - Attempt 1: 1 minute
21//! - Attempt 2: 5 minutes
22//! - Attempt 3: 30 minutes
23//! - After max retries: permanently failed
24
25use chrono::{DateTime, TimeDelta, Utc};
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::SyncStats;
30use crate::config::PortalType;
31
32// =============================================================================
33// Job Status
34// =============================================================================
35
36/// Status of a harvest job in the queue.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38#[serde(rename_all = "lowercase")]
39pub enum JobStatus {
40    /// Job is waiting to be processed.
41    Pending,
42    /// Job is currently being processed by a worker.
43    Running,
44    /// Job completed successfully.
45    Completed,
46    /// Job failed after exhausting all retries.
47    Failed,
48    /// Job was cancelled by user or system.
49    Cancelled,
50}
51
52impl JobStatus {
53    /// Returns the string representation for database storage.
54    pub fn as_str(&self) -> &'static str {
55        match self {
56            JobStatus::Pending => "pending",
57            JobStatus::Running => "running",
58            JobStatus::Completed => "completed",
59            JobStatus::Failed => "failed",
60            JobStatus::Cancelled => "cancelled",
61        }
62    }
63
64    /// Returns true if the job is in a terminal state.
65    pub fn is_terminal(&self) -> bool {
66        matches!(
67            self,
68            JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled
69        )
70    }
71}
72
73/// Error type for parsing JobStatus from string.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct ParseJobStatusError(String);
76
77impl std::fmt::Display for ParseJobStatusError {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(f, "invalid job status: {}", self.0)
80    }
81}
82
83impl std::error::Error for ParseJobStatusError {}
84
85impl std::str::FromStr for JobStatus {
86    type Err = ParseJobStatusError;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        match s {
90            "pending" => Ok(JobStatus::Pending),
91            "running" => Ok(JobStatus::Running),
92            "completed" => Ok(JobStatus::Completed),
93            "failed" => Ok(JobStatus::Failed),
94            "cancelled" => Ok(JobStatus::Cancelled),
95            _ => Err(ParseJobStatusError(s.to_string())),
96        }
97    }
98}
99
100impl std::fmt::Display for JobStatus {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(f, "{}", self.as_str())
103    }
104}
105
106// =============================================================================
107// Retry Configuration
108// =============================================================================
109
110/// Configuration for job retry behavior with exponential backoff.
111#[derive(Debug, Clone)]
112pub struct RetryConfig {
113    /// Maximum number of retry attempts.
114    pub max_retries: u32,
115    /// Maximum delay cap.
116    pub max_delay: TimeDelta,
117}
118
119impl Default for RetryConfig {
120    fn default() -> Self {
121        Self {
122            max_retries: 3,
123            max_delay: TimeDelta::minutes(60), // 1 hour cap
124        }
125    }
126}
127
128impl RetryConfig {
129    /// Calculate delay for a given retry attempt using exponential backoff.
130    ///
131    /// - Attempt 1: 1 minute
132    /// - Attempt 2: 5 minutes
133    /// - Attempt 3: 30 minutes
134    /// - Attempt 4+: 60 minutes (capped)
135    pub fn delay_for_attempt(&self, attempt: u32) -> TimeDelta {
136        if attempt == 0 {
137            return TimeDelta::zero();
138        }
139
140        // Exponential multipliers: 1, 5, 30 (minutes)
141        let minutes = match attempt {
142            1 => 1,
143            2 => 5,
144            3 => 30,
145            _ => 60, // Cap at 60 minutes for any additional retries
146        };
147
148        let delay = TimeDelta::minutes(minutes);
149        std::cmp::min(delay, self.max_delay)
150    }
151}
152
153// =============================================================================
154// Harvest Job
155// =============================================================================
156
157/// A harvest job in the queue.
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct HarvestJob {
160    /// Unique job identifier.
161    pub id: Uuid,
162
163    /// Target portal URL.
164    pub portal_url: String,
165
166    /// Optional friendly portal name.
167    pub portal_name: Option<String>,
168
169    /// Type of portal (ckan, dcat, etc.).
170    pub portal_type: PortalType,
171
172    /// Current job status.
173    pub status: JobStatus,
174
175    /// When the job was created.
176    pub created_at: DateTime<Utc>,
177
178    /// When the job was last updated.
179    pub updated_at: DateTime<Utc>,
180
181    /// When the job started processing.
182    pub started_at: Option<DateTime<Utc>>,
183
184    /// When the job completed (success or failure).
185    pub completed_at: Option<DateTime<Utc>>,
186
187    /// Number of retry attempts made.
188    pub retry_count: u32,
189
190    /// Maximum retries allowed.
191    pub max_retries: u32,
192
193    /// When to attempt the next retry.
194    pub next_retry_at: Option<DateTime<Utc>>,
195
196    /// Error message if failed.
197    pub error_message: Option<String>,
198
199    /// Final sync statistics (if completed).
200    pub sync_stats: Option<SyncStats>,
201
202    /// ID of the worker processing this job.
203    pub worker_id: Option<String>,
204
205    /// Whether to force full sync (bypass incremental).
206    pub force_full_sync: bool,
207
208    /// Optional URL template for dataset landing pages.
209    pub url_template: Option<String>,
210
211    /// Preferred language for multilingual portals.
212    pub language: Option<String>,
213
214    /// Optional DCAT profile (e.g., `"sparql"` for SPARQL endpoints).
215    pub profile: Option<String>,
216
217    /// Optional SPARQL endpoint override for DCAT harvests.
218    pub sparql_endpoint: Option<String>,
219}
220
221impl HarvestJob {
222    /// Check if the job can be retried.
223    pub fn can_retry(&self) -> bool {
224        self.retry_count < self.max_retries
225    }
226
227    /// Calculate the next retry time based on current retry count.
228    pub fn calculate_next_retry(&self, config: &RetryConfig) -> DateTime<Utc> {
229        let delay = config.delay_for_attempt(self.retry_count + 1);
230        Utc::now() + delay
231    }
232}
233
234// =============================================================================
235// Job Creation Request
236// =============================================================================
237
238/// Request to create a new harvest job.
239#[derive(Debug, Clone)]
240pub struct CreateJobRequest {
241    /// Target portal URL.
242    pub portal_url: String,
243    /// Optional friendly portal name.
244    pub portal_name: Option<String>,
245    /// Whether to force full sync.
246    pub force_full_sync: bool,
247    /// Maximum retries (uses default if None).
248    pub max_retries: Option<u32>,
249    /// Optional URL template for dataset landing pages.
250    pub url_template: Option<String>,
251
252    /// Preferred language for multilingual portals.
253    pub language: Option<String>,
254
255    /// Type of portal (ckan, dcat, etc.).
256    pub portal_type: PortalType,
257
258    /// Optional DCAT profile (e.g., `"sparql"` for SPARQL endpoints).
259    pub profile: Option<String>,
260
261    /// Optional SPARQL endpoint override for DCAT harvests.
262    pub sparql_endpoint: Option<String>,
263}
264
265impl CreateJobRequest {
266    /// Create a new job request for the given portal URL.
267    pub fn new(portal_url: impl Into<String>) -> Self {
268        Self {
269            portal_url: portal_url.into(),
270            portal_name: None,
271            force_full_sync: false,
272            max_retries: None,
273            url_template: None,
274            language: None,
275            portal_type: PortalType::default(),
276            profile: None,
277            sparql_endpoint: None,
278        }
279    }
280
281    /// Set the portal name.
282    pub fn with_name(mut self, name: impl Into<String>) -> Self {
283        self.portal_name = Some(name.into());
284        self
285    }
286
287    /// Force full sync (bypass incremental).
288    pub fn with_full_sync(mut self) -> Self {
289        self.force_full_sync = true;
290        self
291    }
292
293    /// Set maximum retries.
294    pub fn with_max_retries(mut self, max: u32) -> Self {
295        self.max_retries = Some(max);
296        self
297    }
298
299    /// Set URL template for dataset landing pages.
300    pub fn with_url_template(mut self, template: impl Into<String>) -> Self {
301        self.url_template = Some(template.into());
302        self
303    }
304
305    /// Set preferred language for multilingual portals.
306    pub fn with_language(mut self, language: impl Into<String>) -> Self {
307        self.language = Some(language.into());
308        self
309    }
310
311    /// Set the portal type.
312    pub fn with_portal_type(mut self, portal_type: PortalType) -> Self {
313        self.portal_type = portal_type;
314        self
315    }
316
317    /// Set the DCAT profile (e.g., `"sparql"`).
318    pub fn with_profile(mut self, profile: impl Into<String>) -> Self {
319        self.profile = Some(profile.into());
320        self
321    }
322
323    /// Set the SPARQL endpoint override for DCAT harvests.
324    pub fn with_sparql_endpoint(mut self, sparql_endpoint: impl Into<String>) -> Self {
325        self.sparql_endpoint = Some(sparql_endpoint.into());
326        self
327    }
328}
329
330// =============================================================================
331// Worker Configuration
332// =============================================================================
333
334/// Worker configuration.
335#[derive(Debug, Clone)]
336pub struct WorkerConfig {
337    /// Unique worker identifier.
338    pub worker_id: String,
339    /// How often to poll for new jobs.
340    pub poll_interval: std::time::Duration,
341    /// Retry configuration.
342    pub retry_config: RetryConfig,
343}
344
345impl Default for WorkerConfig {
346    fn default() -> Self {
347        Self {
348            worker_id: format!("worker-{}", Uuid::new_v4()),
349            poll_interval: std::time::Duration::from_secs(5),
350            retry_config: RetryConfig::default(),
351        }
352    }
353}
354
355impl WorkerConfig {
356    /// Set the worker ID.
357    pub fn with_worker_id(mut self, id: impl Into<String>) -> Self {
358        self.worker_id = id.into();
359        self
360    }
361
362    /// Set the poll interval.
363    pub fn with_poll_interval(mut self, interval: std::time::Duration) -> Self {
364        self.poll_interval = interval;
365        self
366    }
367
368    /// Set the retry configuration.
369    pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
370        self.retry_config = config;
371        self
372    }
373}
374
375// =============================================================================
376// Tests
377// =============================================================================
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn test_job_status_as_str() {
385        assert_eq!(JobStatus::Pending.as_str(), "pending");
386        assert_eq!(JobStatus::Running.as_str(), "running");
387        assert_eq!(JobStatus::Completed.as_str(), "completed");
388        assert_eq!(JobStatus::Failed.as_str(), "failed");
389        assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
390    }
391
392    #[test]
393    fn test_job_status_from_str() {
394        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
395        assert_eq!("running".parse::<JobStatus>(), Ok(JobStatus::Running));
396        assert_eq!("completed".parse::<JobStatus>(), Ok(JobStatus::Completed));
397        assert_eq!("failed".parse::<JobStatus>(), Ok(JobStatus::Failed));
398        assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
399        assert!("unknown".parse::<JobStatus>().is_err());
400    }
401
402    #[test]
403    fn test_job_status_is_terminal() {
404        assert!(!JobStatus::Pending.is_terminal());
405        assert!(!JobStatus::Running.is_terminal());
406        assert!(JobStatus::Completed.is_terminal());
407        assert!(JobStatus::Failed.is_terminal());
408        assert!(JobStatus::Cancelled.is_terminal());
409    }
410
411    #[test]
412    fn test_retry_config_default() {
413        let config = RetryConfig::default();
414        assert_eq!(config.max_retries, 3);
415        assert_eq!(config.max_delay, TimeDelta::minutes(60));
416    }
417
418    #[test]
419    fn test_retry_delay_exponential() {
420        let config = RetryConfig::default();
421
422        assert_eq!(config.delay_for_attempt(0), TimeDelta::zero());
423        assert_eq!(config.delay_for_attempt(1), TimeDelta::minutes(1));
424        assert_eq!(config.delay_for_attempt(2), TimeDelta::minutes(5));
425        assert_eq!(config.delay_for_attempt(3), TimeDelta::minutes(30));
426        assert_eq!(config.delay_for_attempt(4), TimeDelta::minutes(60)); // capped
427        assert_eq!(config.delay_for_attempt(10), TimeDelta::minutes(60)); // still capped
428    }
429
430    #[test]
431    fn test_create_job_request_builder() {
432        let request = CreateJobRequest::new("https://example.com")
433            .with_name("Example Portal")
434            .with_full_sync()
435            .with_max_retries(5);
436
437        assert_eq!(request.portal_url, "https://example.com");
438        assert_eq!(request.portal_name, Some("Example Portal".to_string()));
439        assert!(request.force_full_sync);
440        assert_eq!(request.max_retries, Some(5));
441    }
442
443    #[test]
444    fn test_worker_config_default() {
445        let config = WorkerConfig::default();
446
447        assert!(config.worker_id.starts_with("worker-"));
448        assert_eq!(config.poll_interval, std::time::Duration::from_secs(5));
449        assert_eq!(config.retry_config.max_retries, 3);
450    }
451
452    #[test]
453    fn test_worker_config_builder() {
454        let config = WorkerConfig::default()
455            .with_worker_id("my-worker")
456            .with_poll_interval(std::time::Duration::from_secs(10));
457
458        assert_eq!(config.worker_id, "my-worker");
459        assert_eq!(config.poll_interval, std::time::Duration::from_secs(10));
460    }
461
462    #[test]
463    fn test_harvest_job_can_retry() {
464        let mut job = HarvestJob {
465            id: Uuid::new_v4(),
466            portal_url: "https://example.com".to_string(),
467            portal_name: None,
468            portal_type: crate::config::PortalType::default(),
469            status: JobStatus::Running,
470            created_at: Utc::now(),
471            updated_at: Utc::now(),
472            started_at: Some(Utc::now()),
473            completed_at: None,
474            retry_count: 0,
475            max_retries: 3,
476            next_retry_at: None,
477            error_message: None,
478            sync_stats: None,
479            worker_id: Some("worker-1".to_string()),
480            force_full_sync: false,
481            url_template: None,
482            language: None,
483            profile: None,
484            sparql_endpoint: None,
485        };
486
487        assert!(job.can_retry());
488
489        job.retry_count = 2;
490        assert!(job.can_retry());
491
492        job.retry_count = 3;
493        assert!(!job.can_retry());
494    }
495}