elif_queue/backends/
memory.rs

1//! In-memory queue backend implementation for development and testing
2
3use crate::{JobEntry, JobId, JobState, QueueBackend, QueueError, QueueResult, QueueStats, JobResult, QueueConfig};
4use async_trait::async_trait;
5use dashmap::DashMap;
6use parking_lot::RwLock;
7use std::collections::BinaryHeap;
8use std::cmp::Ordering;
9use std::sync::Arc;
10use tokio::time::Instant;
11use chrono::Utc;
12
13/// Wrapper for JobEntry to implement Ord for priority queue
14#[derive(Debug, Clone)]
15struct PriorityJobEntry {
16    entry: JobEntry,
17    enqueue_time: Instant,
18}
19
20impl PartialEq for PriorityJobEntry {
21    fn eq(&self, other: &Self) -> bool {
22        self.entry.priority() == other.entry.priority() && 
23        self.entry.run_at() == other.entry.run_at() &&
24        self.enqueue_time == other.enqueue_time
25    }
26}
27
28impl Eq for PriorityJobEntry {}
29
30impl PartialOrd for PriorityJobEntry {
31    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
32        Some(self.cmp(other))
33    }
34}
35
36impl Ord for PriorityJobEntry {
37    fn cmp(&self, other: &Self) -> Ordering {
38        // Higher priority first, then earlier run_at, then earlier enqueue time
39        match self.entry.priority().cmp(&other.entry.priority()) {
40            Ordering::Equal => {
41                match other.entry.run_at().cmp(&self.entry.run_at()) {
42                    Ordering::Equal => other.enqueue_time.cmp(&self.enqueue_time),
43                    other_ord => other_ord,
44                }
45            }
46            priority_ord => priority_ord,
47        }
48    }
49}
50
51/// In-memory queue backend
52pub struct MemoryBackend {
53    config: QueueConfig,
54    jobs: DashMap<JobId, JobEntry>,
55    pending_queue: Arc<RwLock<BinaryHeap<PriorityJobEntry>>>,
56    stats: Arc<RwLock<QueueStats>>,
57}
58
59impl MemoryBackend {
60    /// Create a new memory backend
61    pub fn new(config: QueueConfig) -> Self {
62        Self {
63            config,
64            jobs: DashMap::new(),
65            pending_queue: Arc::new(RwLock::new(BinaryHeap::new())),
66            stats: Arc::new(RwLock::new(QueueStats::default())),
67        }
68    }
69    
70    /// Update statistics based on job state changes
71    fn update_stats(&self, old_state: Option<JobState>, new_state: JobState) {
72        let mut stats = self.stats.write();
73        
74        // Decrement old state count
75        if let Some(old) = old_state {
76            match old {
77                JobState::Pending => stats.pending_jobs = stats.pending_jobs.saturating_sub(1),
78                JobState::Processing => stats.processing_jobs = stats.processing_jobs.saturating_sub(1),
79                JobState::Completed => stats.completed_jobs = stats.completed_jobs.saturating_sub(1),
80                JobState::Failed => stats.failed_jobs = stats.failed_jobs.saturating_sub(1),
81                JobState::Dead => stats.dead_jobs = stats.dead_jobs.saturating_sub(1),
82            }
83        } else {
84            // New job
85            stats.total_jobs += 1;
86        }
87        
88        // Increment new state count
89        match new_state {
90            JobState::Pending => stats.pending_jobs += 1,
91            JobState::Processing => stats.processing_jobs += 1,
92            JobState::Completed => stats.completed_jobs += 1,
93            JobState::Failed => stats.failed_jobs += 1,
94            JobState::Dead => stats.dead_jobs += 1,
95        }
96    }
97    
98    /// Get the next ready job from the queue
99    fn get_next_ready_job(&self) -> Option<JobEntry> {
100        let mut queue = self.pending_queue.write();
101        let now = Utc::now();
102        
103        // Look for a ready job at the top of the heap
104        while let Some(priority_entry) = queue.peek() {
105            // If the job has been removed from the main map, discard it from the pending queue.
106            // This handles "ghost" jobs that might remain in the BinaryHeap after being removed
107            // via `remove_job`.
108            if !self.jobs.contains_key(&priority_entry.entry.id()) {
109                queue.pop();
110                continue; 
111            }
112            
113            if priority_entry.entry.is_ready() {
114                let priority_entry = queue.pop().unwrap();
115                return Some(priority_entry.entry);
116            } else if priority_entry.entry.run_at() > now {
117                // No ready jobs (heap is ordered by run_at)
118                break;
119            } else {
120                // Job exists but may not be ready due to other conditions
121                queue.pop();
122            }
123        }
124        
125        None
126    }
127}
128
129#[async_trait]
130impl QueueBackend for MemoryBackend {
131    async fn enqueue(&self, job: JobEntry) -> QueueResult<JobId> {
132        let job_id = job.id();
133        
134        // Check queue size limit
135        if *self.config.get_max_queue_size() > 0 && self.jobs.len() >= *self.config.get_max_queue_size() {
136            return Err(QueueError::Configuration(
137                format!("Queue size limit exceeded: {}", *self.config.get_max_queue_size())
138            ));
139        }
140        
141        // Update stats
142        self.update_stats(None, job.state().clone());
143        
144        // Add to pending queue if ready or pending
145        if job.state() == &JobState::Pending {
146            let priority_entry = PriorityJobEntry {
147                entry: job.clone(),
148                enqueue_time: Instant::now(),
149            };
150            self.pending_queue.write().push(priority_entry);
151        }
152        
153        // Store the job
154        self.jobs.insert(job_id, job);
155        
156        Ok(job_id)
157    }
158    
159    async fn dequeue(&self) -> QueueResult<Option<JobEntry>> {
160        if let Some(mut job) = self.get_next_ready_job() {
161            let old_state = job.state().clone();
162            job.mark_processing();
163            
164            // Update stats
165            self.update_stats(Some(old_state), job.state().clone());
166            
167            // Update stored job
168            self.jobs.insert(job.id(), job.clone());
169            
170            Ok(Some(job))
171        } else {
172            Ok(None)
173        }
174    }
175    
176    async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()> {
177        if let Some(mut job_entry) = self.jobs.get_mut(&job_id) {
178            let old_state = job_entry.state().clone();
179            
180            match result {
181                Ok(_) => {
182                    job_entry.mark_completed();
183                    self.update_stats(Some(old_state), job_entry.state().clone());
184                }
185                Err(error) => {
186                    let error_message = error.to_string();
187                    job_entry.mark_failed(error_message);
188                    let new_state = job_entry.state().clone();
189                    
190                    self.update_stats(Some(old_state), new_state.clone());
191                    
192                    // Re-queue for retry if not dead
193                    if new_state == JobState::Failed {
194                        let priority_entry = PriorityJobEntry {
195                            entry: job_entry.clone(),
196                            enqueue_time: Instant::now(),
197                        };
198                        self.pending_queue.write().push(priority_entry);
199                    }
200                }
201            }
202            Ok(())
203        } else {
204            Err(QueueError::JobNotFound(job_id.to_string()))
205        }
206    }
207    
208    async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>> {
209        Ok(self.jobs.get(&job_id).map(|entry| entry.clone()))
210    }
211    
212    async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>> {
213        let mut jobs: Vec<JobEntry> = self.jobs
214            .iter()
215            .filter(|entry| entry.state() == &state)
216            .map(|entry| entry.clone())
217            .collect();
218        
219        // Sort by created_at for consistent ordering
220        jobs.sort_by(|a, b| a.created_at.cmp(&b.created_at));
221        
222        if let Some(limit) = limit {
223            jobs.truncate(limit);
224        }
225        
226        Ok(jobs)
227    }
228    
229    async fn remove_job(&self, job_id: JobId) -> QueueResult<bool> {
230        if let Some((_, job)) = self.jobs.remove(&job_id) {
231            // Update stats
232            let mut stats = self.stats.write();
233            match job.state() {
234                JobState::Pending => stats.pending_jobs = stats.pending_jobs.saturating_sub(1),
235                JobState::Processing => stats.processing_jobs = stats.processing_jobs.saturating_sub(1),
236                JobState::Completed => stats.completed_jobs = stats.completed_jobs.saturating_sub(1),
237                JobState::Failed => stats.failed_jobs = stats.failed_jobs.saturating_sub(1),
238                JobState::Dead => stats.dead_jobs = stats.dead_jobs.saturating_sub(1),
239            }
240            stats.total_jobs = stats.total_jobs.saturating_sub(1);
241            
242            Ok(true)
243        } else {
244            Ok(false)
245        }
246    }
247    
248    async fn clear(&self) -> QueueResult<()> {
249        self.jobs.clear();
250        self.pending_queue.write().clear();
251        *self.stats.write() = QueueStats::default();
252        Ok(())
253    }
254    
255    async fn stats(&self) -> QueueResult<QueueStats> {
256        Ok(self.stats.read().clone())
257    }
258    
259    /// Atomic requeue implementation for memory backend
260    async fn requeue_job(&self, job_id: JobId, _job: JobEntry) -> QueueResult<bool> {
261        // For memory backend, we can make this atomic using the DashMap's atomic operations
262        if let Some(mut existing_job) = self.jobs.get_mut(&job_id) {
263            if existing_job.state() == &JobState::Dead {
264                // Reset the job for retry
265                existing_job.reset_for_retry();
266                
267                // Add back to pending queue if it's ready
268                if existing_job.is_ready() {
269                    let priority_entry = PriorityJobEntry {
270                        entry: existing_job.clone(),
271                        enqueue_time: Instant::now(),
272                    };
273                    self.pending_queue.write().push(priority_entry);
274                }
275                
276                // Update stats - move from dead back to pending
277                let mut stats = self.stats.write();
278                stats.dead_jobs = stats.dead_jobs.saturating_sub(1);
279                stats.pending_jobs = stats.pending_jobs.saturating_add(1);
280                
281                Ok(true)
282            } else {
283                Ok(false)
284            }
285        } else {
286            Ok(false)
287        }
288    }
289    
290    /// Atomic clear jobs by state implementation for memory backend
291    async fn clear_jobs_by_state(&self, state: JobState) -> QueueResult<u64> {
292        let mut count = 0u64;
293        let mut stats = self.stats.write();
294        
295        // Use retain to atomically filter and count removed jobs
296        self.jobs.retain(|_, job| {
297            if job.state() == &state {
298                count += 1;
299                
300                // Update stats for removed job
301                match state {
302                    JobState::Pending => stats.pending_jobs = stats.pending_jobs.saturating_sub(1),
303                    JobState::Processing => stats.processing_jobs = stats.processing_jobs.saturating_sub(1),
304                    JobState::Completed => stats.completed_jobs = stats.completed_jobs.saturating_sub(1),
305                    JobState::Failed => stats.failed_jobs = stats.failed_jobs.saturating_sub(1),
306                    JobState::Dead => stats.dead_jobs = stats.dead_jobs.saturating_sub(1),
307                }
308                
309                false // Remove this job
310            } else {
311                true // Keep this job
312            }
313        });
314        
315        // Also remove any matching jobs from the pending queue if we're clearing pending jobs
316        if state == JobState::Pending {
317            let mut pending_queue = self.pending_queue.write();
318            pending_queue.retain(|priority_entry| {
319                priority_entry.entry.state() != &JobState::Pending
320            });
321        }
322        
323        stats.total_jobs = stats.total_jobs.saturating_sub(count);
324        Ok(count)
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use crate::{Job, Priority};
332    use serde::{Deserialize, Serialize};
333    use std::time::Duration;
334    
335    #[derive(Debug, Clone, Serialize, Deserialize)]
336    struct TestJob {
337        id: u32,
338        message: String,
339    }
340    
341    #[async_trait]
342    impl Job for TestJob {
343        async fn execute(&self) -> JobResult<()> {
344            Ok(())
345        }
346        
347        fn job_type(&self) -> &'static str {
348            "test"
349        }
350    }
351    
352    #[tokio::test]
353    async fn test_memory_backend_basic_operations() {
354        let backend = MemoryBackend::new(QueueConfig::default());
355        
356        let job = TestJob {
357            id: 1,
358            message: "test job".to_string(),
359        };
360        let entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
361        let job_id = entry.id();
362        
363        // Enqueue job
364        backend.enqueue(entry).await.unwrap();
365        
366        // Check stats
367        let stats = backend.stats().await.unwrap();
368        assert_eq!(stats.pending_jobs, 1);
369        assert_eq!(stats.total_jobs, 1);
370        
371        // Dequeue job
372        let dequeued = backend.dequeue().await.unwrap().unwrap();
373        assert_eq!(dequeued.id(), job_id);
374        assert_eq!(dequeued.state(), &JobState::Processing);
375        
376        // Complete job
377        backend.complete(job_id, Ok(())).await.unwrap();
378        
379        // Check final stats
380        let stats = backend.stats().await.unwrap();
381        assert_eq!(stats.completed_jobs, 1);
382        assert_eq!(stats.processing_jobs, 0);
383        assert_eq!(stats.pending_jobs, 0);
384    }
385    
386    #[tokio::test]
387    async fn test_priority_ordering() {
388        let backend = MemoryBackend::new(QueueConfig::default());
389        
390        // Enqueue jobs with different priorities
391        let low_job = TestJob { id: 1, message: "low".to_string() };
392        let high_job = TestJob { id: 2, message: "high".to_string() };
393        let normal_job = TestJob { id: 3, message: "normal".to_string() };
394        
395        let low_entry = JobEntry::new(low_job, Some(Priority::Low), None).unwrap();
396        let high_entry = JobEntry::new(high_job, Some(Priority::High), None).unwrap();
397        let normal_entry = JobEntry::new(normal_job, Some(Priority::Normal), None).unwrap();
398        
399        backend.enqueue(low_entry).await.unwrap();
400        backend.enqueue(high_entry).await.unwrap();
401        backend.enqueue(normal_entry).await.unwrap();
402        
403        // Dequeue should return high priority first
404        let first = backend.dequeue().await.unwrap().unwrap();
405        assert_eq!(first.priority(), Priority::High);
406        
407        let second = backend.dequeue().await.unwrap().unwrap();
408        assert_eq!(second.priority(), Priority::Normal);
409        
410        let third = backend.dequeue().await.unwrap().unwrap();
411        assert_eq!(third.priority(), Priority::Low);
412    }
413    
414    #[tokio::test]
415    async fn test_ghost_job_cleanup() {
416        let config = crate::config::QueueConfigBuilder::testing().build().expect("Failed to build config");
417        let backend = MemoryBackend::new(config);
418        
419        // Create and enqueue a job
420        let job = TestJob { id: 1, message: "ghost test".to_string() };
421        let entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
422        let job_id = backend.enqueue(entry).await.unwrap();
423        
424        // Verify job is in both jobs map and pending queue
425        assert!(backend.jobs.contains_key(&job_id));
426        assert_eq!(backend.pending_queue.read().len(), 1);
427        
428        // Remove job directly from jobs map (simulating a manual removal)
429        backend.jobs.remove(&job_id);
430        
431        // The pending queue still contains the ghost entry
432        assert_eq!(backend.pending_queue.read().len(), 1);
433        
434        // But dequeue should clean up the ghost and return None
435        let result = backend.dequeue().await.unwrap();
436        assert!(result.is_none());
437        
438        // The pending queue should now be empty (ghost cleaned up)
439        assert_eq!(backend.pending_queue.read().len(), 0);
440    }
441    
442    #[tokio::test]
443    async fn test_multiple_ghost_jobs_cleanup() {
444        let config = crate::config::QueueConfigBuilder::testing().build().expect("Failed to build config");
445        let backend = MemoryBackend::new(config);
446        
447        // Create multiple jobs
448        let mut job_ids = Vec::new();
449        for i in 1..=5 {
450            let job = TestJob { id: i, message: format!("ghost test {}", i) };
451            let entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
452            let job_id = backend.enqueue(entry).await.unwrap();
453            job_ids.push(job_id);
454        }
455        
456        // Verify all jobs are queued
457        assert_eq!(backend.pending_queue.read().len(), 5);
458        assert_eq!(backend.jobs.len(), 5);
459        
460        // Remove first 3 jobs from jobs map (creating ghosts)
461        for &job_id in &job_ids[0..3] {
462            backend.jobs.remove(&job_id);
463        }
464        
465        // Pending queue still has all 5 entries
466        assert_eq!(backend.pending_queue.read().len(), 5);
467        assert_eq!(backend.jobs.len(), 2);
468        
469        // First dequeue should skip the 3 ghost jobs and return the 4th job
470        let result = backend.dequeue().await.unwrap().unwrap();
471        assert_eq!(result.payload.get("id").unwrap().as_u64().unwrap(), 4);
472        
473        // Pending queue should now have cleaned up the ghosts plus the dequeued job
474        assert_eq!(backend.pending_queue.read().len(), 1);
475    }
476    
477    #[tokio::test]
478    async fn test_delayed_job() {
479        let backend = MemoryBackend::new(QueueConfig::default());
480        
481        let job = TestJob {
482            id: 1,
483            message: "delayed job".to_string(),
484        };
485        let delay = Duration::from_millis(100);
486        let entry = JobEntry::new(job, None, Some(delay)).unwrap();
487        
488        backend.enqueue(entry).await.unwrap();
489        
490        // Should not be available immediately
491        let result = backend.dequeue().await.unwrap();
492        assert!(result.is_none());
493        
494        // Wait for delay
495        tokio::time::sleep(delay + Duration::from_millis(10)).await;
496        
497        // Should be available now
498        let result = backend.dequeue().await.unwrap();
499        assert!(result.is_some());
500    }
501    
502    #[tokio::test]
503    async fn test_job_failure_and_retry() {
504        let backend = MemoryBackend::new(QueueConfig::default());
505        
506        let job = TestJob {
507            id: 1,
508            message: "failing job".to_string(),
509        };
510        let entry = JobEntry::new(job, None, None).unwrap();
511        let job_id = entry.id();
512        
513        backend.enqueue(entry).await.unwrap();
514        
515        // Dequeue and fail the job
516        let _job_entry = backend.dequeue().await.unwrap().unwrap();
517        let error = Box::new(std::io::Error::new(std::io::ErrorKind::Other, "test error"));
518        backend.complete(job_id, Err(error)).await.unwrap();
519        
520        // Job should be marked as failed and available for retry
521        let stats = backend.stats().await.unwrap();
522        assert_eq!(stats.failed_jobs, 1);
523        assert_eq!(stats.processing_jobs, 0);
524        
525        // Should be able to get job by state
526        let failed_jobs = backend.get_jobs_by_state(JobState::Failed, None).await.unwrap();
527        assert_eq!(failed_jobs.len(), 1);
528        assert_eq!(failed_jobs[0].attempts(), 1);
529    }
530    
531    #[tokio::test]
532    async fn test_queue_size_limit() {
533        let config = crate::config::QueueConfigBuilder::new()
534            .max_queue_size(2)
535            .build().expect("Failed to build config");
536        let backend = MemoryBackend::new(config);
537        
538        // Enqueue up to limit
539        for i in 1..=2 {
540            let job = TestJob { id: i, message: format!("job {}", i) };
541            let entry = JobEntry::new(job, None, None).unwrap();
542            backend.enqueue(entry).await.unwrap();
543        }
544        
545        // Third job should fail
546        let job = TestJob { id: 3, message: "overflow job".to_string() };
547        let entry = JobEntry::new(job, None, None).unwrap();
548        let result = backend.enqueue(entry).await;
549        
550        assert!(result.is_err());
551        assert!(matches!(result.unwrap_err(), QueueError::Configuration(_)));
552    }
553}