vex_queue/
memory.rs

1//! In-memory queue implementation with priority scheduling
2
3use async_trait::async_trait;
4use chrono::{DateTime, Duration, Utc};
5use std::cmp::Ordering;
6use std::collections::{BinaryHeap, HashMap};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::backend::{QueueBackend, QueueError};
12use crate::job::{JobEntry, JobStatus};
13
14/// Priority entry for the heap - orders by run_at time (earliest first)
15#[derive(Debug, Clone, Eq, PartialEq)]
16struct PriorityEntry {
17    run_at: DateTime<Utc>,
18    id: Uuid,
19}
20
21impl Ord for PriorityEntry {
22    fn cmp(&self, other: &Self) -> Ordering {
23        // Reverse order: earlier run_at = higher priority
24        other.run_at.cmp(&self.run_at)
25    }
26}
27
28impl PartialOrd for PriorityEntry {
29    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
30        Some(self.cmp(other))
31    }
32}
33
34#[derive(Debug, Default)]
35pub struct MemoryQueue {
36    jobs: Arc<RwLock<HashMap<Uuid, JobEntry>>>,
37    queue: Arc<RwLock<BinaryHeap<PriorityEntry>>>,
38}
39
40impl MemoryQueue {
41    pub fn new() -> Self {
42        Self::default()
43    }
44}
45
46#[async_trait]
47impl QueueBackend for MemoryQueue {
48    async fn enqueue(
49        &self,
50        job_type: &str,
51        payload: serde_json::Value,
52        delay_secs: Option<u64>,
53    ) -> Result<Uuid, QueueError> {
54        let id = Uuid::new_v4();
55        let now = Utc::now();
56        let run_at = now + Duration::seconds(delay_secs.unwrap_or(0) as i64);
57
58        let entry = JobEntry {
59            id,
60            job_type: job_type.to_string(),
61            payload,
62            status: JobStatus::Pending,
63            created_at: now,
64            run_at,
65            attempts: 0,
66            last_error: None,
67        };
68
69        let mut jobs = self.jobs.write().await;
70        jobs.insert(id, entry);
71
72        let mut queue = self.queue.write().await;
73        queue.push(PriorityEntry { run_at, id });
74
75        Ok(id)
76    }
77
78    async fn dequeue(&self) -> Result<Option<JobEntry>, QueueError> {
79        let mut queue = self.queue.write().await;
80        let now = Utc::now();
81
82        // Peek at the earliest job
83        if let Some(entry) = queue.peek() {
84            if entry.run_at <= now {
85                // Job is ready - pop it
86                let entry = queue.pop().unwrap();
87                let mut jobs = self.jobs.write().await;
88
89                if let Some(job) = jobs.get_mut(&entry.id) {
90                    // Only process if still pending
91                    if job.status == JobStatus::Pending {
92                        job.status = JobStatus::Running;
93                        return Ok(Some(job.clone()));
94                    }
95                }
96            }
97        }
98
99        Ok(None)
100    }
101
102    async fn update_status(
103        &self,
104        id: Uuid,
105        status: JobStatus,
106        error: Option<String>,
107        delay_secs: Option<u64>,
108    ) -> Result<(), QueueError> {
109        let mut jobs = self.jobs.write().await;
110
111        if let Some(job) = jobs.get_mut(&id) {
112            job.status = status;
113            job.last_error = error;
114            job.attempts += if matches!(status, JobStatus::Failed(_)) {
115                1
116            } else {
117                0
118            };
119
120            if let JobStatus::Failed(retry_count) = status {
121                // Use provided delay or fall back to exponential backoff
122                let backoff_secs = delay_secs.unwrap_or_else(|| {
123                    2_u64.pow(retry_count.min(6)) // Default: exponential, cap at ~1 min
124                });
125                let run_at = Utc::now() + Duration::seconds(backoff_secs as i64);
126                job.run_at = run_at;
127                job.status = JobStatus::Pending; // Reset to pending for retry
128
129                tracing::debug!(
130                    job_id = %id,
131                    retry_count = retry_count,
132                    delay_secs = backoff_secs,
133                    "Re-queuing job with backoff"
134                );
135
136                let mut queue = self.queue.write().await;
137                queue.push(PriorityEntry { run_at, id });
138            }
139        }
140
141        Ok(())
142    }
143
144    async fn get_status(&self, id: Uuid) -> Result<JobStatus, QueueError> {
145        let jobs = self.jobs.read().await;
146        jobs.get(&id).map(|j| j.status).ok_or(QueueError::NotFound)
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use serde_json::json;
154
155    #[tokio::test]
156    async fn test_enqueue_dequeue() {
157        let queue = MemoryQueue::new();
158        let payload = json!({ "foo": "bar" });
159
160        // Enqueue
161        let id = queue
162            .enqueue("test_job", payload.clone(), None)
163            .await
164            .unwrap();
165
166        let status = queue.get_status(id).await.unwrap();
167        assert_eq!(status, JobStatus::Pending);
168
169        // Dequeue
170        let job = queue.dequeue().await.unwrap().expect("Should have job");
171        assert_eq!(job.id, id);
172        assert_eq!(job.job_type, "test_job");
173        assert_eq!(job.status, JobStatus::Running);
174
175        // Dequeue empty
176        let empty = queue.dequeue().await.unwrap();
177        assert!(empty.is_none());
178    }
179
180    #[tokio::test]
181    async fn test_delayed_job() {
182        let queue = MemoryQueue::new();
183        let payload = json!({});
184
185        // Enqueue with delay
186        let id = queue.enqueue("delayed", payload, Some(1)).await.unwrap();
187
188        // Should be none immediately
189        let job = queue.dequeue().await.unwrap();
190        assert!(job.is_none());
191
192        // Wait
193        tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
194
195        // Should be available (Note: MemoryQueue checks run_at > now)
196        let job = queue
197            .dequeue()
198            .await
199            .unwrap()
200            .expect("Should have delayed job");
201        assert_eq!(job.id, id);
202    }
203}