1use crate::error::IoResult;
30use crossbeam_queue::{ArrayQueue, SegQueue};
31use scirs2_core::ndarray::Array1;
32use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
33use std::sync::Arc;
34use tracing::debug;
35
36pub struct LockFreeQueue<T> {
40 queue: Arc<ArrayQueue<T>>,
41 dropped_count: Arc<AtomicUsize>,
42}
43
44impl<T> LockFreeQueue<T> {
45 pub fn new(capacity: usize) -> Self {
47 Self {
48 queue: Arc::new(ArrayQueue::new(capacity)),
49 dropped_count: Arc::new(AtomicUsize::new(0)),
50 }
51 }
52
53 pub fn push(&self, item: T) -> Result<(), T> {
57 self.queue.push(item)
58 }
59
60 pub fn try_push(&self, item: T) -> bool {
64 match self.queue.push(item) {
65 Ok(()) => true,
66 Err(_) => {
67 self.dropped_count.fetch_add(1, Ordering::Relaxed);
68 false
69 }
70 }
71 }
72
73 pub fn pop(&self) -> Option<T> {
77 self.queue.pop()
78 }
79
80 pub fn is_empty(&self) -> bool {
82 self.queue.is_empty()
83 }
84
85 pub fn is_full(&self) -> bool {
87 self.queue.is_full()
88 }
89
90 pub fn len(&self) -> usize {
92 self.queue.len()
93 }
94
95 pub fn capacity(&self) -> usize {
97 self.queue.capacity()
98 }
99
100 pub fn dropped_count(&self) -> usize {
102 self.dropped_count.load(Ordering::Relaxed)
103 }
104
105 pub fn reset_dropped_count(&self) {
107 self.dropped_count.store(0, Ordering::Relaxed);
108 }
109
110 pub fn clone_handle(&self) -> Self {
112 Self {
113 queue: Arc::clone(&self.queue),
114 dropped_count: Arc::clone(&self.dropped_count),
115 }
116 }
117}
118
119impl<T: Clone> LockFreeQueue<T> {
120 pub fn pop_all(&self) -> Vec<T> {
122 let mut items = Vec::with_capacity(self.len());
123 while let Some(item) = self.pop() {
124 items.push(item);
125 }
126 items
127 }
128}
129
130pub struct UnboundedQueue<T> {
135 queue: Arc<SegQueue<T>>,
136 item_count: Arc<AtomicUsize>,
137}
138
139impl<T> UnboundedQueue<T> {
140 pub fn new() -> Self {
142 Self {
143 queue: Arc::new(SegQueue::new()),
144 item_count: Arc::new(AtomicUsize::new(0)),
145 }
146 }
147
148 pub fn push(&self, item: T) {
150 self.queue.push(item);
151 self.item_count.fetch_add(1, Ordering::Relaxed);
152 }
153
154 pub fn pop(&self) -> Option<T> {
156 match self.queue.pop() {
157 Some(item) => {
158 self.item_count.fetch_sub(1, Ordering::Relaxed);
159 Some(item)
160 }
161 None => None,
162 }
163 }
164
165 pub fn is_empty(&self) -> bool {
167 self.queue.is_empty()
168 }
169
170 pub fn len(&self) -> usize {
172 self.item_count.load(Ordering::Relaxed)
173 }
174
175 pub fn clone_handle(&self) -> Self {
177 Self {
178 queue: Arc::clone(&self.queue),
179 item_count: Arc::clone(&self.item_count),
180 }
181 }
182}
183
184impl<T> Default for UnboundedQueue<T> {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190impl<T: Clone> UnboundedQueue<T> {
191 pub fn pop_all(&self) -> Vec<T> {
193 let mut items = Vec::new();
194 while let Some(item) = self.pop() {
195 items.push(item);
196 }
197 items
198 }
199}
200
201pub struct SignalQueue {
205 queue: LockFreeQueue<f32>,
206 batch_size: usize,
207 underrun_count: Arc<AtomicUsize>,
208 overrun_count: Arc<AtomicUsize>,
209}
210
211impl SignalQueue {
212 pub fn new(capacity: usize, batch_size: usize) -> Self {
214 Self {
215 queue: LockFreeQueue::new(capacity),
216 batch_size,
217 underrun_count: Arc::new(AtomicUsize::new(0)),
218 overrun_count: Arc::new(AtomicUsize::new(0)),
219 }
220 }
221
222 pub fn write_samples(&self, samples: &[f32]) -> IoResult<()> {
224 let mut overruns = 0;
225 for &sample in samples {
226 if !self.queue.try_push(sample) {
227 overruns += 1;
228 }
229 }
230
231 if overruns > 0 {
232 self.overrun_count.fetch_add(overruns, Ordering::Relaxed);
233 debug!("Signal queue overrun: {} samples dropped", overruns);
234 }
235
236 Ok(())
237 }
238
239 pub fn read_batch(&self) -> IoResult<Array1<f32>> {
241 let available = self.queue.len();
242
243 if available < self.batch_size {
245 self.underrun_count.fetch_add(1, Ordering::Relaxed);
246 }
247
248 let mut samples = Vec::with_capacity(self.batch_size);
250 for _ in 0..self.batch_size {
251 if let Some(sample) = self.queue.pop() {
252 samples.push(sample);
253 } else {
254 break;
255 }
256 }
257
258 while samples.len() < self.batch_size {
260 samples.push(0.0);
261 }
262
263 Ok(Array1::from_vec(samples))
264 }
265
266 pub fn read_all(&self) -> Array1<f32> {
268 let samples = self.queue.pop_all();
269 Array1::from_vec(samples)
270 }
271
272 pub fn level(&self) -> usize {
274 self.queue.len()
275 }
276
277 pub fn capacity(&self) -> usize {
279 self.queue.capacity()
280 }
281
282 pub fn fill_ratio(&self) -> f32 {
284 self.level() as f32 / self.capacity() as f32
285 }
286
287 pub fn underrun_count(&self) -> usize {
289 self.underrun_count.load(Ordering::Relaxed)
290 }
291
292 pub fn overrun_count(&self) -> usize {
294 self.overrun_count.load(Ordering::Relaxed)
295 }
296
297 pub fn reset_stats(&self) {
299 self.underrun_count.store(0, Ordering::Relaxed);
300 self.overrun_count.store(0, Ordering::Relaxed);
301 self.queue.reset_dropped_count();
302 }
303}
304
305pub struct LockFreeRingBuffer<T> {
309 buffer: Vec<Option<T>>,
310 write_pos: Arc<AtomicUsize>,
311 read_pos: Arc<AtomicUsize>,
312 capacity: usize,
313 overwrite_flag: Arc<AtomicBool>,
314}
315
316impl<T: Clone> LockFreeRingBuffer<T> {
317 pub fn new(capacity: usize) -> Self {
319 let mut buffer = Vec::with_capacity(capacity);
320 for _ in 0..capacity {
321 buffer.push(None);
322 }
323
324 Self {
325 buffer,
326 write_pos: Arc::new(AtomicUsize::new(0)),
327 read_pos: Arc::new(AtomicUsize::new(0)),
328 capacity,
329 overwrite_flag: Arc::new(AtomicBool::new(false)),
330 }
331 }
332
333 pub fn write(&mut self, item: T) {
335 let write_idx = self.write_pos.load(Ordering::Acquire);
336 let read_idx = self.read_pos.load(Ordering::Acquire);
337
338 self.buffer[write_idx] = Some(item);
340
341 let next_write = (write_idx + 1) % self.capacity;
343 self.write_pos.store(next_write, Ordering::Release);
344
345 if next_write == read_idx {
347 let next_read = (read_idx + 1) % self.capacity;
349 self.read_pos.store(next_read, Ordering::Release);
350 self.overwrite_flag.store(true, Ordering::Relaxed);
351 }
352 }
353
354 pub fn read(&self) -> Option<T> {
356 let read_idx = self.read_pos.load(Ordering::Acquire);
357 let write_idx = self.write_pos.load(Ordering::Acquire);
358
359 if read_idx == write_idx {
360 return None; }
362
363 let item = self.buffer[read_idx].clone();
364
365 let next_read = (read_idx + 1) % self.capacity;
367 self.read_pos.store(next_read, Ordering::Release);
368
369 item
370 }
371
372 pub fn available(&self) -> usize {
374 let write_idx = self.write_pos.load(Ordering::Acquire);
375 let read_idx = self.read_pos.load(Ordering::Acquire);
376
377 if write_idx >= read_idx {
378 write_idx - read_idx
379 } else {
380 self.capacity - read_idx + write_idx
381 }
382 }
383
384 pub fn was_overwritten(&self) -> bool {
386 self.overwrite_flag.swap(false, Ordering::Relaxed)
387 }
388
389 pub fn capacity(&self) -> usize {
391 self.capacity
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn test_lockfree_queue_basic() {
401 let queue = LockFreeQueue::new(10);
402
403 assert!(queue.is_empty());
404 assert_eq!(queue.capacity(), 10);
405
406 queue.push(1.0).unwrap();
407 queue.push(2.0).unwrap();
408 queue.push(3.0).unwrap();
409
410 assert_eq!(queue.len(), 3);
411 assert!(!queue.is_empty());
412
413 assert_eq!(queue.pop(), Some(1.0));
414 assert_eq!(queue.pop(), Some(2.0));
415 assert_eq!(queue.pop(), Some(3.0));
416 assert_eq!(queue.pop(), None);
417 }
418
419 #[test]
420 fn test_lockfree_queue_full() {
421 let queue = LockFreeQueue::new(3);
422
423 assert!(queue.push(1.0).is_ok());
424 assert!(queue.push(2.0).is_ok());
425 assert!(queue.push(3.0).is_ok());
426 assert!(queue.push(4.0).is_err()); assert!(queue.is_full());
429 }
430
431 #[test]
432 fn test_lockfree_queue_try_push() {
433 let queue = LockFreeQueue::new(2);
434
435 assert!(queue.try_push(1.0));
436 assert!(queue.try_push(2.0));
437 assert!(!queue.try_push(3.0)); assert_eq!(queue.dropped_count(), 1);
440 }
441
442 #[test]
443 fn test_unbounded_queue() {
444 let queue = UnboundedQueue::new();
445
446 for i in 0..1000 {
447 queue.push(i);
448 }
449
450 assert_eq!(queue.len(), 1000);
451
452 for i in 0..1000 {
453 assert_eq!(queue.pop(), Some(i));
454 }
455
456 assert!(queue.is_empty());
457 }
458
459 #[test]
460 fn test_signal_queue() {
461 let queue = SignalQueue::new(100, 10);
462
463 let samples = vec![1.0, 2.0, 3.0, 4.0, 5.0];
464 queue.write_samples(&samples).unwrap();
465
466 assert_eq!(queue.level(), 5);
467
468 let batch = queue.read_batch().unwrap();
469 assert_eq!(batch.len(), 10); assert_eq!(batch[0], 1.0);
473 assert_eq!(batch[4], 5.0);
474 assert_eq!(batch[5], 0.0);
475 }
476
477 #[test]
478 fn test_signal_queue_overrun() {
479 let queue = SignalQueue::new(10, 5);
480
481 let samples = vec![1.0; 15]; queue.write_samples(&samples).unwrap();
483
484 assert!(queue.overrun_count() > 0);
485 }
486
487 #[test]
488 fn test_lockfree_ring_buffer() {
489 let mut buffer = LockFreeRingBuffer::new(5);
490
491 buffer.write(1);
492 buffer.write(2);
493 buffer.write(3);
494
495 assert_eq!(buffer.available(), 3);
496
497 assert_eq!(buffer.read(), Some(1));
498 assert_eq!(buffer.read(), Some(2));
499 assert_eq!(buffer.read(), Some(3));
500 assert_eq!(buffer.read(), None);
501 }
502
503 #[test]
504 fn test_lockfree_ring_buffer_overwrite() {
505 let mut buffer = LockFreeRingBuffer::new(3);
506
507 buffer.write(1);
508 buffer.write(2);
509 buffer.write(3);
510 buffer.write(4); assert!(buffer.was_overwritten());
513
514 let first = buffer.read();
517 let second = buffer.read();
518
519 assert!(first.is_some());
521 assert!(second.is_some());
522 }
523}