Skip to main content

aurora_db/workers/
job.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status tracking the lifecycle of a background job
7///
8/// Jobs progress through states: Pending → Running → Completed/Failed → DeadLetter
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub enum JobStatus {
11    /// Job is queued and waiting to be processed
12    Pending,
13    /// Job is currently being executed by a worker
14    Running,
15    /// Job completed successfully
16    Completed,
17    /// Job failed but can be retried
18    Failed { error: String, retries: u32 },
19    /// Job permanently failed after max retries
20    DeadLetter { error: String },
21}
22
23/// Job priority for execution order
24///
25/// Higher priority jobs are executed first. Critical > High > Normal > Low.
26///
27/// # Examples
28///
29/// ```
30/// use aurora_db::workers::{Job, JobPriority};
31///
32/// // Time-sensitive payment
33/// let payment = Job::new("process_payment")
34///     .with_priority(JobPriority::Critical);
35///
36/// // User-facing task
37/// let notification = Job::new("send_notification")
38///     .with_priority(JobPriority::High);
39///
40/// // Background cleanup
41/// let cleanup = Job::new("cleanup_temp_files")
42///     .with_priority(JobPriority::Low);
43/// ```
44#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum JobPriority {
46    /// Low priority (value: 1) - background tasks, cleanup
47    Low = 1,
48    /// Normal priority (value: 2) - default for most jobs
49    Normal = 2,
50    /// High priority (value: 3) - user-facing operations
51    High = 3,
52    /// Critical priority (value: 4) - payments, security
53    Critical = 4,
54}
55
56/// A durable background job
57///
58/// Represents a unit of work that can be queued, scheduled, retried, and tracked.
59/// Jobs are persisted to disk and survive restarts. Use the builder pattern to
60/// configure job properties.
61///
62/// # Examples
63///
64/// ```
65/// use aurora_db::workers::{Job, JobPriority};
66/// use serde_json::json;
67///
68/// // Simple job
69/// let job = Job::new("send_email")
70///     .add_field("to", json!("user@example.com"))
71///     .add_field("subject", json!("Welcome!"));
72///
73/// // Job with all options
74/// let job = Job::new("process_video")
75///     .add_field("video_id", json!("vid-123"))
76///     .add_field("resolution", json!("1080p"))
77///     .with_priority(JobPriority::High)
78///     .with_max_retries(5)
79///     .with_timeout(600) // 10 minutes
80///     .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
81/// ```
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct Job {
84    pub id: String,
85    pub job_type: String,
86    pub payload: HashMap<String, serde_json::Value>,
87    pub priority: JobPriority,
88    pub status: JobStatus,
89    pub max_retries: u32,
90    pub retry_count: u32,
91    pub created_at: DateTime<Utc>,
92    pub scheduled_at: Option<DateTime<Utc>>,
93    pub started_at: Option<DateTime<Utc>>,
94    pub completed_at: Option<DateTime<Utc>>,
95    pub timeout_seconds: Option<u64>,
96    /// Unix timestamp of last "I'm alive" signal from worker (for Reaper)
97    #[serde(default)]
98    pub last_heartbeat: u64,
99    /// Seconds before worker is considered dead (default: 30)
100    #[serde(default = "default_lease")]
101    pub lease_duration: u64,
102}
103
104/// Default lease duration: 30 seconds
105fn default_lease() -> u64 {
106    30
107}
108
109impl Job {
110    /// Create a new job
111    ///
112    /// Creates a job with default settings:
113    /// - Priority: Normal
114    /// - Max retries: 3
115    /// - Timeout: 5 minutes
116    /// - Status: Pending
117    ///
118    /// # Arguments
119    /// * `job_type` - Type identifier for routing to the correct handler
120    ///
121    /// # Examples
122    ///
123   
124    /// // Create different job types
125    /// let email = Job::new("send_email");
126    /// let image = Job::new("resize_image");
127    /// let report = Job::new("generate_report");
128    /// ```
129    pub fn new(job_type: impl Into<String>) -> Self {
130        Self {
131            id: Uuid::new_v4().to_string(),
132            job_type: job_type.into(),
133            payload: HashMap::new(),
134            priority: JobPriority::Normal,
135            status: JobStatus::Pending,
136            max_retries: 3,
137            retry_count: 0,
138            created_at: Utc::now(),
139            scheduled_at: None,
140            started_at: None,
141            completed_at: None,
142            timeout_seconds: Some(300), // 5 minutes default
143            last_heartbeat: 0,
144            lease_duration: default_lease(),
145        }
146    }
147
148    /// Set job payload from a HashMap
149    ///
150    /// Replaces the entire payload with the provided HashMap.
151    /// For adding individual fields, use `add_field()` instead.
152    ///
153    /// # Examples
154    ///
155   
156    /// use std::collections::HashMap;
157    /// use serde_json::json;
158    ///
159    /// let mut payload = HashMap::new();
160    /// payload.insert("user_id".to_string(), json!("123"));
161    /// payload.insert("amount".to_string(), json!(99.99));
162    ///
163    /// let job = Job::new("process_payment")
164    ///     .with_payload(payload);
165    /// ```
166    pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
167        self.payload = payload;
168        self
169    }
170
171    /// Add a single field to the job payload
172    ///
173    /// Use this for building the payload incrementally.
174    /// Can be chained multiple times.
175    ///
176    /// # Examples
177    ///
178   
179    /// use serde_json::json;
180    ///
181    /// let job = Job::new("send_email")
182    ///     .add_field("to", json!("user@example.com"))
183    ///     .add_field("subject", json!("Welcome!"))
184    ///     .add_field("template", json!("welcome_email"))
185    ///     .add_field("vars", json!({"name": "Alice"}));
186    /// ```
187    pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
188        self.payload.insert(key.into(), value);
189        self
190    }
191
192    /// Set job priority
193    ///
194    /// Higher priority jobs are executed before lower priority ones.
195    /// Default is `JobPriority::Normal`.
196    ///
197    /// # Examples
198    ///
199   
200    /// // Critical - payments, security operations
201    /// let payment = Job::new("charge_card")
202    ///     .with_priority(JobPriority::Critical);
203    ///
204    /// // High - user-facing operations
205    /// let notification = Job::new("push_notification")
206    ///     .with_priority(JobPriority::High);
207    ///
208    /// // Low - background cleanup, analytics
209    /// let cleanup = Job::new("clean_old_logs")
210    ///     .with_priority(JobPriority::Low);
211    /// ```
212    pub fn with_priority(mut self, priority: JobPriority) -> Self {
213        self.priority = priority;
214        self
215    }
216
217    /// Set maximum retry attempts
218    ///
219    /// If a job fails, it will be retried up to this many times with
220    /// exponential backoff. Default is 3 retries.
221    ///
222    /// # Examples
223    ///
224   
225    /// // Network operation - retry more
226    /// let api_call = Job::new("fetch_api_data")
227    ///     .with_max_retries(5);
228    ///
229    /// // Critical operation - don't retry
230    /// let one_time = Job::new("send_invoice")
231    ///     .with_max_retries(0);
232    ///
233    /// // Flaky operation - retry extensively
234    /// let external = Job::new("third_party_webhook")
235    ///     .with_max_retries(10);
236    /// ```
237    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
238        self.max_retries = max_retries;
239        self
240    }
241
242    /// Schedule job for later execution
243    ///
244    /// Job will not be executed until the specified time.
245    /// Useful for reminders, scheduled reports, delayed notifications.
246    ///
247    /// # Examples
248    ///
249   
250    /// use chrono::{Utc, Duration};
251    ///
252    /// // Run in 1 hour
253    /// let reminder = Job::new("send_reminder")
254    ///     .add_field("message", json!("Meeting starts soon"))
255    ///     .scheduled_at(Utc::now() + Duration::hours(1));
256    ///
257    /// // Run at specific time
258    /// let report = Job::new("daily_report")
259    ///     .scheduled_at(Utc::now().date_naive().and_hms_opt(9, 0, 0).unwrap());
260    ///
261    /// // Delayed retry pattern
262    /// let retry = Job::new("retry_failed_upload")
263    ///     .scheduled_at(Utc::now() + Duration::minutes(30));
264    /// ```
265    pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
266        self.scheduled_at = Some(at);
267        self
268    }
269
270    /// Set job execution timeout
271    ///
272    /// Job will be terminated if it runs longer than this.
273    /// Default is 300 seconds (5 minutes).
274    ///
275    /// # Examples
276    ///
277   
278    /// // Quick task
279    /// let quick = Job::new("send_sms")
280    ///     .with_timeout(30); // 30 seconds
281    ///
282    /// // Long-running task
283    /// let video = Job::new("transcode_video")
284    ///     .with_timeout(3600); // 1 hour
285    ///
286    /// // Very long task
287    /// let batch = Job::new("process_millions_of_records")
288    ///     .with_timeout(7200); // 2 hours
289    /// ```
290    pub fn with_timeout(mut self, seconds: u64) -> Self {
291        self.timeout_seconds = Some(seconds);
292        self
293    }
294
295    /// Check if job should be executed (based on schedule)
296    pub fn should_run(&self) -> bool {
297        match self.status {
298            JobStatus::Pending => {
299                if let Some(scheduled) = self.scheduled_at {
300                    Utc::now() >= scheduled
301                } else {
302                    true
303                }
304            }
305            _ => false,
306        }
307    }
308
309    /// Mark job as running
310    pub fn mark_running(&mut self) {
311        self.status = JobStatus::Running;
312        self.started_at = Some(Utc::now());
313        self.touch(); // Update heartbeat when starting
314    }
315
316    /// Update heartbeat timestamp to "now"
317    ///
318    /// Workers should call this periodically while processing a job.
319    /// The Reaper uses this to detect dead workers.
320    pub fn touch(&mut self) {
321        self.last_heartbeat = std::time::SystemTime::now()
322            .duration_since(std::time::UNIX_EPOCH)
323            .unwrap()
324            .as_secs();
325    }
326
327    /// Check if job's heartbeat has expired (worker presumed dead)
328    pub fn is_heartbeat_expired(&self) -> bool {
329        if !matches!(self.status, JobStatus::Running) {
330            return false;
331        }
332        let now = std::time::SystemTime::now()
333            .duration_since(std::time::UNIX_EPOCH)
334            .unwrap()
335            .as_secs();
336        now.saturating_sub(self.last_heartbeat) > self.lease_duration
337    }
338
339    /// Set custom lease duration
340    pub fn with_lease_duration(mut self, seconds: u64) -> Self {
341        self.lease_duration = seconds;
342        self
343    }
344
345    /// Mark job as completed
346    pub fn mark_completed(&mut self) {
347        self.status = JobStatus::Completed;
348        self.completed_at = Some(Utc::now());
349    }
350
351    /// Mark job as failed
352    pub fn mark_failed(&mut self, error: String) {
353        self.retry_count += 1;
354
355        if self.retry_count >= self.max_retries {
356            self.status = JobStatus::DeadLetter { error };
357        } else {
358            self.status = JobStatus::Failed {
359                error,
360                retries: self.retry_count,
361            };
362        }
363    }
364
365    /// Calculate next retry delay (exponential backoff)
366    pub fn next_retry_delay(&self) -> chrono::Duration {
367        let base_delay = 5; // 5 seconds
368        let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
369        chrono::Duration::seconds(delay_seconds)
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn test_job_creation() {
379        let job = Job::new("email")
380            .add_field("to", serde_json::json!("user@example.com"))
381            .add_field("subject", serde_json::json!("Hello"))
382            .with_priority(JobPriority::High)
383            .with_max_retries(5);
384
385        assert_eq!(job.job_type, "email");
386        assert_eq!(job.priority, JobPriority::High);
387        assert_eq!(job.max_retries, 5);
388        assert_eq!(job.status, JobStatus::Pending);
389    }
390
391    #[test]
392    fn test_job_should_run() {
393        let job = Job::new("test");
394        assert!(job.should_run());
395
396        let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
397        assert!(!future_job.should_run());
398
399        let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
400        assert!(past_job.should_run());
401    }
402
403    #[test]
404    fn test_job_retry_logic() {
405        let mut job = Job::new("test").with_max_retries(3);
406
407        job.mark_failed("Error 1".to_string());
408        assert!(matches!(job.status, JobStatus::Failed { .. }));
409        assert_eq!(job.retry_count, 1);
410
411        job.mark_failed("Error 2".to_string());
412        assert_eq!(job.retry_count, 2);
413
414        job.mark_failed("Error 3".to_string());
415        assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
416        assert_eq!(job.retry_count, 3);
417    }
418
419    #[test]
420    fn test_exponential_backoff() {
421        let mut job = Job::new("test");
422
423        assert_eq!(job.next_retry_delay().num_seconds(), 5);
424
425        job.retry_count = 1;
426        assert_eq!(job.next_retry_delay().num_seconds(), 10);
427
428        job.retry_count = 2;
429        assert_eq!(job.next_retry_delay().num_seconds(), 20);
430
431        job.retry_count = 3;
432        assert_eq!(job.next_retry_delay().num_seconds(), 40);
433    }
434}