oxidite_queue/
job.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::time::Duration;
4use crate::Result;
5
6/// Job status
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
8pub enum JobStatus {
9    Pending,
10    Running,
11    Completed,
12    Failed,
13    Retrying,
14    DeadLetter,  // Permanently failed after max retries
15}
16
17/// Job result
18pub type JobResult = Result<()>;
19
20/// Trait for background jobs
21#[async_trait]
22pub trait Job: Send + Sync + Serialize + for<'de> Deserialize<'de> {
23    /// Perform the job
24    async fn perform(&self) -> JobResult;
25    
26    /// Maximum number of retries
27    fn max_retries(&self) -> u32 {
28        3
29    }
30    
31    /// Backoff duration for retries
32    fn backoff(&self, attempt: u32) -> Duration {
33        Duration::from_secs(60 * 2_u64.pow(attempt))
34    }
35    
36    /// Job priority (higher = more important)
37    fn priority(&self) -> i32 {
38        0
39    }
40    
41    /// Job name for identification
42    fn name(&self) -> &'static str {
43        std::any::type_name::<Self>()
44    }
45}
46
47/// Job wrapper for storage
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct JobWrapper {
50    pub id: String,
51    pub name: String,
52    pub payload: serde_json::Value,
53    pub status: JobStatus,
54    pub attempts: u32,
55    pub max_retries: u32,
56    pub created_at: i64,
57    pub scheduled_at: Option<i64>,
58    pub priority: i32,
59    pub cron_schedule: Option<String>,  // Cron expression for recurring jobs
60    pub last_run_at: Option<i64>,       // Last execution timestamp
61    pub error: Option<String>,          // Last error message
62}
63
64impl JobWrapper {
65    pub fn new<J: Job>(job: &J) -> Result<Self> {
66        let payload = serde_json::to_value(job)?;
67        let now = chrono::Utc::now().timestamp();
68        
69        Ok(Self {
70            id: uuid::Uuid::new_v4().to_string(),
71            name: job.name().to_string(),
72            payload,
73            status: JobStatus::Pending,
74            attempts: 0,
75            max_retries: job.max_retries(),
76            created_at: now,
77            scheduled_at: None,
78            priority: job.priority(),
79            cron_schedule: None,
80            last_run_at: None,
81            error: None,
82        })
83    }
84
85    pub fn with_delay(mut self, delay: Duration) -> Self {
86        let scheduled_time = chrono::Utc::now().timestamp() + delay.as_secs() as i64;
87        self.scheduled_at = Some(scheduled_time);
88        self
89    }
90
91    /// Schedule job with cron expression (e.g., "0 0 * * * *" for hourly)
92    pub fn with_cron(mut self, cron_expr: String) -> Self {
93        self.scheduled_at = Self::next_cron_run(&cron_expr);
94        self.cron_schedule = Some(cron_expr);
95        self
96    }
97
98    /// Calculate next run time from cron expression
99    fn next_cron_run(cron_expr: &str) -> Option<i64> {
100        use cron::Schedule;
101        use std::str::FromStr;
102        
103        if let Ok(schedule) = Schedule::from_str(cron_expr) {
104            if let Some(next) = schedule.upcoming(chrono::Utc).next() {
105                return Some(next.timestamp());
106            }
107        }
108        None
109    }
110
111    /// Reschedule recurring job for next run
112    pub fn reschedule(&mut self) {
113        if let Some(cron_expr) = &self.cron_schedule {
114            self.scheduled_at = Self::next_cron_run(cron_expr);
115            self.status = JobStatus::Pending;
116            self.attempts = 0;
117            self.last_run_at = Some(chrono::Utc::now().timestamp());
118        }
119    }
120
121    /// Check if job should be retried
122    pub fn should_retry(&self) -> bool {
123        self.status == JobStatus::Failed && self.attempts < self.max_retries
124    }
125
126    /// Calculate backoff duration for retry
127    pub fn calculate_backoff(&self) -> Duration {
128        // Exponential backoff: 60s * 2^attempts
129        Duration::from_secs(60 * 2_u64.pow(self.attempts))
130    }
131}