Skip to main content

reddb_server/storage/queue/
deque.rs

1//! Core Queue / Deque implementation
2//!
3//! Uses a BTreeMap keyed by monotonically increasing sequence numbers
4//! for FIFO ordering. For priority mode, keys are (Reverse(priority), sequence)
5//! to ensure highest-priority messages are dequeued first.
6
7use std::collections::BTreeMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10use crate::storage::schema::Value;
11
12/// Which end of the queue to push/pop from
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum QueueSide {
15    Left,  // front (head)
16    Right, // back (tail) — default for push
17}
18
19/// A message in the queue
20#[derive(Debug, Clone)]
21pub struct QueueMessage {
22    /// Monotonic sequence number
23    pub seq: u64,
24    /// Message payload
25    pub payload: Value,
26    /// Optional priority (higher = more urgent)
27    pub priority: Option<i32>,
28    /// Enqueue timestamp (nanoseconds)
29    pub enqueued_at_ns: u64,
30    /// Delivery attempts
31    pub attempts: u32,
32}
33
34/// Core queue data structure supporting FIFO, LIFO, and priority modes.
35pub struct QueueStore {
36    /// Messages keyed by sequence number
37    messages: BTreeMap<u64, QueueMessage>,
38    /// Next sequence number
39    next_seq: AtomicU64,
40    /// Maximum queue size (0 = unlimited)
41    max_size: usize,
42    /// Whether this is a priority queue
43    priority_mode: bool,
44    /// Priority index: (Reverse(priority), seq) for priority ordering
45    priority_index: Option<BTreeMap<(std::cmp::Reverse<i32>, u64), u64>>,
46}
47
48impl QueueStore {
49    /// Create a new FIFO queue
50    pub fn new(max_size: usize) -> Self {
51        Self {
52            messages: BTreeMap::new(),
53            next_seq: AtomicU64::new(1),
54            max_size,
55            priority_mode: false,
56            priority_index: None,
57        }
58    }
59
60    /// Create a priority queue
61    pub fn new_priority(max_size: usize) -> Self {
62        Self {
63            messages: BTreeMap::new(),
64            next_seq: AtomicU64::new(1),
65            max_size,
66            priority_mode: true,
67            priority_index: Some(BTreeMap::new()),
68        }
69    }
70
71    /// Push a message to the back (RPUSH). Returns the sequence number.
72    pub fn push_back(&mut self, payload: Value, priority: Option<i32>) -> Result<u64, QueueError> {
73        if self.max_size > 0 && self.messages.len() >= self.max_size {
74            return Err(QueueError::Full);
75        }
76        let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
77        let now_ns = std::time::SystemTime::now()
78            .duration_since(std::time::UNIX_EPOCH)
79            .unwrap_or_default()
80            .as_nanos() as u64;
81        let msg = QueueMessage {
82            seq,
83            payload,
84            priority,
85            enqueued_at_ns: now_ns,
86            attempts: 0,
87        };
88        if let Some(ref mut idx) = self.priority_index {
89            idx.insert((std::cmp::Reverse(priority.unwrap_or(0)), seq), seq);
90        }
91        self.messages.insert(seq, msg);
92        Ok(seq)
93    }
94
95    /// Push a message to the front (LPUSH). Returns the sequence number.
96    pub fn push_front(&mut self, payload: Value, priority: Option<i32>) -> Result<u64, QueueError> {
97        // LPUSH uses seq=0 trick — we assign a sequence below the current minimum
98        if self.max_size > 0 && self.messages.len() >= self.max_size {
99            return Err(QueueError::Full);
100        }
101        let seq = self
102            .messages
103            .keys()
104            .next()
105            .copied()
106            .unwrap_or(1)
107            .saturating_sub(1);
108        let now_ns = std::time::SystemTime::now()
109            .duration_since(std::time::UNIX_EPOCH)
110            .unwrap_or_default()
111            .as_nanos() as u64;
112        let msg = QueueMessage {
113            seq,
114            payload,
115            priority,
116            enqueued_at_ns: now_ns,
117            attempts: 0,
118        };
119        if let Some(ref mut idx) = self.priority_index {
120            idx.insert((std::cmp::Reverse(priority.unwrap_or(0)), seq), seq);
121        }
122        self.messages.insert(seq, msg);
123        Ok(seq)
124    }
125
126    /// Pop from the front (LPOP) — FIFO dequeue. For priority queues, pops highest priority.
127    pub fn pop_front(&mut self) -> Option<QueueMessage> {
128        if self.priority_mode {
129            if let Some(ref mut idx) = self.priority_index {
130                let key = idx.keys().next().copied()?;
131                let seq = idx.remove(&key)?;
132                return self.messages.remove(&seq);
133            }
134        }
135        let seq = *self.messages.keys().next()?;
136        let msg = self.messages.remove(&seq)?;
137        if let Some(ref mut idx) = self.priority_index {
138            idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
139        }
140        Some(msg)
141    }
142
143    /// Pop from the back (RPOP) — LIFO dequeue
144    pub fn pop_back(&mut self) -> Option<QueueMessage> {
145        let seq = *self.messages.keys().next_back()?;
146        let msg = self.messages.remove(&seq)?;
147        if let Some(ref mut idx) = self.priority_index {
148            idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
149        }
150        Some(msg)
151    }
152
153    /// Peek at the front message without removing it
154    pub fn peek_front(&self, count: usize) -> Vec<&QueueMessage> {
155        if self.priority_mode {
156            if let Some(ref idx) = self.priority_index {
157                return idx
158                    .values()
159                    .take(count)
160                    .filter_map(|seq| self.messages.get(seq))
161                    .collect();
162            }
163        }
164        self.messages.values().take(count).collect()
165    }
166
167    /// Number of messages in the queue
168    pub fn len(&self) -> usize {
169        self.messages.len()
170    }
171
172    /// Whether the queue is empty
173    pub fn is_empty(&self) -> bool {
174        self.messages.is_empty()
175    }
176
177    /// Whether the queue is full
178    pub fn is_full(&self) -> bool {
179        self.max_size > 0 && self.messages.len() >= self.max_size
180    }
181
182    /// Remove all messages
183    pub fn purge(&mut self) -> usize {
184        let count = self.messages.len();
185        self.messages.clear();
186        if let Some(ref mut idx) = self.priority_index {
187            idx.clear();
188        }
189        count
190    }
191
192    /// Get a message by sequence number (for ack/nack)
193    pub fn get(&self, seq: u64) -> Option<&QueueMessage> {
194        self.messages.get(&seq)
195    }
196
197    /// Remove a specific message by sequence (for ack)
198    pub fn remove(&mut self, seq: u64) -> Option<QueueMessage> {
199        let msg = self.messages.remove(&seq)?;
200        if let Some(ref mut idx) = self.priority_index {
201            idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
202        }
203        Some(msg)
204    }
205
206    /// Increment attempt count for a message
207    pub fn increment_attempts(&mut self, seq: u64) -> Option<u32> {
208        if let Some(msg) = self.messages.get_mut(&seq) {
209            msg.attempts += 1;
210            Some(msg.attempts)
211        } else {
212            None
213        }
214    }
215
216    /// Whether this is a priority queue
217    pub fn is_priority(&self) -> bool {
218        self.priority_mode
219    }
220
221    /// Approximate memory usage
222    pub fn memory_bytes(&self) -> usize {
223        let mut size = std::mem::size_of::<Self>();
224        size += self.messages.len() * (std::mem::size_of::<QueueMessage>() + 48);
225        if let Some(ref idx) = self.priority_index {
226            size += idx.len() * 32;
227        }
228        size
229    }
230}
231
232/// Queue errors
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub enum QueueError {
235    /// Queue is at maximum capacity
236    Full,
237    /// Message not found
238    NotFound(u64),
239}
240
241impl std::fmt::Display for QueueError {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        match self {
244            Self::Full => write!(f, "queue is full"),
245            Self::NotFound(seq) => write!(f, "message {} not found", seq),
246        }
247    }
248}
249
250impl std::error::Error for QueueError {}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn test_queue_fifo() {
258        let mut q = QueueStore::new(0);
259        q.push_back(Value::text("first"), None).unwrap();
260        q.push_back(Value::text("second"), None).unwrap();
261        q.push_back(Value::text("third"), None).unwrap();
262
263        assert_eq!(q.len(), 3);
264        let msg = q.pop_front().unwrap();
265        assert_eq!(msg.payload, Value::text("first"));
266        let msg = q.pop_front().unwrap();
267        assert_eq!(msg.payload, Value::text("second"));
268    }
269
270    #[test]
271    fn test_queue_lifo() {
272        let mut q = QueueStore::new(0);
273        q.push_back(Value::text("first"), None).unwrap();
274        q.push_back(Value::text("second"), None).unwrap();
275
276        let msg = q.pop_back().unwrap();
277        assert_eq!(msg.payload, Value::text("second"));
278    }
279
280    #[test]
281    fn test_queue_lpush() {
282        let mut q = QueueStore::new(0);
283        q.push_back(Value::text("middle"), None).unwrap();
284        q.push_front(Value::text("front"), None).unwrap();
285
286        let msg = q.pop_front().unwrap();
287        assert_eq!(msg.payload, Value::text("front"));
288    }
289
290    #[test]
291    fn test_queue_max_size() {
292        let mut q = QueueStore::new(2);
293        assert!(q.push_back(Value::Integer(1), None).is_ok());
294        assert!(q.push_back(Value::Integer(2), None).is_ok());
295        assert_eq!(q.push_back(Value::Integer(3), None), Err(QueueError::Full));
296        assert!(q.is_full());
297    }
298
299    #[test]
300    fn test_queue_priority() {
301        let mut q = QueueStore::new_priority(0);
302        q.push_back(Value::text("low"), Some(1)).unwrap();
303        q.push_back(Value::text("high"), Some(10)).unwrap();
304        q.push_back(Value::text("medium"), Some(5)).unwrap();
305
306        // Highest priority should come first
307        let msg = q.pop_front().unwrap();
308        assert_eq!(msg.payload, Value::text("high"));
309        let msg = q.pop_front().unwrap();
310        assert_eq!(msg.payload, Value::text("medium"));
311        let msg = q.pop_front().unwrap();
312        assert_eq!(msg.payload, Value::text("low"));
313    }
314
315    #[test]
316    fn test_queue_peek() {
317        let mut q = QueueStore::new(0);
318        q.push_back(Value::text("a"), None).unwrap();
319        q.push_back(Value::text("b"), None).unwrap();
320        q.push_back(Value::text("c"), None).unwrap();
321
322        let peeked = q.peek_front(2);
323        assert_eq!(peeked.len(), 2);
324        assert_eq!(q.len(), 3); // not removed
325    }
326
327    #[test]
328    fn test_queue_purge() {
329        let mut q = QueueStore::new(0);
330        q.push_back(Value::Integer(1), None).unwrap();
331        q.push_back(Value::Integer(2), None).unwrap();
332
333        let purged = q.purge();
334        assert_eq!(purged, 2);
335        assert!(q.is_empty());
336    }
337
338    #[test]
339    fn test_queue_remove_by_seq() {
340        let mut q = QueueStore::new(0);
341        let seq1 = q.push_back(Value::Integer(1), None).unwrap();
342        let seq2 = q.push_back(Value::Integer(2), None).unwrap();
343
344        let removed = q.remove(seq1).unwrap();
345        assert_eq!(removed.payload, Value::Integer(1));
346        assert_eq!(q.len(), 1);
347        assert!(q.get(seq2).is_some());
348    }
349
350    #[test]
351    fn test_queue_attempts() {
352        let mut q = QueueStore::new(0);
353        let seq = q.push_back(Value::text("msg"), None).unwrap();
354
355        assert_eq!(q.get(seq).unwrap().attempts, 0);
356        q.increment_attempts(seq);
357        assert_eq!(q.get(seq).unwrap().attempts, 1);
358        q.increment_attempts(seq);
359        assert_eq!(q.get(seq).unwrap().attempts, 2);
360    }
361}