1use crate::error::IoResult;
4use scirs2_core::ndarray::Array1;
5use serde::{Deserialize, Serialize};
6use std::time::Duration;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct StreamConfig {
11 pub sample_rate: f32,
13 pub channels: usize,
15 pub buffer_size: usize,
17 pub timeout: Option<Duration>,
19}
20
21impl Default for StreamConfig {
22 fn default() -> Self {
23 Self {
24 sample_rate: 44100.0,
25 channels: 1,
26 buffer_size: 1024,
27 timeout: Some(Duration::from_secs(5)),
28 }
29 }
30}
31
32impl StreamConfig {
33 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn sample_rate(mut self, rate: f32) -> Self {
40 self.sample_rate = rate;
41 self
42 }
43
44 pub fn channels(mut self, n: usize) -> Self {
46 self.channels = n;
47 self
48 }
49
50 pub fn buffer_size(mut self, size: usize) -> Self {
52 self.buffer_size = size;
53 self
54 }
55
56 pub fn timeout(mut self, timeout: Duration) -> Self {
58 self.timeout = Some(timeout);
59 self
60 }
61}
62
63pub trait SignalStream {
68 fn read(&mut self) -> IoResult<Array1<f32>>;
70
71 fn is_active(&self) -> bool;
73
74 fn config(&self) -> &StreamConfig;
76
77 fn close(&mut self) -> IoResult<()>;
79}
80
81#[async_trait::async_trait]
86pub trait AsyncSignalStream: Send {
87 async fn read(&mut self) -> IoResult<Array1<f32>>;
89
90 fn is_active(&self) -> bool;
92
93 fn config(&self) -> &StreamConfig;
95
96 async fn close(&mut self) -> IoResult<()>;
98}
99
100#[derive(Debug)]
102pub struct MemoryStream {
103 config: StreamConfig,
104 data: Vec<f32>,
105 position: usize,
106 active: bool,
107}
108
109impl MemoryStream {
110 pub fn new(data: Vec<f32>, config: StreamConfig) -> Self {
112 Self {
113 config,
114 data,
115 position: 0,
116 active: true,
117 }
118 }
119
120 pub fn from_array(data: Array1<f32>, config: StreamConfig) -> Self {
122 Self::new(data.to_vec(), config)
123 }
124}
125
126impl SignalStream for MemoryStream {
127 fn read(&mut self) -> IoResult<Array1<f32>> {
128 if !self.active || self.position >= self.data.len() {
129 self.active = false;
130 return Ok(Array1::zeros(self.config.buffer_size));
131 }
132
133 let end = (self.position + self.config.buffer_size).min(self.data.len());
134 let mut buffer = vec![0.0; self.config.buffer_size];
135
136 for (i, val) in self.data[self.position..end].iter().enumerate() {
137 buffer[i] = *val;
138 }
139
140 self.position = end;
141 Ok(Array1::from_vec(buffer))
142 }
143
144 fn is_active(&self) -> bool {
145 self.active && self.position < self.data.len()
146 }
147
148 fn config(&self) -> &StreamConfig {
149 &self.config
150 }
151
152 fn close(&mut self) -> IoResult<()> {
153 self.active = false;
154 Ok(())
155 }
156}
157
158#[derive(Debug)]
164pub struct RingBuffer<T> {
165 data: Vec<T>,
166 capacity: usize,
167 read_pos: usize,
168 write_pos: usize,
169 len: usize,
170}
171
172impl<T: Clone + Default> RingBuffer<T> {
173 pub fn new(capacity: usize) -> Self {
175 let capacity = capacity.max(1);
176 Self {
177 data: vec![T::default(); capacity],
178 capacity,
179 read_pos: 0,
180 write_pos: 0,
181 len: 0,
182 }
183 }
184
185 pub fn push(&mut self, value: T) {
187 self.data[self.write_pos] = value;
188 self.write_pos = (self.write_pos + 1) % self.capacity;
189
190 if self.len < self.capacity {
191 self.len += 1;
192 } else {
193 self.read_pos = (self.read_pos + 1) % self.capacity;
195 }
196 }
197
198 pub fn push_slice(&mut self, values: &[T]) {
200 for val in values {
201 self.push(val.clone());
202 }
203 }
204
205 pub fn pop(&mut self) -> Option<T> {
207 if self.len == 0 {
208 return None;
209 }
210
211 let value = self.data[self.read_pos].clone();
212 self.read_pos = (self.read_pos + 1) % self.capacity;
213 self.len -= 1;
214 Some(value)
215 }
216
217 pub fn peek(&self) -> Option<&T> {
219 if self.len == 0 {
220 None
221 } else {
222 Some(&self.data[self.read_pos])
223 }
224 }
225
226 pub fn peek_back(&self) -> Option<&T> {
228 if self.len == 0 {
229 None
230 } else {
231 let idx = if self.write_pos == 0 {
232 self.capacity - 1
233 } else {
234 self.write_pos - 1
235 };
236 Some(&self.data[idx])
237 }
238 }
239
240 pub fn get(&self, index: usize) -> Option<&T> {
242 if index >= self.len {
243 return None;
244 }
245 let idx = (self.read_pos + index) % self.capacity;
246 Some(&self.data[idx])
247 }
248
249 pub fn len(&self) -> usize {
251 self.len
252 }
253
254 pub fn is_empty(&self) -> bool {
256 self.len == 0
257 }
258
259 pub fn is_full(&self) -> bool {
261 self.len == self.capacity
262 }
263
264 pub fn clear(&mut self) {
266 self.read_pos = 0;
267 self.write_pos = 0;
268 self.len = 0;
269 }
270
271 pub fn capacity(&self) -> usize {
273 self.capacity
274 }
275
276 pub fn available(&self) -> usize {
278 self.capacity - self.len
279 }
280
281 pub fn read_into(&mut self, buffer: &mut [T]) -> usize {
283 let count = buffer.len().min(self.len);
284 for (i, slot) in buffer.iter_mut().enumerate().take(count) {
285 if let Some(val) = self.pop() {
286 *slot = val;
287 } else {
288 return i;
289 }
290 }
291 count
292 }
293
294 pub fn to_vec(&self) -> Vec<T> {
296 let mut result = Vec::with_capacity(self.len);
297 for i in 0..self.len {
298 let idx = (self.read_pos + i) % self.capacity;
299 result.push(self.data[idx].clone());
300 }
301 result
302 }
303}
304
305pub struct RingBufferIter<'a, T> {
307 buffer: &'a RingBuffer<T>,
308 index: usize,
309}
310
311impl<T: Clone + Default> RingBuffer<T> {
312 pub fn iter(&self) -> RingBufferIter<'_, T> {
314 RingBufferIter {
315 buffer: self,
316 index: 0,
317 }
318 }
319}
320
321impl<'a, T: Clone + Default> Iterator for RingBufferIter<'a, T> {
322 type Item = &'a T;
323
324 fn next(&mut self) -> Option<Self::Item> {
325 if self.index >= self.buffer.len() {
326 None
327 } else {
328 let result = self.buffer.get(self.index);
329 self.index += 1;
330 result
331 }
332 }
333
334 fn size_hint(&self) -> (usize, Option<usize>) {
335 let remaining = self.buffer.len() - self.index;
336 (remaining, Some(remaining))
337 }
338}
339
340impl<'a, T: Clone + Default> ExactSizeIterator for RingBufferIter<'a, T> {}
341
342#[derive(Debug)]
344pub struct SignalRingBuffer {
345 buffer: RingBuffer<f32>,
346}
347
348impl SignalRingBuffer {
349 pub fn new(capacity: usize) -> Self {
351 Self {
352 buffer: RingBuffer::new(capacity),
353 }
354 }
355
356 pub fn push(&mut self, sample: f32) {
358 self.buffer.push(sample);
359 }
360
361 pub fn push_slice(&mut self, samples: &[f32]) {
363 self.buffer.push_slice(samples);
364 }
365
366 pub fn pop(&mut self) -> Option<f32> {
368 self.buffer.pop()
369 }
370
371 pub fn mean(&self) -> f32 {
373 if self.buffer.is_empty() {
374 return 0.0;
375 }
376 let sum: f32 = self.buffer.iter().sum();
377 sum / self.buffer.len() as f32
378 }
379
380 pub fn variance(&self) -> f32 {
382 if self.buffer.len() < 2 {
383 return 0.0;
384 }
385 let mean = self.mean();
386 let sum_sq: f32 = self.buffer.iter().map(|x| (x - mean).powi(2)).sum();
387 sum_sq / (self.buffer.len() - 1) as f32
388 }
389
390 pub fn std(&self) -> f32 {
392 self.variance().sqrt()
393 }
394
395 pub fn min(&self) -> Option<f32> {
397 self.buffer.iter().cloned().reduce(f32::min)
398 }
399
400 pub fn max(&self) -> Option<f32> {
402 self.buffer.iter().cloned().reduce(f32::max)
403 }
404
405 pub fn rms(&self) -> f32 {
407 if self.buffer.is_empty() {
408 return 0.0;
409 }
410 let sum_sq: f32 = self.buffer.iter().map(|x| x * x).sum();
411 (sum_sq / self.buffer.len() as f32).sqrt()
412 }
413
414 pub fn peak_to_peak(&self) -> f32 {
416 match (self.min(), self.max()) {
417 (Some(min), Some(max)) => max - min,
418 _ => 0.0,
419 }
420 }
421
422 pub fn zero_crossing_rate(&self) -> f32 {
424 if self.buffer.len() < 2 {
425 return 0.0;
426 }
427 let mut crossings = 0usize;
428 let mut prev = *self.buffer.peek().unwrap_or(&0.0);
429 for sample in self.buffer.iter().skip(1) {
430 if (prev >= 0.0 && *sample < 0.0) || (prev < 0.0 && *sample >= 0.0) {
431 crossings += 1;
432 }
433 prev = *sample;
434 }
435 crossings as f32 / (self.buffer.len() - 1) as f32
436 }
437
438 pub fn len(&self) -> usize {
440 self.buffer.len()
441 }
442
443 pub fn is_empty(&self) -> bool {
445 self.buffer.is_empty()
446 }
447
448 pub fn is_full(&self) -> bool {
450 self.buffer.is_full()
451 }
452
453 pub fn clear(&mut self) {
455 self.buffer.clear();
456 }
457
458 pub fn capacity(&self) -> usize {
460 self.buffer.capacity()
461 }
462
463 pub fn to_array(&self) -> Array1<f32> {
465 Array1::from_vec(self.buffer.to_vec())
466 }
467
468 pub fn iter(&self) -> RingBufferIter<'_, f32> {
470 self.buffer.iter()
471 }
472}
473
474#[derive(Debug)]
480pub struct AsyncMemoryStream {
481 config: StreamConfig,
482 data: Vec<f32>,
483 position: usize,
484 active: bool,
485}
486
487impl AsyncMemoryStream {
488 pub fn new(data: Vec<f32>, config: StreamConfig) -> Self {
490 Self {
491 config,
492 data,
493 position: 0,
494 active: true,
495 }
496 }
497
498 pub fn from_array(data: Array1<f32>, config: StreamConfig) -> Self {
500 Self::new(data.to_vec(), config)
501 }
502}
503
504#[async_trait::async_trait]
505impl AsyncSignalStream for AsyncMemoryStream {
506 async fn read(&mut self) -> IoResult<Array1<f32>> {
507 tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
509
510 if !self.active || self.position >= self.data.len() {
511 self.active = false;
512 return Ok(Array1::zeros(self.config.buffer_size));
513 }
514
515 let end = (self.position + self.config.buffer_size).min(self.data.len());
516 let mut buffer = vec![0.0; self.config.buffer_size];
517
518 for (i, val) in self.data[self.position..end].iter().enumerate() {
519 buffer[i] = *val;
520 }
521
522 self.position = end;
523 Ok(Array1::from_vec(buffer))
524 }
525
526 fn is_active(&self) -> bool {
527 self.active && self.position < self.data.len()
528 }
529
530 fn config(&self) -> &StreamConfig {
531 &self.config
532 }
533
534 async fn close(&mut self) -> IoResult<()> {
535 self.active = false;
536 Ok(())
537 }
538}
539
540pub struct ChannelStream {
544 config: StreamConfig,
545 receiver: tokio::sync::mpsc::Receiver<Vec<f32>>,
546 active: bool,
547}
548
549impl ChannelStream {
550 pub fn new(config: StreamConfig, receiver: tokio::sync::mpsc::Receiver<Vec<f32>>) -> Self {
552 Self {
553 config,
554 receiver,
555 active: true,
556 }
557 }
558}
559
560#[async_trait::async_trait]
561impl AsyncSignalStream for ChannelStream {
562 async fn read(&mut self) -> IoResult<Array1<f32>> {
563 if !self.active {
564 return Ok(Array1::zeros(self.config.buffer_size));
565 }
566
567 match self.receiver.recv().await {
568 Some(data) => {
569 let mut buffer = vec![0.0; self.config.buffer_size];
570 let copy_len = data.len().min(self.config.buffer_size);
571 buffer[..copy_len].copy_from_slice(&data[..copy_len]);
572 Ok(Array1::from_vec(buffer))
573 }
574 None => {
575 self.active = false;
576 Ok(Array1::zeros(self.config.buffer_size))
577 }
578 }
579 }
580
581 fn is_active(&self) -> bool {
582 self.active
583 }
584
585 fn config(&self) -> &StreamConfig {
586 &self.config
587 }
588
589 async fn close(&mut self) -> IoResult<()> {
590 self.active = false;
591 self.receiver.close();
592 Ok(())
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599
600 #[test]
601 fn test_memory_stream() {
602 let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
603 let config = StreamConfig::new().buffer_size(4);
604 let mut stream = MemoryStream::new(data, config);
605
606 assert!(stream.is_active());
607
608 let buf1 = stream.read().unwrap();
609 assert_eq!(buf1[0], 1.0);
610 assert_eq!(buf1[3], 4.0);
611
612 let buf2 = stream.read().unwrap();
613 assert_eq!(buf2[0], 5.0);
614
615 assert!(!stream.is_active());
616 }
617
618 #[test]
619 fn test_ring_buffer_basic() {
620 let mut buf: RingBuffer<i32> = RingBuffer::new(4);
621 assert!(buf.is_empty());
622 assert_eq!(buf.capacity(), 4);
623
624 buf.push(1);
625 buf.push(2);
626 buf.push(3);
627 assert_eq!(buf.len(), 3);
628 assert!(!buf.is_full());
629
630 assert_eq!(buf.pop(), Some(1));
631 assert_eq!(buf.pop(), Some(2));
632 assert_eq!(buf.len(), 1);
633 }
634
635 #[test]
636 fn test_ring_buffer_overwrite() {
637 let mut buf: RingBuffer<i32> = RingBuffer::new(3);
638 buf.push(1);
639 buf.push(2);
640 buf.push(3);
641 assert!(buf.is_full());
642
643 buf.push(4);
645 assert_eq!(buf.len(), 3);
646 assert_eq!(buf.pop(), Some(2)); assert_eq!(buf.pop(), Some(3));
648 assert_eq!(buf.pop(), Some(4));
649 }
650
651 #[test]
652 fn test_ring_buffer_peek() {
653 let mut buf: RingBuffer<i32> = RingBuffer::new(4);
654 buf.push(10);
655 buf.push(20);
656 buf.push(30);
657
658 assert_eq!(buf.peek(), Some(&10));
659 assert_eq!(buf.peek_back(), Some(&30));
660 assert_eq!(buf.get(1), Some(&20));
661 }
662
663 #[test]
664 fn test_ring_buffer_iter() {
665 let mut buf: RingBuffer<i32> = RingBuffer::new(4);
666 buf.push(1);
667 buf.push(2);
668 buf.push(3);
669
670 let collected: Vec<_> = buf.iter().cloned().collect();
671 assert_eq!(collected, vec![1, 2, 3]);
672 }
673
674 #[test]
675 fn test_signal_ring_buffer_stats() {
676 let mut buf = SignalRingBuffer::new(5);
677 buf.push_slice(&[1.0, 2.0, 3.0, 4.0, 5.0]);
678
679 assert!((buf.mean() - 3.0).abs() < 0.01);
680 assert!(buf.min() == Some(1.0));
681 assert!(buf.max() == Some(5.0));
682 assert!((buf.peak_to_peak() - 4.0).abs() < 0.01);
683 }
684
685 #[test]
686 fn test_signal_ring_buffer_rms() {
687 let mut buf = SignalRingBuffer::new(4);
688 buf.push_slice(&[1.0, 1.0, 1.0, 1.0]);
689 assert!((buf.rms() - 1.0).abs() < 0.01);
690
691 buf.clear();
692 buf.push_slice(&[3.0, 4.0]); assert!((buf.rms() - 3.536).abs() < 0.01);
694 }
695
696 #[test]
697 fn test_signal_ring_buffer_zero_crossing() {
698 let mut buf = SignalRingBuffer::new(10);
699 buf.push_slice(&[1.0, 0.5, -0.5, -1.0, -0.5, 0.5, 1.0]);
701 let zcr = buf.zero_crossing_rate();
702 assert!((zcr - 0.333).abs() < 0.01);
704 }
705}