1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use uuid::Uuid;
7
8pub type JobId = Uuid;
10
11pub type JobData = serde_json::Value;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
16pub enum JobPriority {
17 Low = 0,
19 #[default]
21 Normal = 1,
22 High = 2,
24 Critical = 3,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub enum JobState {
31 Pending,
33 Processing,
35 Completed,
37 Failed,
39 Dead,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct JobStatus {
46 pub state: JobState,
48
49 pub progress: u8,
51
52 pub message: Option<String>,
54
55 pub error: Option<String>,
57
58 pub updated_at: DateTime<Utc>,
60}
61
62impl JobStatus {
63 pub fn pending() -> Self {
65 Self {
66 state: JobState::Pending,
67 progress: 0,
68 message: None,
69 error: None,
70 updated_at: Utc::now(),
71 }
72 }
73
74 pub fn processing() -> Self {
76 Self {
77 state: JobState::Processing,
78 progress: 0,
79 message: None,
80 error: None,
81 updated_at: Utc::now(),
82 }
83 }
84
85 pub fn completed() -> Self {
87 Self {
88 state: JobState::Completed,
89 progress: 100,
90 message: None,
91 error: None,
92 updated_at: Utc::now(),
93 }
94 }
95
96 pub fn failed(error: String) -> Self {
98 Self {
99 state: JobState::Failed,
100 progress: 0,
101 message: None,
102 error: Some(error),
103 updated_at: Utc::now(),
104 }
105 }
106
107 pub fn dead(error: String) -> Self {
109 Self {
110 state: JobState::Dead,
111 progress: 0,
112 message: None,
113 error: Some(error),
114 updated_at: Utc::now(),
115 }
116 }
117
118 pub fn with_progress(mut self, progress: u8) -> Self {
120 self.progress = progress.min(100);
121 self.updated_at = Utc::now();
122 self
123 }
124
125 pub fn with_message(mut self, message: impl Into<String>) -> Self {
127 self.message = Some(message.into());
128 self.updated_at = Utc::now();
129 self
130 }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct Job {
136 pub id: JobId,
138
139 pub job_type: String,
141
142 pub data: JobData,
144
145 pub priority: JobPriority,
147
148 pub status: JobStatus,
150
151 pub attempts: u32,
153
154 pub max_attempts: u32,
156
157 pub queue: String,
159
160 pub created_at: DateTime<Utc>,
162
163 pub scheduled_at: Option<DateTime<Utc>>,
165
166 pub started_at: Option<DateTime<Utc>>,
168
169 pub completed_at: Option<DateTime<Utc>>,
171
172 pub metadata: HashMap<String, String>,
174}
175
176impl Job {
177 pub fn new(queue: impl Into<String>, job_type: impl Into<String>, data: JobData) -> Self {
179 Self {
180 id: Uuid::new_v4(),
181 job_type: job_type.into(),
182 data,
183 priority: JobPriority::default(),
184 status: JobStatus::pending(),
185 attempts: 0,
186 max_attempts: 3,
187 queue: queue.into(),
188 created_at: Utc::now(),
189 scheduled_at: None,
190 started_at: None,
191 completed_at: None,
192 metadata: HashMap::new(),
193 }
194 }
195
196 pub fn with_priority(mut self, priority: JobPriority) -> Self {
198 self.priority = priority;
199 self
200 }
201
202 pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
204 self.max_attempts = max_attempts;
205 self
206 }
207
208 pub fn schedule_at(mut self, time: DateTime<Utc>) -> Self {
210 self.scheduled_at = Some(time);
211 self
212 }
213
214 pub fn schedule_after(mut self, duration: chrono::Duration) -> Self {
216 self.scheduled_at = Some(Utc::now() + duration);
217 self
218 }
219
220 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
222 self.metadata.insert(key.into(), value.into());
223 self
224 }
225
226 pub fn is_ready(&self) -> bool {
228 if let Some(scheduled_at) = self.scheduled_at {
229 Utc::now() >= scheduled_at
230 } else {
231 true
232 }
233 }
234
235 pub fn can_retry(&self) -> bool {
237 self.attempts < self.max_attempts
238 }
239
240 pub fn start_processing(&mut self) {
242 self.status = JobStatus::processing();
243 self.started_at = Some(Utc::now());
244 self.attempts += 1;
245 }
246
247 pub fn complete(&mut self) {
249 self.status = JobStatus::completed();
250 self.completed_at = Some(Utc::now());
251 }
252
253 pub fn fail(&mut self, error: String) {
255 if self.can_retry() {
256 self.status = JobStatus::failed(error);
257 } else {
258 self.status = JobStatus::dead(error);
259 self.completed_at = Some(Utc::now());
260 }
261 }
262
263 pub fn update_progress(&mut self, progress: u8, message: Option<String>) {
265 self.status.progress = progress.min(100);
266 self.status.message = message;
267 self.status.updated_at = Utc::now();
268 }
269
270 pub fn backoff_delay(&self) -> chrono::Duration {
272 let seconds = 2_i64.pow(self.attempts.saturating_sub(1));
274 chrono::Duration::seconds(seconds.min(3600)) }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 #[test]
283 fn test_job_creation() {
284 let job = Job::new(
285 "default",
286 "send_email",
287 serde_json::json!({"to": "test@example.com"}),
288 );
289
290 assert_eq!(job.queue, "default");
291 assert_eq!(job.job_type, "send_email");
292 assert_eq!(job.attempts, 0);
293 assert_eq!(job.priority, JobPriority::Normal);
294 }
295
296 #[test]
297 fn test_job_builder() {
298 let job = Job::new("default", "task", serde_json::json!({}))
299 .with_priority(JobPriority::High)
300 .with_max_attempts(5)
301 .with_metadata("user_id", "123");
302
303 assert_eq!(job.priority, JobPriority::High);
304 assert_eq!(job.max_attempts, 5);
305 assert_eq!(job.metadata.get("user_id"), Some(&"123".to_string()));
306 }
307
308 #[test]
309 fn test_job_ready() {
310 let mut job = Job::new("default", "task", serde_json::json!({}));
311 assert!(job.is_ready());
312
313 job = job.schedule_at(Utc::now() + chrono::Duration::hours(1));
314 assert!(!job.is_ready());
315 }
316
317 #[test]
318 fn test_job_retry_logic() {
319 let mut job = Job::new("default", "task", serde_json::json!({}));
320 job.max_attempts = 3;
321
322 assert!(job.can_retry());
323
324 job.start_processing();
325 job.fail("Error 1".to_string());
326 assert!(job.can_retry());
327 assert_eq!(job.status.state, JobState::Failed);
328
329 job.start_processing();
330 job.fail("Error 2".to_string());
331 assert!(job.can_retry());
332
333 job.start_processing();
334 job.fail("Error 3".to_string());
335 assert!(!job.can_retry());
336 assert_eq!(job.status.state, JobState::Dead);
337 }
338
339 #[test]
340 fn test_backoff_delay() {
341 let mut job = Job::new("default", "task", serde_json::json!({}));
342
343 job.attempts = 1;
344 assert_eq!(job.backoff_delay(), chrono::Duration::seconds(1));
345
346 job.attempts = 2;
347 assert_eq!(job.backoff_delay(), chrono::Duration::seconds(2));
348
349 job.attempts = 3;
350 assert_eq!(job.backoff_delay(), chrono::Duration::seconds(4));
351
352 job.attempts = 10;
353 assert_eq!(job.backoff_delay(), chrono::Duration::seconds(512));
354 }
355
356 #[test]
357 fn test_job_priority_levels() {
358 let low =
359 Job::new("default", "task", serde_json::json!({})).with_priority(JobPriority::Low);
360 let normal =
361 Job::new("default", "task", serde_json::json!({})).with_priority(JobPriority::Normal);
362 let high =
363 Job::new("default", "task", serde_json::json!({})).with_priority(JobPriority::High);
364 let critical =
365 Job::new("default", "task", serde_json::json!({})).with_priority(JobPriority::Critical);
366
367 assert_eq!(low.priority, JobPriority::Low);
368 assert_eq!(normal.priority, JobPriority::Normal);
369 assert_eq!(high.priority, JobPriority::High);
370 assert_eq!(critical.priority, JobPriority::Critical);
371 }
372
373 #[test]
374 fn test_job_metadata() {
375 let job = Job::new("default", "task", serde_json::json!({}))
376 .with_metadata("key1", "value1")
377 .with_metadata("key2", "value2");
378
379 assert_eq!(job.metadata.len(), 2);
380 assert_eq!(job.metadata.get("key1"), Some(&"value1".to_string()));
381 assert_eq!(job.metadata.get("key2"), Some(&"value2".to_string()));
382 }
383
384 #[test]
385 fn test_job_schedule_at() {
386 let future = Utc::now() + chrono::Duration::hours(2);
387 let job = Job::new("default", "task", serde_json::json!({})).schedule_at(future);
388
389 assert!(!job.is_ready());
390 assert!(job.scheduled_at.is_some());
391 }
392
393 #[test]
394 fn test_job_scheduled_at_in_future() {
395 let future = Utc::now() + chrono::Duration::minutes(30);
396 let job = Job::new("default", "task", serde_json::json!({})).schedule_at(future);
397
398 assert!(!job.is_ready());
399 assert!(job.scheduled_at.is_some());
400 }
401
402 #[test]
403 fn test_job_status_transitions() {
404 let mut job = Job::new("default", "task", serde_json::json!({}));
405
406 assert_eq!(job.status.state, JobState::Pending);
407
408 job.start_processing();
409 assert_eq!(job.status.state, JobState::Processing);
410
411 job.complete();
412 assert_eq!(job.status.state, JobState::Completed);
413 }
414
415 #[test]
416 fn test_job_failure_tracking() {
417 let mut job = Job::new("default", "task", serde_json::json!({}));
418
419 job.start_processing();
420 job.fail("First error".to_string());
421
422 assert_eq!(job.status.state, JobState::Failed);
423 assert_eq!(job.status.error, Some("First error".to_string()));
424 assert_eq!(job.attempts, 1);
425 }
426
427 #[test]
428 fn test_job_max_attempts() {
429 let job = Job::new("default", "task", serde_json::json!({})).with_max_attempts(10);
430
431 assert_eq!(job.max_attempts, 10);
432 }
433
434 #[test]
435 fn test_job_default_max_attempts() {
436 let job = Job::new("default", "task", serde_json::json!({}));
437 assert_eq!(job.max_attempts, 3);
438 }
439
440 #[test]
441 fn test_job_can_retry_with_zero_max_attempts() {
442 let mut job = Job::new("default", "task", serde_json::json!({})).with_max_attempts(0);
443
444 job.start_processing();
445 job.fail("Error".to_string());
446
447 assert!(!job.can_retry());
448 }
449
450 #[test]
451 fn test_job_id_uniqueness() {
452 let job1 = Job::new("default", "task", serde_json::json!({}));
453 let job2 = Job::new("default", "task", serde_json::json!({}));
454
455 assert_ne!(job1.id, job2.id);
456 }
457
458 #[test]
459 fn test_job_timestamps() {
460 let before = Utc::now();
461 let job = Job::new("default", "task", serde_json::json!({}));
462 let after = Utc::now();
463
464 assert!(job.created_at >= before);
465 assert!(job.created_at <= after);
466 }
467
468 #[test]
469 fn test_job_complete_sets_state() {
470 let mut job = Job::new("default", "task", serde_json::json!({}));
471
472 job.start_processing();
473 job.complete();
474
475 assert_eq!(job.status.state, JobState::Completed);
476 }
477
478 #[test]
479 fn test_job_ready_with_past_schedule() {
480 let past = Utc::now() - chrono::Duration::hours(1);
481 let job = Job::new("default", "task", serde_json::json!({})).schedule_at(past);
482
483 assert!(job.is_ready());
484 }
485
486 #[test]
487 fn test_job_serialization_data() {
488 let data = serde_json::json!({
489 "email": "test@example.com",
490 "subject": "Test",
491 "count": 42
492 });
493
494 let job = Job::new("default", "send_email", data.clone());
495 assert_eq!(job.data, data);
496 }
497
498 #[test]
499 fn test_job_priority_ordering() {
500 assert!(JobPriority::Low < JobPriority::Normal);
501 assert!(JobPriority::Normal < JobPriority::High);
502 assert!(JobPriority::High < JobPriority::Critical);
503 }
504
505 #[test]
506 fn test_backoff_delay_exponential_growth() {
507 let mut job = Job::new("default", "task", serde_json::json!({}));
508
509 let delays: Vec<i64> = (1..=5)
510 .map(|attempt| {
511 job.attempts = attempt;
512 job.backoff_delay().num_seconds()
513 })
514 .collect();
515
516 assert!(delays[0] < delays[1]);
518 assert!(delays[1] < delays[2]);
519 assert!(delays[2] < delays[3]);
520 assert!(delays[3] < delays[4]);
521 }
522
523 #[test]
524 fn test_job_state_dead_after_max_retries() {
525 let mut job = Job::new("default", "task", serde_json::json!({})).with_max_attempts(2);
526
527 job.start_processing();
528 job.fail("Error 1".to_string());
529 assert_eq!(job.status.state, JobState::Failed);
530
531 job.start_processing();
532 job.fail("Error 2".to_string());
533 assert_eq!(job.status.state, JobState::Dead);
534 }
535
536 #[test]
537 fn test_job_metadata_overwrite() {
538 let job = Job::new("default", "task", serde_json::json!({}))
539 .with_metadata("key", "value1")
540 .with_metadata("key", "value2");
541
542 assert_eq!(job.metadata.get("key"), Some(&"value2".to_string()));
543 }
544}