armature_queue/
job.rs

1//! Job definition and state management.
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use uuid::Uuid;
7
8/// Job unique identifier.
9pub type JobId = Uuid;
10
11/// Job data payload.
12pub type JobData = serde_json::Value;
13
14/// Job priority levels.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
16pub enum JobPriority {
17    /// Lowest priority
18    Low = 0,
19    /// Normal priority (default)
20    #[default]
21    Normal = 1,
22    /// High priority
23    High = 2,
24    /// Critical priority
25    Critical = 3,
26}
27
28/// Job state.
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub enum JobState {
31    /// Job is waiting to be processed
32    Pending,
33    /// Job is currently being processed
34    Processing,
35    /// Job completed successfully
36    Completed,
37    /// Job failed and will be retried
38    Failed,
39    /// Job failed permanently (max retries exceeded)
40    Dead,
41}
42
43/// Job status information.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct JobStatus {
46    /// Current state
47    pub state: JobState,
48
49    /// Progress percentage (0-100)
50    pub progress: u8,
51
52    /// Status message
53    pub message: Option<String>,
54
55    /// Error message (if failed)
56    pub error: Option<String>,
57
58    /// Last updated timestamp
59    pub updated_at: DateTime<Utc>,
60}
61
62impl JobStatus {
63    /// Create a new pending status.
64    pub fn pending() -> Self {
65        Self {
66            state: JobState::Pending,
67            progress: 0,
68            message: None,
69            error: None,
70            updated_at: Utc::now(),
71        }
72    }
73
74    /// Create a processing status.
75    pub fn processing() -> Self {
76        Self {
77            state: JobState::Processing,
78            progress: 0,
79            message: None,
80            error: None,
81            updated_at: Utc::now(),
82        }
83    }
84
85    /// Create a completed status.
86    pub fn completed() -> Self {
87        Self {
88            state: JobState::Completed,
89            progress: 100,
90            message: None,
91            error: None,
92            updated_at: Utc::now(),
93        }
94    }
95
96    /// Create a failed status.
97    pub fn failed(error: String) -> Self {
98        Self {
99            state: JobState::Failed,
100            progress: 0,
101            message: None,
102            error: Some(error),
103            updated_at: Utc::now(),
104        }
105    }
106
107    /// Create a dead status.
108    pub fn dead(error: String) -> Self {
109        Self {
110            state: JobState::Dead,
111            progress: 0,
112            message: None,
113            error: Some(error),
114            updated_at: Utc::now(),
115        }
116    }
117
118    /// Update progress.
119    pub fn with_progress(mut self, progress: u8) -> Self {
120        self.progress = progress.min(100);
121        self.updated_at = Utc::now();
122        self
123    }
124
125    /// Update message.
126    pub fn with_message(mut self, message: impl Into<String>) -> Self {
127        self.message = Some(message.into());
128        self.updated_at = Utc::now();
129        self
130    }
131}
132
133/// A job to be processed.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct Job {
136    /// Unique job identifier
137    pub id: JobId,
138
139    /// Job type/name
140    pub job_type: String,
141
142    /// Job payload data
143    pub data: JobData,
144
145    /// Job priority
146    pub priority: JobPriority,
147
148    /// Job status
149    pub status: JobStatus,
150
151    /// Number of attempts
152    pub attempts: u32,
153
154    /// Maximum number of retry attempts
155    pub max_attempts: u32,
156
157    /// Queue name
158    pub queue: String,
159
160    /// When the job was created
161    pub created_at: DateTime<Utc>,
162
163    /// When the job should be processed (for delayed jobs)
164    pub scheduled_at: Option<DateTime<Utc>>,
165
166    /// When the job was started
167    pub started_at: Option<DateTime<Utc>>,
168
169    /// When the job completed/failed
170    pub completed_at: Option<DateTime<Utc>>,
171
172    /// Job metadata
173    pub metadata: HashMap<String, String>,
174}
175
176impl Job {
177    /// Create a new job.
178    pub fn new(queue: impl Into<String>, job_type: impl Into<String>, data: JobData) -> Self {
179        Self {
180            id: Uuid::new_v4(),
181            job_type: job_type.into(),
182            data,
183            priority: JobPriority::default(),
184            status: JobStatus::pending(),
185            attempts: 0,
186            max_attempts: 3,
187            queue: queue.into(),
188            created_at: Utc::now(),
189            scheduled_at: None,
190            started_at: None,
191            completed_at: None,
192            metadata: HashMap::new(),
193        }
194    }
195
196    /// Set job priority.
197    pub fn with_priority(mut self, priority: JobPriority) -> Self {
198        self.priority = priority;
199        self
200    }
201
202    /// Set max retry attempts.
203    pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
204        self.max_attempts = max_attempts;
205        self
206    }
207
208    /// Schedule the job for later.
209    pub fn schedule_at(mut self, time: DateTime<Utc>) -> Self {
210        self.scheduled_at = Some(time);
211        self
212    }
213
214    /// Schedule the job after a delay.
215    pub fn schedule_after(mut self, duration: chrono::Duration) -> Self {
216        self.scheduled_at = Some(Utc::now() + duration);
217        self
218    }
219
220    /// Add metadata.
221    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
222        self.metadata.insert(key.into(), value.into());
223        self
224    }
225
226    /// Check if the job is ready to be processed.
227    pub fn is_ready(&self) -> bool {
228        if let Some(scheduled_at) = self.scheduled_at {
229            Utc::now() >= scheduled_at
230        } else {
231            true
232        }
233    }
234
235    /// Check if the job can be retried.
236    pub fn can_retry(&self) -> bool {
237        self.attempts < self.max_attempts
238    }
239
240    /// Mark job as processing.
241    pub fn start_processing(&mut self) {
242        self.status = JobStatus::processing();
243        self.started_at = Some(Utc::now());
244        self.attempts += 1;
245    }
246
247    /// Mark job as completed.
248    pub fn complete(&mut self) {
249        self.status = JobStatus::completed();
250        self.completed_at = Some(Utc::now());
251    }
252
253    /// Mark job as failed.
254    pub fn fail(&mut self, error: String) {
255        if self.can_retry() {
256            self.status = JobStatus::failed(error);
257        } else {
258            self.status = JobStatus::dead(error);
259            self.completed_at = Some(Utc::now());
260        }
261    }
262
263    /// Update job progress.
264    pub fn update_progress(&mut self, progress: u8, message: Option<String>) {
265        self.status.progress = progress.min(100);
266        self.status.message = message;
267        self.status.updated_at = Utc::now();
268    }
269
270    /// Calculate backoff delay for retry.
271    pub fn backoff_delay(&self) -> chrono::Duration {
272        // Exponential backoff: 2^attempts seconds
273        let seconds = 2_i64.pow(self.attempts.saturating_sub(1));
274        chrono::Duration::seconds(seconds.min(3600)) // Max 1 hour
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    #[test]
283    fn test_job_creation() {
284        let job = Job::new(
285            "default",
286            "send_email",
287            serde_json::json!({"to": "test@example.com"}),
288        );
289
290        assert_eq!(job.queue, "default");
291        assert_eq!(job.job_type, "send_email");
292        assert_eq!(job.attempts, 0);
293        assert_eq!(job.priority, JobPriority::Normal);
294    }
295
296    #[test]
297    fn test_job_builder() {
298        let job = Job::new("default", "task", serde_json::json!({}))
299            .with_priority(JobPriority::High)
300            .with_max_attempts(5)
301            .with_metadata("user_id", "123");
302
303        assert_eq!(job.priority, JobPriority::High);
304        assert_eq!(job.max_attempts, 5);
305        assert_eq!(job.metadata.get("user_id"), Some(&"123".to_string()));
306    }
307
308    #[test]
309    fn test_job_ready() {
310        let mut job = Job::new("default", "task", serde_json::json!({}));
311        assert!(job.is_ready());
312
313        job = job.schedule_at(Utc::now() + chrono::Duration::hours(1));
314        assert!(!job.is_ready());
315    }
316
317    #[test]
318    fn test_job_retry_logic() {
319        let mut job = Job::new("default", "task", serde_json::json!({}));
320        job.max_attempts = 3;
321
322        assert!(job.can_retry());
323
324        job.start_processing();
325        job.fail("Error 1".to_string());
326        assert!(job.can_retry());
327        assert_eq!(job.status.state, JobState::Failed);
328
329        job.start_processing();
330        job.fail("Error 2".to_string());
331        assert!(job.can_retry());
332
333        job.start_processing();
334        job.fail("Error 3".to_string());
335        assert!(!job.can_retry());
336        assert_eq!(job.status.state, JobState::Dead);
337    }
338
339    #[test]
340    fn test_backoff_delay() {
341        let mut job = Job::new("default", "task", serde_json::json!({}));
342
343        job.attempts = 1;
344        assert_eq!(job.backoff_delay(), chrono::Duration::seconds(1));
345
346        job.attempts = 2;
347        assert_eq!(job.backoff_delay(), chrono::Duration::seconds(2));
348
349        job.attempts = 3;
350        assert_eq!(job.backoff_delay(), chrono::Duration::seconds(4));
351
352        job.attempts = 10;
353        assert_eq!(job.backoff_delay(), chrono::Duration::seconds(512));
354    }
355
356    #[test]
357    fn test_job_priority_levels() {
358        let low =
359            Job::new("default", "task", serde_json::json!({})).with_priority(JobPriority::Low);
360        let normal =
361            Job::new("default", "task", serde_json::json!({})).with_priority(JobPriority::Normal);
362        let high =
363            Job::new("default", "task", serde_json::json!({})).with_priority(JobPriority::High);
364        let critical =
365            Job::new("default", "task", serde_json::json!({})).with_priority(JobPriority::Critical);
366
367        assert_eq!(low.priority, JobPriority::Low);
368        assert_eq!(normal.priority, JobPriority::Normal);
369        assert_eq!(high.priority, JobPriority::High);
370        assert_eq!(critical.priority, JobPriority::Critical);
371    }
372
373    #[test]
374    fn test_job_metadata() {
375        let job = Job::new("default", "task", serde_json::json!({}))
376            .with_metadata("key1", "value1")
377            .with_metadata("key2", "value2");
378
379        assert_eq!(job.metadata.len(), 2);
380        assert_eq!(job.metadata.get("key1"), Some(&"value1".to_string()));
381        assert_eq!(job.metadata.get("key2"), Some(&"value2".to_string()));
382    }
383
384    #[test]
385    fn test_job_schedule_at() {
386        let future = Utc::now() + chrono::Duration::hours(2);
387        let job = Job::new("default", "task", serde_json::json!({})).schedule_at(future);
388
389        assert!(!job.is_ready());
390        assert!(job.scheduled_at.is_some());
391    }
392
393    #[test]
394    fn test_job_scheduled_at_in_future() {
395        let future = Utc::now() + chrono::Duration::minutes(30);
396        let job = Job::new("default", "task", serde_json::json!({})).schedule_at(future);
397
398        assert!(!job.is_ready());
399        assert!(job.scheduled_at.is_some());
400    }
401
402    #[test]
403    fn test_job_status_transitions() {
404        let mut job = Job::new("default", "task", serde_json::json!({}));
405
406        assert_eq!(job.status.state, JobState::Pending);
407
408        job.start_processing();
409        assert_eq!(job.status.state, JobState::Processing);
410
411        job.complete();
412        assert_eq!(job.status.state, JobState::Completed);
413    }
414
415    #[test]
416    fn test_job_failure_tracking() {
417        let mut job = Job::new("default", "task", serde_json::json!({}));
418
419        job.start_processing();
420        job.fail("First error".to_string());
421
422        assert_eq!(job.status.state, JobState::Failed);
423        assert_eq!(job.status.error, Some("First error".to_string()));
424        assert_eq!(job.attempts, 1);
425    }
426
427    #[test]
428    fn test_job_max_attempts() {
429        let job = Job::new("default", "task", serde_json::json!({})).with_max_attempts(10);
430
431        assert_eq!(job.max_attempts, 10);
432    }
433
434    #[test]
435    fn test_job_default_max_attempts() {
436        let job = Job::new("default", "task", serde_json::json!({}));
437        assert_eq!(job.max_attempts, 3);
438    }
439
440    #[test]
441    fn test_job_can_retry_with_zero_max_attempts() {
442        let mut job = Job::new("default", "task", serde_json::json!({})).with_max_attempts(0);
443
444        job.start_processing();
445        job.fail("Error".to_string());
446
447        assert!(!job.can_retry());
448    }
449
450    #[test]
451    fn test_job_id_uniqueness() {
452        let job1 = Job::new("default", "task", serde_json::json!({}));
453        let job2 = Job::new("default", "task", serde_json::json!({}));
454
455        assert_ne!(job1.id, job2.id);
456    }
457
458    #[test]
459    fn test_job_timestamps() {
460        let before = Utc::now();
461        let job = Job::new("default", "task", serde_json::json!({}));
462        let after = Utc::now();
463
464        assert!(job.created_at >= before);
465        assert!(job.created_at <= after);
466    }
467
468    #[test]
469    fn test_job_complete_sets_state() {
470        let mut job = Job::new("default", "task", serde_json::json!({}));
471
472        job.start_processing();
473        job.complete();
474
475        assert_eq!(job.status.state, JobState::Completed);
476    }
477
478    #[test]
479    fn test_job_ready_with_past_schedule() {
480        let past = Utc::now() - chrono::Duration::hours(1);
481        let job = Job::new("default", "task", serde_json::json!({})).schedule_at(past);
482
483        assert!(job.is_ready());
484    }
485
486    #[test]
487    fn test_job_serialization_data() {
488        let data = serde_json::json!({
489            "email": "test@example.com",
490            "subject": "Test",
491            "count": 42
492        });
493
494        let job = Job::new("default", "send_email", data.clone());
495        assert_eq!(job.data, data);
496    }
497
498    #[test]
499    fn test_job_priority_ordering() {
500        assert!(JobPriority::Low < JobPriority::Normal);
501        assert!(JobPriority::Normal < JobPriority::High);
502        assert!(JobPriority::High < JobPriority::Critical);
503    }
504
505    #[test]
506    fn test_backoff_delay_exponential_growth() {
507        let mut job = Job::new("default", "task", serde_json::json!({}));
508
509        let delays: Vec<i64> = (1..=5)
510            .map(|attempt| {
511                job.attempts = attempt;
512                job.backoff_delay().num_seconds()
513            })
514            .collect();
515
516        // Verify exponential growth
517        assert!(delays[0] < delays[1]);
518        assert!(delays[1] < delays[2]);
519        assert!(delays[2] < delays[3]);
520        assert!(delays[3] < delays[4]);
521    }
522
523    #[test]
524    fn test_job_state_dead_after_max_retries() {
525        let mut job = Job::new("default", "task", serde_json::json!({})).with_max_attempts(2);
526
527        job.start_processing();
528        job.fail("Error 1".to_string());
529        assert_eq!(job.status.state, JobState::Failed);
530
531        job.start_processing();
532        job.fail("Error 2".to_string());
533        assert_eq!(job.status.state, JobState::Dead);
534    }
535
536    #[test]
537    fn test_job_metadata_overwrite() {
538        let job = Job::new("default", "task", serde_json::json!({}))
539            .with_metadata("key", "value1")
540            .with_metadata("key", "value2");
541
542        assert_eq!(job.metadata.get("key"), Some(&"value2".to_string()));
543    }
544}