elif_queue/
lib.rs

1//! # elif-queue
2//! 
3//! Background job queue system for the elif.rs framework.
4//! 
5//! ## Features
6//! 
7//! - **Multi-backend support**: Memory and Redis queue backends
8//! - **Job persistence**: Reliable job storage and recovery
9//! - **Priority queuing**: Support for job priorities and delays
10//! - **Async-first**: Built for modern async Rust applications
11//! - **Type-safe**: Generic job definitions with serialization support
12//! - **Retry logic**: Built-in failure handling and retry mechanisms
13//! 
14//! ## Quick Start
15//! 
16//! ```rust
17//! use elif_queue::{Queue, MemoryBackend, QueueConfig, Job, JobResult};
18//! use std::time::Duration;
19//! use serde::{Serialize, Deserialize};
20//! use async_trait::async_trait;
21//! 
22//! #[derive(Debug, Clone, Serialize, Deserialize)]
23//! struct EmailJob {
24//!     to: String,
25//!     subject: String,
26//!     body: String,
27//! }
28//! 
29//! #[async_trait]
30//! impl Job for EmailJob {
31//!     async fn execute(&self) -> JobResult<()> {
32//!         // Send email logic here
33//!         println!("Sending email to: {}", self.to);
34//!         Ok(())
35//!     }
36//! 
37//!     fn job_type(&self) -> &'static str {
38//!         "email"
39//!     }
40//! }
41//! 
42//! # tokio_test::block_on(async {
43//! // Create a memory-based queue
44//! let queue = Queue::new(MemoryBackend::new(QueueConfig::default()));
45//! 
46//! // Enqueue a job
47//! let job = EmailJob {
48//!     to: "user@example.com".to_string(),
49//!     subject: "Hello".to_string(),
50//!     body: "Hello, World!".to_string(),
51//! };
52//! 
53//! queue.enqueue(job, None).await.unwrap();
54//! 
55//! // Process jobs
56//! if let Some(job_entry) = queue.dequeue().await.unwrap() {
57//!     let result = job_entry.execute::<EmailJob>().await;
58//!     queue.complete(job_entry.id(), result).await.unwrap();
59//! }
60//! # });
61//! ```
62
63use std::time::Duration;
64use serde::{Serialize, Deserialize, de::DeserializeOwned};
65use async_trait::async_trait;
66use thiserror::Error;
67use uuid::Uuid;
68use chrono::{DateTime, Utc};
69
70pub mod backends;
71pub mod config;
72pub mod worker;
73pub mod scheduler;
74
75pub use backends::*;
76pub use config::*;
77pub use worker::*;
78pub use scheduler::*;
79
80/// Job execution errors
81#[derive(Error, Debug)]
82pub enum QueueError {
83    #[error("Serialization error: {0}")]
84    Serialization(#[from] serde_json::Error),
85    
86    #[error("Backend error: {0}")]
87    Backend(String),
88    
89    #[error("Job not found: {0}")]
90    JobNotFound(String),
91    
92    #[error("Queue configuration error: {0}")]
93    Configuration(String),
94    
95    #[error("Network error: {0}")]
96    Network(String),
97    
98    #[error("Timeout error")]
99    Timeout,
100    
101    #[error("Job execution failed: {0}")]
102    Execution(String),
103}
104
105/// Result type for queue operations
106pub type QueueResult<T> = Result<T, QueueError>;
107
108/// Result type for job execution
109pub type JobResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
110
111/// Job unique identifier
112pub type JobId = Uuid;
113
114/// Job priority levels
115#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
116pub enum Priority {
117    Low = 0,
118    Normal = 1,
119    High = 2,
120    Critical = 3,
121}
122
123impl Default for Priority {
124    fn default() -> Self {
125        Priority::Normal
126    }
127}
128
129/// Job execution state
130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
131pub enum JobState {
132    /// Job is waiting to be processed
133    Pending,
134    /// Job is currently being processed
135    Processing,
136    /// Job completed successfully
137    Completed,
138    /// Job failed and will be retried
139    Failed,
140    /// Job failed permanently after max retries
141    Dead,
142}
143
144/// Core trait that all jobs must implement
145#[async_trait]
146pub trait Job: Send + Sync + Serialize + DeserializeOwned {
147    /// Execute the job
148    async fn execute(&self) -> JobResult<()>;
149    
150    /// Get the job type identifier
151    fn job_type(&self) -> &'static str;
152    
153    /// Get maximum number of retry attempts (default: 3)
154    fn max_retries(&self) -> u32 {
155        3
156    }
157    
158    /// Get retry delay (default: exponential backoff starting at 1 second)
159    fn retry_delay(&self, attempt: u32) -> Duration {
160        Duration::from_secs(1 << attempt.min(6)) // Cap at 64 seconds
161    }
162    
163    /// Get job timeout (default: 5 minutes)
164    fn timeout(&self) -> Duration {
165        Duration::from_secs(300)
166    }
167}
168
169/// Job entry containing job data and metadata
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct JobEntry {
172    /// Unique job identifier
173    id: JobId,
174    /// Job type for routing and processing
175    job_type: String,
176    /// Serialized job payload
177    payload: serde_json::Value,
178    /// Job priority
179    priority: Priority,
180    /// Current job state
181    state: JobState,
182    /// Number of retry attempts
183    attempts: u32,
184    /// Maximum retry attempts
185    max_retries: u32,
186    /// When the job was created
187    created_at: DateTime<Utc>,
188    /// When the job should be processed (for delayed jobs)
189    run_at: DateTime<Utc>,
190    /// When the job was last processed
191    processed_at: Option<DateTime<Utc>>,
192    /// Last error message (if any)
193    last_error: Option<String>,
194}
195
196impl JobEntry {
197    /// Create a new job entry
198    pub fn new<T: Job>(job: T, priority: Option<Priority>, delay: Option<Duration>) -> QueueResult<Self> {
199        let now = Utc::now();
200        let run_at = match delay {
201            Some(d) => now + chrono::Duration::from_std(d)
202                .map_err(|e| QueueError::Configuration(format!("Invalid delay duration: {}", e)))?,
203            None => now,
204        };
205        
206        let job_type = job.job_type().to_string();
207        let max_retries = job.max_retries();
208        let payload = serde_json::to_value(job)?;
209        
210        Ok(JobEntry {
211            id: Uuid::new_v4(),
212            job_type,
213            payload,
214            priority: priority.unwrap_or_default(),
215            state: JobState::Pending,
216            attempts: 0,
217            max_retries,
218            created_at: now,
219            run_at,
220            processed_at: None,
221            last_error: None,
222        })
223    }
224    
225    /// Get job ID
226    pub fn id(&self) -> JobId {
227        self.id
228    }
229    
230    /// Get job type
231    pub fn job_type(&self) -> &str {
232        &self.job_type
233    }
234    
235    /// Get job priority
236    pub fn priority(&self) -> Priority {
237        self.priority
238    }
239    
240    /// Get job state
241    pub fn state(&self) -> &JobState {
242        &self.state
243    }
244    
245    /// Get number of attempts
246    pub fn attempts(&self) -> u32 {
247        self.attempts
248    }
249    
250    /// Get when job should be processed
251    pub fn run_at(&self) -> DateTime<Utc> {
252        self.run_at
253    }
254    
255    /// Check if job is ready to be processed
256    pub fn is_ready(&self) -> bool {
257        matches!(self.state, JobState::Pending | JobState::Failed) && self.run_at <= Utc::now()
258    }
259    
260    /// Deserialize and execute the job
261    pub async fn execute<T: Job>(&self) -> JobResult<()> {
262        let job: T = serde_json::from_value(self.payload.clone())?;
263        job.execute().await
264    }
265    
266    /// Mark job as processing
267    pub(crate) fn mark_processing(&mut self) {
268        self.state = JobState::Processing;
269        self.processed_at = Some(Utc::now());
270    }
271    
272    /// Mark job as completed
273    pub(crate) fn mark_completed(&mut self) {
274        self.state = JobState::Completed;
275    }
276    
277    /// Mark job as failed
278    pub(crate) fn mark_failed(&mut self, error: String) {
279        self.attempts += 1;
280        self.last_error = Some(error);
281        
282        if self.attempts >= self.max_retries {
283            self.state = JobState::Dead;
284        } else {
285            self.state = JobState::Failed;
286            // Set next retry time with exponential backoff
287            let delay = Duration::from_secs(1 << self.attempts.min(6));
288            // Set next retry time with exponential backoff
289            // If the delay is too large for chrono::Duration, cap it at max value
290            let chrono_delay = chrono::Duration::from_std(delay)
291                .unwrap_or(chrono::Duration::MAX);
292            self.run_at = Utc::now() + chrono_delay;
293        }
294    }
295    
296    /// Reset job for retry (used for dead letter queue reprocessing)
297    pub(crate) fn reset_for_retry(&mut self) {
298        self.attempts = 0;
299        self.state = JobState::Pending;
300        self.run_at = Utc::now();
301        self.last_error = None;
302        self.processed_at = None;
303    }
304    
305    /// Create a new job entry with explicit job type (for scheduled jobs)
306    pub(crate) fn new_with_job_type(
307        job_type: String,
308        payload: serde_json::Value,
309        priority: Option<Priority>,
310        delay: Option<Duration>,
311        max_retries: u32,
312    ) -> QueueResult<Self> {
313        let now = Utc::now();
314        let run_at = delay.map(|d| now + chrono::Duration::from_std(d).unwrap()).unwrap_or(now);
315        
316        Ok(JobEntry {
317            id: Uuid::new_v4(),
318            job_type,
319            payload,
320            priority: priority.unwrap_or_default(),
321            state: JobState::Pending,
322            attempts: 0,
323            max_retries,
324            created_at: now,
325            run_at,
326            processed_at: None,
327            last_error: None,
328        })
329    }
330}
331
332/// Core queue backend trait that all queue implementations must implement
333#[async_trait]
334pub trait QueueBackend: Send + Sync {
335    /// Enqueue a job
336    async fn enqueue(&self, job: JobEntry) -> QueueResult<JobId>;
337    
338    /// Dequeue the next available job
339    async fn dequeue(&self) -> QueueResult<Option<JobEntry>>;
340    
341    /// Mark a job as completed
342    async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()>;
343    
344    /// Get job by ID
345    async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>>;
346    
347    /// Get jobs by state
348    async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>>;
349    
350    /// Remove a job from the queue
351    async fn remove_job(&self, job_id: JobId) -> QueueResult<bool>;
352    
353    /// Clear all jobs from the queue
354    async fn clear(&self) -> QueueResult<()>;
355    
356    /// Get queue statistics
357    async fn stats(&self) -> QueueResult<QueueStats>;
358    
359    /// Atomically requeue a job (remove old and enqueue new)
360    /// Default implementation is non-atomic for backward compatibility
361    async fn requeue_job(&self, job_id: JobId, mut job: JobEntry) -> QueueResult<bool> {
362        // Non-atomic fallback implementation
363        if self.remove_job(job_id).await? {
364            job.reset_for_retry();
365            self.enqueue(job).await?;
366            Ok(true)
367        } else {
368            Ok(false)
369        }
370    }
371    
372    /// Atomically clear all jobs in a specific state
373    /// Default implementation is non-atomic for backward compatibility
374    async fn clear_jobs_by_state(&self, state: JobState) -> QueueResult<u64> {
375        // Non-atomic fallback implementation
376        let jobs = self.get_jobs_by_state(state, None).await?;
377        let count = jobs.len() as u64;
378        
379        for job in jobs {
380            self.remove_job(job.id()).await?;
381        }
382        
383        Ok(count)
384    }
385}
386
387/// Queue statistics
388#[derive(Debug, Clone, Default, Serialize, Deserialize)]
389pub struct QueueStats {
390    pub pending_jobs: u64,
391    pub processing_jobs: u64,
392    pub completed_jobs: u64,
393    pub failed_jobs: u64,
394    pub dead_jobs: u64,
395    pub total_jobs: u64,
396}
397
398/// High-level queue interface
399pub struct Queue<B: QueueBackend> {
400    backend: B,
401}
402
403impl<B: QueueBackend> Queue<B> {
404    /// Create a new queue instance with the given backend
405    pub fn new(backend: B) -> Self {
406        Self { backend }
407    }
408    
409    /// Enqueue a job
410    pub async fn enqueue<T: Job>(&self, job: T, priority: Option<Priority>) -> QueueResult<JobId> {
411        let entry = JobEntry::new(job, priority, None)?;
412        self.backend.enqueue(entry).await
413    }
414    
415    /// Enqueue a delayed job
416    pub async fn enqueue_delayed<T: Job>(&self, job: T, delay: Duration, priority: Option<Priority>) -> QueueResult<JobId> {
417        let entry = JobEntry::new(job, priority, Some(delay))?;
418        self.backend.enqueue(entry).await
419    }
420    
421    /// Dequeue the next available job
422    pub async fn dequeue(&self) -> QueueResult<Option<JobEntry>> {
423        self.backend.dequeue().await
424    }
425    
426    /// Mark a job as completed
427    pub async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()> {
428        self.backend.complete(job_id, result).await
429    }
430    
431    /// Get job by ID
432    pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>> {
433        self.backend.get_job(job_id).await
434    }
435    
436    /// Get jobs by state
437    pub async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>> {
438        self.backend.get_jobs_by_state(state, limit).await
439    }
440    
441    /// Remove a job from the queue
442    pub async fn remove_job(&self, job_id: JobId) -> QueueResult<bool> {
443        self.backend.remove_job(job_id).await
444    }
445    
446    /// Clear all jobs from the queue
447    pub async fn clear(&self) -> QueueResult<()> {
448        self.backend.clear().await
449    }
450    
451    /// Get queue statistics
452    pub async fn stats(&self) -> QueueResult<QueueStats> {
453        self.backend.stats().await
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::backends::MemoryBackend;
461    
462    #[derive(Debug, Clone, Serialize, Deserialize)]
463    struct TestJob {
464        message: String,
465    }
466    
467    #[async_trait]
468    impl Job for TestJob {
469        async fn execute(&self) -> JobResult<()> {
470            println!("Executing job: {}", self.message);
471            Ok(())
472        }
473        
474        fn job_type(&self) -> &'static str {
475            "test"
476        }
477    }
478    
479    #[tokio::test]
480    async fn test_job_entry_creation() {
481        let job = TestJob {
482            message: "Hello, World!".to_string(),
483        };
484        
485        let entry = JobEntry::new(job, Some(Priority::High), None).unwrap();
486        assert_eq!(entry.job_type(), "test");
487        assert_eq!(entry.priority(), Priority::High);
488        assert_eq!(entry.state(), &JobState::Pending);
489        assert_eq!(entry.attempts(), 0);
490        assert!(entry.is_ready());
491    }
492    
493    #[tokio::test]
494    async fn test_delayed_job() {
495        let job = TestJob {
496            message: "Delayed job".to_string(),
497        };
498        
499        let delay = Duration::from_secs(60);
500        let entry = JobEntry::new(job, None, Some(delay)).unwrap();
501        
502        // Delayed job should not be ready immediately
503        assert!(!entry.is_ready());
504        assert!(entry.run_at() > Utc::now());
505    }
506    
507    #[tokio::test]
508    async fn test_duration_conversion_error_handling() {
509        use std::time::Duration as StdDuration;
510        
511        let job = TestJob {
512            message: "test".to_string(),
513        };
514        
515        // Test with maximum possible duration that would overflow chrono::Duration
516        let max_delay = StdDuration::MAX;
517        let result = JobEntry::new(job.clone(), None, Some(max_delay));
518        
519        // This should fail with a Configuration error instead of panicking
520        assert!(result.is_err());
521        if let Err(QueueError::Configuration(msg)) = result {
522            assert!(msg.contains("Invalid delay duration"));
523        } else {
524            panic!("Expected Configuration error for invalid delay duration");
525        }
526        
527        // Test that mark_failed doesn't panic with very large retry attempts
528        let entry = JobEntry::new(job, None, None).unwrap();
529        let mut job_entry = entry;
530        
531        // Set attempts to a very high value to test exponential backoff overflow
532        job_entry.attempts = 100; // This would cause 2^100 seconds delay, way beyond chrono limits
533        
534        // This should not panic, but gracefully handle the overflow
535        job_entry.mark_failed("test error".to_string());
536        
537        // Job should still be properly handled
538        assert_eq!(job_entry.state, JobState::Dead); // Should exceed max_retries and be dead
539    }
540    
541    #[tokio::test]
542    async fn test_queue_basic_operations() {
543        let backend = MemoryBackend::new(QueueConfig::default());
544        let queue = Queue::new(backend);
545        
546        let job = TestJob {
547            message: "Test job".to_string(),
548        };
549        
550        // Enqueue job
551        let job_id = queue.enqueue(job, Some(Priority::Normal)).await.unwrap();
552        
553        // Dequeue job
554        let job_entry = queue.dequeue().await.unwrap().unwrap();
555        assert_eq!(job_entry.id(), job_id);
556        assert_eq!(job_entry.job_type(), "test");
557        
558        // Complete job
559        let result = job_entry.execute::<TestJob>().await;
560        queue.complete(job_id, result).await.unwrap();
561        
562        // Verify stats
563        let stats = queue.stats().await.unwrap();
564        assert_eq!(stats.total_jobs, 1);
565        assert_eq!(stats.completed_jobs, 1);
566    }
567}