ringkernel_core/
queue.rs

1//! Lock-free message queue implementation.
2//!
3//! This module provides the core message queue abstraction used for
4//! communication between host and GPU kernels. The queue uses a ring
5//! buffer design with atomic operations for lock-free access.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use crate::error::{Result, RingKernelError};
10use crate::message::MessageEnvelope;
11
12/// Statistics for a message queue.
13#[derive(Debug, Clone, Default)]
14pub struct QueueStats {
15    /// Total messages enqueued.
16    pub enqueued: u64,
17    /// Total messages dequeued.
18    pub dequeued: u64,
19    /// Messages dropped due to full queue.
20    pub dropped: u64,
21    /// Current queue depth.
22    pub depth: u64,
23    /// Maximum queue depth observed.
24    pub max_depth: u64,
25}
26
27/// Trait for message queue implementations.
28///
29/// Message queues provide lock-free FIFO communication between
30/// producers (host or other kernels) and consumers (GPU kernels).
31pub trait MessageQueue: Send + Sync {
32    /// Get the queue capacity.
33    fn capacity(&self) -> usize;
34
35    /// Get current queue size.
36    fn len(&self) -> usize;
37
38    /// Check if queue is empty.
39    fn is_empty(&self) -> bool {
40        self.len() == 0
41    }
42
43    /// Check if queue is full.
44    fn is_full(&self) -> bool {
45        self.len() >= self.capacity()
46    }
47
48    /// Try to enqueue a message envelope.
49    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()>;
50
51    /// Try to dequeue a message envelope.
52    fn try_dequeue(&self) -> Result<MessageEnvelope>;
53
54    /// Get queue statistics.
55    fn stats(&self) -> QueueStats;
56
57    /// Reset queue statistics.
58    fn reset_stats(&self);
59}
60
61/// Single-producer single-consumer lock-free ring buffer.
62///
63/// This implementation is optimized for the common case of one
64/// producer (host) and one consumer (GPU kernel).
65pub struct SpscQueue {
66    /// Ring buffer storage.
67    buffer: Vec<parking_lot::Mutex<Option<MessageEnvelope>>>,
68    /// Capacity (power of 2).
69    capacity: usize,
70    /// Mask for index wrapping.
71    mask: usize,
72    /// Head pointer (producer writes here).
73    head: AtomicU64,
74    /// Tail pointer (consumer reads from here).
75    tail: AtomicU64,
76    /// Statistics.
77    stats: QueueStatsInner,
78}
79
80/// Internal statistics with atomics.
81struct QueueStatsInner {
82    enqueued: AtomicU64,
83    dequeued: AtomicU64,
84    dropped: AtomicU64,
85    max_depth: AtomicU64,
86}
87
88impl SpscQueue {
89    /// Create a new queue with the given capacity.
90    ///
91    /// Capacity will be rounded up to the next power of 2.
92    pub fn new(capacity: usize) -> Self {
93        let capacity = capacity.next_power_of_two();
94        let mask = capacity - 1;
95
96        let mut buffer = Vec::with_capacity(capacity);
97        for _ in 0..capacity {
98            buffer.push(parking_lot::Mutex::new(None));
99        }
100
101        Self {
102            buffer,
103            capacity,
104            mask,
105            head: AtomicU64::new(0),
106            tail: AtomicU64::new(0),
107            stats: QueueStatsInner {
108                enqueued: AtomicU64::new(0),
109                dequeued: AtomicU64::new(0),
110                dropped: AtomicU64::new(0),
111                max_depth: AtomicU64::new(0),
112            },
113        }
114    }
115
116    /// Get current depth.
117    fn depth(&self) -> u64 {
118        let head = self.head.load(Ordering::Acquire);
119        let tail = self.tail.load(Ordering::Acquire);
120        head.wrapping_sub(tail)
121    }
122
123    /// Update max depth statistic.
124    fn update_max_depth(&self) {
125        let depth = self.depth();
126        let mut max = self.stats.max_depth.load(Ordering::Relaxed);
127        while depth > max {
128            match self.stats.max_depth.compare_exchange_weak(
129                max,
130                depth,
131                Ordering::Relaxed,
132                Ordering::Relaxed,
133            ) {
134                Ok(_) => break,
135                Err(current) => max = current,
136            }
137        }
138    }
139}
140
141impl MessageQueue for SpscQueue {
142    fn capacity(&self) -> usize {
143        self.capacity
144    }
145
146    fn len(&self) -> usize {
147        self.depth() as usize
148    }
149
150    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
151        let head = self.head.load(Ordering::Acquire);
152        let tail = self.tail.load(Ordering::Acquire);
153
154        // Check if full
155        if head.wrapping_sub(tail) >= self.capacity as u64 {
156            self.stats.dropped.fetch_add(1, Ordering::Relaxed);
157            return Err(RingKernelError::QueueFull {
158                capacity: self.capacity,
159            });
160        }
161
162        // Get slot
163        let index = (head as usize) & self.mask;
164        let mut slot = self.buffer[index].lock();
165        *slot = Some(envelope);
166        drop(slot);
167
168        // Advance head
169        self.head.store(head.wrapping_add(1), Ordering::Release);
170
171        // Update stats
172        self.stats.enqueued.fetch_add(1, Ordering::Relaxed);
173        self.update_max_depth();
174
175        Ok(())
176    }
177
178    fn try_dequeue(&self) -> Result<MessageEnvelope> {
179        let tail = self.tail.load(Ordering::Acquire);
180        let head = self.head.load(Ordering::Acquire);
181
182        // Check if empty
183        if head == tail {
184            return Err(RingKernelError::QueueEmpty);
185        }
186
187        // Get slot
188        let index = (tail as usize) & self.mask;
189        let mut slot = self.buffer[index].lock();
190        let envelope = slot.take().ok_or(RingKernelError::QueueEmpty)?;
191        drop(slot);
192
193        // Advance tail
194        self.tail.store(tail.wrapping_add(1), Ordering::Release);
195
196        // Update stats
197        self.stats.dequeued.fetch_add(1, Ordering::Relaxed);
198
199        Ok(envelope)
200    }
201
202    fn stats(&self) -> QueueStats {
203        QueueStats {
204            enqueued: self.stats.enqueued.load(Ordering::Relaxed),
205            dequeued: self.stats.dequeued.load(Ordering::Relaxed),
206            dropped: self.stats.dropped.load(Ordering::Relaxed),
207            depth: self.depth(),
208            max_depth: self.stats.max_depth.load(Ordering::Relaxed),
209        }
210    }
211
212    fn reset_stats(&self) {
213        self.stats.enqueued.store(0, Ordering::Relaxed);
214        self.stats.dequeued.store(0, Ordering::Relaxed);
215        self.stats.dropped.store(0, Ordering::Relaxed);
216        self.stats.max_depth.store(0, Ordering::Relaxed);
217    }
218}
219
220/// Multi-producer single-consumer lock-free queue.
221///
222/// This variant allows multiple producers (e.g., multiple host threads
223/// or kernel-to-kernel messaging) to enqueue messages concurrently.
224pub struct MpscQueue {
225    /// Inner SPSC queue.
226    inner: SpscQueue,
227    /// Lock for producers.
228    producer_lock: parking_lot::Mutex<()>,
229}
230
231impl MpscQueue {
232    /// Create a new MPSC queue.
233    pub fn new(capacity: usize) -> Self {
234        Self {
235            inner: SpscQueue::new(capacity),
236            producer_lock: parking_lot::Mutex::new(()),
237        }
238    }
239}
240
241impl MessageQueue for MpscQueue {
242    fn capacity(&self) -> usize {
243        self.inner.capacity()
244    }
245
246    fn len(&self) -> usize {
247        self.inner.len()
248    }
249
250    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
251        let _guard = self.producer_lock.lock();
252        self.inner.try_enqueue(envelope)
253    }
254
255    fn try_dequeue(&self) -> Result<MessageEnvelope> {
256        self.inner.try_dequeue()
257    }
258
259    fn stats(&self) -> QueueStats {
260        self.inner.stats()
261    }
262
263    fn reset_stats(&self) {
264        self.inner.reset_stats()
265    }
266}
267
268/// Bounded queue with blocking operations.
269pub struct BoundedQueue {
270    /// Inner MPSC queue.
271    inner: MpscQueue,
272    /// Condvar for waiting on space.
273    not_full: parking_lot::Condvar,
274    /// Condvar for waiting on data.
275    not_empty: parking_lot::Condvar,
276    /// Mutex for condvar coordination.
277    mutex: parking_lot::Mutex<()>,
278}
279
280impl BoundedQueue {
281    /// Create a new bounded queue.
282    pub fn new(capacity: usize) -> Self {
283        Self {
284            inner: MpscQueue::new(capacity),
285            not_full: parking_lot::Condvar::new(),
286            not_empty: parking_lot::Condvar::new(),
287            mutex: parking_lot::Mutex::new(()),
288        }
289    }
290
291    /// Blocking enqueue with timeout.
292    pub fn enqueue_timeout(
293        &self,
294        envelope: MessageEnvelope,
295        timeout: std::time::Duration,
296    ) -> Result<()> {
297        let deadline = std::time::Instant::now() + timeout;
298
299        loop {
300            match self.inner.try_enqueue(envelope.clone()) {
301                Ok(()) => {
302                    self.not_empty.notify_one();
303                    return Ok(());
304                }
305                Err(RingKernelError::QueueFull { .. }) => {
306                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
307                    if remaining.is_zero() {
308                        return Err(RingKernelError::Timeout(timeout));
309                    }
310                    let mut guard = self.mutex.lock();
311                    let _ = self.not_full.wait_for(&mut guard, remaining);
312                }
313                Err(e) => return Err(e),
314            }
315        }
316    }
317
318    /// Blocking dequeue with timeout.
319    pub fn dequeue_timeout(&self, timeout: std::time::Duration) -> Result<MessageEnvelope> {
320        let deadline = std::time::Instant::now() + timeout;
321
322        loop {
323            match self.inner.try_dequeue() {
324                Ok(envelope) => {
325                    self.not_full.notify_one();
326                    return Ok(envelope);
327                }
328                Err(RingKernelError::QueueEmpty) => {
329                    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
330                    if remaining.is_zero() {
331                        return Err(RingKernelError::Timeout(timeout));
332                    }
333                    let mut guard = self.mutex.lock();
334                    let _ = self.not_empty.wait_for(&mut guard, remaining);
335                }
336                Err(e) => return Err(e),
337            }
338        }
339    }
340}
341
342impl MessageQueue for BoundedQueue {
343    fn capacity(&self) -> usize {
344        self.inner.capacity()
345    }
346
347    fn len(&self) -> usize {
348        self.inner.len()
349    }
350
351    fn try_enqueue(&self, envelope: MessageEnvelope) -> Result<()> {
352        let result = self.inner.try_enqueue(envelope);
353        if result.is_ok() {
354            self.not_empty.notify_one();
355        }
356        result
357    }
358
359    fn try_dequeue(&self) -> Result<MessageEnvelope> {
360        let result = self.inner.try_dequeue();
361        if result.is_ok() {
362            self.not_full.notify_one();
363        }
364        result
365    }
366
367    fn stats(&self) -> QueueStats {
368        self.inner.stats()
369    }
370
371    fn reset_stats(&self) {
372        self.inner.reset_stats()
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379    use crate::hlc::HlcTimestamp;
380    use crate::message::MessageHeader;
381
382    fn make_envelope() -> MessageEnvelope {
383        MessageEnvelope {
384            header: MessageHeader::new(1, 0, 1, 8, HlcTimestamp::now(1)),
385            payload: vec![1, 2, 3, 4, 5, 6, 7, 8],
386        }
387    }
388
389    #[test]
390    fn test_spsc_basic() {
391        let queue = SpscQueue::new(16);
392
393        assert!(queue.is_empty());
394        assert!(!queue.is_full());
395
396        let env = make_envelope();
397        queue.try_enqueue(env).unwrap();
398
399        assert_eq!(queue.len(), 1);
400        assert!(!queue.is_empty());
401
402        let _ = queue.try_dequeue().unwrap();
403        assert!(queue.is_empty());
404    }
405
406    #[test]
407    fn test_spsc_full() {
408        let queue = SpscQueue::new(4);
409
410        for _ in 0..4 {
411            queue.try_enqueue(make_envelope()).unwrap();
412        }
413
414        assert!(queue.is_full());
415        assert!(matches!(
416            queue.try_enqueue(make_envelope()),
417            Err(RingKernelError::QueueFull { .. })
418        ));
419    }
420
421    #[test]
422    fn test_spsc_stats() {
423        let queue = SpscQueue::new(16);
424
425        for _ in 0..10 {
426            queue.try_enqueue(make_envelope()).unwrap();
427        }
428
429        for _ in 0..5 {
430            let _ = queue.try_dequeue().unwrap();
431        }
432
433        let stats = queue.stats();
434        assert_eq!(stats.enqueued, 10);
435        assert_eq!(stats.dequeued, 5);
436        assert_eq!(stats.depth, 5);
437    }
438
439    #[test]
440    fn test_mpsc_concurrent() {
441        use std::sync::Arc;
442        use std::thread;
443
444        let queue = Arc::new(MpscQueue::new(1024));
445        let mut handles = vec![];
446
447        // Spawn multiple producers
448        for _ in 0..4 {
449            let q = Arc::clone(&queue);
450            handles.push(thread::spawn(move || {
451                for _ in 0..100 {
452                    q.try_enqueue(make_envelope()).unwrap();
453                }
454            }));
455        }
456
457        // Wait for producers
458        for h in handles {
459            h.join().unwrap();
460        }
461
462        let stats = queue.stats();
463        assert_eq!(stats.enqueued, 400);
464    }
465
466    #[test]
467    fn test_bounded_timeout() {
468        let queue = BoundedQueue::new(2);
469
470        // Fill queue
471        queue.try_enqueue(make_envelope()).unwrap();
472        queue.try_enqueue(make_envelope()).unwrap();
473
474        // Should timeout
475        let result = queue.enqueue_timeout(make_envelope(), std::time::Duration::from_millis(10));
476        assert!(matches!(result, Err(RingKernelError::Timeout(_))));
477    }
478}