1use crate::{
4 realtime::{ProcessingMode, RealtimeConfig, RealtimeConverter},
5 types::{ConversionTarget, ConversionType},
6 Error, Result,
7};
8use async_stream::stream;
9use fastrand;
10use futures::{Stream, StreamExt};
11use std::collections::VecDeque;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use std::time::{Duration, Instant};
16use tokio::sync::{mpsc, Mutex, RwLock};
17use tokio_stream::wrappers::ReceiverStream;
18use tracing::{debug, error, info, warn};
19
20#[derive(Debug)]
22pub struct StreamingConverter {
23 realtime_converter: Arc<Mutex<RealtimeConverter>>,
25 config: StreamConfig,
27 accumulation_buffer: Arc<Mutex<VecDeque<f32>>>,
29 stats: Arc<RwLock<StreamingStats>>,
31 conversion_target: Option<ConversionTarget>,
33 state: StreamState,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum StreamState {
40 Idle,
42 Processing,
44 Paused,
46 Error,
48 Stopped,
50}
51
52impl StreamingConverter {
53 pub fn new(config: StreamConfig) -> Result<Self> {
55 let realtime_config = RealtimeConfig {
56 buffer_size: config.chunk_size,
57 sample_rate: config.sample_rate,
58 target_latency_ms: config.target_latency_ms,
59 overlap_factor: 0.25,
60 adaptive_buffering: config.adaptive_buffering,
61 max_threads: config.max_concurrent_streams.min(4),
62 enable_lookahead: true,
63 lookahead_size: config.chunk_size / 4,
64 };
65
66 let realtime_converter = Arc::new(Mutex::new(RealtimeConverter::new(realtime_config)?));
67 let accumulation_buffer =
68 Arc::new(Mutex::new(VecDeque::with_capacity(config.buffer_capacity)));
69 let stats = Arc::new(RwLock::new(StreamingStats::default()));
70
71 Ok(Self {
72 realtime_converter,
73 config,
74 accumulation_buffer,
75 stats,
76 conversion_target: None,
77 state: StreamState::Idle,
78 })
79 }
80
81 pub fn set_conversion_target(&mut self, target: ConversionTarget) {
83 self.conversion_target = Some(target);
84 }
85
86 pub async fn set_processing_mode(&self, mode: ProcessingMode) {
88 let mut converter = self.realtime_converter.lock().await;
89 converter.set_processing_mode(mode);
90 }
91
92 pub fn state(&self) -> StreamState {
94 self.state
95 }
96
97 pub async fn start(&mut self) -> Result<()> {
99 if self.state != StreamState::Idle && self.state != StreamState::Stopped {
100 return Err(Error::Streaming {
101 message: "Stream is already active".to_string(),
102 stream_info: None,
103 context: None,
104 recovery_suggestions: Box::new(vec![
105 "Stop the current stream before starting a new one".to_string(),
106 "Check stream state before calling start()".to_string(),
107 ]),
108 });
109 }
110
111 self.state = StreamState::Processing;
112 info!("Started streaming converter");
113 Ok(())
114 }
115
116 pub async fn pause(&mut self) -> Result<()> {
118 if self.state != StreamState::Processing {
119 return Err(Error::Streaming {
120 message: "Stream is not processing".to_string(),
121 stream_info: None,
122 context: None,
123 recovery_suggestions: Box::new(vec![
124 "Start the stream before trying to pause".to_string(),
125 "Check stream state before calling pause()".to_string(),
126 ]),
127 });
128 }
129
130 self.state = StreamState::Paused;
131 info!("Paused streaming converter");
132 Ok(())
133 }
134
135 pub async fn stop(&mut self) -> Result<()> {
137 self.state = StreamState::Stopped;
138
139 let mut buffer = self.accumulation_buffer.lock().await;
141 buffer.clear();
142
143 info!("Stopped streaming converter");
144 Ok(())
145 }
146
147 pub async fn process_stream<S>(
149 &mut self,
150 mut stream: S,
151 ) -> Result<impl Stream<Item = Result<Vec<f32>>>>
152 where
153 S: Stream<Item = Vec<f32>> + Unpin + Send + 'static,
154 {
155 if self.state != StreamState::Processing {
156 return Err(Error::streaming(
157 "Stream is not in processing state".to_string(),
158 ));
159 }
160
161 let realtime_converter = self.realtime_converter.clone();
162 let config = self.config.clone();
163 let stats = self.stats.clone();
164 let conversion_target = self.conversion_target.clone();
165
166 let (tx, rx) = mpsc::channel(config.channel_buffer_size);
167
168 tokio::spawn(async move {
170 let mut converter = realtime_converter.lock().await;
171 if let Some(target) = conversion_target {
172 converter.set_conversion_target(target);
173 }
174 drop(converter);
175
176 let mut chunk_count = 0u64;
177 let start_time = Instant::now();
178
179 while let Some(chunk) = stream.next().await {
180 let chunk_start = Instant::now();
181
182 if chunk.is_empty() {
183 continue;
184 }
185
186 let mut converter = realtime_converter.lock().await;
188 match converter.process_chunk(&chunk).await {
189 Ok(processed_chunk) => {
190 if !processed_chunk.is_empty()
191 && tx.send(Ok(processed_chunk)).await.is_err()
192 {
193 warn!("Receiver dropped, stopping processing");
194 break;
195 }
196 }
197 Err(e) => {
198 error!("Error processing chunk: {}", e);
199 if tx.send(Err(e)).await.is_err() {
200 break;
201 }
202 }
203 }
204 drop(converter);
205
206 chunk_count += 1;
208 let chunk_duration = chunk_start.elapsed();
209 let mut stats_guard = stats.write().await;
210 stats_guard.update_chunk_stats(chunk.len(), chunk_duration);
211
212 if chunk_duration > Duration::from_millis(config.target_latency_ms as u64) {
214 warn!(
215 "Processing slower than real-time: {:.2}ms",
216 chunk_duration.as_millis()
217 );
218 }
219 }
220
221 let total_duration = start_time.elapsed();
222 info!(
223 "Processed {} chunks in {:.2}s",
224 chunk_count,
225 total_duration.as_secs_f32()
226 );
227 });
228
229 Ok(ReceiverStream::new(rx))
230 }
231
232 pub async fn process_stream_with_backpressure<S>(
234 &mut self,
235 stream: S,
236 ) -> Result<impl Stream<Item = Result<Vec<f32>>>>
237 where
238 S: Stream<Item = Vec<f32>> + Unpin + Send + 'static,
239 {
240 let throttled_stream = self.throttle_stream(stream).await?;
241 self.process_stream(throttled_stream).await
242 }
243
244 async fn throttle_stream<S>(
246 &self,
247 stream: S,
248 ) -> Result<std::pin::Pin<Box<dyn Stream<Item = Vec<f32>> + Send>>>
249 where
250 S: Stream<Item = Vec<f32>> + Unpin + Send + 'static,
251 {
252 let config = self.config.clone();
253 let throttle_interval = Duration::from_millis(
254 (config.chunk_size as f64 / config.sample_rate as f64 * 1000.0) as u64,
255 );
256
257 Ok(Box::pin(stream! {
258 tokio::pin!(stream);
259
260 let mut last_yield = Instant::now();
261
262 while let Some(chunk) = stream.next().await {
263 let now = Instant::now();
264 let time_since_last = now - last_yield;
265
266 if time_since_last < throttle_interval {
267 let sleep_time = throttle_interval - time_since_last;
268 tokio::time::sleep(sleep_time).await;
269 }
270
271 yield chunk;
272 last_yield = Instant::now();
273 }
274 }))
275 }
276
277 pub async fn get_stats(&self) -> StreamingStats {
279 self.stats.read().await.clone()
280 }
281
282 pub async fn reset_stats(&self) {
284 let mut stats = self.stats.write().await;
285 *stats = StreamingStats::default();
286 }
287
288 pub async fn is_healthy(&self) -> bool {
290 let stats = self.stats.read().await;
291 let avg_latency = stats.average_chunk_latency_ms();
292 avg_latency <= self.config.target_latency_ms * 1.5 }
294
295 pub async fn multiplex_streams<S>(
297 &mut self,
298 streams: Vec<S>,
299 ) -> Result<impl Stream<Item = Result<Vec<f32>>>>
300 where
301 S: Stream<Item = Vec<f32>> + Unpin + Send + 'static,
302 {
303 if streams.is_empty() {
304 return Err(Error::Streaming {
305 message: "No input streams provided".to_string(),
306 stream_info: None,
307 context: None,
308 recovery_suggestions: Box::new(vec![
309 "Provide at least one input stream".to_string(),
310 "Check stream configuration".to_string(),
311 ]),
312 });
313 }
314
315 if streams.len() > self.config.max_concurrent_streams {
316 return Err(Error::Streaming {
317 message: format!(
318 "Too many streams: {} > {}",
319 streams.len(),
320 self.config.max_concurrent_streams
321 ),
322 stream_info: None,
323 context: None,
324 recovery_suggestions: Box::new(vec![
325 "Reduce the number of concurrent streams".to_string(),
326 "Increase max_concurrent_streams configuration".to_string(),
327 ]),
328 });
329 }
330
331 let (tx, rx) = mpsc::channel(self.config.channel_buffer_size);
332 let realtime_converter = self.realtime_converter.clone();
333 let config = self.config.clone();
334
335 for (stream_id, stream) in streams.into_iter().enumerate() {
336 let tx_clone = tx.clone();
337 let converter_clone = realtime_converter.clone();
338 let config_clone = config.clone();
339
340 tokio::spawn(async move {
341 tokio::pin!(stream);
342
343 while let Some(chunk) = stream.next().await {
344 let mut converter = converter_clone.lock().await;
345 match converter.process_chunk(&chunk).await {
346 Ok(processed) => {
347 if !processed.is_empty() && tx_clone.send(Ok(processed)).await.is_err()
348 {
349 debug!("Stream {} receiver dropped", stream_id);
350 break;
351 }
352 }
353 Err(e) => {
354 error!("Stream {} processing error: {}", stream_id, e);
355 if tx_clone.send(Err(e)).await.is_err() {
356 break;
357 }
358 }
359 }
360 }
361 });
362 }
363
364 Ok(ReceiverStream::new(rx))
365 }
366}
367
368#[derive(Debug)]
370pub struct StreamProcessor {
371 config: StreamConfig,
373 converters: Arc<RwLock<Vec<StreamingConverter>>>,
375 load_balancer: LoadBalancer,
377}
378
379#[derive(Debug, Clone)]
381pub struct LoadBalancer {
382 strategy: LoadBalancingStrategy,
384 round_robin_index: usize,
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390pub enum LoadBalancingStrategy {
391 RoundRobin,
393 LeastLoaded,
395 Random,
397}
398
399#[derive(Debug, Clone)]
401pub struct StreamConfig {
402 pub chunk_size: usize,
404 pub sample_rate: u32,
406 pub target_latency_ms: f32,
408 pub buffer_capacity: usize,
410 pub channel_buffer_size: usize,
412 pub max_concurrent_streams: usize,
414 pub adaptive_buffering: bool,
416 pub quality_vs_latency: f32,
418 pub enable_error_recovery: bool,
420 pub stream_timeout_secs: u64,
422}
423
424impl StreamProcessor {
425 pub fn new(config: StreamConfig) -> Self {
427 Self {
428 config,
429 converters: Arc::new(RwLock::new(Vec::new())),
430 load_balancer: LoadBalancer::new(LoadBalancingStrategy::LeastLoaded),
431 }
432 }
433
434 pub async fn with_converter_pool(config: StreamConfig, pool_size: usize) -> Result<Self> {
436 let mut processor = Self::new(config.clone());
437
438 for _ in 0..pool_size {
439 let converter = StreamingConverter::new(config.clone())?;
440 processor.converters.write().await.push(converter);
441 }
442
443 Ok(processor)
444 }
445
446 pub async fn add_converter(&self, converter: StreamingConverter) {
448 self.converters.write().await.push(converter);
449 }
450
451 pub async fn process_stream(&self, audio_stream: AudioStream) -> Result<ProcessedAudioStream> {
453 let converter_index = self.select_converter().await?;
454
455 Ok(ProcessedAudioStream::new(
456 audio_stream,
457 self.config.clone(),
458 converter_index,
459 ))
460 }
461
462 pub async fn process_multiple_streams(
464 &self,
465 streams: Vec<AudioStream>,
466 ) -> Result<Vec<ProcessedAudioStream>> {
467 if streams.len() > self.config.max_concurrent_streams {
468 return Err(Error::Streaming {
469 message: format!(
470 "Too many streams: {} > {}",
471 streams.len(),
472 self.config.max_concurrent_streams
473 ),
474 stream_info: None,
475 context: None,
476 recovery_suggestions: Box::new(vec![
477 "Reduce the number of concurrent streams".to_string(),
478 "Increase max_concurrent_streams configuration".to_string(),
479 ]),
480 });
481 }
482
483 let mut processed_streams = Vec::new();
484
485 for stream in streams {
486 let processed = self.process_stream(stream).await?;
487 processed_streams.push(processed);
488 }
489
490 Ok(processed_streams)
491 }
492
493 async fn select_converter(&self) -> Result<usize> {
495 let converters = self.converters.read().await;
496
497 if converters.is_empty() {
498 return Err(Error::Streaming {
499 message: "No converters available".to_string(),
500 stream_info: None,
501 context: None,
502 recovery_suggestions: Box::new(vec![
503 "Initialize converters before processing".to_string(),
504 "Check converter configuration".to_string(),
505 ]),
506 });
507 }
508
509 match self.load_balancer.strategy {
510 LoadBalancingStrategy::RoundRobin => {
511 Ok(self.load_balancer.round_robin_index % converters.len())
512 }
513 LoadBalancingStrategy::LeastLoaded => {
514 let mut best_index = 0;
516 let mut lowest_load_score = f32::MAX;
517
518 for (index, converter) in converters.iter().enumerate() {
519 let stats = converter.get_stats().await;
520
521 let latency_score = stats.average_chunk_latency_ms();
523 let error_rate = if stats.total_chunks_processed > 0 {
524 stats.total_errors as f32 / stats.total_chunks_processed as f32
525 } else {
526 0.0
527 };
528 let throughput_score = 1.0 / (stats.throughput_samples_per_sec() + 1.0); let load_score = (latency_score * 0.5)
532 + (throughput_score * 0.3)
533 + (error_rate * 100.0 * 0.2);
534
535 if load_score < lowest_load_score {
536 lowest_load_score = load_score;
537 best_index = index;
538 }
539 }
540
541 Ok(best_index)
542 }
543 LoadBalancingStrategy::Random => Ok(fastrand::usize(0..converters.len())),
544 }
545 }
546
547 pub async fn get_stats(&self) -> ProcessorStats {
549 let converters = self.converters.read().await;
550 let mut total_processed = 0;
551 let mut total_errors = 0;
552 let mut avg_latency = 0.0;
553
554 for converter in converters.iter() {
555 let stats = converter.get_stats().await;
556 total_processed += stats.total_chunks_processed;
557 total_errors += stats.total_errors;
558 avg_latency += stats.average_chunk_latency_ms();
559 }
560
561 if !converters.is_empty() {
562 avg_latency /= converters.len() as f32;
563 }
564
565 ProcessorStats {
566 total_converters: converters.len(),
567 total_processed,
568 total_errors,
569 average_latency_ms: avg_latency,
570 }
571 }
572}
573
574impl LoadBalancer {
575 pub fn new(strategy: LoadBalancingStrategy) -> Self {
577 Self {
578 strategy,
579 round_robin_index: 0,
580 }
581 }
582
583 pub fn set_strategy(&mut self, strategy: LoadBalancingStrategy) {
585 self.strategy = strategy;
586 }
587}
588
589#[derive(Debug)]
591pub struct AudioStream {
592 data: Vec<f32>,
594 position: usize,
596 metadata: StreamMetadata,
598 format: AudioFormat,
600}
601
602#[derive(Debug, Clone)]
604pub struct StreamMetadata {
605 pub id: String,
607 pub name: String,
609 pub source: String,
611 pub created_at: std::time::SystemTime,
613}
614
615#[derive(Debug, Clone)]
617pub struct AudioFormat {
618 pub sample_rate: u32,
620 pub channels: u16,
622 pub bits_per_sample: u16,
624 pub encoding: AudioEncoding,
626}
627
628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630pub enum AudioEncoding {
631 PCM,
633 Float32,
635 MP3,
637 AAC,
639 Opus,
641}
642
643impl AudioStream {
644 pub fn new(data: Vec<f32>) -> Self {
646 Self {
647 data,
648 position: 0,
649 metadata: StreamMetadata::default(),
650 format: AudioFormat::default(),
651 }
652 }
653
654 pub fn with_metadata(data: Vec<f32>, metadata: StreamMetadata, format: AudioFormat) -> Self {
656 Self {
657 data,
658 position: 0,
659 metadata,
660 format,
661 }
662 }
663
664 pub fn metadata(&self) -> &StreamMetadata {
666 &self.metadata
667 }
668
669 pub fn format(&self) -> &AudioFormat {
671 &self.format
672 }
673
674 pub fn remaining(&self) -> usize {
676 self.data.len().saturating_sub(self.position)
677 }
678
679 pub fn is_finished(&self) -> bool {
681 self.position >= self.data.len()
682 }
683
684 pub fn reset(&mut self) {
686 self.position = 0;
687 }
688
689 pub fn seek(&mut self, position: usize) -> Result<()> {
691 if position > self.data.len() {
692 return Err(Error::Streaming {
693 message: "Seek position out of bounds".to_string(),
694 stream_info: None,
695 context: None,
696 recovery_suggestions: Box::new(vec![
697 "Provide a valid seek position within stream bounds".to_string(),
698 "Check stream length before seeking".to_string(),
699 ]),
700 });
701 }
702 self.position = position;
703 Ok(())
704 }
705}
706
707impl Default for StreamMetadata {
708 fn default() -> Self {
709 Self {
710 id: format!("stream_{}", fastrand::u64(..)),
711 name: "Unnamed Stream".to_string(),
712 source: "Unknown".to_string(),
713 created_at: std::time::SystemTime::now(),
714 }
715 }
716}
717
718impl Default for AudioFormat {
719 fn default() -> Self {
720 Self {
721 sample_rate: 22050,
722 channels: 1,
723 bits_per_sample: 32,
724 encoding: AudioEncoding::Float32,
725 }
726 }
727}
728
729#[derive(Debug)]
731pub struct ProcessedAudioStream {
732 source: AudioStream,
734 config: StreamConfig,
736 converter_index: usize,
738 buffer: VecDeque<f32>,
740 error_recovery: ErrorRecoveryState,
742}
743
744#[derive(Debug, Clone)]
746pub struct ErrorRecoveryState {
747 consecutive_errors: u32,
749 last_error_time: Option<Instant>,
751 strategy: ErrorRecoveryStrategy,
753}
754
755#[derive(Debug, Clone, Copy, PartialEq, Eq)]
757pub enum ErrorRecoveryStrategy {
758 Skip,
760 Retry,
762 Passthrough,
764 Stop,
766}
767
768impl ProcessedAudioStream {
769 pub fn new(source: AudioStream, config: StreamConfig, converter_index: usize) -> Self {
771 let buffer_capacity = config.buffer_capacity;
772 Self {
773 source,
774 config,
775 converter_index,
776 buffer: VecDeque::with_capacity(buffer_capacity),
777 error_recovery: ErrorRecoveryState::default(),
778 }
779 }
780
781 pub fn converter_index(&self) -> usize {
783 self.converter_index
784 }
785
786 pub fn error_recovery_state(&self) -> &ErrorRecoveryState {
788 &self.error_recovery
789 }
790
791 fn handle_error(&mut self, error: Error) -> Result<Option<Vec<f32>>> {
793 self.error_recovery.consecutive_errors += 1;
794 self.error_recovery.last_error_time = Some(Instant::now());
795
796 match self.error_recovery.strategy {
797 ErrorRecoveryStrategy::Skip => {
798 warn!("Skipping chunk due to error: {}", error);
799 Ok(None) }
801 ErrorRecoveryStrategy::Retry => {
802 if self.error_recovery.consecutive_errors < 3 {
803 Err(error) } else {
805 warn!("Too many retries, falling back to passthrough");
806 self.error_recovery.strategy = ErrorRecoveryStrategy::Passthrough;
807 Ok(None)
808 }
809 }
810 ErrorRecoveryStrategy::Passthrough => {
811 warn!("Using passthrough due to error: {}", error);
812 Ok(None)
814 }
815 ErrorRecoveryStrategy::Stop => {
816 error!("Stopping processing due to error: {}", error);
817 Err(error)
818 }
819 }
820 }
821}
822
823impl Default for ErrorRecoveryState {
824 fn default() -> Self {
825 Self {
826 consecutive_errors: 0,
827 last_error_time: None,
828 strategy: ErrorRecoveryStrategy::Retry,
829 }
830 }
831}
832
833impl Stream for ProcessedAudioStream {
834 type Item = Result<Vec<f32>>;
835
836 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
837 if self.source.is_finished() && self.buffer.is_empty() {
839 return Poll::Ready(None);
840 }
841
842 while self.buffer.len() < self.config.chunk_size && !self.source.is_finished() {
844 let remaining = self.source.remaining();
845 let chunk_size = self.config.chunk_size.min(remaining);
846
847 if chunk_size == 0 {
848 break;
849 }
850
851 let start_pos = self.source.position;
852 let end_pos = start_pos + chunk_size;
853
854 let samples: Vec<f32> = self.source.data[start_pos..end_pos].to_vec();
855 for sample in samples {
856 self.buffer.push_back(sample);
857 }
858
859 self.source.position = end_pos;
860 }
861
862 if self.buffer.len() >= self.config.chunk_size
864 || (self.source.is_finished() && !self.buffer.is_empty())
865 {
866 let chunk_size = self.config.chunk_size.min(self.buffer.len());
867 let mut chunk = Vec::with_capacity(chunk_size);
868
869 for _ in 0..chunk_size {
870 if let Some(sample) = self.buffer.pop_front() {
871 chunk.push(sample);
872 }
873 }
874
875 if !chunk.is_empty() {
877 self.error_recovery.consecutive_errors = 0;
878 }
879
880 Poll::Ready(Some(Ok(chunk)))
881 } else {
882 cx.waker().wake_by_ref();
884 Poll::Pending
885 }
886 }
887}
888
889impl Default for StreamConfig {
890 fn default() -> Self {
891 Self {
892 chunk_size: 512,
893 sample_rate: 22050,
894 target_latency_ms: 20.0,
895 buffer_capacity: 8192,
896 channel_buffer_size: 100,
897 max_concurrent_streams: 4,
898 adaptive_buffering: true,
899 quality_vs_latency: 0.5,
900 enable_error_recovery: true,
901 stream_timeout_secs: 30,
902 }
903 }
904}
905
906#[derive(Debug, Clone, Default)]
908pub struct StreamingStats {
909 pub total_chunks_processed: u64,
911 pub total_processing_time_ms: f64,
913 pub total_errors: u64,
915 pub max_chunk_latency_ms: f32,
917 pub min_chunk_latency_ms: f32,
919 pub total_samples: u64,
921}
922
923impl StreamingStats {
924 pub fn update_chunk_stats(&mut self, sample_count: usize, processing_time: Duration) {
926 let time_ms = processing_time.as_millis() as f64;
927
928 self.total_chunks_processed += 1;
929 self.total_processing_time_ms += time_ms;
930 self.total_samples += sample_count as u64;
931
932 let time_ms_f32 = time_ms as f32;
933 if self.total_chunks_processed == 1 {
934 self.max_chunk_latency_ms = time_ms_f32;
935 self.min_chunk_latency_ms = time_ms_f32;
936 } else {
937 self.max_chunk_latency_ms = self.max_chunk_latency_ms.max(time_ms_f32);
938 self.min_chunk_latency_ms = self.min_chunk_latency_ms.min(time_ms_f32);
939 }
940 }
941
942 pub fn average_chunk_latency_ms(&self) -> f32 {
944 if self.total_chunks_processed == 0 {
945 0.0
946 } else {
947 (self.total_processing_time_ms / self.total_chunks_processed as f64) as f32
948 }
949 }
950
951 pub fn throughput_samples_per_sec(&self) -> f32 {
953 if self.total_processing_time_ms == 0.0 {
954 0.0
955 } else {
956 (self.total_samples as f64 / (self.total_processing_time_ms / 1000.0)) as f32
957 }
958 }
959
960 pub fn error_rate(&self) -> f32 {
962 if self.total_chunks_processed == 0 {
963 0.0
964 } else {
965 self.total_errors as f32 / self.total_chunks_processed as f32
966 }
967 }
968}
969
970#[derive(Debug, Clone)]
972pub struct ProcessorStats {
973 pub total_converters: usize,
975 pub total_processed: u64,
977 pub total_errors: u64,
979 pub average_latency_ms: f32,
981}
982
983impl Default for StreamProcessor {
984 fn default() -> Self {
985 Self::new(StreamConfig::default())
986 }
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992 use futures::stream;
993 use tokio_test;
994
995 #[tokio::test]
996 async fn test_streaming_converter_creation() {
997 let config = StreamConfig::default();
998 let converter = StreamingConverter::new(config);
999 assert!(converter.is_ok());
1000 }
1001
1002 #[tokio::test]
1003 async fn test_audio_stream_processing() {
1004 let data = vec![0.1, 0.2, 0.3, 0.4, 0.5];
1005 let audio_stream = AudioStream::new(data.clone());
1006
1007 assert_eq!(audio_stream.remaining(), data.len());
1008 assert!(!audio_stream.is_finished());
1009 }
1010
1011 #[tokio::test]
1012 async fn test_stream_processor() {
1013 let config = StreamConfig {
1014 chunk_size: 2,
1015 ..Default::default()
1016 };
1017 let processor = StreamProcessor::with_converter_pool(config.clone(), 1).await;
1018 assert!(processor.is_ok());
1019 let processor = processor.unwrap();
1020
1021 let data = vec![0.1, 0.2, 0.3, 0.4];
1022 let audio_stream = AudioStream::new(data);
1023
1024 let processed_stream = processor.process_stream(audio_stream).await;
1025 assert!(processed_stream.is_ok());
1026 }
1027
1028 #[tokio::test]
1029 async fn test_streaming_stats() {
1030 let mut stats = StreamingStats::default();
1031 let duration = Duration::from_millis(10);
1032
1033 stats.update_chunk_stats(100, duration);
1034
1035 assert_eq!(stats.total_chunks_processed, 1);
1036 assert_eq!(stats.total_samples, 100);
1037 assert_eq!(stats.average_chunk_latency_ms(), 10.0);
1038 }
1039}