aurora_db/workers/job.rs
1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status tracking the lifecycle of a background job
7///
8/// Jobs progress through states: Pending → Running → Completed/Failed → DeadLetter
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub enum JobStatus {
11 /// Job is queued and waiting to be processed
12 Pending,
13 /// Job is currently being executed by a worker
14 Running,
15 /// Job completed successfully
16 Completed,
17 /// Job failed but can be retried
18 Failed { error: String, retries: u32 },
19 /// Job permanently failed after max retries
20 DeadLetter { error: String },
21}
22
23/// Job priority for execution order
24///
25/// Higher priority jobs are executed first. Critical > High > Normal > Low.
26///
27/// # Examples
28///
29/// ```
30/// use aurora_db::workers::{Job, JobPriority};
31///
32/// // Time-sensitive payment
33/// let payment = Job::new("process_payment")
34/// .with_priority(JobPriority::Critical);
35///
36/// // User-facing task
37/// let notification = Job::new("send_notification")
38/// .with_priority(JobPriority::High);
39///
40/// // Background cleanup
41/// let cleanup = Job::new("cleanup_temp_files")
42/// .with_priority(JobPriority::Low);
43/// ```
44#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum JobPriority {
46 /// Low priority (value: 1) - background tasks, cleanup
47 Low = 1,
48 /// Normal priority (value: 2) - default for most jobs
49 Normal = 2,
50 /// High priority (value: 3) - user-facing operations
51 High = 3,
52 /// Critical priority (value: 4) - payments, security
53 Critical = 4,
54}
55
56/// A durable background job
57///
58/// Represents a unit of work that can be queued, scheduled, retried, and tracked.
59/// Jobs are persisted to disk and survive restarts. Use the builder pattern to
60/// configure job properties.
61///
62/// # Examples
63///
64/// ```
65/// use aurora_db::workers::{Job, JobPriority};
66/// use serde_json::json;
67///
68/// // Simple job
69/// let job = Job::new("send_email")
70/// .add_field("to", json!("user@example.com"))
71/// .add_field("subject", json!("Welcome!"));
72///
73/// // Job with all options
74/// let job = Job::new("process_video")
75/// .add_field("video_id", json!("vid-123"))
76/// .add_field("resolution", json!("1080p"))
77/// .with_priority(JobPriority::High)
78/// .with_max_retries(5)
79/// .with_timeout(600) // 10 minutes
80/// .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
81/// ```
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct Job {
84 pub id: String,
85 pub job_type: String,
86 pub payload: HashMap<String, serde_json::Value>,
87 pub priority: JobPriority,
88 pub status: JobStatus,
89 pub max_retries: u32,
90 pub retry_count: u32,
91 pub created_at: DateTime<Utc>,
92 pub scheduled_at: Option<DateTime<Utc>>,
93 pub started_at: Option<DateTime<Utc>>,
94 pub completed_at: Option<DateTime<Utc>>,
95 pub timeout_seconds: Option<u64>,
96 /// Unix timestamp of last "I'm alive" signal from worker (for Reaper)
97 #[serde(default)]
98 pub last_heartbeat: u64,
99 /// Seconds before worker is considered dead (default: 30)
100 #[serde(default = "default_lease")]
101 pub lease_duration: u64,
102}
103
104/// Default lease duration: 30 seconds
105fn default_lease() -> u64 {
106 30
107}
108
109impl Job {
110 /// Create a new job
111 ///
112 /// Creates a job with default settings:
113 /// - Priority: Normal
114 /// - Max retries: 3
115 /// - Timeout: 5 minutes
116 /// - Status: Pending
117 ///
118 /// # Arguments
119 /// * `job_type` - Type identifier for routing to the correct handler
120 ///
121 /// # Examples
122 ///
123
124 /// // Create different job types
125 /// let email = Job::new("send_email");
126 /// let image = Job::new("resize_image");
127 /// let report = Job::new("generate_report");
128 /// ```
129 pub fn new(job_type: impl Into<String>) -> Self {
130 Self {
131 id: Uuid::new_v4().to_string(),
132 job_type: job_type.into(),
133 payload: HashMap::new(),
134 priority: JobPriority::Normal,
135 status: JobStatus::Pending,
136 max_retries: 3,
137 retry_count: 0,
138 created_at: Utc::now(),
139 scheduled_at: None,
140 started_at: None,
141 completed_at: None,
142 timeout_seconds: Some(300), // 5 minutes default
143 last_heartbeat: 0,
144 lease_duration: default_lease(),
145 }
146 }
147
148 /// Set job payload from a HashMap
149 ///
150 /// Replaces the entire payload with the provided HashMap.
151 /// For adding individual fields, use `add_field()` instead.
152 ///
153 /// # Examples
154 ///
155
156 /// use std::collections::HashMap;
157 /// use serde_json::json;
158 ///
159 /// let mut payload = HashMap::new();
160 /// payload.insert("user_id".to_string(), json!("123"));
161 /// payload.insert("amount".to_string(), json!(99.99));
162 ///
163 /// let job = Job::new("process_payment")
164 /// .with_payload(payload);
165 /// ```
166 pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
167 self.payload = payload;
168 self
169 }
170
171 /// Add a single field to the job payload
172 ///
173 /// Use this for building the payload incrementally.
174 /// Can be chained multiple times.
175 ///
176 /// # Examples
177 ///
178
179 /// use serde_json::json;
180 ///
181 /// let job = Job::new("send_email")
182 /// .add_field("to", json!("user@example.com"))
183 /// .add_field("subject", json!("Welcome!"))
184 /// .add_field("template", json!("welcome_email"))
185 /// .add_field("vars", json!({"name": "Alice"}));
186 /// ```
187 pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
188 self.payload.insert(key.into(), value);
189 self
190 }
191
192 /// Set job priority
193 ///
194 /// Higher priority jobs are executed before lower priority ones.
195 /// Default is `JobPriority::Normal`.
196 ///
197 /// # Examples
198 ///
199
200 /// // Critical - payments, security operations
201 /// let payment = Job::new("charge_card")
202 /// .with_priority(JobPriority::Critical);
203 ///
204 /// // High - user-facing operations
205 /// let notification = Job::new("push_notification")
206 /// .with_priority(JobPriority::High);
207 ///
208 /// // Low - background cleanup, analytics
209 /// let cleanup = Job::new("clean_old_logs")
210 /// .with_priority(JobPriority::Low);
211 /// ```
212 pub fn with_priority(mut self, priority: JobPriority) -> Self {
213 self.priority = priority;
214 self
215 }
216
217 /// Set maximum retry attempts
218 ///
219 /// If a job fails, it will be retried up to this many times with
220 /// exponential backoff. Default is 3 retries.
221 ///
222 /// # Examples
223 ///
224
225 /// // Network operation - retry more
226 /// let api_call = Job::new("fetch_api_data")
227 /// .with_max_retries(5);
228 ///
229 /// // Critical operation - don't retry
230 /// let one_time = Job::new("send_invoice")
231 /// .with_max_retries(0);
232 ///
233 /// // Flaky operation - retry extensively
234 /// let external = Job::new("third_party_webhook")
235 /// .with_max_retries(10);
236 /// ```
237 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
238 self.max_retries = max_retries;
239 self
240 }
241
242 /// Schedule job for later execution
243 ///
244 /// Job will not be executed until the specified time.
245 /// Useful for reminders, scheduled reports, delayed notifications.
246 ///
247 /// # Examples
248 ///
249
250 /// use chrono::{Utc, Duration};
251 ///
252 /// // Run in 1 hour
253 /// let reminder = Job::new("send_reminder")
254 /// .add_field("message", json!("Meeting starts soon"))
255 /// .scheduled_at(Utc::now() + Duration::hours(1));
256 ///
257 /// // Run at specific time
258 /// let report = Job::new("daily_report")
259 /// .scheduled_at(Utc::now().date_naive().and_hms_opt(9, 0, 0).unwrap());
260 ///
261 /// // Delayed retry pattern
262 /// let retry = Job::new("retry_failed_upload")
263 /// .scheduled_at(Utc::now() + Duration::minutes(30));
264 /// ```
265 pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
266 self.scheduled_at = Some(at);
267 self
268 }
269
270 /// Set job execution timeout
271 ///
272 /// Job will be terminated if it runs longer than this.
273 /// Default is 300 seconds (5 minutes).
274 ///
275 /// # Examples
276 ///
277
278 /// // Quick task
279 /// let quick = Job::new("send_sms")
280 /// .with_timeout(30); // 30 seconds
281 ///
282 /// // Long-running task
283 /// let video = Job::new("transcode_video")
284 /// .with_timeout(3600); // 1 hour
285 ///
286 /// // Very long task
287 /// let batch = Job::new("process_millions_of_records")
288 /// .with_timeout(7200); // 2 hours
289 /// ```
290 pub fn with_timeout(mut self, seconds: u64) -> Self {
291 self.timeout_seconds = Some(seconds);
292 self
293 }
294
295 /// Check if job should be executed (based on schedule)
296 pub fn should_run(&self) -> bool {
297 match self.status {
298 JobStatus::Pending => {
299 if let Some(scheduled) = self.scheduled_at {
300 Utc::now() >= scheduled
301 } else {
302 true
303 }
304 }
305 _ => false,
306 }
307 }
308
309 /// Mark job as running
310 pub fn mark_running(&mut self) {
311 self.status = JobStatus::Running;
312 self.started_at = Some(Utc::now());
313 self.touch(); // Update heartbeat when starting
314 }
315
316 /// Update heartbeat timestamp to "now"
317 ///
318 /// Workers should call this periodically while processing a job.
319 /// The Reaper uses this to detect dead workers.
320 pub fn touch(&mut self) {
321 self.last_heartbeat = std::time::SystemTime::now()
322 .duration_since(std::time::UNIX_EPOCH)
323 .unwrap()
324 .as_secs();
325 }
326
327 /// Check if job's heartbeat has expired (worker presumed dead)
328 pub fn is_heartbeat_expired(&self) -> bool {
329 if !matches!(self.status, JobStatus::Running) {
330 return false;
331 }
332 let now = std::time::SystemTime::now()
333 .duration_since(std::time::UNIX_EPOCH)
334 .unwrap()
335 .as_secs();
336 now.saturating_sub(self.last_heartbeat) > self.lease_duration
337 }
338
339 /// Set custom lease duration
340 pub fn with_lease_duration(mut self, seconds: u64) -> Self {
341 self.lease_duration = seconds;
342 self
343 }
344
345 /// Mark job as completed
346 pub fn mark_completed(&mut self) {
347 self.status = JobStatus::Completed;
348 self.completed_at = Some(Utc::now());
349 }
350
351 /// Mark job as failed
352 pub fn mark_failed(&mut self, error: String) {
353 self.retry_count += 1;
354
355 if self.retry_count >= self.max_retries {
356 self.status = JobStatus::DeadLetter { error };
357 } else {
358 self.status = JobStatus::Failed {
359 error,
360 retries: self.retry_count,
361 };
362 }
363 }
364
365 /// Calculate next retry delay (exponential backoff)
366 pub fn next_retry_delay(&self) -> chrono::Duration {
367 let base_delay = 5; // 5 seconds
368 let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
369 chrono::Duration::seconds(delay_seconds)
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn test_job_creation() {
379 let job = Job::new("email")
380 .add_field("to", serde_json::json!("user@example.com"))
381 .add_field("subject", serde_json::json!("Hello"))
382 .with_priority(JobPriority::High)
383 .with_max_retries(5);
384
385 assert_eq!(job.job_type, "email");
386 assert_eq!(job.priority, JobPriority::High);
387 assert_eq!(job.max_retries, 5);
388 assert_eq!(job.status, JobStatus::Pending);
389 }
390
391 #[test]
392 fn test_job_should_run() {
393 let job = Job::new("test");
394 assert!(job.should_run());
395
396 let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
397 assert!(!future_job.should_run());
398
399 let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
400 assert!(past_job.should_run());
401 }
402
403 #[test]
404 fn test_job_retry_logic() {
405 let mut job = Job::new("test").with_max_retries(3);
406
407 job.mark_failed("Error 1".to_string());
408 assert!(matches!(job.status, JobStatus::Failed { .. }));
409 assert_eq!(job.retry_count, 1);
410
411 job.mark_failed("Error 2".to_string());
412 assert_eq!(job.retry_count, 2);
413
414 job.mark_failed("Error 3".to_string());
415 assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
416 assert_eq!(job.retry_count, 3);
417 }
418
419 #[test]
420 fn test_exponential_backoff() {
421 let mut job = Job::new("test");
422
423 assert_eq!(job.next_retry_delay().num_seconds(), 5);
424
425 job.retry_count = 1;
426 assert_eq!(job.next_retry_delay().num_seconds(), 10);
427
428 job.retry_count = 2;
429 assert_eq!(job.next_retry_delay().num_seconds(), 20);
430
431 job.retry_count = 3;
432 assert_eq!(job.next_retry_delay().num_seconds(), 40);
433 }
434}