Skip to main content

aurora_db/workers/
job.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status tracking the lifecycle of a background job
7///
8/// Jobs progress through states: Pending -> Running -> Completed/Failed -> DeadLetter
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub enum JobStatus {
11    /// Job is queued and waiting to be processed
12    Pending,
13    /// Job is currently being executed by a worker
14    Running,
15    /// Job completed successfully
16    Completed,
17    /// Job failed but can be retried
18    Failed { error: String, retries: u32 },
19    /// Job permanently failed after max retries
20    DeadLetter { error: String },
21}
22
23impl std::fmt::Display for JobStatus {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            JobStatus::Pending => write!(f, "Pending"),
27            JobStatus::Running => write!(f, "Running"),
28            JobStatus::Completed => write!(f, "Completed"),
29            JobStatus::Failed { error, retries } => {
30                write!(f, "Failed({}, retries: {})", error, retries)
31            }
32            JobStatus::DeadLetter { error } => write!(f, "DeadLetter({})", error),
33        }
34    }
35}
36
37/// Job priority for execution order
38///
39/// Higher priority jobs are executed first. Critical > High > Normal > Low.
40///
41/// # Examples
42///
43/// ```
44/// use aurora_db::workers::{Job, JobPriority};
45///
46/// // Time-sensitive payment
47/// let payment = Job::new("process_payment")
48///     .with_priority(JobPriority::Critical);
49///
50/// // User-facing task
51/// let notification = Job::new("send_notification")
52///     .with_priority(JobPriority::High);
53///
54/// // Background cleanup
55/// let cleanup = Job::new("cleanup_temp_files")
56///     .with_priority(JobPriority::Low);
57/// ```
58#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
59pub enum JobPriority {
60    /// Low priority (value: 1) - background tasks, cleanup
61    Low = 1,
62    /// Normal priority (value: 2) - default for most jobs
63    Normal = 2,
64    /// High priority (value: 3) - user-facing operations
65    High = 3,
66    /// Critical priority (value: 4) - payments, security
67    Critical = 4,
68}
69
70impl std::fmt::Display for JobPriority {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            JobPriority::Low => write!(f, "Low"),
74            JobPriority::Normal => write!(f, "Normal"),
75            JobPriority::High => write!(f, "High"),
76            JobPriority::Critical => write!(f, "Critical"),
77        }
78    }
79}
80
81/// A durable background job
82///
83/// Represents a unit of work that can be queued, scheduled, retried, and tracked.
84/// Jobs are persisted to disk and survive restarts. Use the builder pattern to
85/// configure job properties.
86///
87/// # Examples
88///
89/// ```
90/// use aurora_db::workers::{Job, JobPriority};
91/// use serde_json::json;
92///
93/// // Simple job
94/// let job = Job::new("send_email")
95///     .add_field("to", json!("user@example.com"))
96///     .add_field("subject", json!("Welcome!"));
97///
98/// // Job with all options
99/// let job = Job::new("process_video")
100///     .add_field("video_id", json!("vid-123"))
101///     .add_field("resolution", json!("1080p"))
102///     .with_priority(JobPriority::High)
103///     .with_max_retries(5)
104///     .with_timeout(600) // 10 minutes
105///     .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
106/// ```
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct Job {
109    pub id: String,
110    pub job_type: String,
111    pub payload: HashMap<String, serde_json::Value>,
112    pub priority: JobPriority,
113    pub status: JobStatus,
114    pub max_retries: u32,
115    pub retry_count: u32,
116    pub created_at: DateTime<Utc>,
117    pub scheduled_at: Option<DateTime<Utc>>,
118    pub started_at: Option<DateTime<Utc>>,
119    pub completed_at: Option<DateTime<Utc>>,
120    pub timeout_seconds: Option<u64>,
121    /// Unix timestamp of last "I'm alive" signal from worker (for Reaper)
122    #[serde(default)]
123    pub last_heartbeat: u64,
124    /// Seconds before worker is considered dead (default: 30)
125    #[serde(default = "default_lease")]
126    pub lease_duration: u64,
127}
128
129impl std::fmt::Display for Job {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        write!(
132            f,
133            "{}",
134            serde_json::to_string_pretty(self).unwrap_or_default()
135        )
136    }
137}
138
139/// Default lease duration: 30 seconds
140fn default_lease() -> u64 {
141    30
142}
143
144impl Job {
145    /// Create a new job
146    ///
147    /// Creates a job with default settings:
148    /// - Priority: Normal
149    /// - Max retries: 3
150    /// - Timeout: 5 minutes
151    /// - Status: Pending
152    ///
153    /// # Arguments
154    /// * `job_type` - Type identifier for routing to the correct handler
155    ///
156    /// # Examples
157    ///
158
159    /// // Create different job types
160    /// let email = Job::new("send_email");
161    /// let image = Job::new("resize_image");
162    /// let report = Job::new("generate_report");
163    /// ```
164    pub fn new(job_type: impl Into<String>) -> Self {
165        Self {
166            id: Uuid::new_v4().to_string(),
167            job_type: job_type.into(),
168            payload: HashMap::new(),
169            priority: JobPriority::Normal,
170            status: JobStatus::Pending,
171            max_retries: 3,
172            retry_count: 0,
173            created_at: Utc::now(),
174            scheduled_at: None,
175            started_at: None,
176            completed_at: None,
177            timeout_seconds: Some(300), // 5 minutes default
178            last_heartbeat: 0,
179            lease_duration: default_lease(),
180        }
181    }
182
183    /// Set job payload from a HashMap
184    ///
185    /// Replaces the entire payload with the provided HashMap.
186    /// For adding individual fields, use `add_field()` instead.
187    ///
188    /// # Examples
189    ///
190
191    /// use std::collections::HashMap;
192    /// use serde_json::json;
193    ///
194    /// let mut payload = HashMap::new();
195    /// payload.insert("user_id".to_string(), json!("123"));
196    /// payload.insert("amount".to_string(), json!(99.99));
197    ///
198    /// let job = Job::new("process_payment")
199    ///     .with_payload(payload);
200    /// ```
201    pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
202        self.payload = payload;
203        self
204    }
205
206    /// Add a single field to the job payload
207    ///
208    /// Use this for building the payload incrementally.
209    /// Can be chained multiple times.
210    ///
211    /// # Examples
212    ///
213
214    /// use serde_json::json;
215    ///
216    /// let job = Job::new("send_email")
217    ///     .add_field("to", json!("user@example.com"))
218    ///     .add_field("subject", json!("Welcome!"))
219    ///     .add_field("template", json!("welcome_email"))
220    ///     .add_field("vars", json!({"name": "Alice"}));
221    /// ```
222    pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
223        self.payload.insert(key.into(), value);
224        self
225    }
226
227    /// Set job priority
228    ///
229    /// Higher priority jobs are executed before lower priority ones.
230    /// Default is `JobPriority::Normal`.
231    ///
232    /// # Examples
233    ///
234
235    /// // Critical - payments, security operations
236    /// let payment = Job::new("charge_card")
237    ///     .with_priority(JobPriority::Critical);
238    ///
239    /// // High - user-facing operations
240    /// let notification = Job::new("push_notification")
241    ///     .with_priority(JobPriority::High);
242    ///
243    /// // Low - background cleanup, analytics
244    /// let cleanup = Job::new("clean_old_logs")
245    ///     .with_priority(JobPriority::Low);
246    /// ```
247    pub fn with_priority(mut self, priority: JobPriority) -> Self {
248        self.priority = priority;
249        self
250    }
251
252    /// Set maximum retry attempts
253    ///
254    /// If a job fails, it will be retried up to this many times with
255    /// exponential backoff. Default is 3 retries.
256    ///
257    /// # Examples
258    ///
259
260    /// // Network operation - retry more
261    /// let api_call = Job::new("fetch_api_data")
262    ///     .with_max_retries(5);
263    ///
264    /// // Critical operation - don't retry
265    /// let one_time = Job::new("send_invoice")
266    ///     .with_max_retries(0);
267    ///
268    /// // Flaky operation - retry extensively
269    /// let external = Job::new("third_party_webhook")
270    ///     .with_max_retries(10);
271    /// ```
272    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
273        self.max_retries = max_retries;
274        self
275    }
276
277    /// Schedule job for later execution
278    ///
279    /// Job will not be executed until the specified time.
280    /// Useful for reminders, scheduled reports, delayed notifications.
281    ///
282    /// # Examples
283    ///
284
285    /// use chrono::{Utc, Duration};
286    ///
287    /// // Run in 1 hour
288    /// let reminder = Job::new("send_reminder")
289    ///     .add_field("message", json!("Meeting starts soon"))
290    ///     .scheduled_at(Utc::now() + Duration::hours(1));
291    ///
292    /// // Run at specific time
293    /// let report = Job::new("daily_report")
294    ///     .scheduled_at(Utc::now().date_naive().and_hms_opt(9, 0, 0).unwrap());
295    ///
296    /// // Delayed retry pattern
297    /// let retry = Job::new("retry_failed_upload")
298    ///     .scheduled_at(Utc::now() + Duration::minutes(30));
299    /// ```
300    pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
301        self.scheduled_at = Some(at);
302        self
303    }
304
305    /// Set job execution timeout
306    ///
307    /// Job will be terminated if it runs longer than this.
308    /// Default is 300 seconds (5 minutes).
309    ///
310    /// # Examples
311    ///
312
313    /// // Quick task
314    /// let quick = Job::new("send_sms")
315    ///     .with_timeout(30); // 30 seconds
316    ///
317    /// // Long-running task
318    /// let video = Job::new("transcode_video")
319    ///     .with_timeout(3600); // 1 hour
320    ///
321    /// // Very long task
322    /// let batch = Job::new("process_millions_of_records")
323    ///     .with_timeout(7200); // 2 hours
324    /// ```
325    pub fn with_timeout(mut self, seconds: u64) -> Self {
326        self.timeout_seconds = Some(seconds);
327        self
328    }
329
330    /// Check if job should be executed (based on schedule)
331    pub fn should_run(&self) -> bool {
332        match self.status {
333            JobStatus::Pending => {
334                if let Some(scheduled) = self.scheduled_at {
335                    Utc::now() >= scheduled
336                } else {
337                    true
338                }
339            }
340            _ => false,
341        }
342    }
343
344    /// Mark job as running
345    pub fn mark_running(&mut self) {
346        self.status = JobStatus::Running;
347        self.started_at = Some(Utc::now());
348        self.touch(); // Update heartbeat when starting
349    }
350
351    /// Update heartbeat timestamp to "now"
352    ///
353    /// Workers should call this periodically while processing a job.
354    /// The Reaper uses this to detect dead workers.
355    pub fn touch(&mut self) {
356        self.last_heartbeat = std::time::SystemTime::now()
357            .duration_since(std::time::UNIX_EPOCH)
358            .unwrap()
359            .as_secs();
360    }
361
362    /// Check if job's heartbeat has expired (worker presumed dead)
363    pub fn is_heartbeat_expired(&self) -> bool {
364        if !matches!(self.status, JobStatus::Running) {
365            return false;
366        }
367        let now = std::time::SystemTime::now()
368            .duration_since(std::time::UNIX_EPOCH)
369            .unwrap()
370            .as_secs();
371        now.saturating_sub(self.last_heartbeat) > self.lease_duration
372    }
373
374    /// Set custom lease duration
375    pub fn with_lease_duration(mut self, seconds: u64) -> Self {
376        self.lease_duration = seconds;
377        self
378    }
379
380    /// Mark job as completed
381    pub fn mark_completed(&mut self) {
382        self.status = JobStatus::Completed;
383        self.completed_at = Some(Utc::now());
384    }
385
386    /// Mark job as failed
387    pub fn mark_failed(&mut self, error: String) {
388        self.retry_count += 1;
389
390        if self.retry_count >= self.max_retries {
391            self.status = JobStatus::DeadLetter { error };
392        } else {
393            self.status = JobStatus::Failed {
394                error,
395                retries: self.retry_count,
396            };
397        }
398    }
399
400    /// Calculate next retry delay (exponential backoff)
401    pub fn next_retry_delay(&self) -> chrono::Duration {
402        let base_delay = 5; // 5 seconds
403        let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
404        chrono::Duration::seconds(delay_seconds)
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    #[test]
413    fn test_job_creation() {
414        let job = Job::new("email")
415            .add_field("to", serde_json::json!("user@example.com"))
416            .add_field("subject", serde_json::json!("Hello"))
417            .with_priority(JobPriority::High)
418            .with_max_retries(5);
419
420        assert_eq!(job.job_type, "email");
421        assert_eq!(job.priority, JobPriority::High);
422        assert_eq!(job.max_retries, 5);
423        assert_eq!(job.status, JobStatus::Pending);
424    }
425
426    #[test]
427    fn test_job_should_run() {
428        let job = Job::new("test");
429        assert!(job.should_run());
430
431        let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
432        assert!(!future_job.should_run());
433
434        let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
435        assert!(past_job.should_run());
436    }
437
438    #[test]
439    fn test_job_retry_logic() {
440        let mut job = Job::new("test").with_max_retries(3);
441
442        job.mark_failed("Error 1".to_string());
443        assert!(matches!(job.status, JobStatus::Failed { .. }));
444        assert_eq!(job.retry_count, 1);
445
446        job.mark_failed("Error 2".to_string());
447        assert_eq!(job.retry_count, 2);
448
449        job.mark_failed("Error 3".to_string());
450        assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
451        assert_eq!(job.retry_count, 3);
452    }
453
454    #[test]
455    fn test_exponential_backoff() {
456        let mut job = Job::new("test");
457
458        assert_eq!(job.next_retry_delay().num_seconds(), 5);
459
460        job.retry_count = 1;
461        assert_eq!(job.next_retry_delay().num_seconds(), 10);
462
463        job.retry_count = 2;
464        assert_eq!(job.next_retry_delay().num_seconds(), 20);
465
466        job.retry_count = 3;
467        assert_eq!(job.next_retry_delay().num_seconds(), 40);
468    }
469}