aurora_db/workers/job.rs
1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use uuid::Uuid;
5
6/// Job status tracking the lifecycle of a background job
7///
8/// Jobs progress through states: Pending → Running → Completed/Failed → DeadLetter
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10pub enum JobStatus {
11 /// Job is queued and waiting to be processed
12 Pending,
13 /// Job is currently being executed by a worker
14 Running,
15 /// Job completed successfully
16 Completed,
17 /// Job failed but can be retried
18 Failed { error: String, retries: u32 },
19 /// Job permanently failed after max retries
20 DeadLetter { error: String },
21}
22
23/// Job priority for execution order
24///
25/// Higher priority jobs are executed first. Critical > High > Normal > Low.
26///
27/// # Examples
28///
29/// ```
30/// use aurora_db::workers::{Job, JobPriority};
31///
32/// // Time-sensitive payment
33/// let payment = Job::new("process_payment")
34/// .with_priority(JobPriority::Critical);
35///
36/// // User-facing task
37/// let notification = Job::new("send_notification")
38/// .with_priority(JobPriority::High);
39///
40/// // Background cleanup
41/// let cleanup = Job::new("cleanup_temp_files")
42/// .with_priority(JobPriority::Low);
43/// ```
44#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum JobPriority {
46 /// Low priority (value: 1) - background tasks, cleanup
47 Low = 1,
48 /// Normal priority (value: 2) - default for most jobs
49 Normal = 2,
50 /// High priority (value: 3) - user-facing operations
51 High = 3,
52 /// Critical priority (value: 4) - payments, security
53 Critical = 4,
54}
55
56/// A durable background job
57///
58/// Represents a unit of work that can be queued, scheduled, retried, and tracked.
59/// Jobs are persisted to disk and survive restarts. Use the builder pattern to
60/// configure job properties.
61///
62/// # Examples
63///
64/// ```
65/// use aurora_db::workers::{Job, JobPriority};
66/// use serde_json::json;
67///
68/// // Simple job
69/// let job = Job::new("send_email")
70/// .add_field("to", json!("user@example.com"))
71/// .add_field("subject", json!("Welcome!"));
72///
73/// // Job with all options
74/// let job = Job::new("process_video")
75/// .add_field("video_id", json!("vid-123"))
76/// .add_field("resolution", json!("1080p"))
77/// .with_priority(JobPriority::High)
78/// .with_max_retries(5)
79/// .with_timeout(600) // 10 minutes
80/// .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
81/// ```
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct Job {
84 pub id: String,
85 pub job_type: String,
86 pub payload: HashMap<String, serde_json::Value>,
87 pub priority: JobPriority,
88 pub status: JobStatus,
89 pub max_retries: u32,
90 pub retry_count: u32,
91 pub created_at: DateTime<Utc>,
92 pub scheduled_at: Option<DateTime<Utc>>,
93 pub started_at: Option<DateTime<Utc>>,
94 pub completed_at: Option<DateTime<Utc>>,
95 pub timeout_seconds: Option<u64>,
96}
97
98impl Job {
99 /// Create a new job
100 ///
101 /// Creates a job with default settings:
102 /// - Priority: Normal
103 /// - Max retries: 3
104 /// - Timeout: 5 minutes
105 /// - Status: Pending
106 ///
107 /// # Arguments
108 /// * `job_type` - Type identifier for routing to the correct handler
109 ///
110 /// # Examples
111 ///
112 /// ```
113 /// // Create different job types
114 /// let email = Job::new("send_email");
115 /// let image = Job::new("resize_image");
116 /// let report = Job::new("generate_report");
117 /// ```
118 pub fn new(job_type: impl Into<String>) -> Self {
119 Self {
120 id: Uuid::new_v4().to_string(),
121 job_type: job_type.into(),
122 payload: HashMap::new(),
123 priority: JobPriority::Normal,
124 status: JobStatus::Pending,
125 max_retries: 3,
126 retry_count: 0,
127 created_at: Utc::now(),
128 scheduled_at: None,
129 started_at: None,
130 completed_at: None,
131 timeout_seconds: Some(300), // 5 minutes default
132 }
133 }
134
135 /// Set job payload from a HashMap
136 ///
137 /// Replaces the entire payload with the provided HashMap.
138 /// For adding individual fields, use `add_field()` instead.
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// use std::collections::HashMap;
144 /// use serde_json::json;
145 ///
146 /// let mut payload = HashMap::new();
147 /// payload.insert("user_id".to_string(), json!("123"));
148 /// payload.insert("amount".to_string(), json!(99.99));
149 ///
150 /// let job = Job::new("process_payment")
151 /// .with_payload(payload);
152 /// ```
153 pub fn with_payload(mut self, payload: HashMap<String, serde_json::Value>) -> Self {
154 self.payload = payload;
155 self
156 }
157
158 /// Add a single field to the job payload
159 ///
160 /// Use this for building the payload incrementally.
161 /// Can be chained multiple times.
162 ///
163 /// # Examples
164 ///
165 /// ```
166 /// use serde_json::json;
167 ///
168 /// let job = Job::new("send_email")
169 /// .add_field("to", json!("user@example.com"))
170 /// .add_field("subject", json!("Welcome!"))
171 /// .add_field("template", json!("welcome_email"))
172 /// .add_field("vars", json!({"name": "Alice"}));
173 /// ```
174 pub fn add_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
175 self.payload.insert(key.into(), value);
176 self
177 }
178
179 /// Set job priority
180 ///
181 /// Higher priority jobs are executed before lower priority ones.
182 /// Default is `JobPriority::Normal`.
183 ///
184 /// # Examples
185 ///
186 /// ```
187 /// // Critical - payments, security operations
188 /// let payment = Job::new("charge_card")
189 /// .with_priority(JobPriority::Critical);
190 ///
191 /// // High - user-facing operations
192 /// let notification = Job::new("push_notification")
193 /// .with_priority(JobPriority::High);
194 ///
195 /// // Low - background cleanup, analytics
196 /// let cleanup = Job::new("clean_old_logs")
197 /// .with_priority(JobPriority::Low);
198 /// ```
199 pub fn with_priority(mut self, priority: JobPriority) -> Self {
200 self.priority = priority;
201 self
202 }
203
204 /// Set maximum retry attempts
205 ///
206 /// If a job fails, it will be retried up to this many times with
207 /// exponential backoff. Default is 3 retries.
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// // Network operation - retry more
213 /// let api_call = Job::new("fetch_api_data")
214 /// .with_max_retries(5);
215 ///
216 /// // Critical operation - don't retry
217 /// let one_time = Job::new("send_invoice")
218 /// .with_max_retries(0);
219 ///
220 /// // Flaky operation - retry extensively
221 /// let external = Job::new("third_party_webhook")
222 /// .with_max_retries(10);
223 /// ```
224 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
225 self.max_retries = max_retries;
226 self
227 }
228
229 /// Schedule job for later execution
230 ///
231 /// Job will not be executed until the specified time.
232 /// Useful for reminders, scheduled reports, delayed notifications.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// use chrono::{Utc, Duration};
238 ///
239 /// // Run in 1 hour
240 /// let reminder = Job::new("send_reminder")
241 /// .add_field("message", json!("Meeting starts soon"))
242 /// .scheduled_at(Utc::now() + Duration::hours(1));
243 ///
244 /// // Run at specific time
245 /// let report = Job::new("daily_report")
246 /// .scheduled_at(Utc::now().date_naive().and_hms_opt(9, 0, 0).unwrap());
247 ///
248 /// // Delayed retry pattern
249 /// let retry = Job::new("retry_failed_upload")
250 /// .scheduled_at(Utc::now() + Duration::minutes(30));
251 /// ```
252 pub fn scheduled_at(mut self, at: DateTime<Utc>) -> Self {
253 self.scheduled_at = Some(at);
254 self
255 }
256
257 /// Set job execution timeout
258 ///
259 /// Job will be terminated if it runs longer than this.
260 /// Default is 300 seconds (5 minutes).
261 ///
262 /// # Examples
263 ///
264 /// ```
265 /// // Quick task
266 /// let quick = Job::new("send_sms")
267 /// .with_timeout(30); // 30 seconds
268 ///
269 /// // Long-running task
270 /// let video = Job::new("transcode_video")
271 /// .with_timeout(3600); // 1 hour
272 ///
273 /// // Very long task
274 /// let batch = Job::new("process_millions_of_records")
275 /// .with_timeout(7200); // 2 hours
276 /// ```
277 pub fn with_timeout(mut self, seconds: u64) -> Self {
278 self.timeout_seconds = Some(seconds);
279 self
280 }
281
282 /// Check if job should be executed (based on schedule)
283 pub fn should_run(&self) -> bool {
284 match self.status {
285 JobStatus::Pending => {
286 if let Some(scheduled) = self.scheduled_at {
287 Utc::now() >= scheduled
288 } else {
289 true
290 }
291 }
292 _ => false,
293 }
294 }
295
296 /// Mark job as running
297 pub fn mark_running(&mut self) {
298 self.status = JobStatus::Running;
299 self.started_at = Some(Utc::now());
300 }
301
302 /// Mark job as completed
303 pub fn mark_completed(&mut self) {
304 self.status = JobStatus::Completed;
305 self.completed_at = Some(Utc::now());
306 }
307
308 /// Mark job as failed
309 pub fn mark_failed(&mut self, error: String) {
310 self.retry_count += 1;
311
312 if self.retry_count >= self.max_retries {
313 self.status = JobStatus::DeadLetter { error };
314 } else {
315 self.status = JobStatus::Failed {
316 error,
317 retries: self.retry_count,
318 };
319 }
320 }
321
322 /// Calculate next retry delay (exponential backoff)
323 pub fn next_retry_delay(&self) -> chrono::Duration {
324 let base_delay = 5; // 5 seconds
325 let delay_seconds = base_delay * 2_i64.pow(self.retry_count);
326 chrono::Duration::seconds(delay_seconds)
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn test_job_creation() {
336 let job = Job::new("email")
337 .add_field("to", serde_json::json!("user@example.com"))
338 .add_field("subject", serde_json::json!("Hello"))
339 .with_priority(JobPriority::High)
340 .with_max_retries(5);
341
342 assert_eq!(job.job_type, "email");
343 assert_eq!(job.priority, JobPriority::High);
344 assert_eq!(job.max_retries, 5);
345 assert_eq!(job.status, JobStatus::Pending);
346 }
347
348 #[test]
349 fn test_job_should_run() {
350 let job = Job::new("test");
351 assert!(job.should_run());
352
353 let future_job = Job::new("test").scheduled_at(Utc::now() + chrono::Duration::hours(1));
354 assert!(!future_job.should_run());
355
356 let past_job = Job::new("test").scheduled_at(Utc::now() - chrono::Duration::hours(1));
357 assert!(past_job.should_run());
358 }
359
360 #[test]
361 fn test_job_retry_logic() {
362 let mut job = Job::new("test").with_max_retries(3);
363
364 job.mark_failed("Error 1".to_string());
365 assert!(matches!(job.status, JobStatus::Failed { .. }));
366 assert_eq!(job.retry_count, 1);
367
368 job.mark_failed("Error 2".to_string());
369 assert_eq!(job.retry_count, 2);
370
371 job.mark_failed("Error 3".to_string());
372 assert!(matches!(job.status, JobStatus::DeadLetter { .. }));
373 assert_eq!(job.retry_count, 3);
374 }
375
376 #[test]
377 fn test_exponential_backoff() {
378 let mut job = Job::new("test");
379
380 assert_eq!(job.next_retry_delay().num_seconds(), 5);
381
382 job.retry_count = 1;
383 assert_eq!(job.next_retry_delay().num_seconds(), 10);
384
385 job.retry_count = 2;
386 assert_eq!(job.next_retry_delay().num_seconds(), 20);
387
388 job.retry_count = 3;
389 assert_eq!(job.next_retry_delay().num_seconds(), 40);
390 }
391}