aurora_db/workers/
job.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status tracking the lifecycle of a background job
7///
8/// Jobs progress through states: Pending → Running → Completed/Failed → DeadLetter
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub enum JobStatus {
11    /// Job is queued and waiting to be processed
12    Pending,
13    /// Job is currently being executed by a worker
14    Running,
15    /// Job completed successfully
16    Completed,
17    /// Job failed but can be retried
18    Failed { error: String, retries: u32 },
19    /// Job permanently failed after max retries
20    DeadLetter { error: String },
21}
22
23/// Job priority for execution order
24///
25/// Higher priority jobs are executed first. Critical > High > Normal > Low.
26///
27/// # Examples
28///
29/// ```
30/// use aurora_db::workers::{Job, JobPriority};
31///
32/// // Time-sensitive payment
33/// let payment = Job::new("process_payment")
34///     .with_priority(JobPriority::Critical);
35///
36/// // User-facing task
37/// let notification = Job::new("send_notification")
38///     .with_priority(JobPriority::High);
39///
40/// // Background cleanup
41/// let cleanup = Job::new("cleanup_temp_files")
42///     .with_priority(JobPriority::Low);
43/// ```
44#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum JobPriority {
46    /// Low priority (value: 1) - background tasks, cleanup
47    Low = 1,
48    /// Normal priority (value: 2) - default for most jobs
49    Normal = 2,
50    /// High priority (value: 3) - user-facing operations
51    High = 3,
52    /// Critical priority (value: 4) - payments, security
53    Critical = 4,
54}
55
56/// A durable background job
57///
58/// Represents a unit of work that can be queued, scheduled, retried, and tracked.
59/// Jobs are persisted to disk and survive restarts. Use the builder pattern to
60/// configure job properties.
61///
62/// # Examples
63///
64/// ```
65/// use aurora_db::workers::{Job, JobPriority};
66/// use serde_json::json;
67///
68/// // Simple job
69/// let job = Job::new("send_email")
70///     .add_field("to", json!("user@example.com"))
71///     .add_field("subject", json!("Welcome!"));
72///
73/// // Job with all options
74/// let job = Job::new("process_video")
75///     .add_field("video_id", json!("vid-123"))
76///     .add_field("resolution", json!("1080p"))
77///     .with_priority(JobPriority::High)
78///     .with_max_retries(5)
79///     .with_timeout(600) // 10 minutes
80///     .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
81/// ```
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct Job {
84    pub id: String,
85    pub job_type: String,
86    pub payload: HashMap<String, serde_json::Value>,
87    pub priority: JobPriority,
88    pub status: JobStatus,
89    pub max_retries: u32,
90    pub retry_count: u32,
91    pub created_at: DateTime<Utc>,
92    pub scheduled_at: Option<DateTime<Utc>>,
93    pub started_at: Option<DateTime<Utc>>,
94    pub completed_at: Option<DateTime<Utc>>,
95    pub timeout_seconds: Option<u64>,
96}
97
98impl Job {
99    /// Create a new job
100    ///
101    /// Creates a job with default settings:
102    /// - Priority: Normal
103    /// - Max retries: 3
104    /// - Timeout: 5 minutes
105    /// - Status: Pending
106    ///
107    /// # Arguments
108    /// * `job_type` - Type identifier for routing to the correct handler
109    ///
110    /// # Examples
111    ///
112    /// ```
113    /// // Create different job types
114    /// let email = Job::new("send_email");
115    /// let image = Job::new("resize_image");
116    /// let report = Job::new("generate_report");
117    /// ```
118    pub fn new(job_type: impl Into<String>) -> Self {
119        Self {
120            id: Uuid::new_v4().to_string(),
121            job_type: job_type.into(),
122            payload: HashMap::new(),
123            priority: JobPriority::Normal,
124            status: JobStatus::Pending,
125            max_retries: 3,
126            retry_count: 0,
127            created_at: Utc::now(),
128            scheduled_at: None,
129            started_at: None,
130            completed_at: None,
131            timeout_seconds: Some(300), // 5 minutes default
132        }
133    }
134
135    /// Set job payload from a HashMap
136    ///
137    /// Replaces the entire payload with the provided HashMap.
138    /// For adding individual fields, use `add_field()` instead.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// use std::collections::HashMap;
144    /// use serde_json::json;
145    ///
146    /// let mut payload = HashMap::new();
147    /// payload.insert("user_id".to_string(), json!("123"));
148    /// payload.insert("amount".to_string(), json!(99.99));
149    ///
150    /// let job = Job::new("process_payment")
151    ///     .with_payload(payload);
152    /// ```
153    pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
154        self.payload = payload;
155        self
156    }
157
158    /// Add a single field to the job payload
159    ///
160    /// Use this for building the payload incrementally.
161    /// Can be chained multiple times.
162    ///
163    /// # Examples
164    ///
165    /// ```
166    /// use serde_json::json;
167    ///
168    /// let job = Job::new("send_email")
169    ///     .add_field("to", json!("user@example.com"))
170    ///     .add_field("subject", json!("Welcome!"))
171    ///     .add_field("template", json!("welcome_email"))
172    ///     .add_field("vars", json!({"name": "Alice"}));
173    /// ```
174    pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
175        self.payload.insert(key.into(), value);
176        self
177    }
178
179    /// Set job priority
180    ///
181    /// Higher priority jobs are executed before lower priority ones.
182    /// Default is `JobPriority::Normal`.
183    ///
184    /// # Examples
185    ///
186    /// ```
187    /// // Critical - payments, security operations
188    /// let payment = Job::new("charge_card")
189    ///     .with_priority(JobPriority::Critical);
190    ///
191    /// // High - user-facing operations
192    /// let notification = Job::new("push_notification")
193    ///     .with_priority(JobPriority::High);
194    ///
195    /// // Low - background cleanup, analytics
196    /// let cleanup = Job::new("clean_old_logs")
197    ///     .with_priority(JobPriority::Low);
198    /// ```
199    pub fn with_priority(mut self, priority: JobPriority) -> Self {
200        self.priority = priority;
201        self
202    }
203
204    /// Set maximum retry attempts
205    ///
206    /// If a job fails, it will be retried up to this many times with
207    /// exponential backoff. Default is 3 retries.
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// // Network operation - retry more
213    /// let api_call = Job::new("fetch_api_data")
214    ///     .with_max_retries(5);
215    ///
216    /// // Critical operation - don't retry
217    /// let one_time = Job::new("send_invoice")
218    ///     .with_max_retries(0);
219    ///
220    /// // Flaky operation - retry extensively
221    /// let external = Job::new("third_party_webhook")
222    ///     .with_max_retries(10);
223    /// ```
224    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
225        self.max_retries = max_retries;
226        self
227    }
228
229    /// Schedule job for later execution
230    ///
231    /// Job will not be executed until the specified time.
232    /// Useful for reminders, scheduled reports, delayed notifications.
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// use chrono::{Utc, Duration};
238    ///
239    /// // Run in 1 hour
240    /// let reminder = Job::new("send_reminder")
241    ///     .add_field("message", json!("Meeting starts soon"))
242    ///     .scheduled_at(Utc::now() + Duration::hours(1));
243    ///
244    /// // Run at specific time
245    /// let report = Job::new("daily_report")
246    ///     .scheduled_at(Utc::now().date_naive().and_hms_opt(9, 0, 0).unwrap());
247    ///
248    /// // Delayed retry pattern
249    /// let retry = Job::new("retry_failed_upload")
250    ///     .scheduled_at(Utc::now() + Duration::minutes(30));
251    /// ```
252    pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
253        self.scheduled_at = Some(at);
254        self
255    }
256
257    /// Set job execution timeout
258    ///
259    /// Job will be terminated if it runs longer than this.
260    /// Default is 300 seconds (5 minutes).
261    ///
262    /// # Examples
263    ///
264    /// ```
265    /// // Quick task
266    /// let quick = Job::new("send_sms")
267    ///     .with_timeout(30); // 30 seconds
268    ///
269    /// // Long-running task
270    /// let video = Job::new("transcode_video")
271    ///     .with_timeout(3600); // 1 hour
272    ///
273    /// // Very long task
274    /// let batch = Job::new("process_millions_of_records")
275    ///     .with_timeout(7200); // 2 hours
276    /// ```
277    pub fn with_timeout(mut self, seconds: u64) -> Self {
278        self.timeout_seconds = Some(seconds);
279        self
280    }
281
282    /// Check if job should be executed (based on schedule)
283    pub fn should_run(&self) -> bool {
284        match self.status {
285            JobStatus::Pending => {
286                if let Some(scheduled) = self.scheduled_at {
287                    Utc::now() >= scheduled
288                } else {
289                    true
290                }
291            }
292            _ => false,
293        }
294    }
295
296    /// Mark job as running
297    pub fn mark_running(&mut self) {
298        self.status = JobStatus::Running;
299        self.started_at = Some(Utc::now());
300    }
301
302    /// Mark job as completed
303    pub fn mark_completed(&mut self) {
304        self.status = JobStatus::Completed;
305        self.completed_at = Some(Utc::now());
306    }
307
308    /// Mark job as failed
309    pub fn mark_failed(&mut self, error: String) {
310        self.retry_count += 1;
311
312        if self.retry_count >= self.max_retries {
313            self.status = JobStatus::DeadLetter { error };
314        } else {
315            self.status = JobStatus::Failed {
316                error,
317                retries: self.retry_count,
318            };
319        }
320    }
321
322    /// Calculate next retry delay (exponential backoff)
323    pub fn next_retry_delay(&self) -> chrono::Duration {
324        let base_delay = 5; // 5 seconds
325        let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
326        chrono::Duration::seconds(delay_seconds)
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn test_job_creation() {
336        let job = Job::new("email")
337            .add_field("to", serde_json::json!("user@example.com"))
338            .add_field("subject", serde_json::json!("Hello"))
339            .with_priority(JobPriority::High)
340            .with_max_retries(5);
341
342        assert_eq!(job.job_type, "email");
343        assert_eq!(job.priority, JobPriority::High);
344        assert_eq!(job.max_retries, 5);
345        assert_eq!(job.status, JobStatus::Pending);
346    }
347
348    #[test]
349    fn test_job_should_run() {
350        let job = Job::new("test");
351        assert!(job.should_run());
352
353        let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
354        assert!(!future_job.should_run());
355
356        let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
357        assert!(past_job.should_run());
358    }
359
360    #[test]
361    fn test_job_retry_logic() {
362        let mut job = Job::new("test").with_max_retries(3);
363
364        job.mark_failed("Error 1".to_string());
365        assert!(matches!(job.status, JobStatus::Failed { .. }));
366        assert_eq!(job.retry_count, 1);
367
368        job.mark_failed("Error 2".to_string());
369        assert_eq!(job.retry_count, 2);
370
371        job.mark_failed("Error 3".to_string());
372        assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
373        assert_eq!(job.retry_count, 3);
374    }
375
376    #[test]
377    fn test_exponential_backoff() {
378        let mut job = Job::new("test");
379
380        assert_eq!(job.next_retry_delay().num_seconds(), 5);
381
382        job.retry_count = 1;
383        assert_eq!(job.next_retry_delay().num_seconds(), 10);
384
385        job.retry_count = 2;
386        assert_eq!(job.next_retry_delay().num_seconds(), 20);
387
388        job.retry_count = 3;
389        assert_eq!(job.next_retry_delay().num_seconds(), 40);
390    }
391}