elif_queue/
lib.rs

1//! # elif-queue
2//! 
3//! Background job queue system for the elif.rs framework.
4//! 
5//! ## Features
6//! 
7//! - **Multi-backend support**: Memory and Redis queue backends
8//! - **Job persistence**: Reliable job storage and recovery
9//! - **Priority queuing**: Support for job priorities and delays
10//! - **Async-first**: Built for modern async Rust applications
11//! - **Type-safe**: Generic job definitions with serialization support
12//! - **Retry logic**: Built-in failure handling and retry mechanisms
13//! 
14//! ## Quick Start
15//! 
16//! ```rust
17//! use elif_queue::{Queue, MemoryBackend, QueueConfig, Job, JobResult};
18//! use std::time::Duration;
19//! use serde::{Serialize, Deserialize};
20//! use async_trait::async_trait;
21//! 
22//! #[derive(Debug, Clone, Serialize, Deserialize)]
23//! struct EmailJob {
24//!     to: String,
25//!     subject: String,
26//!     body: String,
27//! }
28//! 
29//! #[async_trait]
30//! impl Job for EmailJob {
31//!     async fn execute(&self) -> JobResult<()> {
32//!         // Send email logic here
33//!         println!("Sending email to: {}", self.to);
34//!         Ok(())
35//!     }
36//! 
37//!     fn job_type(&self) -> &'static str {
38//!         "email"
39//!     }
40//! }
41//! 
42//! # tokio_test::block_on(async {
43//! // Create a memory-based queue
44//! let queue = Queue::new(MemoryBackend::new(QueueConfig::default()));
45//! 
46//! // Enqueue a job
47//! let job = EmailJob {
48//!     to: "user@example.com".to_string(),
49//!     subject: "Hello".to_string(),
50//!     body: "Hello, World!".to_string(),
51//! };
52//! 
53//! queue.enqueue(job, None).await.unwrap();
54//! 
55//! // Process jobs
56//! if let Some(job_entry) = queue.dequeue().await.unwrap() {
57//!     let result = job_entry.execute::<EmailJob>().await;
58//!     queue.complete(job_entry.id(), result).await.unwrap();
59//! }
60//! # });
61//! ```
62
63use std::time::Duration;
64use serde::{Serialize, Deserialize, de::DeserializeOwned};
65use async_trait::async_trait;
66use thiserror::Error;
67use uuid::Uuid;
68use chrono::{DateTime, Utc};
69
70pub mod backends;
71pub mod config;
72pub mod worker;
73pub mod scheduler;
74
75pub use backends::*;
76pub use config::*;
77pub use worker::*;
78pub use scheduler::*;
79
80/// Job execution errors
81#[derive(Error, Debug)]
82pub enum QueueError {
83    #[error("Serialization error: {0}")]
84    Serialization(#[from] serde_json::Error),
85    
86    #[error("Backend error: {0}")]
87    Backend(String),
88    
89    #[error("Job not found: {0}")]
90    JobNotFound(String),
91    
92    #[error("Queue configuration error: {0}")]
93    Configuration(String),
94    
95    #[error("Network error: {0}")]
96    Network(String),
97    
98    #[error("Timeout error")]
99    Timeout,
100    
101    #[error("Job execution failed: {0}")]
102    Execution(String),
103}
104
105/// Result type for queue operations
106pub type QueueResult<T> = Result<T, QueueError>;
107
108/// Result type for job execution
109pub type JobResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
110
111/// Job unique identifier
112pub type JobId = Uuid;
113
114/// Job priority levels
115#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
116pub enum Priority {
117    Low = 0,
118    Normal = 1,
119    High = 2,
120    Critical = 3,
121}
122
123impl Default for Priority {
124    fn default() -> Self {
125        Priority::Normal
126    }
127}
128
129/// Job execution state
130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
131pub enum JobState {
132    /// Job is waiting to be processed
133    Pending,
134    /// Job is currently being processed
135    Processing,
136    /// Job completed successfully
137    Completed,
138    /// Job failed and will be retried
139    Failed,
140    /// Job failed permanently after max retries
141    Dead,
142}
143
144/// Core trait that all jobs must implement
145#[async_trait]
146pub trait Job: Send + Sync + Serialize + DeserializeOwned {
147    /// Execute the job
148    async fn execute(&self) -> JobResult<()>;
149    
150    /// Get the job type identifier
151    fn job_type(&self) -> &'static str;
152    
153    /// Get maximum number of retry attempts (default: 3)
154    fn max_retries(&self) -> u32 {
155        3
156    }
157    
158    /// Get retry delay (default: exponential backoff starting at 1 second)
159    fn retry_delay(&self, attempt: u32) -> Duration {
160        Duration::from_secs(1 << attempt.min(6)) // Cap at 64 seconds
161    }
162    
163    /// Get job timeout (default: 5 minutes)
164    fn timeout(&self) -> Duration {
165        Duration::from_secs(300)
166    }
167}
168
169/// Job entry containing job data and metadata
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct JobEntry {
172    /// Unique job identifier
173    id: JobId,
174    /// Job type for routing and processing
175    job_type: String,
176    /// Serialized job payload
177    payload: serde_json::Value,
178    /// Job priority
179    priority: Priority,
180    /// Current job state
181    state: JobState,
182    /// Number of retry attempts
183    attempts: u32,
184    /// Maximum retry attempts
185    max_retries: u32,
186    /// When the job was created
187    created_at: DateTime<Utc>,
188    /// When the job should be processed (for delayed jobs)
189    run_at: DateTime<Utc>,
190    /// When the job was last processed
191    processed_at: Option<DateTime<Utc>>,
192    /// Last error message (if any)
193    last_error: Option<String>,
194}
195
196impl JobEntry {
197    /// Create a new job entry
198    pub fn new<T: Job>(job: T, priority: Option<Priority>, delay: Option<Duration>) -> QueueResult<Self> {
199        let now = Utc::now();
200        let run_at = match delay {
201            Some(d) => now + chrono::Duration::from_std(d)
202                .map_err(|e| QueueError::Configuration(format!("Invalid delay duration: {}", e)))?,
203            None => now,
204        };
205        
206        let job_type = job.job_type().to_string();
207        let max_retries = job.max_retries();
208        let payload = serde_json::to_value(job)?;
209        
210        Ok(JobEntry {
211            id: Uuid::new_v4(),
212            job_type,
213            payload,
214            priority: priority.unwrap_or_default(),
215            state: JobState::Pending,
216            attempts: 0,
217            max_retries,
218            created_at: now,
219            run_at,
220            processed_at: None,
221            last_error: None,
222        })
223    }
224    
225    /// Get job ID
226    pub fn id(&self) -> JobId {
227        self.id
228    }
229    
230    /// Get job type
231    pub fn job_type(&self) -> &str {
232        &self.job_type
233    }
234    
235    /// Get job priority
236    pub fn priority(&self) -> Priority {
237        self.priority
238    }
239    
240    /// Get job state
241    pub fn state(&self) -> &JobState {
242        &self.state
243    }
244    
245    /// Get number of attempts
246    pub fn attempts(&self) -> u32 {
247        self.attempts
248    }
249    
250    /// Get when job should be processed
251    pub fn run_at(&self) -> DateTime<Utc> {
252        self.run_at
253    }
254    
255    /// Get job payload
256    pub fn payload(&self) -> &serde_json::Value {
257        &self.payload
258    }
259    
260    /// Check if job is ready to be processed
261    pub fn is_ready(&self) -> bool {
262        matches!(self.state, JobState::Pending | JobState::Failed) && self.run_at <= Utc::now()
263    }
264    
265    /// Deserialize and execute the job
266    pub async fn execute<T: Job>(&self) -> JobResult<()> {
267        let job: T = serde_json::from_value(self.payload.clone())?;
268        job.execute().await
269    }
270    
271    /// Mark job as processing
272    pub(crate) fn mark_processing(&mut self) {
273        self.state = JobState::Processing;
274        self.processed_at = Some(Utc::now());
275    }
276    
277    /// Mark job as completed
278    pub(crate) fn mark_completed(&mut self) {
279        self.state = JobState::Completed;
280    }
281    
282    /// Mark job as failed
283    pub(crate) fn mark_failed(&mut self, error: String) {
284        self.attempts += 1;
285        self.last_error = Some(error);
286        
287        if self.attempts >= self.max_retries {
288            self.state = JobState::Dead;
289        } else {
290            self.state = JobState::Failed;
291            // Set next retry time with exponential backoff
292            let delay = Duration::from_secs(1 << self.attempts.min(6));
293            // Set next retry time with exponential backoff
294            // If the delay is too large for chrono::Duration, cap it at max value
295            let chrono_delay = chrono::Duration::from_std(delay)
296                .unwrap_or(chrono::Duration::MAX);
297            self.run_at = Utc::now() + chrono_delay;
298        }
299    }
300    
301    /// Reset job for retry (used for dead letter queue reprocessing)
302    pub(crate) fn reset_for_retry(&mut self) {
303        self.attempts = 0;
304        self.state = JobState::Pending;
305        self.run_at = Utc::now();
306        self.last_error = None;
307        self.processed_at = None;
308    }
309    
310    /// Create a new job entry with explicit job type (for scheduled jobs)
311    pub(crate) fn new_with_job_type(
312        job_type: String,
313        payload: serde_json::Value,
314        priority: Option<Priority>,
315        delay: Option<Duration>,
316        max_retries: u32,
317    ) -> QueueResult<Self> {
318        let now = Utc::now();
319        let run_at = delay.map(|d| now + chrono::Duration::from_std(d).unwrap()).unwrap_or(now);
320        
321        Ok(JobEntry {
322            id: Uuid::new_v4(),
323            job_type,
324            payload,
325            priority: priority.unwrap_or_default(),
326            state: JobState::Pending,
327            attempts: 0,
328            max_retries,
329            created_at: now,
330            run_at,
331            processed_at: None,
332            last_error: None,
333        })
334    }
335}
336
337/// Core queue backend trait that all queue implementations must implement
338#[async_trait]
339pub trait QueueBackend: Send + Sync {
340    /// Enqueue a job
341    async fn enqueue(&self, job: JobEntry) -> QueueResult<JobId>;
342    
343    /// Dequeue the next available job
344    async fn dequeue(&self) -> QueueResult<Option<JobEntry>>;
345    
346    /// Mark a job as completed
347    async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()>;
348    
349    /// Get job by ID
350    async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>>;
351    
352    /// Get jobs by state
353    async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>>;
354    
355    /// Remove a job from the queue
356    async fn remove_job(&self, job_id: JobId) -> QueueResult<bool>;
357    
358    /// Clear all jobs from the queue
359    async fn clear(&self) -> QueueResult<()>;
360    
361    /// Get queue statistics
362    async fn stats(&self) -> QueueResult<QueueStats>;
363    
364    /// Atomically requeue a job (remove old and enqueue new)
365    /// Default implementation is non-atomic for backward compatibility
366    async fn requeue_job(&self, job_id: JobId, mut job: JobEntry) -> QueueResult<bool> {
367        // Non-atomic fallback implementation
368        if self.remove_job(job_id).await? {
369            job.reset_for_retry();
370            self.enqueue(job).await?;
371            Ok(true)
372        } else {
373            Ok(false)
374        }
375    }
376    
377    /// Atomically clear all jobs in a specific state
378    /// Default implementation is non-atomic for backward compatibility
379    async fn clear_jobs_by_state(&self, state: JobState) -> QueueResult<u64> {
380        // Non-atomic fallback implementation
381        let jobs = self.get_jobs_by_state(state, None).await?;
382        let count = jobs.len() as u64;
383        
384        for job in jobs {
385            self.remove_job(job.id()).await?;
386        }
387        
388        Ok(count)
389    }
390}
391
392/// Queue statistics
393#[derive(Debug, Clone, Default, Serialize, Deserialize)]
394pub struct QueueStats {
395    pub pending_jobs: u64,
396    pub processing_jobs: u64,
397    pub completed_jobs: u64,
398    pub failed_jobs: u64,
399    pub dead_jobs: u64,
400    pub total_jobs: u64,
401}
402
403/// High-level queue interface
404pub struct Queue<B: QueueBackend> {
405    backend: B,
406}
407
408impl<B: QueueBackend> Queue<B> {
409    /// Create a new queue instance with the given backend
410    pub fn new(backend: B) -> Self {
411        Self { backend }
412    }
413    
414    /// Enqueue a job
415    pub async fn enqueue<T: Job>(&self, job: T, priority: Option<Priority>) -> QueueResult<JobId> {
416        let entry = JobEntry::new(job, priority, None)?;
417        self.backend.enqueue(entry).await
418    }
419    
420    /// Enqueue a delayed job
421    pub async fn enqueue_delayed<T: Job>(&self, job: T, delay: Duration, priority: Option<Priority>) -> QueueResult<JobId> {
422        let entry = JobEntry::new(job, priority, Some(delay))?;
423        self.backend.enqueue(entry).await
424    }
425    
426    /// Dequeue the next available job
427    pub async fn dequeue(&self) -> QueueResult<Option<JobEntry>> {
428        self.backend.dequeue().await
429    }
430    
431    /// Mark a job as completed
432    pub async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()> {
433        self.backend.complete(job_id, result).await
434    }
435    
436    /// Get job by ID
437    pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>> {
438        self.backend.get_job(job_id).await
439    }
440    
441    /// Get jobs by state
442    pub async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>> {
443        self.backend.get_jobs_by_state(state, limit).await
444    }
445    
446    /// Remove a job from the queue
447    pub async fn remove_job(&self, job_id: JobId) -> QueueResult<bool> {
448        self.backend.remove_job(job_id).await
449    }
450    
451    /// Clear all jobs from the queue
452    pub async fn clear(&self) -> QueueResult<()> {
453        self.backend.clear().await
454    }
455    
456    /// Get queue statistics
457    pub async fn stats(&self) -> QueueResult<QueueStats> {
458        self.backend.stats().await
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use crate::backends::MemoryBackend;
466    
467    #[derive(Debug, Clone, Serialize, Deserialize)]
468    struct TestJob {
469        message: String,
470    }
471    
472    #[async_trait]
473    impl Job for TestJob {
474        async fn execute(&self) -> JobResult<()> {
475            println!("Executing job: {}", self.message);
476            Ok(())
477        }
478        
479        fn job_type(&self) -> &'static str {
480            "test"
481        }
482    }
483    
484    #[tokio::test]
485    async fn test_job_entry_creation() {
486        let job = TestJob {
487            message: "Hello, World!".to_string(),
488        };
489        
490        let entry = JobEntry::new(job, Some(Priority::High), None).unwrap();
491        assert_eq!(entry.job_type(), "test");
492        assert_eq!(entry.priority(), Priority::High);
493        assert_eq!(entry.state(), &JobState::Pending);
494        assert_eq!(entry.attempts(), 0);
495        assert!(entry.is_ready());
496    }
497    
498    #[tokio::test]
499    async fn test_delayed_job() {
500        let job = TestJob {
501            message: "Delayed job".to_string(),
502        };
503        
504        let delay = Duration::from_secs(60);
505        let entry = JobEntry::new(job, None, Some(delay)).unwrap();
506        
507        // Delayed job should not be ready immediately
508        assert!(!entry.is_ready());
509        assert!(entry.run_at() > Utc::now());
510    }
511    
512    #[tokio::test]
513    async fn test_duration_conversion_error_handling() {
514        use std::time::Duration as StdDuration;
515        
516        let job = TestJob {
517            message: "test".to_string(),
518        };
519        
520        // Test with maximum possible duration that would overflow chrono::Duration
521        let max_delay = StdDuration::MAX;
522        let result = JobEntry::new(job.clone(), None, Some(max_delay));
523        
524        // This should fail with a Configuration error instead of panicking
525        assert!(result.is_err());
526        if let Err(QueueError::Configuration(msg)) = result {
527            assert!(msg.contains("Invalid delay duration"));
528        } else {
529            panic!("Expected Configuration error for invalid delay duration");
530        }
531        
532        // Test that mark_failed doesn't panic with very large retry attempts
533        let entry = JobEntry::new(job, None, None).unwrap();
534        let mut job_entry = entry;
535        
536        // Set attempts to a very high value to test exponential backoff overflow
537        job_entry.attempts = 100; // This would cause 2^100 seconds delay, way beyond chrono limits
538        
539        // This should not panic, but gracefully handle the overflow
540        job_entry.mark_failed("test error".to_string());
541        
542        // Job should still be properly handled
543        assert_eq!(job_entry.state, JobState::Dead); // Should exceed max_retries and be dead
544    }
545    
546    #[tokio::test]
547    async fn test_queue_basic_operations() {
548        let backend = MemoryBackend::new(QueueConfig::default());
549        let queue = Queue::new(backend);
550        
551        let job = TestJob {
552            message: "Test job".to_string(),
553        };
554        
555        // Enqueue job
556        let job_id = queue.enqueue(job, Some(Priority::Normal)).await.unwrap();
557        
558        // Dequeue job
559        let job_entry = queue.dequeue().await.unwrap().unwrap();
560        assert_eq!(job_entry.id(), job_id);
561        assert_eq!(job_entry.job_type(), "test");
562        
563        // Complete job
564        let result = job_entry.execute::<TestJob>().await;
565        queue.complete(job_id, result).await.unwrap();
566        
567        // Verify stats
568        let stats = queue.stats().await.unwrap();
569        assert_eq!(stats.total_jobs, 1);
570        assert_eq!(stats.completed_jobs, 1);
571    }
572}