aurora_db/workers/job.rs
1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status tracking the lifecycle of a background job
7///
8/// Jobs progress through states: Pending -> Running -> Completed/Failed -> DeadLetter
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub enum JobStatus {
11 /// Job is queued and waiting to be processed
12 Pending,
13 /// Job is currently being executed by a worker
14 Running,
15 /// Job completed successfully
16 Completed,
17 /// Job failed but can be retried
18 Failed { error: String, retries: u32 },
19 /// Job permanently failed after max retries
20 DeadLetter { error: String },
21}
22
23impl std::fmt::Display for JobStatus {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 JobStatus::Pending => write!(f, "Pending"),
27 JobStatus::Running => write!(f, "Running"),
28 JobStatus::Completed => write!(f, "Completed"),
29 JobStatus::Failed { error, retries } => write!(f, "Failed({}, retries: {})", error, retries),
30 JobStatus::DeadLetter { error } => write!(f, "DeadLetter({})", error),
31 }
32 }
33}
34
35/// Job priority for execution order
36///
37/// Higher priority jobs are executed first. Critical > High > Normal > Low.
38///
39/// # Examples
40///
41/// ```
42/// use aurora_db::workers::{Job, JobPriority};
43///
44/// // Time-sensitive payment
45/// let payment = Job::new("process_payment")
46/// .with_priority(JobPriority::Critical);
47///
48/// // User-facing task
49/// let notification = Job::new("send_notification")
50/// .with_priority(JobPriority::High);
51///
52/// // Background cleanup
53/// let cleanup = Job::new("cleanup_temp_files")
54/// .with_priority(JobPriority::Low);
55/// ```
56#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
57pub enum JobPriority {
58 /// Low priority (value: 1) - background tasks, cleanup
59 Low = 1,
60 /// Normal priority (value: 2) - default for most jobs
61 Normal = 2,
62 /// High priority (value: 3) - user-facing operations
63 High = 3,
64 /// Critical priority (value: 4) - payments, security
65 Critical = 4,
66}
67
68impl std::fmt::Display for JobPriority {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 JobPriority::Low => write!(f, "Low"),
72 JobPriority::Normal => write!(f, "Normal"),
73 JobPriority::High => write!(f, "High"),
74 JobPriority::Critical => write!(f, "Critical"),
75 }
76 }
77}
78
79/// A durable background job
80///
81/// Represents a unit of work that can be queued, scheduled, retried, and tracked.
82/// Jobs are persisted to disk and survive restarts. Use the builder pattern to
83/// configure job properties.
84///
85/// # Examples
86///
87/// ```
88/// use aurora_db::workers::{Job, JobPriority};
89/// use serde_json::json;
90///
91/// // Simple job
92/// let job = Job::new("send_email")
93/// .add_field("to", json!("user@example.com"))
94/// .add_field("subject", json!("Welcome!"));
95///
96/// // Job with all options
97/// let job = Job::new("process_video")
98/// .add_field("video_id", json!("vid-123"))
99/// .add_field("resolution", json!("1080p"))
100/// .with_priority(JobPriority::High)
101/// .with_max_retries(5)
102/// .with_timeout(600) // 10 minutes
103/// .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
104/// ```
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct Job {
107 pub id: String,
108 pub job_type: String,
109 pub payload: HashMap<String, serde_json::Value>,
110 pub priority: JobPriority,
111 pub status: JobStatus,
112 pub max_retries: u32,
113 pub retry_count: u32,
114 pub created_at: DateTime<Utc>,
115 pub scheduled_at: Option<DateTime<Utc>>,
116 pub started_at: Option<DateTime<Utc>>,
117 pub completed_at: Option<DateTime<Utc>>,
118 pub timeout_seconds: Option<u64>,
119 /// Unix timestamp of last "I'm alive" signal from worker (for Reaper)
120 #[serde(default)]
121 pub last_heartbeat: u64,
122 /// Seconds before worker is considered dead (default: 30)
123 #[serde(default = "default_lease")]
124 pub lease_duration: u64,
125}
126
127impl std::fmt::Display for Job {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "{}", serde_json::to_string_pretty(self).unwrap_or_default())
130 }
131}
132
133/// Default lease duration: 30 seconds
134fn default_lease() -> u64 {
135 30
136}
137
138impl Job {
139 /// Create a new job
140 ///
141 /// Creates a job with default settings:
142 /// - Priority: Normal
143 /// - Max retries: 3
144 /// - Timeout: 5 minutes
145 /// - Status: Pending
146 ///
147 /// # Arguments
148 /// * `job_type` - Type identifier for routing to the correct handler
149 ///
150 /// # Examples
151 ///
152
153 /// // Create different job types
154 /// let email = Job::new("send_email");
155 /// let image = Job::new("resize_image");
156 /// let report = Job::new("generate_report");
157 /// ```
158 pub fn new(job_type: impl Into<String>) -> Self {
159 Self {
160 id: Uuid::new_v4().to_string(),
161 job_type: job_type.into(),
162 payload: HashMap::new(),
163 priority: JobPriority::Normal,
164 status: JobStatus::Pending,
165 max_retries: 3,
166 retry_count: 0,
167 created_at: Utc::now(),
168 scheduled_at: None,
169 started_at: None,
170 completed_at: None,
171 timeout_seconds: Some(300), // 5 minutes default
172 last_heartbeat: 0,
173 lease_duration: default_lease(),
174 }
175 }
176
177 /// Set job payload from a HashMap
178 ///
179 /// Replaces the entire payload with the provided HashMap.
180 /// For adding individual fields, use `add_field()` instead.
181 ///
182 /// # Examples
183 ///
184
185 /// use std::collections::HashMap;
186 /// use serde_json::json;
187 ///
188 /// let mut payload = HashMap::new();
189 /// payload.insert("user_id".to_string(), json!("123"));
190 /// payload.insert("amount".to_string(), json!(99.99));
191 ///
192 /// let job = Job::new("process_payment")
193 /// .with_payload(payload);
194 /// ```
195 pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
196 self.payload = payload;
197 self
198 }
199
200 /// Add a single field to the job payload
201 ///
202 /// Use this for building the payload incrementally.
203 /// Can be chained multiple times.
204 ///
205 /// # Examples
206 ///
207
208 /// use serde_json::json;
209 ///
210 /// let job = Job::new("send_email")
211 /// .add_field("to", json!("user@example.com"))
212 /// .add_field("subject", json!("Welcome!"))
213 /// .add_field("template", json!("welcome_email"))
214 /// .add_field("vars", json!({"name": "Alice"}));
215 /// ```
216 pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
217 self.payload.insert(key.into(), value);
218 self
219 }
220
221 /// Set job priority
222 ///
223 /// Higher priority jobs are executed before lower priority ones.
224 /// Default is `JobPriority::Normal`.
225 ///
226 /// # Examples
227 ///
228
229 /// // Critical - payments, security operations
230 /// let payment = Job::new("charge_card")
231 /// .with_priority(JobPriority::Critical);
232 ///
233 /// // High - user-facing operations
234 /// let notification = Job::new("push_notification")
235 /// .with_priority(JobPriority::High);
236 ///
237 /// // Low - background cleanup, analytics
238 /// let cleanup = Job::new("clean_old_logs")
239 /// .with_priority(JobPriority::Low);
240 /// ```
241 pub fn with_priority(mut self, priority: JobPriority) -> Self {
242 self.priority = priority;
243 self
244 }
245
246 /// Set maximum retry attempts
247 ///
248 /// If a job fails, it will be retried up to this many times with
249 /// exponential backoff. Default is 3 retries.
250 ///
251 /// # Examples
252 ///
253
254 /// // Network operation - retry more
255 /// let api_call = Job::new("fetch_api_data")
256 /// .with_max_retries(5);
257 ///
258 /// // Critical operation - don't retry
259 /// let one_time = Job::new("send_invoice")
260 /// .with_max_retries(0);
261 ///
262 /// // Flaky operation - retry extensively
263 /// let external = Job::new("third_party_webhook")
264 /// .with_max_retries(10);
265 /// ```
266 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
267 self.max_retries = max_retries;
268 self
269 }
270
271 /// Schedule job for later execution
272 ///
273 /// Job will not be executed until the specified time.
274 /// Useful for reminders, scheduled reports, delayed notifications.
275 ///
276 /// # Examples
277 ///
278
279 /// use chrono::{Utc, Duration};
280 ///
281 /// // Run in 1 hour
282 /// let reminder = Job::new("send_reminder")
283 /// .add_field("message", json!("Meeting starts soon"))
284 /// .scheduled_at(Utc::now() + Duration::hours(1));
285 ///
286 /// // Run at specific time
287 /// let report = Job::new("daily_report")
288 /// .scheduled_at(Utc::now().date_naive().and_hms_opt(9, 0, 0).unwrap());
289 ///
290 /// // Delayed retry pattern
291 /// let retry = Job::new("retry_failed_upload")
292 /// .scheduled_at(Utc::now() + Duration::minutes(30));
293 /// ```
294 pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
295 self.scheduled_at = Some(at);
296 self
297 }
298
299 /// Set job execution timeout
300 ///
301 /// Job will be terminated if it runs longer than this.
302 /// Default is 300 seconds (5 minutes).
303 ///
304 /// # Examples
305 ///
306
307 /// // Quick task
308 /// let quick = Job::new("send_sms")
309 /// .with_timeout(30); // 30 seconds
310 ///
311 /// // Long-running task
312 /// let video = Job::new("transcode_video")
313 /// .with_timeout(3600); // 1 hour
314 ///
315 /// // Very long task
316 /// let batch = Job::new("process_millions_of_records")
317 /// .with_timeout(7200); // 2 hours
318 /// ```
319 pub fn with_timeout(mut self, seconds: u64) -> Self {
320 self.timeout_seconds = Some(seconds);
321 self
322 }
323
324 /// Check if job should be executed (based on schedule)
325 pub fn should_run(&self) -> bool {
326 match self.status {
327 JobStatus::Pending => {
328 if let Some(scheduled) = self.scheduled_at {
329 Utc::now() >= scheduled
330 } else {
331 true
332 }
333 }
334 _ => false,
335 }
336 }
337
338 /// Mark job as running
339 pub fn mark_running(&mut self) {
340 self.status = JobStatus::Running;
341 self.started_at = Some(Utc::now());
342 self.touch(); // Update heartbeat when starting
343 }
344
345 /// Update heartbeat timestamp to "now"
346 ///
347 /// Workers should call this periodically while processing a job.
348 /// The Reaper uses this to detect dead workers.
349 pub fn touch(&mut self) {
350 self.last_heartbeat = std::time::SystemTime::now()
351 .duration_since(std::time::UNIX_EPOCH)
352 .unwrap()
353 .as_secs();
354 }
355
356 /// Check if job's heartbeat has expired (worker presumed dead)
357 pub fn is_heartbeat_expired(&self) -> bool {
358 if !matches!(self.status, JobStatus::Running) {
359 return false;
360 }
361 let now = std::time::SystemTime::now()
362 .duration_since(std::time::UNIX_EPOCH)
363 .unwrap()
364 .as_secs();
365 now.saturating_sub(self.last_heartbeat) > self.lease_duration
366 }
367
368 /// Set custom lease duration
369 pub fn with_lease_duration(mut self, seconds: u64) -> Self {
370 self.lease_duration = seconds;
371 self
372 }
373
374 /// Mark job as completed
375 pub fn mark_completed(&mut self) {
376 self.status = JobStatus::Completed;
377 self.completed_at = Some(Utc::now());
378 }
379
380 /// Mark job as failed
381 pub fn mark_failed(&mut self, error: String) {
382 self.retry_count += 1;
383
384 if self.retry_count >= self.max_retries {
385 self.status = JobStatus::DeadLetter { error };
386 } else {
387 self.status = JobStatus::Failed {
388 error,
389 retries: self.retry_count,
390 };
391 }
392 }
393
394 /// Calculate next retry delay (exponential backoff)
395 pub fn next_retry_delay(&self) -> chrono::Duration {
396 let base_delay = 5; // 5 seconds
397 let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
398 chrono::Duration::seconds(delay_seconds)
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 #[test]
407 fn test_job_creation() {
408 let job = Job::new("email")
409 .add_field("to", serde_json::json!("user@example.com"))
410 .add_field("subject", serde_json::json!("Hello"))
411 .with_priority(JobPriority::High)
412 .with_max_retries(5);
413
414 assert_eq!(job.job_type, "email");
415 assert_eq!(job.priority, JobPriority::High);
416 assert_eq!(job.max_retries, 5);
417 assert_eq!(job.status, JobStatus::Pending);
418 }
419
420 #[test]
421 fn test_job_should_run() {
422 let job = Job::new("test");
423 assert!(job.should_run());
424
425 let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
426 assert!(!future_job.should_run());
427
428 let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
429 assert!(past_job.should_run());
430 }
431
432 #[test]
433 fn test_job_retry_logic() {
434 let mut job = Job::new("test").with_max_retries(3);
435
436 job.mark_failed("Error 1".to_string());
437 assert!(matches!(job.status, JobStatus::Failed { .. }));
438 assert_eq!(job.retry_count, 1);
439
440 job.mark_failed("Error 2".to_string());
441 assert_eq!(job.retry_count, 2);
442
443 job.mark_failed("Error 3".to_string());
444 assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
445 assert_eq!(job.retry_count, 3);
446 }
447
448 #[test]
449 fn test_exponential_backoff() {
450 let mut job = Job::new("test");
451
452 assert_eq!(job.next_retry_delay().num_seconds(), 5);
453
454 job.retry_count = 1;
455 assert_eq!(job.next_retry_delay().num_seconds(), 10);
456
457 job.retry_count = 2;
458 assert_eq!(job.next_retry_delay().num_seconds(), 20);
459
460 job.retry_count = 3;
461 assert_eq!(job.next_retry_delay().num_seconds(), 40);
462 }
463}