kizzasi_io/
lockfree.rs

1//! Lock-free data structures for high-performance streaming
2//!
3//! Provides lock-free queues and buffers optimized for real-time audio/video streaming
4//! with minimal latency and no blocking operations.
5//!
6//! ## Features
7//! - Lock-free SPSC (Single Producer Single Consumer) queue
8//! - Lock-free ring buffer with overwrite semantics
9//! - Wait-free read operations
10//! - Cache-friendly memory layout
11//!
12//! ## Example
13//! ```rust
14//! use kizzasi_io::LockFreeQueue;
15//!
16//! let queue = LockFreeQueue::<f32>::new(1024);
17//!
18//! // Producer thread
19//! for i in 0..100 {
20//!     queue.push(i as f32).unwrap();
21//! }
22//!
23//! // Consumer thread
24//! while let Some(value) = queue.pop() {
25//!     println!("Received: {}", value);
26//! }
27//! ```
28
29use crate::error::IoResult;
30use crossbeam_queue::{ArrayQueue, SegQueue};
31use scirs2_core::ndarray::Array1;
32use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
33use std::sync::Arc;
34use tracing::debug;
35
36/// Lock-free SPSC (Single Producer Single Consumer) queue
37///
38/// Optimized for low-latency real-time streaming with wait-free operations.
39pub struct LockFreeQueue<T> {
40    queue: Arc<ArrayQueue<T>>,
41    dropped_count: Arc<AtomicUsize>,
42}
43
44impl<T> LockFreeQueue<T> {
45    /// Create a new lock-free queue with given capacity
46    pub fn new(capacity: usize) -> Self {
47        Self {
48            queue: Arc::new(ArrayQueue::new(capacity)),
49            dropped_count: Arc::new(AtomicUsize::new(0)),
50        }
51    }
52
53    /// Push an item to the queue (non-blocking)
54    ///
55    /// Returns `Err` if the queue is full
56    pub fn push(&self, item: T) -> Result<(), T> {
57        self.queue.push(item)
58    }
59
60    /// Try to push an item, dropping it if queue is full
61    ///
62    /// Returns `true` if the item was pushed, `false` if dropped
63    pub fn try_push(&self, item: T) -> bool {
64        match self.queue.push(item) {
65            Ok(()) => true,
66            Err(_) => {
67                self.dropped_count.fetch_add(1, Ordering::Relaxed);
68                false
69            }
70        }
71    }
72
73    /// Pop an item from the queue (non-blocking)
74    ///
75    /// Returns `None` if the queue is empty
76    pub fn pop(&self) -> Option<T> {
77        self.queue.pop()
78    }
79
80    /// Check if queue is empty
81    pub fn is_empty(&self) -> bool {
82        self.queue.is_empty()
83    }
84
85    /// Check if queue is full
86    pub fn is_full(&self) -> bool {
87        self.queue.is_full()
88    }
89
90    /// Get current queue length
91    pub fn len(&self) -> usize {
92        self.queue.len()
93    }
94
95    /// Get queue capacity
96    pub fn capacity(&self) -> usize {
97        self.queue.capacity()
98    }
99
100    /// Get number of dropped items (when queue was full)
101    pub fn dropped_count(&self) -> usize {
102        self.dropped_count.load(Ordering::Relaxed)
103    }
104
105    /// Reset dropped count
106    pub fn reset_dropped_count(&self) {
107        self.dropped_count.store(0, Ordering::Relaxed);
108    }
109
110    /// Clone the queue handle (shares the same underlying queue)
111    pub fn clone_handle(&self) -> Self {
112        Self {
113            queue: Arc::clone(&self.queue),
114            dropped_count: Arc::clone(&self.dropped_count),
115        }
116    }
117}
118
119impl<T: Clone> LockFreeQueue<T> {
120    /// Pop all available items as a vector
121    pub fn pop_all(&self) -> Vec<T> {
122        let mut items = Vec::with_capacity(self.len());
123        while let Some(item) = self.pop() {
124            items.push(item);
125        }
126        items
127    }
128}
129
130/// Lock-free unbounded MPMC (Multi Producer Multi Consumer) queue
131///
132/// Uses a linked list for unbounded capacity. Slower than bounded queue
133/// but never blocks on push.
134pub struct UnboundedQueue<T> {
135    queue: Arc<SegQueue<T>>,
136    item_count: Arc<AtomicUsize>,
137}
138
139impl<T> UnboundedQueue<T> {
140    /// Create a new unbounded queue
141    pub fn new() -> Self {
142        Self {
143            queue: Arc::new(SegQueue::new()),
144            item_count: Arc::new(AtomicUsize::new(0)),
145        }
146    }
147
148    /// Push an item to the queue (never fails)
149    pub fn push(&self, item: T) {
150        self.queue.push(item);
151        self.item_count.fetch_add(1, Ordering::Relaxed);
152    }
153
154    /// Pop an item from the queue
155    pub fn pop(&self) -> Option<T> {
156        match self.queue.pop() {
157            Some(item) => {
158                self.item_count.fetch_sub(1, Ordering::Relaxed);
159                Some(item)
160            }
161            None => None,
162        }
163    }
164
165    /// Check if queue is empty
166    pub fn is_empty(&self) -> bool {
167        self.queue.is_empty()
168    }
169
170    /// Get approximate queue length
171    pub fn len(&self) -> usize {
172        self.item_count.load(Ordering::Relaxed)
173    }
174
175    /// Clone the queue handle
176    pub fn clone_handle(&self) -> Self {
177        Self {
178            queue: Arc::clone(&self.queue),
179            item_count: Arc::clone(&self.item_count),
180        }
181    }
182}
183
184impl<T> Default for UnboundedQueue<T> {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190impl<T: Clone> UnboundedQueue<T> {
191    /// Pop all available items as a vector
192    pub fn pop_all(&self) -> Vec<T> {
193        let mut items = Vec::new();
194        while let Some(item) = self.pop() {
195            items.push(item);
196        }
197        items
198    }
199}
200
201/// Lock-free signal queue for audio/sensor data
202///
203/// Specialized queue for streaming f32 samples with batching support.
204pub struct SignalQueue {
205    queue: LockFreeQueue<f32>,
206    batch_size: usize,
207    underrun_count: Arc<AtomicUsize>,
208    overrun_count: Arc<AtomicUsize>,
209}
210
211impl SignalQueue {
212    /// Create a new signal queue
213    pub fn new(capacity: usize, batch_size: usize) -> Self {
214        Self {
215            queue: LockFreeQueue::new(capacity),
216            batch_size,
217            underrun_count: Arc::new(AtomicUsize::new(0)),
218            overrun_count: Arc::new(AtomicUsize::new(0)),
219        }
220    }
221
222    /// Write samples to the queue
223    pub fn write_samples(&self, samples: &[f32]) -> IoResult<()> {
224        let mut overruns = 0;
225        for &sample in samples {
226            if !self.queue.try_push(sample) {
227                overruns += 1;
228            }
229        }
230
231        if overruns > 0 {
232            self.overrun_count.fetch_add(overruns, Ordering::Relaxed);
233            debug!("Signal queue overrun: {} samples dropped", overruns);
234        }
235
236        Ok(())
237    }
238
239    /// Read a batch of samples
240    pub fn read_batch(&self) -> IoResult<Array1<f32>> {
241        let available = self.queue.len();
242
243        // Mark underrun if we don't have a full batch
244        if available < self.batch_size {
245            self.underrun_count.fetch_add(1, Ordering::Relaxed);
246        }
247
248        // Read whatever is available, up to batch_size
249        let mut samples = Vec::with_capacity(self.batch_size);
250        for _ in 0..self.batch_size {
251            if let Some(sample) = self.queue.pop() {
252                samples.push(sample);
253            } else {
254                break;
255            }
256        }
257
258        // Pad with zeros if needed
259        while samples.len() < self.batch_size {
260            samples.push(0.0);
261        }
262
263        Ok(Array1::from_vec(samples))
264    }
265
266    /// Read all available samples
267    pub fn read_all(&self) -> Array1<f32> {
268        let samples = self.queue.pop_all();
269        Array1::from_vec(samples)
270    }
271
272    /// Get current queue level
273    pub fn level(&self) -> usize {
274        self.queue.len()
275    }
276
277    /// Get queue capacity
278    pub fn capacity(&self) -> usize {
279        self.queue.capacity()
280    }
281
282    /// Get fill percentage (0.0 to 1.0)
283    pub fn fill_ratio(&self) -> f32 {
284        self.level() as f32 / self.capacity() as f32
285    }
286
287    /// Get underrun count
288    pub fn underrun_count(&self) -> usize {
289        self.underrun_count.load(Ordering::Relaxed)
290    }
291
292    /// Get overrun count
293    pub fn overrun_count(&self) -> usize {
294        self.overrun_count.load(Ordering::Relaxed)
295    }
296
297    /// Reset statistics
298    pub fn reset_stats(&self) {
299        self.underrun_count.store(0, Ordering::Relaxed);
300        self.overrun_count.store(0, Ordering::Relaxed);
301        self.queue.reset_dropped_count();
302    }
303}
304
305/// Lock-free ring buffer with overwrite semantics
306///
307/// Automatically overwrites oldest data when full, ensuring continuous streaming.
308pub struct LockFreeRingBuffer<T> {
309    buffer: Vec<Option<T>>,
310    write_pos: Arc<AtomicUsize>,
311    read_pos: Arc<AtomicUsize>,
312    capacity: usize,
313    overwrite_flag: Arc<AtomicBool>,
314}
315
316impl<T: Clone> LockFreeRingBuffer<T> {
317    /// Create a new lock-free ring buffer
318    pub fn new(capacity: usize) -> Self {
319        let mut buffer = Vec::with_capacity(capacity);
320        for _ in 0..capacity {
321            buffer.push(None);
322        }
323
324        Self {
325            buffer,
326            write_pos: Arc::new(AtomicUsize::new(0)),
327            read_pos: Arc::new(AtomicUsize::new(0)),
328            capacity,
329            overwrite_flag: Arc::new(AtomicBool::new(false)),
330        }
331    }
332
333    /// Write an item (overwrites oldest if full)
334    pub fn write(&mut self, item: T) {
335        let write_idx = self.write_pos.load(Ordering::Acquire);
336        let read_idx = self.read_pos.load(Ordering::Acquire);
337
338        // Store the item
339        self.buffer[write_idx] = Some(item);
340
341        // Advance write position
342        let next_write = (write_idx + 1) % self.capacity;
343        self.write_pos.store(next_write, Ordering::Release);
344
345        // Check if we're overwriting
346        if next_write == read_idx {
347            // Advance read position to maintain buffer size
348            let next_read = (read_idx + 1) % self.capacity;
349            self.read_pos.store(next_read, Ordering::Release);
350            self.overwrite_flag.store(true, Ordering::Relaxed);
351        }
352    }
353
354    /// Read an item
355    pub fn read(&self) -> Option<T> {
356        let read_idx = self.read_pos.load(Ordering::Acquire);
357        let write_idx = self.write_pos.load(Ordering::Acquire);
358
359        if read_idx == write_idx {
360            return None; // Buffer is empty
361        }
362
363        let item = self.buffer[read_idx].clone();
364
365        // Advance read position
366        let next_read = (read_idx + 1) % self.capacity;
367        self.read_pos.store(next_read, Ordering::Release);
368
369        item
370    }
371
372    /// Get number of available items
373    pub fn available(&self) -> usize {
374        let write_idx = self.write_pos.load(Ordering::Acquire);
375        let read_idx = self.read_pos.load(Ordering::Acquire);
376
377        if write_idx >= read_idx {
378            write_idx - read_idx
379        } else {
380            self.capacity - read_idx + write_idx
381        }
382    }
383
384    /// Check if overwrite occurred
385    pub fn was_overwritten(&self) -> bool {
386        self.overwrite_flag.swap(false, Ordering::Relaxed)
387    }
388
389    /// Get capacity
390    pub fn capacity(&self) -> usize {
391        self.capacity
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    #[test]
400    fn test_lockfree_queue_basic() {
401        let queue = LockFreeQueue::new(10);
402
403        assert!(queue.is_empty());
404        assert_eq!(queue.capacity(), 10);
405
406        queue.push(1.0).unwrap();
407        queue.push(2.0).unwrap();
408        queue.push(3.0).unwrap();
409
410        assert_eq!(queue.len(), 3);
411        assert!(!queue.is_empty());
412
413        assert_eq!(queue.pop(), Some(1.0));
414        assert_eq!(queue.pop(), Some(2.0));
415        assert_eq!(queue.pop(), Some(3.0));
416        assert_eq!(queue.pop(), None);
417    }
418
419    #[test]
420    fn test_lockfree_queue_full() {
421        let queue = LockFreeQueue::new(3);
422
423        assert!(queue.push(1.0).is_ok());
424        assert!(queue.push(2.0).is_ok());
425        assert!(queue.push(3.0).is_ok());
426        assert!(queue.push(4.0).is_err()); // Queue full
427
428        assert!(queue.is_full());
429    }
430
431    #[test]
432    fn test_lockfree_queue_try_push() {
433        let queue = LockFreeQueue::new(2);
434
435        assert!(queue.try_push(1.0));
436        assert!(queue.try_push(2.0));
437        assert!(!queue.try_push(3.0)); // Dropped
438
439        assert_eq!(queue.dropped_count(), 1);
440    }
441
442    #[test]
443    fn test_unbounded_queue() {
444        let queue = UnboundedQueue::new();
445
446        for i in 0..1000 {
447            queue.push(i);
448        }
449
450        assert_eq!(queue.len(), 1000);
451
452        for i in 0..1000 {
453            assert_eq!(queue.pop(), Some(i));
454        }
455
456        assert!(queue.is_empty());
457    }
458
459    #[test]
460    fn test_signal_queue() {
461        let queue = SignalQueue::new(100, 10);
462
463        let samples = vec![1.0, 2.0, 3.0, 4.0, 5.0];
464        queue.write_samples(&samples).unwrap();
465
466        assert_eq!(queue.level(), 5);
467
468        let batch = queue.read_batch().unwrap();
469        assert_eq!(batch.len(), 10); // Batch size
470
471        // First 5 should be the samples, rest zeros
472        assert_eq!(batch[0], 1.0);
473        assert_eq!(batch[4], 5.0);
474        assert_eq!(batch[5], 0.0);
475    }
476
477    #[test]
478    fn test_signal_queue_overrun() {
479        let queue = SignalQueue::new(10, 5);
480
481        let samples = vec![1.0; 15]; // More than capacity
482        queue.write_samples(&samples).unwrap();
483
484        assert!(queue.overrun_count() > 0);
485    }
486
487    #[test]
488    fn test_lockfree_ring_buffer() {
489        let mut buffer = LockFreeRingBuffer::new(5);
490
491        buffer.write(1);
492        buffer.write(2);
493        buffer.write(3);
494
495        assert_eq!(buffer.available(), 3);
496
497        assert_eq!(buffer.read(), Some(1));
498        assert_eq!(buffer.read(), Some(2));
499        assert_eq!(buffer.read(), Some(3));
500        assert_eq!(buffer.read(), None);
501    }
502
503    #[test]
504    fn test_lockfree_ring_buffer_overwrite() {
505        let mut buffer = LockFreeRingBuffer::new(3);
506
507        buffer.write(1);
508        buffer.write(2);
509        buffer.write(3);
510        buffer.write(4); // Overwrites, advances read pointer
511
512        assert!(buffer.was_overwritten());
513
514        // After overwrite, buffer should contain last 2 items written
515        // because when write catches up to read, read advances
516        let first = buffer.read();
517        let second = buffer.read();
518
519        // Should be able to read 2 items (exact values depend on implementation)
520        assert!(first.is_some());
521        assert!(second.is_some());
522    }
523}