Skip to main content

oxidite_queue/
queue.rs

1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5use crate::job::{JobWrapper, JobStatus};
6use crate::stats::StatsTracker;
7use crate::Result;
8
9/// Queue backend trait
10#[async_trait]
11pub trait QueueBackend: Send + Sync {
12    async fn enqueue(&self, job: JobWrapper) -> Result<()>;
13    async fn dequeue(&self) -> Result<Option<JobWrapper>>;
14    async fn complete(&self, job_id: &str) -> Result<()>;
15    async fn fail(&self, job_id: &str, error: String) -> Result<()>;
16    async fn retry(&self, job: JobWrapper) -> Result<()>;
17    async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()>;
18    async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>>;
19    async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()>;
20    async fn clear(&self) -> Result<()>;
21}
22
23/// In-memory queue backend
24pub struct MemoryBackend {
25    queue: Arc<Mutex<VecDeque<JobWrapper>>>,
26    dead_letter: Arc<Mutex<Vec<JobWrapper>>>,
27}
28
29impl MemoryBackend {
30    pub fn new() -> Self {
31        Self {
32            queue: Arc::new(Mutex::new(VecDeque::new())),
33            dead_letter: Arc::new(Mutex::new(Vec::new())),
34        }
35    }
36}
37
38impl Default for MemoryBackend {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44#[async_trait]
45impl QueueBackend for MemoryBackend {
46    async fn enqueue(&self, mut job: JobWrapper) -> Result<()> {
47        job.status = JobStatus::Pending;
48        let mut queue = self.queue.lock().await;
49        
50        // Insert based on priority (higher priority first)
51        let pos = queue.iter().position(|j| j.priority < job.priority)
52            .unwrap_or(queue.len());
53        queue.insert(pos, job);
54        
55        Ok(())
56    }
57
58    async fn dequeue(&self) -> Result<Option<JobWrapper>> {
59        let mut queue = self.queue.lock().await;
60        
61        // Find first job that can be run now
62        let now = chrono::Utc::now().timestamp();
63        let pos = queue.iter().position(|j| {
64            j.status == JobStatus::Pending &&
65            j.scheduled_at.map(|t| t <= now).unwrap_or(true)
66        });
67
68        if let Some(pos) = pos {
69            let mut job = queue.remove(pos).unwrap();
70            job.status = JobStatus::Running;
71            job.attempts += 1;
72            Ok(Some(job))
73        } else {
74            Ok(None)
75        }
76    }
77
78    async fn complete(&self, _job_id: &str) -> Result<()> {
79        // In memory backend doesn't need to track completed jobs
80        Ok(())
81    }
82
83    async fn fail(&self, _job_id: &str, _error: String) -> Result<()> {
84        // In memory backend doesn't need to track failed jobs
85        Ok(())
86    }
87
88    async fn retry(&self, job: JobWrapper) -> Result<()> {
89        self.enqueue(job).await
90    }
91
92    async fn move_to_dead_letter(&self, mut job: JobWrapper) -> Result<()> {
93        job.status = JobStatus::DeadLetter;
94        let mut dlq = self.dead_letter.lock().await;
95        dlq.push(job);
96        Ok(())
97    }
98
99    async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
100        let dlq = self.dead_letter.lock().await;
101        Ok(dlq.clone())
102    }
103
104    async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
105        let mut dlq = self.dead_letter.lock().await;
106        if let Some(pos) = dlq.iter().position(|j| j.id == job_id) {
107            let mut job = dlq.remove(pos);
108            job.status = JobStatus::Pending;
109            job.attempts = 0;
110            job.error = None;
111            drop(dlq); // Release lock before enqueue
112            self.enqueue(job).await?;
113        }
114        Ok(())
115    }
116
117    async fn clear(&self) -> Result<()> {
118        let mut queue = self.queue.lock().await;
119        queue.clear();
120        Ok(())
121    }
122}
123
124/// Queue for managing jobs
125pub struct Queue {
126    backend: Arc<dyn QueueBackend>,
127    stats: StatsTracker,
128}
129
130impl Queue {
131    pub fn new(backend: Arc<dyn QueueBackend>) -> Self {
132        Self {
133            backend,
134            stats: StatsTracker::new(),
135        }
136    }
137
138    pub fn memory() -> Self {
139        Self::new(Arc::new(MemoryBackend::new()))
140    }
141
142    pub async fn enqueue(&self, job: JobWrapper) -> Result<String> {
143        let job_id = job.id.clone();
144        self.backend.enqueue(job).await?;
145        self.stats.increment_enqueued().await;
146        Ok(job_id)
147    }
148
149    pub async fn dequeue(&self) -> Result<Option<JobWrapper>> {
150        let job = self.backend.dequeue().await?;
151        if job.is_some() {
152            self.stats.mark_running().await;
153        }
154        Ok(job)
155    }
156
157    pub async fn complete(&self, job_id: &str) -> Result<()> {
158        self.backend.complete(job_id).await?;
159        self.stats.increment_processed().await;
160        Ok(())
161    }
162
163    pub async fn fail(&self, job_id: &str, error: String) -> Result<()> {
164        self.backend.fail(job_id, error).await?;
165        self.stats.increment_failed().await;
166        Ok(())
167    }
168
169    pub async fn retry(&self, job: JobWrapper) -> Result<()> {
170        self.backend.retry(job).await?;
171        self.stats.increment_retried().await;
172        Ok(())
173    }
174
175    pub async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
176        self.backend.move_to_dead_letter(job).await?;
177        self.stats.increment_dead_letter().await;
178        Ok(())
179    }
180
181    pub async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
182        self.backend.list_dead_letter().await
183    }
184
185    pub async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
186        self.backend.retry_from_dead_letter(job_id).await
187    }
188
189    pub async fn clear(&self) -> Result<()> {
190        self.backend.clear().await?;
191        self.stats.reset().await;
192        Ok(())
193    }
194
195    pub async fn get_stats(&self) -> crate::stats::QueueStats {
196        self.stats.get_stats().await
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use crate::job::Job;
204    use serde::{Deserialize, Serialize};
205
206    #[derive(Serialize, Deserialize)]
207    struct TestJob {
208        value: i32,
209    }
210
211    #[async_trait::async_trait]
212    impl Job for TestJob {
213        async fn perform(&self) -> crate::Result<()> {
214            Ok(())
215        }
216    }
217
218    #[tokio::test]
219    async fn test_enqueue_dequeue() {
220        let queue = Queue::memory();
221        let job = JobWrapper::new(&TestJob { value: 42 }).unwrap();
222        
223        queue.enqueue(job).await.unwrap();
224        let dequeued = queue.dequeue().await.unwrap();
225        
226        assert!(dequeued.is_some());
227    }
228}