aurora_db/workers/
job.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
8pub enum JobStatus {
9    Pending,
10    Running,
11    Completed,
12    Failed { error: String, retries: u32 },
13    DeadLetter { error: String },
14}
15
16/// Job priority
17#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
18pub enum JobPriority {
19    Low = 1,
20    Normal = 2,
21    High = 3,
22    Critical = 4,
23}
24
25/// A durable background job
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Job {
28    pub id: String,
29    pub job_type: String,
30    pub payload: HashMap<String, serde_json::Value>,
31    pub priority: JobPriority,
32    pub status: JobStatus,
33    pub max_retries: u32,
34    pub retry_count: u32,
35    pub created_at: DateTime<Utc>,
36    pub scheduled_at: Option<DateTime<Utc>>,
37    pub started_at: Option<DateTime<Utc>>,
38    pub completed_at: Option<DateTime<Utc>>,
39    pub timeout_seconds: Option<u64>,
40}
41
42impl Job {
43    /// Create a new job
44    pub fn new(job_type: impl Into<String>) -> Self {
45        Self {
46            id: Uuid::new_v4().to_string(),
47            job_type: job_type.into(),
48            payload: HashMap::new(),
49            priority: JobPriority::Normal,
50            status: JobStatus::Pending,
51            max_retries: 3,
52            retry_count: 0,
53            created_at: Utc::now(),
54            scheduled_at: None,
55            started_at: None,
56            completed_at: None,
57            timeout_seconds: Some(300), // 5 minutes default
58        }
59    }
60
61    /// Set job payload
62    pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
63        self.payload = payload;
64        self
65    }
66
67    /// Add a single payload field
68    pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
69        self.payload.insert(key.into(), value);
70        self
71    }
72
73    /// Set job priority
74    pub fn with_priority(mut self, priority: JobPriority) -> Self {
75        self.priority = priority;
76        self
77    }
78
79    /// Set max retries
80    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
81        self.max_retries = max_retries;
82        self
83    }
84
85    /// Schedule job for later execution
86    pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
87        self.scheduled_at = Some(at);
88        self
89    }
90
91    /// Set job timeout
92    pub fn with_timeout(mut self, seconds: u64) -> Self {
93        self.timeout_seconds = Some(seconds);
94        self
95    }
96
97    /// Check if job should be executed (based on schedule)
98    pub fn should_run(&self) -> bool {
99        match self.status {
100            JobStatus::Pending => {
101                if let Some(scheduled) = self.scheduled_at {
102                    Utc::now() >= scheduled
103                } else {
104                    true
105                }
106            }
107            _ => false,
108        }
109    }
110
111    /// Mark job as running
112    pub fn mark_running(&mut self) {
113        self.status = JobStatus::Running;
114        self.started_at = Some(Utc::now());
115    }
116
117    /// Mark job as completed
118    pub fn mark_completed(&mut self) {
119        self.status = JobStatus::Completed;
120        self.completed_at = Some(Utc::now());
121    }
122
123    /// Mark job as failed
124    pub fn mark_failed(&mut self, error: String) {
125        self.retry_count += 1;
126
127        if self.retry_count >= self.max_retries {
128            self.status = JobStatus::DeadLetter { error };
129        } else {
130            self.status = JobStatus::Failed {
131                error,
132                retries: self.retry_count,
133            };
134        }
135    }
136
137    /// Calculate next retry delay (exponential backoff)
138    pub fn next_retry_delay(&self) -> chrono::Duration {
139        let base_delay = 5; // 5 seconds
140        let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
141        chrono::Duration::seconds(delay_seconds)
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_job_creation() {
151        let job = Job::new("email")
152            .add_field("to", serde_json::json!("user@example.com"))
153            .add_field("subject", serde_json::json!("Hello"))
154            .with_priority(JobPriority::High)
155            .with_max_retries(5);
156
157        assert_eq!(job.job_type, "email");
158        assert_eq!(job.priority, JobPriority::High);
159        assert_eq!(job.max_retries, 5);
160        assert_eq!(job.status, JobStatus::Pending);
161    }
162
163    #[test]
164    fn test_job_should_run() {
165        let job = Job::new("test");
166        assert!(job.should_run());
167
168        let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
169        assert!(!future_job.should_run());
170
171        let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
172        assert!(past_job.should_run());
173    }
174
175    #[test]
176    fn test_job_retry_logic() {
177        let mut job = Job::new("test").with_max_retries(3);
178
179        job.mark_failed("Error 1".to_string());
180        assert!(matches!(job.status, JobStatus::Failed { .. }));
181        assert_eq!(job.retry_count, 1);
182
183        job.mark_failed("Error 2".to_string());
184        assert_eq!(job.retry_count, 2);
185
186        job.mark_failed("Error 3".to_string());
187        assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
188        assert_eq!(job.retry_count, 3);
189    }
190
191    #[test]
192    fn test_exponential_backoff() {
193        let mut job = Job::new("test");
194
195        assert_eq!(job.next_retry_delay().num_seconds(), 5);
196
197        job.retry_count = 1;
198        assert_eq!(job.next_retry_delay().num_seconds(), 10);
199
200        job.retry_count = 2;
201        assert_eq!(job.next_retry_delay().num_seconds(), 20);
202
203        job.retry_count = 3;
204        assert_eq!(job.next_retry_delay().num_seconds(), 40);
205    }
206}