kizzasi_io/
stream.rs

1//! Signal stream abstractions
2
3use crate::error::IoResult;
4use scirs2_core::ndarray::Array1;
5use serde::{Deserialize, Serialize};
6use std::time::Duration;
7
8/// Configuration for signal streams
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct StreamConfig {
11    /// Sample rate in Hz
12    pub sample_rate: f32,
13    /// Number of channels
14    pub channels: usize,
15    /// Buffer size
16    pub buffer_size: usize,
17    /// Read timeout
18    pub timeout: Option<Duration>,
19}
20
21impl Default for StreamConfig {
22    fn default() -> Self {
23        Self {
24            sample_rate: 44100.0,
25            channels: 1,
26            buffer_size: 1024,
27            timeout: Some(Duration::from_secs(5)),
28        }
29    }
30}
31
32impl StreamConfig {
33    /// Create a new stream configuration
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Set the sample rate
39    pub fn sample_rate(mut self, rate: f32) -> Self {
40        self.sample_rate = rate;
41        self
42    }
43
44    /// Set the number of channels
45    pub fn channels(mut self, n: usize) -> Self {
46        self.channels = n;
47        self
48    }
49
50    /// Set the buffer size
51    pub fn buffer_size(mut self, size: usize) -> Self {
52        self.buffer_size = size;
53        self
54    }
55
56    /// Set the timeout
57    pub fn timeout(mut self, timeout: Duration) -> Self {
58        self.timeout = Some(timeout);
59        self
60    }
61}
62
63/// Trait for signal streams
64///
65/// Note: Not all stream implementations are Send (e.g., AudioInput with cpal::Stream).
66/// Use `SendableSignalStream` when Send is required.
67pub trait SignalStream {
68    /// Read the next signal buffer
69    fn read(&mut self) -> IoResult<Array1<f32>>;
70
71    /// Check if stream is still active
72    fn is_active(&self) -> bool;
73
74    /// Get stream configuration
75    fn config(&self) -> &StreamConfig;
76
77    /// Close the stream
78    fn close(&mut self) -> IoResult<()>;
79}
80
81/// Async trait for signal streams
82///
83/// Provides async I/O for signal streams, enabling non-blocking reads
84/// and better integration with async runtimes like Tokio.
85#[async_trait::async_trait]
86pub trait AsyncSignalStream: Send {
87    /// Read the next signal buffer asynchronously
88    async fn read(&mut self) -> IoResult<Array1<f32>>;
89
90    /// Check if stream is still active
91    fn is_active(&self) -> bool;
92
93    /// Get stream configuration
94    fn config(&self) -> &StreamConfig;
95
96    /// Close the stream asynchronously
97    async fn close(&mut self) -> IoResult<()>;
98}
99
100/// In-memory signal stream for testing
101#[derive(Debug)]
102pub struct MemoryStream {
103    config: StreamConfig,
104    data: Vec<f32>,
105    position: usize,
106    active: bool,
107}
108
109impl MemoryStream {
110    /// Create a new memory stream from data
111    pub fn new(data: Vec<f32>, config: StreamConfig) -> Self {
112        Self {
113            config,
114            data,
115            position: 0,
116            active: true,
117        }
118    }
119
120    /// Create from an array
121    pub fn from_array(data: Array1<f32>, config: StreamConfig) -> Self {
122        Self::new(data.to_vec(), config)
123    }
124}
125
126impl SignalStream for MemoryStream {
127    fn read(&mut self) -> IoResult<Array1<f32>> {
128        if !self.active || self.position >= self.data.len() {
129            self.active = false;
130            return Ok(Array1::zeros(self.config.buffer_size));
131        }
132
133        let end = (self.position + self.config.buffer_size).min(self.data.len());
134        let mut buffer = vec![0.0; self.config.buffer_size];
135
136        for (i, val) in self.data[self.position..end].iter().enumerate() {
137            buffer[i] = *val;
138        }
139
140        self.position = end;
141        Ok(Array1::from_vec(buffer))
142    }
143
144    fn is_active(&self) -> bool {
145        self.active && self.position < self.data.len()
146    }
147
148    fn config(&self) -> &StreamConfig {
149        &self.config
150    }
151
152    fn close(&mut self) -> IoResult<()> {
153        self.active = false;
154        Ok(())
155    }
156}
157
158/// Ring buffer for real-time signal processing
159///
160/// A lock-free single-producer single-consumer ring buffer optimized for
161/// real-time audio and sensor data. Provides O(1) push/pop operations
162/// with minimal allocation.
163#[derive(Debug)]
164pub struct RingBuffer<T> {
165    data: Vec<T>,
166    capacity: usize,
167    read_pos: usize,
168    write_pos: usize,
169    len: usize,
170}
171
172impl<T: Clone + Default> RingBuffer<T> {
173    /// Create a new ring buffer with given capacity
174    pub fn new(capacity: usize) -> Self {
175        let capacity = capacity.max(1);
176        Self {
177            data: vec![T::default(); capacity],
178            capacity,
179            read_pos: 0,
180            write_pos: 0,
181            len: 0,
182        }
183    }
184
185    /// Push an element, overwriting oldest if full
186    pub fn push(&mut self, value: T) {
187        self.data[self.write_pos] = value;
188        self.write_pos = (self.write_pos + 1) % self.capacity;
189
190        if self.len < self.capacity {
191            self.len += 1;
192        } else {
193            // Buffer was full, advance read position
194            self.read_pos = (self.read_pos + 1) % self.capacity;
195        }
196    }
197
198    /// Push multiple elements
199    pub fn push_slice(&mut self, values: &[T]) {
200        for val in values {
201            self.push(val.clone());
202        }
203    }
204
205    /// Pop the oldest element
206    pub fn pop(&mut self) -> Option<T> {
207        if self.len == 0 {
208            return None;
209        }
210
211        let value = self.data[self.read_pos].clone();
212        self.read_pos = (self.read_pos + 1) % self.capacity;
213        self.len -= 1;
214        Some(value)
215    }
216
217    /// Peek at the oldest element without removing
218    pub fn peek(&self) -> Option<&T> {
219        if self.len == 0 {
220            None
221        } else {
222            Some(&self.data[self.read_pos])
223        }
224    }
225
226    /// Peek at the newest element
227    pub fn peek_back(&self) -> Option<&T> {
228        if self.len == 0 {
229            None
230        } else {
231            let idx = if self.write_pos == 0 {
232                self.capacity - 1
233            } else {
234                self.write_pos - 1
235            };
236            Some(&self.data[idx])
237        }
238    }
239
240    /// Get element at index (0 = oldest)
241    pub fn get(&self, index: usize) -> Option<&T> {
242        if index >= self.len {
243            return None;
244        }
245        let idx = (self.read_pos + index) % self.capacity;
246        Some(&self.data[idx])
247    }
248
249    /// Current number of elements
250    pub fn len(&self) -> usize {
251        self.len
252    }
253
254    /// Is the buffer empty?
255    pub fn is_empty(&self) -> bool {
256        self.len == 0
257    }
258
259    /// Is the buffer full?
260    pub fn is_full(&self) -> bool {
261        self.len == self.capacity
262    }
263
264    /// Clear all elements
265    pub fn clear(&mut self) {
266        self.read_pos = 0;
267        self.write_pos = 0;
268        self.len = 0;
269    }
270
271    /// Get capacity
272    pub fn capacity(&self) -> usize {
273        self.capacity
274    }
275
276    /// Available space for writing
277    pub fn available(&self) -> usize {
278        self.capacity - self.len
279    }
280
281    /// Read into a slice, returns number read
282    pub fn read_into(&mut self, buffer: &mut [T]) -> usize {
283        let count = buffer.len().min(self.len);
284        for (i, slot) in buffer.iter_mut().enumerate().take(count) {
285            if let Some(val) = self.pop() {
286                *slot = val;
287            } else {
288                return i;
289            }
290        }
291        count
292    }
293
294    /// Copy contents to a Vec without modifying buffer
295    pub fn to_vec(&self) -> Vec<T> {
296        let mut result = Vec::with_capacity(self.len);
297        for i in 0..self.len {
298            let idx = (self.read_pos + i) % self.capacity;
299            result.push(self.data[idx].clone());
300        }
301        result
302    }
303}
304
305/// Ring buffer iterator
306pub struct RingBufferIter<'a, T> {
307    buffer: &'a RingBuffer<T>,
308    index: usize,
309}
310
311impl<T: Clone + Default> RingBuffer<T> {
312    /// Iterate over elements (oldest to newest)
313    pub fn iter(&self) -> RingBufferIter<'_, T> {
314        RingBufferIter {
315            buffer: self,
316            index: 0,
317        }
318    }
319}
320
321impl<'a, T: Clone + Default> Iterator for RingBufferIter<'a, T> {
322    type Item = &'a T;
323
324    fn next(&mut self) -> Option<Self::Item> {
325        if self.index >= self.buffer.len() {
326            None
327        } else {
328            let result = self.buffer.get(self.index);
329            self.index += 1;
330            result
331        }
332    }
333
334    fn size_hint(&self) -> (usize, Option<usize>) {
335        let remaining = self.buffer.len() - self.index;
336        (remaining, Some(remaining))
337    }
338}
339
340impl<'a, T: Clone + Default> ExactSizeIterator for RingBufferIter<'a, T> {}
341
342/// Float ring buffer with signal processing operations
343#[derive(Debug)]
344pub struct SignalRingBuffer {
345    buffer: RingBuffer<f32>,
346}
347
348impl SignalRingBuffer {
349    /// Create a new signal ring buffer
350    pub fn new(capacity: usize) -> Self {
351        Self {
352            buffer: RingBuffer::new(capacity),
353        }
354    }
355
356    /// Push a sample
357    pub fn push(&mut self, sample: f32) {
358        self.buffer.push(sample);
359    }
360
361    /// Push multiple samples
362    pub fn push_slice(&mut self, samples: &[f32]) {
363        self.buffer.push_slice(samples);
364    }
365
366    /// Pop oldest sample
367    pub fn pop(&mut self) -> Option<f32> {
368        self.buffer.pop()
369    }
370
371    /// Compute mean of buffered samples
372    pub fn mean(&self) -> f32 {
373        if self.buffer.is_empty() {
374            return 0.0;
375        }
376        let sum: f32 = self.buffer.iter().sum();
377        sum / self.buffer.len() as f32
378    }
379
380    /// Compute variance
381    pub fn variance(&self) -> f32 {
382        if self.buffer.len() < 2 {
383            return 0.0;
384        }
385        let mean = self.mean();
386        let sum_sq: f32 = self.buffer.iter().map(|x| (x - mean).powi(2)).sum();
387        sum_sq / (self.buffer.len() - 1) as f32
388    }
389
390    /// Compute standard deviation
391    pub fn std(&self) -> f32 {
392        self.variance().sqrt()
393    }
394
395    /// Get min value
396    pub fn min(&self) -> Option<f32> {
397        self.buffer.iter().cloned().reduce(f32::min)
398    }
399
400    /// Get max value
401    pub fn max(&self) -> Option<f32> {
402        self.buffer.iter().cloned().reduce(f32::max)
403    }
404
405    /// Compute RMS (root mean square)
406    pub fn rms(&self) -> f32 {
407        if self.buffer.is_empty() {
408            return 0.0;
409        }
410        let sum_sq: f32 = self.buffer.iter().map(|x| x * x).sum();
411        (sum_sq / self.buffer.len() as f32).sqrt()
412    }
413
414    /// Get peak-to-peak amplitude
415    pub fn peak_to_peak(&self) -> f32 {
416        match (self.min(), self.max()) {
417            (Some(min), Some(max)) => max - min,
418            _ => 0.0,
419        }
420    }
421
422    /// Compute zero-crossing rate
423    pub fn zero_crossing_rate(&self) -> f32 {
424        if self.buffer.len() < 2 {
425            return 0.0;
426        }
427        let mut crossings = 0usize;
428        let mut prev = *self.buffer.peek().unwrap_or(&0.0);
429        for sample in self.buffer.iter().skip(1) {
430            if (prev >= 0.0 && *sample < 0.0) || (prev < 0.0 && *sample >= 0.0) {
431                crossings += 1;
432            }
433            prev = *sample;
434        }
435        crossings as f32 / (self.buffer.len() - 1) as f32
436    }
437
438    /// Get current length
439    pub fn len(&self) -> usize {
440        self.buffer.len()
441    }
442
443    /// Is empty?
444    pub fn is_empty(&self) -> bool {
445        self.buffer.is_empty()
446    }
447
448    /// Is full?
449    pub fn is_full(&self) -> bool {
450        self.buffer.is_full()
451    }
452
453    /// Clear buffer
454    pub fn clear(&mut self) {
455        self.buffer.clear();
456    }
457
458    /// Get capacity
459    pub fn capacity(&self) -> usize {
460        self.buffer.capacity()
461    }
462
463    /// Convert to array
464    pub fn to_array(&self) -> Array1<f32> {
465        Array1::from_vec(self.buffer.to_vec())
466    }
467
468    /// Iterate over samples
469    pub fn iter(&self) -> RingBufferIter<'_, f32> {
470        self.buffer.iter()
471    }
472}
473
474// ============================================================================
475// Async Stream Implementations
476// ============================================================================
477
478/// Async in-memory signal stream for testing
479#[derive(Debug)]
480pub struct AsyncMemoryStream {
481    config: StreamConfig,
482    data: Vec<f32>,
483    position: usize,
484    active: bool,
485}
486
487impl AsyncMemoryStream {
488    /// Create a new async memory stream from data
489    pub fn new(data: Vec<f32>, config: StreamConfig) -> Self {
490        Self {
491            config,
492            data,
493            position: 0,
494            active: true,
495        }
496    }
497
498    /// Create from an array
499    pub fn from_array(data: Array1<f32>, config: StreamConfig) -> Self {
500        Self::new(data.to_vec(), config)
501    }
502}
503
504#[async_trait::async_trait]
505impl AsyncSignalStream for AsyncMemoryStream {
506    async fn read(&mut self) -> IoResult<Array1<f32>> {
507        // Simulate async I/O delay
508        tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
509
510        if !self.active || self.position >= self.data.len() {
511            self.active = false;
512            return Ok(Array1::zeros(self.config.buffer_size));
513        }
514
515        let end = (self.position + self.config.buffer_size).min(self.data.len());
516        let mut buffer = vec![0.0; self.config.buffer_size];
517
518        for (i, val) in self.data[self.position..end].iter().enumerate() {
519            buffer[i] = *val;
520        }
521
522        self.position = end;
523        Ok(Array1::from_vec(buffer))
524    }
525
526    fn is_active(&self) -> bool {
527        self.active && self.position < self.data.len()
528    }
529
530    fn config(&self) -> &StreamConfig {
531        &self.config
532    }
533
534    async fn close(&mut self) -> IoResult<()> {
535        self.active = false;
536        Ok(())
537    }
538}
539
540/// Async channel-based stream adapter
541///
542/// Wraps a tokio channel receiver to provide async stream interface
543pub struct ChannelStream {
544    config: StreamConfig,
545    receiver: tokio::sync::mpsc::Receiver<Vec<f32>>,
546    active: bool,
547}
548
549impl ChannelStream {
550    /// Create a new channel stream
551    pub fn new(config: StreamConfig, receiver: tokio::sync::mpsc::Receiver<Vec<f32>>) -> Self {
552        Self {
553            config,
554            receiver,
555            active: true,
556        }
557    }
558}
559
560#[async_trait::async_trait]
561impl AsyncSignalStream for ChannelStream {
562    async fn read(&mut self) -> IoResult<Array1<f32>> {
563        if !self.active {
564            return Ok(Array1::zeros(self.config.buffer_size));
565        }
566
567        match self.receiver.recv().await {
568            Some(data) => {
569                let mut buffer = vec![0.0; self.config.buffer_size];
570                let copy_len = data.len().min(self.config.buffer_size);
571                buffer[..copy_len].copy_from_slice(&data[..copy_len]);
572                Ok(Array1::from_vec(buffer))
573            }
574            None => {
575                self.active = false;
576                Ok(Array1::zeros(self.config.buffer_size))
577            }
578        }
579    }
580
581    fn is_active(&self) -> bool {
582        self.active
583    }
584
585    fn config(&self) -> &StreamConfig {
586        &self.config
587    }
588
589    async fn close(&mut self) -> IoResult<()> {
590        self.active = false;
591        self.receiver.close();
592        Ok(())
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599
600    #[test]
601    fn test_memory_stream() {
602        let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
603        let config = StreamConfig::new().buffer_size(4);
604        let mut stream = MemoryStream::new(data, config);
605
606        assert!(stream.is_active());
607
608        let buf1 = stream.read().unwrap();
609        assert_eq!(buf1[0], 1.0);
610        assert_eq!(buf1[3], 4.0);
611
612        let buf2 = stream.read().unwrap();
613        assert_eq!(buf2[0], 5.0);
614
615        assert!(!stream.is_active());
616    }
617
618    #[test]
619    fn test_ring_buffer_basic() {
620        let mut buf: RingBuffer<i32> = RingBuffer::new(4);
621        assert!(buf.is_empty());
622        assert_eq!(buf.capacity(), 4);
623
624        buf.push(1);
625        buf.push(2);
626        buf.push(3);
627        assert_eq!(buf.len(), 3);
628        assert!(!buf.is_full());
629
630        assert_eq!(buf.pop(), Some(1));
631        assert_eq!(buf.pop(), Some(2));
632        assert_eq!(buf.len(), 1);
633    }
634
635    #[test]
636    fn test_ring_buffer_overwrite() {
637        let mut buf: RingBuffer<i32> = RingBuffer::new(3);
638        buf.push(1);
639        buf.push(2);
640        buf.push(3);
641        assert!(buf.is_full());
642
643        // Push overwrites oldest
644        buf.push(4);
645        assert_eq!(buf.len(), 3);
646        assert_eq!(buf.pop(), Some(2)); // 1 was overwritten
647        assert_eq!(buf.pop(), Some(3));
648        assert_eq!(buf.pop(), Some(4));
649    }
650
651    #[test]
652    fn test_ring_buffer_peek() {
653        let mut buf: RingBuffer<i32> = RingBuffer::new(4);
654        buf.push(10);
655        buf.push(20);
656        buf.push(30);
657
658        assert_eq!(buf.peek(), Some(&10));
659        assert_eq!(buf.peek_back(), Some(&30));
660        assert_eq!(buf.get(1), Some(&20));
661    }
662
663    #[test]
664    fn test_ring_buffer_iter() {
665        let mut buf: RingBuffer<i32> = RingBuffer::new(4);
666        buf.push(1);
667        buf.push(2);
668        buf.push(3);
669
670        let collected: Vec<_> = buf.iter().cloned().collect();
671        assert_eq!(collected, vec![1, 2, 3]);
672    }
673
674    #[test]
675    fn test_signal_ring_buffer_stats() {
676        let mut buf = SignalRingBuffer::new(5);
677        buf.push_slice(&[1.0, 2.0, 3.0, 4.0, 5.0]);
678
679        assert!((buf.mean() - 3.0).abs() < 0.01);
680        assert!(buf.min() == Some(1.0));
681        assert!(buf.max() == Some(5.0));
682        assert!((buf.peak_to_peak() - 4.0).abs() < 0.01);
683    }
684
685    #[test]
686    fn test_signal_ring_buffer_rms() {
687        let mut buf = SignalRingBuffer::new(4);
688        buf.push_slice(&[1.0, 1.0, 1.0, 1.0]);
689        assert!((buf.rms() - 1.0).abs() < 0.01);
690
691        buf.clear();
692        buf.push_slice(&[3.0, 4.0]); // sqrt((9+16)/2) = sqrt(12.5) ≈ 3.54
693        assert!((buf.rms() - 3.536).abs() < 0.01);
694    }
695
696    #[test]
697    fn test_signal_ring_buffer_zero_crossing() {
698        let mut buf = SignalRingBuffer::new(10);
699        // Sine-like: positive, negative, positive
700        buf.push_slice(&[1.0, 0.5, -0.5, -1.0, -0.5, 0.5, 1.0]);
701        let zcr = buf.zero_crossing_rate();
702        // 2 crossings in 6 transitions = 0.333
703        assert!((zcr - 0.333).abs() < 0.01);
704    }
705}