Skip to main content

vex_queue/
memory.rs

1//! In-memory queue implementation with priority scheduling
2
3use async_trait::async_trait;
4use chrono::{DateTime, Duration, Utc};
5use std::cmp::Ordering;
6use std::collections::{BinaryHeap, HashMap};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::backend::{QueueBackend, QueueError};
12use crate::job::{JobEntry, JobStatus};
13
14/// Priority entry for the heap - orders by run_at time (earliest first)
15#[derive(Debug, Clone, Eq, PartialEq)]
16struct PriorityEntry {
17    run_at: DateTime<Utc>,
18    id: Uuid,
19}
20
21impl Ord for PriorityEntry {
22    fn cmp(&self, other: &Self) -> Ordering {
23        // Reverse order: earlier run_at = higher priority
24        other.run_at.cmp(&self.run_at)
25    }
26}
27
28impl PartialOrd for PriorityEntry {
29    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
30        Some(self.cmp(other))
31    }
32}
33
34#[derive(Debug, Default)]
35pub struct MemoryQueue {
36    jobs: Arc<RwLock<HashMap<Uuid, JobEntry>>>,
37    queue: Arc<RwLock<BinaryHeap<PriorityEntry>>>,
38}
39
40impl MemoryQueue {
41    pub fn new() -> Self {
42        Self::default()
43    }
44}
45
46#[async_trait]
47impl QueueBackend for MemoryQueue {
48    async fn enqueue(
49        &self,
50        tenant_id: &str,
51        job_type: &str,
52        payload: serde_json::Value,
53        delay_secs: Option<u64>,
54    ) -> Result<Uuid, QueueError> {
55        let id = Uuid::new_v4();
56        let now = Utc::now();
57        let run_at = now + Duration::seconds(delay_secs.unwrap_or(0) as i64);
58
59        let entry = JobEntry {
60            id,
61            tenant_id: tenant_id.to_string(),
62            job_type: job_type.to_string(),
63            payload,
64            status: JobStatus::Pending,
65            created_at: now,
66            run_at,
67            attempts: 0,
68            last_error: None,
69            result: None,
70        };
71
72        let mut jobs = self.jobs.write().await;
73        jobs.insert(id, entry);
74
75        let mut queue = self.queue.write().await;
76        queue.push(PriorityEntry { run_at, id });
77
78        Ok(id)
79    }
80
81    async fn dequeue(&self) -> Result<Option<JobEntry>, QueueError> {
82        let mut queue = self.queue.write().await;
83        let now = Utc::now();
84
85        // Peek at the earliest job
86        if let Some(entry) = queue.peek() {
87            if entry.run_at <= now {
88                // Job is ready - pop it
89                let entry = queue.pop().unwrap();
90                let mut jobs = self.jobs.write().await;
91
92                if let Some(job) = jobs.get_mut(&entry.id) {
93                    // Only process if still pending
94                    if job.status == JobStatus::Pending {
95                        job.status = JobStatus::Running;
96                        return Ok(Some(job.clone()));
97                    }
98                }
99            }
100        }
101
102        Ok(None)
103    }
104
105    async fn update_status(
106        &self,
107        id: Uuid,
108        status: JobStatus,
109        error: Option<String>,
110        delay_secs: Option<u64>,
111    ) -> Result<(), QueueError> {
112        let mut jobs = self.jobs.write().await;
113
114        if let Some(job) = jobs.get_mut(&id) {
115            job.status = status;
116            job.last_error = error;
117            job.attempts += if matches!(status, JobStatus::Failed(_)) {
118                1
119            } else {
120                0
121            };
122
123            if let JobStatus::Failed(retry_count) = status {
124                // Use provided delay or fall back to exponential backoff
125                let backoff_secs = delay_secs.unwrap_or_else(|| {
126                    2_u64.pow(retry_count.min(6)) // Default: exponential, cap at ~1 min
127                });
128                let run_at = Utc::now() + Duration::seconds(backoff_secs as i64);
129                job.run_at = run_at;
130                job.status = JobStatus::Pending; // Reset to pending for retry
131
132                tracing::debug!(
133                    job_id = %id,
134                    retry_count = retry_count,
135                    delay_secs = backoff_secs,
136                    "Re-queuing job with backoff"
137                );
138
139                let mut queue = self.queue.write().await;
140                queue.push(PriorityEntry { run_at, id });
141            }
142        }
143
144        Ok(())
145    }
146
147    async fn get_status(&self, tenant_id: &str, id: Uuid) -> Result<JobStatus, QueueError> {
148        let jobs = self.jobs.read().await;
149        let job = jobs.get(&id).ok_or(QueueError::NotFound)?;
150
151        if job.tenant_id != tenant_id {
152            return Err(QueueError::NotFound);
153        }
154
155        Ok(job.status)
156    }
157
158    async fn get_job(&self, tenant_id: &str, id: Uuid) -> Result<JobEntry, QueueError> {
159        let jobs = self.jobs.read().await;
160        let job = jobs.get(&id).ok_or(QueueError::NotFound)?;
161
162        if job.tenant_id != tenant_id {
163            return Err(QueueError::NotFound);
164        }
165
166        Ok(job.clone())
167    }
168
169    async fn set_result(&self, id: Uuid, result: serde_json::Value) -> Result<(), QueueError> {
170        let mut jobs = self.jobs.write().await;
171        if let Some(job) = jobs.get_mut(&id) {
172            job.result = Some(result);
173            Ok(())
174        } else {
175            Err(QueueError::NotFound)
176        }
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use serde_json::json;
184
185    #[tokio::test]
186    async fn test_enqueue_dequeue() {
187        let queue = MemoryQueue::new();
188        let payload = json!({ "foo": "bar" });
189
190        // Enqueue
191        let id = queue
192            .enqueue("test-tenant", "test_job", payload.clone(), None)
193            .await
194            .unwrap();
195
196        let status = queue.get_status("test-tenant", id).await.unwrap();
197        assert_eq!(status, JobStatus::Pending);
198
199        // Dequeue
200        let job = queue.dequeue().await.unwrap().expect("Should have job");
201        assert_eq!(job.id, id);
202        assert_eq!(job.job_type, "test_job");
203        assert_eq!(job.status, JobStatus::Running);
204
205        // Dequeue empty
206        let empty = queue.dequeue().await.unwrap();
207        assert!(empty.is_none());
208    }
209
210    #[tokio::test]
211    async fn test_delayed_job() {
212        let queue = MemoryQueue::new();
213        let payload = json!({});
214
215        // Enqueue with delay
216        let id = queue
217            .enqueue("test-tenant", "delayed", payload, Some(1))
218            .await
219            .unwrap();
220
221        // Should be none immediately
222        let job = queue.dequeue().await.unwrap();
223        assert!(job.is_none());
224
225        // Wait
226        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
227
228        // Should be available (Note: MemoryQueue checks run_at > now)
229        let job = queue
230            .dequeue()
231            .await
232            .unwrap()
233            .expect("Should have delayed job");
234        assert_eq!(job.id, id);
235    }
236}