reddb_server/storage/queue/
deque.rs1use std::collections::BTreeMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10use crate::storage::schema::Value;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum QueueSide {
15 Left, Right, }
18
19#[derive(Debug, Clone)]
21pub struct QueueMessage {
22 pub seq: u64,
24 pub payload: Value,
26 pub priority: Option<i32>,
28 pub enqueued_at_ns: u64,
30 pub attempts: u32,
32}
33
34pub struct QueueStore {
36 messages: BTreeMap<u64, QueueMessage>,
38 next_seq: AtomicU64,
40 max_size: usize,
42 priority_mode: bool,
44 priority_index: Option<BTreeMap<(std::cmp::Reverse<i32>, u64), u64>>,
46}
47
48impl QueueStore {
49 pub fn new(max_size: usize) -> Self {
51 Self {
52 messages: BTreeMap::new(),
53 next_seq: AtomicU64::new(1),
54 max_size,
55 priority_mode: false,
56 priority_index: None,
57 }
58 }
59
60 pub fn new_priority(max_size: usize) -> Self {
62 Self {
63 messages: BTreeMap::new(),
64 next_seq: AtomicU64::new(1),
65 max_size,
66 priority_mode: true,
67 priority_index: Some(BTreeMap::new()),
68 }
69 }
70
71 pub fn push_back(&mut self, payload: Value, priority: Option<i32>) -> Result<u64, QueueError> {
73 if self.max_size > 0 && self.messages.len() >= self.max_size {
74 return Err(QueueError::Full);
75 }
76 let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
77 let now_ns = std::time::SystemTime::now()
78 .duration_since(std::time::UNIX_EPOCH)
79 .unwrap_or_default()
80 .as_nanos() as u64;
81 let msg = QueueMessage {
82 seq,
83 payload,
84 priority,
85 enqueued_at_ns: now_ns,
86 attempts: 0,
87 };
88 if let Some(ref mut idx) = self.priority_index {
89 idx.insert((std::cmp::Reverse(priority.unwrap_or(0)), seq), seq);
90 }
91 self.messages.insert(seq, msg);
92 Ok(seq)
93 }
94
95 pub fn push_front(&mut self, payload: Value, priority: Option<i32>) -> Result<u64, QueueError> {
97 if self.max_size > 0 && self.messages.len() >= self.max_size {
99 return Err(QueueError::Full);
100 }
101 let seq = self
102 .messages
103 .keys()
104 .next()
105 .copied()
106 .unwrap_or(1)
107 .saturating_sub(1);
108 let now_ns = std::time::SystemTime::now()
109 .duration_since(std::time::UNIX_EPOCH)
110 .unwrap_or_default()
111 .as_nanos() as u64;
112 let msg = QueueMessage {
113 seq,
114 payload,
115 priority,
116 enqueued_at_ns: now_ns,
117 attempts: 0,
118 };
119 if let Some(ref mut idx) = self.priority_index {
120 idx.insert((std::cmp::Reverse(priority.unwrap_or(0)), seq), seq);
121 }
122 self.messages.insert(seq, msg);
123 Ok(seq)
124 }
125
126 pub fn pop_front(&mut self) -> Option<QueueMessage> {
128 if self.priority_mode {
129 if let Some(ref mut idx) = self.priority_index {
130 let key = idx.keys().next().copied()?;
131 let seq = idx.remove(&key)?;
132 return self.messages.remove(&seq);
133 }
134 }
135 let seq = *self.messages.keys().next()?;
136 let msg = self.messages.remove(&seq)?;
137 if let Some(ref mut idx) = self.priority_index {
138 idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
139 }
140 Some(msg)
141 }
142
143 pub fn pop_back(&mut self) -> Option<QueueMessage> {
145 let seq = *self.messages.keys().next_back()?;
146 let msg = self.messages.remove(&seq)?;
147 if let Some(ref mut idx) = self.priority_index {
148 idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
149 }
150 Some(msg)
151 }
152
153 pub fn peek_front(&self, count: usize) -> Vec<&QueueMessage> {
155 if self.priority_mode {
156 if let Some(ref idx) = self.priority_index {
157 return idx
158 .values()
159 .take(count)
160 .filter_map(|seq| self.messages.get(seq))
161 .collect();
162 }
163 }
164 self.messages.values().take(count).collect()
165 }
166
167 pub fn len(&self) -> usize {
169 self.messages.len()
170 }
171
172 pub fn is_empty(&self) -> bool {
174 self.messages.is_empty()
175 }
176
177 pub fn is_full(&self) -> bool {
179 self.max_size > 0 && self.messages.len() >= self.max_size
180 }
181
182 pub fn purge(&mut self) -> usize {
184 let count = self.messages.len();
185 self.messages.clear();
186 if let Some(ref mut idx) = self.priority_index {
187 idx.clear();
188 }
189 count
190 }
191
192 pub fn get(&self, seq: u64) -> Option<&QueueMessage> {
194 self.messages.get(&seq)
195 }
196
197 pub fn remove(&mut self, seq: u64) -> Option<QueueMessage> {
199 let msg = self.messages.remove(&seq)?;
200 if let Some(ref mut idx) = self.priority_index {
201 idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
202 }
203 Some(msg)
204 }
205
206 pub fn increment_attempts(&mut self, seq: u64) -> Option<u32> {
208 if let Some(msg) = self.messages.get_mut(&seq) {
209 msg.attempts += 1;
210 Some(msg.attempts)
211 } else {
212 None
213 }
214 }
215
216 pub fn is_priority(&self) -> bool {
218 self.priority_mode
219 }
220
221 pub fn memory_bytes(&self) -> usize {
223 let mut size = std::mem::size_of::<Self>();
224 size += self.messages.len() * (std::mem::size_of::<QueueMessage>() + 48);
225 if let Some(ref idx) = self.priority_index {
226 size += idx.len() * 32;
227 }
228 size
229 }
230}
231
232#[derive(Debug, Clone, PartialEq, Eq)]
234pub enum QueueError {
235 Full,
237 NotFound(u64),
239}
240
241impl std::fmt::Display for QueueError {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 match self {
244 Self::Full => write!(f, "queue is full"),
245 Self::NotFound(seq) => write!(f, "message {} not found", seq),
246 }
247 }
248}
249
250impl std::error::Error for QueueError {}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[test]
257 fn test_queue_fifo() {
258 let mut q = QueueStore::new(0);
259 q.push_back(Value::text("first"), None).unwrap();
260 q.push_back(Value::text("second"), None).unwrap();
261 q.push_back(Value::text("third"), None).unwrap();
262
263 assert_eq!(q.len(), 3);
264 let msg = q.pop_front().unwrap();
265 assert_eq!(msg.payload, Value::text("first"));
266 let msg = q.pop_front().unwrap();
267 assert_eq!(msg.payload, Value::text("second"));
268 }
269
270 #[test]
271 fn test_queue_lifo() {
272 let mut q = QueueStore::new(0);
273 q.push_back(Value::text("first"), None).unwrap();
274 q.push_back(Value::text("second"), None).unwrap();
275
276 let msg = q.pop_back().unwrap();
277 assert_eq!(msg.payload, Value::text("second"));
278 }
279
280 #[test]
281 fn test_queue_lpush() {
282 let mut q = QueueStore::new(0);
283 q.push_back(Value::text("middle"), None).unwrap();
284 q.push_front(Value::text("front"), None).unwrap();
285
286 let msg = q.pop_front().unwrap();
287 assert_eq!(msg.payload, Value::text("front"));
288 }
289
290 #[test]
291 fn test_queue_max_size() {
292 let mut q = QueueStore::new(2);
293 assert!(q.push_back(Value::Integer(1), None).is_ok());
294 assert!(q.push_back(Value::Integer(2), None).is_ok());
295 assert_eq!(q.push_back(Value::Integer(3), None), Err(QueueError::Full));
296 assert!(q.is_full());
297 }
298
299 #[test]
300 fn test_queue_priority() {
301 let mut q = QueueStore::new_priority(0);
302 q.push_back(Value::text("low"), Some(1)).unwrap();
303 q.push_back(Value::text("high"), Some(10)).unwrap();
304 q.push_back(Value::text("medium"), Some(5)).unwrap();
305
306 let msg = q.pop_front().unwrap();
308 assert_eq!(msg.payload, Value::text("high"));
309 let msg = q.pop_front().unwrap();
310 assert_eq!(msg.payload, Value::text("medium"));
311 let msg = q.pop_front().unwrap();
312 assert_eq!(msg.payload, Value::text("low"));
313 }
314
315 #[test]
316 fn test_queue_peek() {
317 let mut q = QueueStore::new(0);
318 q.push_back(Value::text("a"), None).unwrap();
319 q.push_back(Value::text("b"), None).unwrap();
320 q.push_back(Value::text("c"), None).unwrap();
321
322 let peeked = q.peek_front(2);
323 assert_eq!(peeked.len(), 2);
324 assert_eq!(q.len(), 3); }
326
327 #[test]
328 fn test_queue_purge() {
329 let mut q = QueueStore::new(0);
330 q.push_back(Value::Integer(1), None).unwrap();
331 q.push_back(Value::Integer(2), None).unwrap();
332
333 let purged = q.purge();
334 assert_eq!(purged, 2);
335 assert!(q.is_empty());
336 }
337
338 #[test]
339 fn test_queue_remove_by_seq() {
340 let mut q = QueueStore::new(0);
341 let seq1 = q.push_back(Value::Integer(1), None).unwrap();
342 let seq2 = q.push_back(Value::Integer(2), None).unwrap();
343
344 let removed = q.remove(seq1).unwrap();
345 assert_eq!(removed.payload, Value::Integer(1));
346 assert_eq!(q.len(), 1);
347 assert!(q.get(seq2).is_some());
348 }
349
350 #[test]
351 fn test_queue_attempts() {
352 let mut q = QueueStore::new(0);
353 let seq = q.push_back(Value::text("msg"), None).unwrap();
354
355 assert_eq!(q.get(seq).unwrap().attempts, 0);
356 q.increment_attempts(seq);
357 assert_eq!(q.get(seq).unwrap().attempts, 1);
358 q.increment_attempts(seq);
359 assert_eq!(q.get(seq).unwrap().attempts, 2);
360 }
361}