aurora_db/workers/job.rs
1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status tracking the lifecycle of a background job
7///
8/// Jobs progress through states: Pending -> Running -> Completed/Failed -> DeadLetter
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub enum JobStatus {
11 /// Job is queued and waiting to be processed
12 Pending,
13 /// Job is currently being executed by a worker
14 Running,
15 /// Job completed successfully
16 Completed,
17 /// Job failed but can be retried
18 Failed { error: String, retries: u32 },
19 /// Job permanently failed after max retries
20 DeadLetter { error: String },
21}
22
23impl std::fmt::Display for JobStatus {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 JobStatus::Pending => write!(f, "Pending"),
27 JobStatus::Running => write!(f, "Running"),
28 JobStatus::Completed => write!(f, "Completed"),
29 JobStatus::Failed { error, retries } => {
30 write!(f, "Failed({}, retries: {})", error, retries)
31 }
32 JobStatus::DeadLetter { error } => write!(f, "DeadLetter({})", error),
33 }
34 }
35}
36
37/// Job priority for execution order
38///
39/// Higher priority jobs are executed first. Critical > High > Normal > Low.
40///
41/// # Examples
42///
43/// ```
44/// use aurora_db::workers::{Job, JobPriority};
45///
46/// // Time-sensitive payment
47/// let payment = Job::new("process_payment")
48/// .with_priority(JobPriority::Critical);
49///
50/// // User-facing task
51/// let notification = Job::new("send_notification")
52/// .with_priority(JobPriority::High);
53///
54/// // Background cleanup
55/// let cleanup = Job::new("cleanup_temp_files")
56/// .with_priority(JobPriority::Low);
57/// ```
58#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
59pub enum JobPriority {
60 /// Low priority (value: 1) - background tasks, cleanup
61 Low = 1,
62 /// Normal priority (value: 2) - default for most jobs
63 Normal = 2,
64 /// High priority (value: 3) - user-facing operations
65 High = 3,
66 /// Critical priority (value: 4) - payments, security
67 Critical = 4,
68}
69
70impl std::fmt::Display for JobPriority {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 JobPriority::Low => write!(f, "Low"),
74 JobPriority::Normal => write!(f, "Normal"),
75 JobPriority::High => write!(f, "High"),
76 JobPriority::Critical => write!(f, "Critical"),
77 }
78 }
79}
80
81/// A durable background job
82///
83/// Represents a unit of work that can be queued, scheduled, retried, and tracked.
84/// Jobs are persisted to disk and survive restarts. Use the builder pattern to
85/// configure job properties.
86///
87/// # Examples
88///
89/// ```
90/// use aurora_db::workers::{Job, JobPriority};
91/// use serde_json::json;
92///
93/// // Simple job
94/// let job = Job::new("send_email")
95/// .add_field("to", json!("user@example.com"))
96/// .add_field("subject", json!("Welcome!"));
97///
98/// // Job with all options
99/// let job = Job::new("process_video")
100/// .add_field("video_id", json!("vid-123"))
101/// .add_field("resolution", json!("1080p"))
102/// .with_priority(JobPriority::High)
103/// .with_max_retries(5)
104/// .with_timeout(600) // 10 minutes
105/// .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
106/// ```
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct Job {
109 pub id: String,
110 pub job_type: String,
111 pub payload: HashMap<String, serde_json::Value>,
112 pub priority: JobPriority,
113 pub status: JobStatus,
114 pub max_retries: u32,
115 pub retry_count: u32,
116 pub created_at: DateTime<Utc>,
117 pub scheduled_at: Option<DateTime<Utc>>,
118 pub started_at: Option<DateTime<Utc>>,
119 pub completed_at: Option<DateTime<Utc>>,
120 pub timeout_seconds: Option<u64>,
121 /// Unix timestamp of last "I'm alive" signal from worker (for Reaper)
122 #[serde(default)]
123 pub last_heartbeat: u64,
124 /// Seconds before worker is considered dead (default: 30)
125 #[serde(default = "default_lease")]
126 pub lease_duration: u64,
127}
128
129impl std::fmt::Display for Job {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 write!(
132 f,
133 "{}",
134 serde_json::to_string_pretty(self).unwrap_or_default()
135 )
136 }
137}
138
139/// Default lease duration: 30 seconds
140fn default_lease() -> u64 {
141 30
142}
143
144impl Job {
145 /// Create a new job
146 ///
147 /// Creates a job with default settings:
148 /// - Priority: Normal
149 /// - Max retries: 3
150 /// - Timeout: 5 minutes
151 /// - Status: Pending
152 ///
153 /// # Arguments
154 /// * `job_type` - Type identifier for routing to the correct handler
155 ///
156 /// # Examples
157 ///
158
159 /// // Create different job types
160 /// let email = Job::new("send_email");
161 /// let image = Job::new("resize_image");
162 /// let report = Job::new("generate_report");
163 /// ```
164 pub fn new(job_type: impl Into<String>) -> Self {
165 Self {
166 id: Uuid::new_v4().to_string(),
167 job_type: job_type.into(),
168 payload: HashMap::new(),
169 priority: JobPriority::Normal,
170 status: JobStatus::Pending,
171 max_retries: 3,
172 retry_count: 0,
173 created_at: Utc::now(),
174 scheduled_at: None,
175 started_at: None,
176 completed_at: None,
177 timeout_seconds: Some(300), // 5 minutes default
178 last_heartbeat: 0,
179 lease_duration: default_lease(),
180 }
181 }
182
183 /// Set job payload from a HashMap
184 ///
185 /// Replaces the entire payload with the provided HashMap.
186 /// For adding individual fields, use `add_field()` instead.
187 ///
188 /// # Examples
189 ///
190
191 /// use std::collections::HashMap;
192 /// use serde_json::json;
193 ///
194 /// let mut payload = HashMap::new();
195 /// payload.insert("user_id".to_string(), json!("123"));
196 /// payload.insert("amount".to_string(), json!(99.99));
197 ///
198 /// let job = Job::new("process_payment")
199 /// .with_payload(payload);
200 /// ```
201 pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
202 self.payload = payload;
203 self
204 }
205
206 /// Add a single field to the job payload
207 ///
208 /// Use this for building the payload incrementally.
209 /// Can be chained multiple times.
210 ///
211 /// # Examples
212 ///
213
214 /// use serde_json::json;
215 ///
216 /// let job = Job::new("send_email")
217 /// .add_field("to", json!("user@example.com"))
218 /// .add_field("subject", json!("Welcome!"))
219 /// .add_field("template", json!("welcome_email"))
220 /// .add_field("vars", json!({"name": "Alice"}));
221 /// ```
222 pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
223 self.payload.insert(key.into(), value);
224 self
225 }
226
227 /// Set job priority
228 ///
229 /// Higher priority jobs are executed before lower priority ones.
230 /// Default is `JobPriority::Normal`.
231 ///
232 /// # Examples
233 ///
234
235 /// // Critical - payments, security operations
236 /// let payment = Job::new("charge_card")
237 /// .with_priority(JobPriority::Critical);
238 ///
239 /// // High - user-facing operations
240 /// let notification = Job::new("push_notification")
241 /// .with_priority(JobPriority::High);
242 ///
243 /// // Low - background cleanup, analytics
244 /// let cleanup = Job::new("clean_old_logs")
245 /// .with_priority(JobPriority::Low);
246 /// ```
247 pub fn with_priority(mut self, priority: JobPriority) -> Self {
248 self.priority = priority;
249 self
250 }
251
252 /// Set maximum retry attempts
253 ///
254 /// If a job fails, it will be retried up to this many times with
255 /// exponential backoff. Default is 3 retries.
256 ///
257 /// # Examples
258 ///
259
260 /// // Network operation - retry more
261 /// let api_call = Job::new("fetch_api_data")
262 /// .with_max_retries(5);
263 ///
264 /// // Critical operation - don't retry
265 /// let one_time = Job::new("send_invoice")
266 /// .with_max_retries(0);
267 ///
268 /// // Flaky operation - retry extensively
269 /// let external = Job::new("third_party_webhook")
270 /// .with_max_retries(10);
271 /// ```
272 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
273 self.max_retries = max_retries;
274 self
275 }
276
277 /// Schedule job for later execution
278 ///
279 /// Job will not be executed until the specified time.
280 /// Useful for reminders, scheduled reports, delayed notifications.
281 ///
282 /// # Examples
283 ///
284
285 /// use chrono::{Utc, Duration};
286 ///
287 /// // Run in 1 hour
288 /// let reminder = Job::new("send_reminder")
289 /// .add_field("message", json!("Meeting starts soon"))
290 /// .scheduled_at(Utc::now() + Duration::hours(1));
291 ///
292 /// // Run at specific time
293 /// let report = Job::new("daily_report")
294 /// .scheduled_at(Utc::now().date_naive().and_hms_opt(9, 0, 0).unwrap());
295 ///
296 /// // Delayed retry pattern
297 /// let retry = Job::new("retry_failed_upload")
298 /// .scheduled_at(Utc::now() + Duration::minutes(30));
299 /// ```
300 pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
301 self.scheduled_at = Some(at);
302 self
303 }
304
305 /// Set job execution timeout
306 ///
307 /// Job will be terminated if it runs longer than this.
308 /// Default is 300 seconds (5 minutes).
309 ///
310 /// # Examples
311 ///
312
313 /// // Quick task
314 /// let quick = Job::new("send_sms")
315 /// .with_timeout(30); // 30 seconds
316 ///
317 /// // Long-running task
318 /// let video = Job::new("transcode_video")
319 /// .with_timeout(3600); // 1 hour
320 ///
321 /// // Very long task
322 /// let batch = Job::new("process_millions_of_records")
323 /// .with_timeout(7200); // 2 hours
324 /// ```
325 pub fn with_timeout(mut self, seconds: u64) -> Self {
326 self.timeout_seconds = Some(seconds);
327 self
328 }
329
330 /// Check if job should be executed (based on schedule)
331 pub fn should_run(&self) -> bool {
332 match self.status {
333 JobStatus::Pending => {
334 if let Some(scheduled) = self.scheduled_at {
335 Utc::now() >= scheduled
336 } else {
337 true
338 }
339 }
340 _ => false,
341 }
342 }
343
344 /// Mark job as running
345 pub fn mark_running(&mut self) {
346 self.status = JobStatus::Running;
347 self.started_at = Some(Utc::now());
348 self.touch(); // Update heartbeat when starting
349 }
350
351 /// Update heartbeat timestamp to "now"
352 ///
353 /// Workers should call this periodically while processing a job.
354 /// The Reaper uses this to detect dead workers.
355 pub fn touch(&mut self) {
356 self.last_heartbeat = std::time::SystemTime::now()
357 .duration_since(std::time::UNIX_EPOCH)
358 .unwrap()
359 .as_secs();
360 }
361
362 /// Check if job's heartbeat has expired (worker presumed dead)
363 pub fn is_heartbeat_expired(&self) -> bool {
364 if !matches!(self.status, JobStatus::Running) {
365 return false;
366 }
367 let now = std::time::SystemTime::now()
368 .duration_since(std::time::UNIX_EPOCH)
369 .unwrap()
370 .as_secs();
371 now.saturating_sub(self.last_heartbeat) > self.lease_duration
372 }
373
374 /// Set custom lease duration
375 pub fn with_lease_duration(mut self, seconds: u64) -> Self {
376 self.lease_duration = seconds;
377 self
378 }
379
380 /// Mark job as completed
381 pub fn mark_completed(&mut self) {
382 self.status = JobStatus::Completed;
383 self.completed_at = Some(Utc::now());
384 }
385
386 /// Mark job as failed
387 pub fn mark_failed(&mut self, error: String) {
388 self.retry_count += 1;
389
390 if self.retry_count >= self.max_retries {
391 self.status = JobStatus::DeadLetter { error };
392 } else {
393 self.status = JobStatus::Failed {
394 error,
395 retries: self.retry_count,
396 };
397 }
398 }
399
400 /// Calculate next retry delay (exponential backoff)
401 pub fn next_retry_delay(&self) -> chrono::Duration {
402 let base_delay = 5; // 5 seconds
403 let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
404 chrono::Duration::seconds(delay_seconds)
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_job_creation() {
414 let job = Job::new("email")
415 .add_field("to", serde_json::json!("user@example.com"))
416 .add_field("subject", serde_json::json!("Hello"))
417 .with_priority(JobPriority::High)
418 .with_max_retries(5);
419
420 assert_eq!(job.job_type, "email");
421 assert_eq!(job.priority, JobPriority::High);
422 assert_eq!(job.max_retries, 5);
423 assert_eq!(job.status, JobStatus::Pending);
424 }
425
426 #[test]
427 fn test_job_should_run() {
428 let job = Job::new("test");
429 assert!(job.should_run());
430
431 let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
432 assert!(!future_job.should_run());
433
434 let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
435 assert!(past_job.should_run());
436 }
437
438 #[test]
439 fn test_job_retry_logic() {
440 let mut job = Job::new("test").with_max_retries(3);
441
442 job.mark_failed("Error 1".to_string());
443 assert!(matches!(job.status, JobStatus::Failed { .. }));
444 assert_eq!(job.retry_count, 1);
445
446 job.mark_failed("Error 2".to_string());
447 assert_eq!(job.retry_count, 2);
448
449 job.mark_failed("Error 3".to_string());
450 assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
451 assert_eq!(job.retry_count, 3);
452 }
453
454 #[test]
455 fn test_exponential_backoff() {
456 let mut job = Job::new("test");
457
458 assert_eq!(job.next_retry_delay().num_seconds(), 5);
459
460 job.retry_count = 1;
461 assert_eq!(job.next_retry_delay().num_seconds(), 10);
462
463 job.retry_count = 2;
464 assert_eq!(job.next_retry_delay().num_seconds(), 20);
465
466 job.retry_count = 3;
467 assert_eq!(job.next_retry_delay().num_seconds(), 40);
468 }
469}