1use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::error::{Result, RingKernelError};
10use crate::message::MessageEnvelope;
11
12#[derive(Debug, Clone, Default)]
14pub struct QueueStats {
15 pub enqueued: u64,
17 pub dequeued: u64,
19 pub dropped: u64,
21 pub depth: u64,
23 pub max_depth: u64,
25}
26
27pub trait MessageQueue: Send + Sync {
32 fn capacity(&self) -> usize;
34
35 fn len(&self) -> usize;
37
38 fn is_empty(&self) -> bool {
40 self.len() == 0
41 }
42
43 fn is_full(&self) -> bool {
45 self.len() >= self.capacity()
46 }
47
48 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
50
51 fn try_dequeue(&self) -> Result<MessageEnvelope>;
53
54 fn stats(&self) -> QueueStats;
56
57 fn reset_stats(&self);
59}
60
61pub struct SpscQueue {
66 buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
68 capacity: usize,
70 mask: usize,
72 head: AtomicU64,
74 tail: AtomicU64,
76 stats: QueueStatsInner,
78}
79
80struct QueueStatsInner {
82 enqueued: AtomicU64,
83 dequeued: AtomicU64,
84 dropped: AtomicU64,
85 max_depth: AtomicU64,
86}
87
88impl SpscQueue {
89 pub fn new(capacity: usize) -> Self {
93 let capacity = capacity.next_power_of_two();
94 let mask = capacity - 1;
95
96 let mut buffer = Vec::with_capacity(capacity);
97 for _ in 0..capacity {
98 buffer.push(parking_lot::Mutex::new(None));
99 }
100
101 Self {
102 buffer,
103 capacity,
104 mask,
105 head: AtomicU64::new(0),
106 tail: AtomicU64::new(0),
107 stats: QueueStatsInner {
108 enqueued: AtomicU64::new(0),
109 dequeued: AtomicU64::new(0),
110 dropped: AtomicU64::new(0),
111 max_depth: AtomicU64::new(0),
112 },
113 }
114 }
115
116 fn depth(&self) -> u64 {
118 let head = self.head.load(Ordering::Acquire);
119 let tail = self.tail.load(Ordering::Acquire);
120 head.wrapping_sub(tail)
121 }
122
123 fn update_max_depth(&self) {
125 let depth = self.depth();
126 let mut max = self.stats.max_depth.load(Ordering::Relaxed);
127 while depth > max {
128 match self.stats.max_depth.compare_exchange_weak(
129 max,
130 depth,
131 Ordering::Relaxed,
132 Ordering::Relaxed,
133 ) {
134 Ok(_) => break,
135 Err(current) => max = current,
136 }
137 }
138 }
139}
140
141impl MessageQueue for SpscQueue {
142 fn capacity(&self) -> usize {
143 self.capacity
144 }
145
146 fn len(&self) -> usize {
147 self.depth() as usize
148 }
149
150 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
151 let head = self.head.load(Ordering::Acquire);
152 let tail = self.tail.load(Ordering::Acquire);
153
154 if head.wrapping_sub(tail) >= self.capacity as u64 {
156 self.stats.dropped.fetch_add(1, Ordering::Relaxed);
157 return Err(RingKernelError::QueueFull {
158 capacity: self.capacity,
159 });
160 }
161
162 let index = (head as usize) & self.mask;
164 let mut slot = self.buffer[index].lock();
165 *slot = Some(envelope);
166 drop(slot);
167
168 self.head.store(head.wrapping_add(1), Ordering::Release);
170
171 self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
173 self.update_max_depth();
174
175 Ok(())
176 }
177
178 fn try_dequeue(&self) -> Result<MessageEnvelope> {
179 let tail = self.tail.load(Ordering::Acquire);
180 let head = self.head.load(Ordering::Acquire);
181
182 if head == tail {
184 return Err(RingKernelError::QueueEmpty);
185 }
186
187 let index = (tail as usize) & self.mask;
189 let mut slot = self.buffer[index].lock();
190 let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
191 drop(slot);
192
193 self.tail.store(tail.wrapping_add(1), Ordering::Release);
195
196 self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
198
199 Ok(envelope)
200 }
201
202 fn stats(&self) -> QueueStats {
203 QueueStats {
204 enqueued: self.stats.enqueued.load(Ordering::Relaxed),
205 dequeued: self.stats.dequeued.load(Ordering::Relaxed),
206 dropped: self.stats.dropped.load(Ordering::Relaxed),
207 depth: self.depth(),
208 max_depth: self.stats.max_depth.load(Ordering::Relaxed),
209 }
210 }
211
212 fn reset_stats(&self) {
213 self.stats.enqueued.store(0, Ordering::Relaxed);
214 self.stats.dequeued.store(0, Ordering::Relaxed);
215 self.stats.dropped.store(0, Ordering::Relaxed);
216 self.stats.max_depth.store(0, Ordering::Relaxed);
217 }
218}
219
220pub struct MpscQueue {
225 inner: SpscQueue,
227 producer_lock: parking_lot::Mutex<()>,
229}
230
231impl MpscQueue {
232 pub fn new(capacity: usize) -> Self {
234 Self {
235 inner: SpscQueue::new(capacity),
236 producer_lock: parking_lot::Mutex::new(()),
237 }
238 }
239}
240
241impl MessageQueue for MpscQueue {
242 fn capacity(&self) -> usize {
243 self.inner.capacity()
244 }
245
246 fn len(&self) -> usize {
247 self.inner.len()
248 }
249
250 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
251 let _guard = self.producer_lock.lock();
252 self.inner.try_enqueue(envelope)
253 }
254
255 fn try_dequeue(&self) -> Result<MessageEnvelope> {
256 self.inner.try_dequeue()
257 }
258
259 fn stats(&self) -> QueueStats {
260 self.inner.stats()
261 }
262
263 fn reset_stats(&self) {
264 self.inner.reset_stats()
265 }
266}
267
268pub struct BoundedQueue {
270 inner: MpscQueue,
272 not_full: parking_lot::Condvar,
274 not_empty: parking_lot::Condvar,
276 mutex: parking_lot::Mutex<()>,
278}
279
280impl BoundedQueue {
281 pub fn new(capacity: usize) -> Self {
283 Self {
284 inner: MpscQueue::new(capacity),
285 not_full: parking_lot::Condvar::new(),
286 not_empty: parking_lot::Condvar::new(),
287 mutex: parking_lot::Mutex::new(()),
288 }
289 }
290
291 pub fn enqueue_timeout(
293 &self,
294 envelope: MessageEnvelope,
295 timeout: std::time::Duration,
296 ) -> Result<()> {
297 let deadline = std::time::Instant::now() + timeout;
298
299 loop {
300 match self.inner.try_enqueue(envelope.clone()) {
301 Ok(()) => {
302 self.not_empty.notify_one();
303 return Ok(());
304 }
305 Err(RingKernelError::QueueFull { .. }) => {
306 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
307 if remaining.is_zero() {
308 return Err(RingKernelError::Timeout(timeout));
309 }
310 let mut guard = self.mutex.lock();
311 let _ = self.not_full.wait_for(&mut guard, remaining);
312 }
313 Err(e) => return Err(e),
314 }
315 }
316 }
317
318 pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
320 let deadline = std::time::Instant::now() + timeout;
321
322 loop {
323 match self.inner.try_dequeue() {
324 Ok(envelope) => {
325 self.not_full.notify_one();
326 return Ok(envelope);
327 }
328 Err(RingKernelError::QueueEmpty) => {
329 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
330 if remaining.is_zero() {
331 return Err(RingKernelError::Timeout(timeout));
332 }
333 let mut guard = self.mutex.lock();
334 let _ = self.not_empty.wait_for(&mut guard, remaining);
335 }
336 Err(e) => return Err(e),
337 }
338 }
339 }
340}
341
342impl MessageQueue for BoundedQueue {
343 fn capacity(&self) -> usize {
344 self.inner.capacity()
345 }
346
347 fn len(&self) -> usize {
348 self.inner.len()
349 }
350
351 fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
352 let result = self.inner.try_enqueue(envelope);
353 if result.is_ok() {
354 self.not_empty.notify_one();
355 }
356 result
357 }
358
359 fn try_dequeue(&self) -> Result<MessageEnvelope> {
360 let result = self.inner.try_dequeue();
361 if result.is_ok() {
362 self.not_full.notify_one();
363 }
364 result
365 }
366
367 fn stats(&self) -> QueueStats {
368 self.inner.stats()
369 }
370
371 fn reset_stats(&self) {
372 self.inner.reset_stats()
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379 use crate::hlc::HlcTimestamp;
380 use crate::message::MessageHeader;
381
382 fn make_envelope() -> MessageEnvelope {
383 MessageEnvelope {
384 header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
385 payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
386 }
387 }
388
389 #[test]
390 fn test_spsc_basic() {
391 let queue = SpscQueue::new(16);
392
393 assert!(queue.is_empty());
394 assert!(!queue.is_full());
395
396 let env = make_envelope();
397 queue.try_enqueue(env).unwrap();
398
399 assert_eq!(queue.len(), 1);
400 assert!(!queue.is_empty());
401
402 let _ = queue.try_dequeue().unwrap();
403 assert!(queue.is_empty());
404 }
405
406 #[test]
407 fn test_spsc_full() {
408 let queue = SpscQueue::new(4);
409
410 for _ in 0..4 {
411 queue.try_enqueue(make_envelope()).unwrap();
412 }
413
414 assert!(queue.is_full());
415 assert!(matches!(
416 queue.try_enqueue(make_envelope()),
417 Err(RingKernelError::QueueFull { .. })
418 ));
419 }
420
421 #[test]
422 fn test_spsc_stats() {
423 let queue = SpscQueue::new(16);
424
425 for _ in 0..10 {
426 queue.try_enqueue(make_envelope()).unwrap();
427 }
428
429 for _ in 0..5 {
430 let _ = queue.try_dequeue().unwrap();
431 }
432
433 let stats = queue.stats();
434 assert_eq!(stats.enqueued, 10);
435 assert_eq!(stats.dequeued, 5);
436 assert_eq!(stats.depth, 5);
437 }
438
439 #[test]
440 fn test_mpsc_concurrent() {
441 use std::sync::Arc;
442 use std::thread;
443
444 let queue = Arc::new(MpscQueue::new(1024));
445 let mut handles = vec![];
446
447 for _ in 0..4 {
449 let q = Arc::clone(&queue);
450 handles.push(thread::spawn(move || {
451 for _ in 0..100 {
452 q.try_enqueue(make_envelope()).unwrap();
453 }
454 }));
455 }
456
457 for h in handles {
459 h.join().unwrap();
460 }
461
462 let stats = queue.stats();
463 assert_eq!(stats.enqueued, 400);
464 }
465
466 #[test]
467 fn test_bounded_timeout() {
468 let queue = BoundedQueue::new(2);
469
470 queue.try_enqueue(make_envelope()).unwrap();
472 queue.try_enqueue(make_envelope()).unwrap();
473
474 let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
476 assert!(matches!(result, Err(RingKernelError::Timeout(_))));
477 }
478}