Skip to main content

aurora_db/workers/
job.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status tracking the lifecycle of a background job
7///
8/// Jobs progress through states: Pending -> Running -> Completed/Failed -> DeadLetter
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub enum JobStatus {
11    /// Job is queued and waiting to be processed
12    Pending,
13    /// Job is currently being executed by a worker
14    Running,
15    /// Job completed successfully
16    Completed,
17    /// Job failed but can be retried
18    Failed { error: String, retries: u32 },
19    /// Job permanently failed after max retries
20    DeadLetter { error: String },
21}
22
23impl std::fmt::Display for JobStatus {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            JobStatus::Pending => write!(f, "Pending"),
27            JobStatus::Running => write!(f, "Running"),
28            JobStatus::Completed => write!(f, "Completed"),
29            JobStatus::Failed { error, retries } => write!(f, "Failed({}, retries: {})", error, retries),
30            JobStatus::DeadLetter { error } => write!(f, "DeadLetter({})", error),
31        }
32    }
33}
34
35/// Job priority for execution order
36///
37/// Higher priority jobs are executed first. Critical > High > Normal > Low.
38///
39/// # Examples
40///
41/// ```
42/// use aurora_db::workers::{Job, JobPriority};
43///
44/// // Time-sensitive payment
45/// let payment = Job::new("process_payment")
46///     .with_priority(JobPriority::Critical);
47///
48/// // User-facing task
49/// let notification = Job::new("send_notification")
50///     .with_priority(JobPriority::High);
51///
52/// // Background cleanup
53/// let cleanup = Job::new("cleanup_temp_files")
54///     .with_priority(JobPriority::Low);
55/// ```
56#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
57pub enum JobPriority {
58    /// Low priority (value: 1) - background tasks, cleanup
59    Low = 1,
60    /// Normal priority (value: 2) - default for most jobs
61    Normal = 2,
62    /// High priority (value: 3) - user-facing operations
63    High = 3,
64    /// Critical priority (value: 4) - payments, security
65    Critical = 4,
66}
67
68impl std::fmt::Display for JobPriority {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            JobPriority::Low => write!(f, "Low"),
72            JobPriority::Normal => write!(f, "Normal"),
73            JobPriority::High => write!(f, "High"),
74            JobPriority::Critical => write!(f, "Critical"),
75        }
76    }
77}
78
79/// A durable background job
80///
81/// Represents a unit of work that can be queued, scheduled, retried, and tracked.
82/// Jobs are persisted to disk and survive restarts. Use the builder pattern to
83/// configure job properties.
84///
85/// # Examples
86///
87/// ```
88/// use aurora_db::workers::{Job, JobPriority};
89/// use serde_json::json;
90///
91/// // Simple job
92/// let job = Job::new("send_email")
93///     .add_field("to", json!("user@example.com"))
94///     .add_field("subject", json!("Welcome!"));
95///
96/// // Job with all options
97/// let job = Job::new("process_video")
98///     .add_field("video_id", json!("vid-123"))
99///     .add_field("resolution", json!("1080p"))
100///     .with_priority(JobPriority::High)
101///     .with_max_retries(5)
102///     .with_timeout(600) // 10 minutes
103///     .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
104/// ```
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct Job {
107    pub id: String,
108    pub job_type: String,
109    pub payload: HashMap<String, serde_json::Value>,
110    pub priority: JobPriority,
111    pub status: JobStatus,
112    pub max_retries: u32,
113    pub retry_count: u32,
114    pub created_at: DateTime<Utc>,
115    pub scheduled_at: Option<DateTime<Utc>>,
116    pub started_at: Option<DateTime<Utc>>,
117    pub completed_at: Option<DateTime<Utc>>,
118    pub timeout_seconds: Option<u64>,
119    /// Unix timestamp of last "I'm alive" signal from worker (for Reaper)
120    #[serde(default)]
121    pub last_heartbeat: u64,
122    /// Seconds before worker is considered dead (default: 30)
123    #[serde(default = "default_lease")]
124    pub lease_duration: u64,
125}
126
127impl std::fmt::Display for Job {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "{}", serde_json::to_string_pretty(self).unwrap_or_default())
130    }
131}
132
133/// Default lease duration: 30 seconds
134fn default_lease() -> u64 {
135    30
136}
137
138impl Job {
139    /// Create a new job
140    ///
141    /// Creates a job with default settings:
142    /// - Priority: Normal
143    /// - Max retries: 3
144    /// - Timeout: 5 minutes
145    /// - Status: Pending
146    ///
147    /// # Arguments
148    /// * `job_type` - Type identifier for routing to the correct handler
149    ///
150    /// # Examples
151    ///
152   
153    /// // Create different job types
154    /// let email = Job::new("send_email");
155    /// let image = Job::new("resize_image");
156    /// let report = Job::new("generate_report");
157    /// ```
158    pub fn new(job_type: impl Into<String>) -> Self {
159        Self {
160            id: Uuid::new_v4().to_string(),
161            job_type: job_type.into(),
162            payload: HashMap::new(),
163            priority: JobPriority::Normal,
164            status: JobStatus::Pending,
165            max_retries: 3,
166            retry_count: 0,
167            created_at: Utc::now(),
168            scheduled_at: None,
169            started_at: None,
170            completed_at: None,
171            timeout_seconds: Some(300), // 5 minutes default
172            last_heartbeat: 0,
173            lease_duration: default_lease(),
174        }
175    }
176
177    /// Set job payload from a HashMap
178    ///
179    /// Replaces the entire payload with the provided HashMap.
180    /// For adding individual fields, use `add_field()` instead.
181    ///
182    /// # Examples
183    ///
184   
185    /// use std::collections::HashMap;
186    /// use serde_json::json;
187    ///
188    /// let mut payload = HashMap::new();
189    /// payload.insert("user_id".to_string(), json!("123"));
190    /// payload.insert("amount".to_string(), json!(99.99));
191    ///
192    /// let job = Job::new("process_payment")
193    ///     .with_payload(payload);
194    /// ```
195    pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
196        self.payload = payload;
197        self
198    }
199
200    /// Add a single field to the job payload
201    ///
202    /// Use this for building the payload incrementally.
203    /// Can be chained multiple times.
204    ///
205    /// # Examples
206    ///
207   
208    /// use serde_json::json;
209    ///
210    /// let job = Job::new("send_email")
211    ///     .add_field("to", json!("user@example.com"))
212    ///     .add_field("subject", json!("Welcome!"))
213    ///     .add_field("template", json!("welcome_email"))
214    ///     .add_field("vars", json!({"name": "Alice"}));
215    /// ```
216    pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
217        self.payload.insert(key.into(), value);
218        self
219    }
220
221    /// Set job priority
222    ///
223    /// Higher priority jobs are executed before lower priority ones.
224    /// Default is `JobPriority::Normal`.
225    ///
226    /// # Examples
227    ///
228   
229    /// // Critical - payments, security operations
230    /// let payment = Job::new("charge_card")
231    ///     .with_priority(JobPriority::Critical);
232    ///
233    /// // High - user-facing operations
234    /// let notification = Job::new("push_notification")
235    ///     .with_priority(JobPriority::High);
236    ///
237    /// // Low - background cleanup, analytics
238    /// let cleanup = Job::new("clean_old_logs")
239    ///     .with_priority(JobPriority::Low);
240    /// ```
241    pub fn with_priority(mut self, priority: JobPriority) -> Self {
242        self.priority = priority;
243        self
244    }
245
246    /// Set maximum retry attempts
247    ///
248    /// If a job fails, it will be retried up to this many times with
249    /// exponential backoff. Default is 3 retries.
250    ///
251    /// # Examples
252    ///
253   
254    /// // Network operation - retry more
255    /// let api_call = Job::new("fetch_api_data")
256    ///     .with_max_retries(5);
257    ///
258    /// // Critical operation - don't retry
259    /// let one_time = Job::new("send_invoice")
260    ///     .with_max_retries(0);
261    ///
262    /// // Flaky operation - retry extensively
263    /// let external = Job::new("third_party_webhook")
264    ///     .with_max_retries(10);
265    /// ```
266    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
267        self.max_retries = max_retries;
268        self
269    }
270
271    /// Schedule job for later execution
272    ///
273    /// Job will not be executed until the specified time.
274    /// Useful for reminders, scheduled reports, delayed notifications.
275    ///
276    /// # Examples
277    ///
278   
279    /// use chrono::{Utc, Duration};
280    ///
281    /// // Run in 1 hour
282    /// let reminder = Job::new("send_reminder")
283    ///     .add_field("message", json!("Meeting starts soon"))
284    ///     .scheduled_at(Utc::now() + Duration::hours(1));
285    ///
286    /// // Run at specific time
287    /// let report = Job::new("daily_report")
288    ///     .scheduled_at(Utc::now().date_naive().and_hms_opt(9, 0, 0).unwrap());
289    ///
290    /// // Delayed retry pattern
291    /// let retry = Job::new("retry_failed_upload")
292    ///     .scheduled_at(Utc::now() + Duration::minutes(30));
293    /// ```
294    pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
295        self.scheduled_at = Some(at);
296        self
297    }
298
299    /// Set job execution timeout
300    ///
301    /// Job will be terminated if it runs longer than this.
302    /// Default is 300 seconds (5 minutes).
303    ///
304    /// # Examples
305    ///
306   
307    /// // Quick task
308    /// let quick = Job::new("send_sms")
309    ///     .with_timeout(30); // 30 seconds
310    ///
311    /// // Long-running task
312    /// let video = Job::new("transcode_video")
313    ///     .with_timeout(3600); // 1 hour
314    ///
315    /// // Very long task
316    /// let batch = Job::new("process_millions_of_records")
317    ///     .with_timeout(7200); // 2 hours
318    /// ```
319    pub fn with_timeout(mut self, seconds: u64) -> Self {
320        self.timeout_seconds = Some(seconds);
321        self
322    }
323
324    /// Check if job should be executed (based on schedule)
325    pub fn should_run(&self) -> bool {
326        match self.status {
327            JobStatus::Pending => {
328                if let Some(scheduled) = self.scheduled_at {
329                    Utc::now() >= scheduled
330                } else {
331                    true
332                }
333            }
334            _ => false,
335        }
336    }
337
338    /// Mark job as running
339    pub fn mark_running(&mut self) {
340        self.status = JobStatus::Running;
341        self.started_at = Some(Utc::now());
342        self.touch(); // Update heartbeat when starting
343    }
344
345    /// Update heartbeat timestamp to "now"
346    ///
347    /// Workers should call this periodically while processing a job.
348    /// The Reaper uses this to detect dead workers.
349    pub fn touch(&mut self) {
350        self.last_heartbeat = std::time::SystemTime::now()
351            .duration_since(std::time::UNIX_EPOCH)
352            .unwrap()
353            .as_secs();
354    }
355
356    /// Check if job's heartbeat has expired (worker presumed dead)
357    pub fn is_heartbeat_expired(&self) -> bool {
358        if !matches!(self.status, JobStatus::Running) {
359            return false;
360        }
361        let now = std::time::SystemTime::now()
362            .duration_since(std::time::UNIX_EPOCH)
363            .unwrap()
364            .as_secs();
365        now.saturating_sub(self.last_heartbeat) > self.lease_duration
366    }
367
368    /// Set custom lease duration
369    pub fn with_lease_duration(mut self, seconds: u64) -> Self {
370        self.lease_duration = seconds;
371        self
372    }
373
374    /// Mark job as completed
375    pub fn mark_completed(&mut self) {
376        self.status = JobStatus::Completed;
377        self.completed_at = Some(Utc::now());
378    }
379
380    /// Mark job as failed
381    pub fn mark_failed(&mut self, error: String) {
382        self.retry_count += 1;
383
384        if self.retry_count >= self.max_retries {
385            self.status = JobStatus::DeadLetter { error };
386        } else {
387            self.status = JobStatus::Failed {
388                error,
389                retries: self.retry_count,
390            };
391        }
392    }
393
394    /// Calculate next retry delay (exponential backoff)
395    pub fn next_retry_delay(&self) -> chrono::Duration {
396        let base_delay = 5; // 5 seconds
397        let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
398        chrono::Duration::seconds(delay_seconds)
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_job_creation() {
408        let job = Job::new("email")
409            .add_field("to", serde_json::json!("user@example.com"))
410            .add_field("subject", serde_json::json!("Hello"))
411            .with_priority(JobPriority::High)
412            .with_max_retries(5);
413
414        assert_eq!(job.job_type, "email");
415        assert_eq!(job.priority, JobPriority::High);
416        assert_eq!(job.max_retries, 5);
417        assert_eq!(job.status, JobStatus::Pending);
418    }
419
420    #[test]
421    fn test_job_should_run() {
422        let job = Job::new("test");
423        assert!(job.should_run());
424
425        let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
426        assert!(!future_job.should_run());
427
428        let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
429        assert!(past_job.should_run());
430    }
431
432    #[test]
433    fn test_job_retry_logic() {
434        let mut job = Job::new("test").with_max_retries(3);
435
436        job.mark_failed("Error 1".to_string());
437        assert!(matches!(job.status, JobStatus::Failed { .. }));
438        assert_eq!(job.retry_count, 1);
439
440        job.mark_failed("Error 2".to_string());
441        assert_eq!(job.retry_count, 2);
442
443        job.mark_failed("Error 3".to_string());
444        assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
445        assert_eq!(job.retry_count, 3);
446    }
447
448    #[test]
449    fn test_exponential_backoff() {
450        let mut job = Job::new("test");
451
452        assert_eq!(job.next_retry_delay().num_seconds(), 5);
453
454        job.retry_count = 1;
455        assert_eq!(job.next_retry_delay().num_seconds(), 10);
456
457        job.retry_count = 2;
458        assert_eq!(job.next_retry_delay().num_seconds(), 20);
459
460        job.retry_count = 3;
461        assert_eq!(job.next_retry_delay().num_seconds(), 40);
462    }
463}