Skip to main content

voirs_conversion/
streaming.rs

1//! Streaming voice conversion
2
3use crate::{
4    realtime::{ProcessingMode, RealtimeConfig, RealtimeConverter},
5    types::{ConversionTarget, ConversionType},
6    Error, Result,
7};
8use async_stream::stream;
9use fastrand;
10use futures::{Stream, StreamExt};
11use std::collections::VecDeque;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use std::time::{Duration, Instant};
16use tokio::sync::{mpsc, Mutex, RwLock};
17use tokio_stream::wrappers::ReceiverStream;
18use tracing::{debug, error, info, warn};
19
20/// Streaming converter for continuous audio processing
21#[derive(Debug)]
22pub struct StreamingConverter {
23    /// Real-time converter
24    realtime_converter: Arc<Mutex<RealtimeConverter>>,
25    /// Stream configuration
26    config: StreamConfig,
27    /// Buffer for audio accumulation
28    accumulation_buffer: Arc<Mutex<VecDeque<f32>>>,
29    /// Processing statistics
30    stats: Arc<RwLock<StreamingStats>>,
31    /// Conversion target
32    conversion_target: Option<ConversionTarget>,
33    /// Stream state
34    state: StreamState,
35}
36
37/// Stream processing state tracking current operation status
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum StreamState {
40    /// Stream is idle
41    Idle,
42    /// Stream is actively processing
43    Processing,
44    /// Stream is paused
45    Paused,
46    /// Stream encountered an error
47    Error,
48    /// Stream is stopped
49    Stopped,
50}
51
52impl StreamingConverter {
53    /// Create new streaming converter
54    pub fn new(config: StreamConfig) -> Result<Self> {
55        let realtime_config = RealtimeConfig {
56            buffer_size: config.chunk_size,
57            sample_rate: config.sample_rate,
58            target_latency_ms: config.target_latency_ms,
59            overlap_factor: 0.25,
60            adaptive_buffering: config.adaptive_buffering,
61            max_threads: config.max_concurrent_streams.min(4),
62            enable_lookahead: true,
63            lookahead_size: config.chunk_size / 4,
64        };
65
66        let realtime_converter = Arc::new(Mutex::new(RealtimeConverter::new(realtime_config)?));
67        let accumulation_buffer =
68            Arc::new(Mutex::new(VecDeque::with_capacity(config.buffer_capacity)));
69        let stats = Arc::new(RwLock::new(StreamingStats::default()));
70
71        Ok(Self {
72            realtime_converter,
73            config,
74            accumulation_buffer,
75            stats,
76            conversion_target: None,
77            state: StreamState::Idle,
78        })
79    }
80
81    /// Set conversion target for voice transformation
82    pub fn set_conversion_target(&mut self, target: ConversionTarget) {
83        self.conversion_target = Some(target);
84    }
85
86    /// Set processing mode
87    pub async fn set_processing_mode(&self, mode: ProcessingMode) {
88        let mut converter = self.realtime_converter.lock().await;
89        converter.set_processing_mode(mode);
90    }
91
92    /// Get current stream state
93    pub fn state(&self) -> StreamState {
94        self.state
95    }
96
97    /// Start streaming processing
98    pub async fn start(&mut self) -> Result<()> {
99        if self.state != StreamState::Idle && self.state != StreamState::Stopped {
100            return Err(Error::Streaming {
101                message: "Stream is already active".to_string(),
102                stream_info: None,
103                context: None,
104                recovery_suggestions: Box::new(vec![
105                    "Stop the current stream before starting a new one".to_string(),
106                    "Check stream state before calling start()".to_string(),
107                ]),
108            });
109        }
110
111        self.state = StreamState::Processing;
112        info!("Started streaming converter");
113        Ok(())
114    }
115
116    /// Pause streaming processing
117    pub async fn pause(&mut self) -> Result<()> {
118        if self.state != StreamState::Processing {
119            return Err(Error::Streaming {
120                message: "Stream is not processing".to_string(),
121                stream_info: None,
122                context: None,
123                recovery_suggestions: Box::new(vec![
124                    "Start the stream before trying to pause".to_string(),
125                    "Check stream state before calling pause()".to_string(),
126                ]),
127            });
128        }
129
130        self.state = StreamState::Paused;
131        info!("Paused streaming converter");
132        Ok(())
133    }
134
135    /// Stop streaming processing
136    pub async fn stop(&mut self) -> Result<()> {
137        self.state = StreamState::Stopped;
138
139        // Clear buffers
140        let mut buffer = self.accumulation_buffer.lock().await;
141        buffer.clear();
142
143        info!("Stopped streaming converter");
144        Ok(())
145    }
146
147    /// Process streaming audio
148    pub async fn process_stream<S>(
149        &mut self,
150        mut stream: S,
151    ) -> Result<impl Stream<Item = Result<Vec<f32>>>>
152    where
153        S: Stream<Item = Vec<f32>> + Unpin + Send + 'static,
154    {
155        if self.state != StreamState::Processing {
156            return Err(Error::streaming(
157                "Stream is not in processing state".to_string(),
158            ));
159        }
160
161        let realtime_converter = self.realtime_converter.clone();
162        let config = self.config.clone();
163        let stats = self.stats.clone();
164        let conversion_target = self.conversion_target.clone();
165
166        let (tx, rx) = mpsc::channel(config.channel_buffer_size);
167
168        // Spawn processing task
169        tokio::spawn(async move {
170            let mut converter = realtime_converter.lock().await;
171            if let Some(target) = conversion_target {
172                converter.set_conversion_target(target);
173            }
174            drop(converter);
175
176            let mut chunk_count = 0u64;
177            let start_time = Instant::now();
178
179            while let Some(chunk) = stream.next().await {
180                let chunk_start = Instant::now();
181
182                if chunk.is_empty() {
183                    continue;
184                }
185
186                // Process chunk through real-time converter
187                let mut converter = realtime_converter.lock().await;
188                match converter.process_chunk(&chunk).await {
189                    Ok(processed_chunk) => {
190                        if !processed_chunk.is_empty()
191                            && tx.send(Ok(processed_chunk)).await.is_err()
192                        {
193                            warn!("Receiver dropped, stopping processing");
194                            break;
195                        }
196                    }
197                    Err(e) => {
198                        error!("Error processing chunk: {}", e);
199                        if tx.send(Err(e)).await.is_err() {
200                            break;
201                        }
202                    }
203                }
204                drop(converter);
205
206                // Update statistics
207                chunk_count += 1;
208                let chunk_duration = chunk_start.elapsed();
209                let mut stats_guard = stats.write().await;
210                stats_guard.update_chunk_stats(chunk.len(), chunk_duration);
211
212                // Check if we need to throttle
213                if chunk_duration > Duration::from_millis(config.target_latency_ms as u64) {
214                    warn!(
215                        "Processing slower than real-time: {:.2}ms",
216                        chunk_duration.as_millis()
217                    );
218                }
219            }
220
221            let total_duration = start_time.elapsed();
222            info!(
223                "Processed {} chunks in {:.2}s",
224                chunk_count,
225                total_duration.as_secs_f32()
226            );
227        });
228
229        Ok(ReceiverStream::new(rx))
230    }
231
232    /// Process audio stream with backpressure handling
233    pub async fn process_stream_with_backpressure<S>(
234        &mut self,
235        stream: S,
236    ) -> Result<impl Stream<Item = Result<Vec<f32>>>>
237    where
238        S: Stream<Item = Vec<f32>> + Unpin + Send + 'static,
239    {
240        let throttled_stream = self.throttle_stream(stream).await?;
241        self.process_stream(throttled_stream).await
242    }
243
244    /// Apply throttling to prevent buffer overflow
245    async fn throttle_stream<S>(
246        &self,
247        stream: S,
248    ) -> Result<std::pin::Pin<Box<dyn Stream<Item = Vec<f32>> + Send>>>
249    where
250        S: Stream<Item = Vec<f32>> + Unpin + Send + 'static,
251    {
252        let config = self.config.clone();
253        let throttle_interval = Duration::from_millis(
254            (config.chunk_size as f64 / config.sample_rate as f64 * 1000.0) as u64,
255        );
256
257        Ok(Box::pin(stream! {
258            tokio::pin!(stream);
259
260            let mut last_yield = Instant::now();
261
262            while let Some(chunk) = stream.next().await {
263                let now = Instant::now();
264                let time_since_last = now - last_yield;
265
266                if time_since_last < throttle_interval {
267                    let sleep_time = throttle_interval - time_since_last;
268                    tokio::time::sleep(sleep_time).await;
269                }
270
271                yield chunk;
272                last_yield = Instant::now();
273            }
274        }))
275    }
276
277    /// Get streaming statistics
278    pub async fn get_stats(&self) -> StreamingStats {
279        self.stats.read().await.clone()
280    }
281
282    /// Reset streaming statistics
283    pub async fn reset_stats(&self) {
284        let mut stats = self.stats.write().await;
285        *stats = StreamingStats::default();
286    }
287
288    /// Check if streaming is healthy (meeting performance targets)
289    pub async fn is_healthy(&self) -> bool {
290        let stats = self.stats.read().await;
291        let avg_latency = stats.average_chunk_latency_ms();
292        avg_latency <= self.config.target_latency_ms * 1.5 // 50% tolerance
293    }
294
295    /// Create a processing stream from multiple input streams
296    pub async fn multiplex_streams<S>(
297        &mut self,
298        streams: Vec<S>,
299    ) -> Result<impl Stream<Item = Result<Vec<f32>>>>
300    where
301        S: Stream<Item = Vec<f32>> + Unpin + Send + 'static,
302    {
303        if streams.is_empty() {
304            return Err(Error::Streaming {
305                message: "No input streams provided".to_string(),
306                stream_info: None,
307                context: None,
308                recovery_suggestions: Box::new(vec![
309                    "Provide at least one input stream".to_string(),
310                    "Check stream configuration".to_string(),
311                ]),
312            });
313        }
314
315        if streams.len() > self.config.max_concurrent_streams {
316            return Err(Error::Streaming {
317                message: format!(
318                    "Too many streams: {} > {}",
319                    streams.len(),
320                    self.config.max_concurrent_streams
321                ),
322                stream_info: None,
323                context: None,
324                recovery_suggestions: Box::new(vec![
325                    "Reduce the number of concurrent streams".to_string(),
326                    "Increase max_concurrent_streams configuration".to_string(),
327                ]),
328            });
329        }
330
331        let (tx, rx) = mpsc::channel(self.config.channel_buffer_size);
332        let realtime_converter = self.realtime_converter.clone();
333        let config = self.config.clone();
334
335        for (stream_id, stream) in streams.into_iter().enumerate() {
336            let tx_clone = tx.clone();
337            let converter_clone = realtime_converter.clone();
338            let config_clone = config.clone();
339
340            tokio::spawn(async move {
341                tokio::pin!(stream);
342
343                while let Some(chunk) = stream.next().await {
344                    let mut converter = converter_clone.lock().await;
345                    match converter.process_chunk(&chunk).await {
346                        Ok(processed) => {
347                            if !processed.is_empty() && tx_clone.send(Ok(processed)).await.is_err()
348                            {
349                                debug!("Stream {} receiver dropped", stream_id);
350                                break;
351                            }
352                        }
353                        Err(e) => {
354                            error!("Stream {} processing error: {}", stream_id, e);
355                            if tx_clone.send(Err(e)).await.is_err() {
356                                break;
357                            }
358                        }
359                    }
360                }
361            });
362        }
363
364        Ok(ReceiverStream::new(rx))
365    }
366}
367
368/// Stream processor for handling audio streams
369#[derive(Debug)]
370pub struct StreamProcessor {
371    /// Processing configuration
372    config: StreamConfig,
373    /// Active streaming converters
374    converters: Arc<RwLock<Vec<StreamingConverter>>>,
375    /// Load balancer for distributing streams
376    load_balancer: LoadBalancer,
377}
378
379/// Load balancer for distributing processing load across converters
380#[derive(Debug, Clone)]
381pub struct LoadBalancer {
382    /// Load balancing strategy
383    strategy: LoadBalancingStrategy,
384    /// Current round-robin index
385    round_robin_index: usize,
386}
387
388/// Load balancing strategies for distributing stream processing
389#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390pub enum LoadBalancingStrategy {
391    /// Round-robin distribution
392    RoundRobin,
393    /// Least loaded converter
394    LeastLoaded,
395    /// Random distribution
396    Random,
397}
398
399/// Configuration for stream processing with buffering and latency settings
400#[derive(Debug, Clone)]
401pub struct StreamConfig {
402    /// Chunk size for processing
403    pub chunk_size: usize,
404    /// Sample rate
405    pub sample_rate: u32,
406    /// Target latency in milliseconds
407    pub target_latency_ms: f32,
408    /// Buffer capacity for accumulation
409    pub buffer_capacity: usize,
410    /// Channel buffer size for async communication
411    pub channel_buffer_size: usize,
412    /// Maximum concurrent streams
413    pub max_concurrent_streams: usize,
414    /// Enable adaptive buffering
415    pub adaptive_buffering: bool,
416    /// Quality vs latency trade-off (0.0 = lowest latency, 1.0 = highest quality)
417    pub quality_vs_latency: f32,
418    /// Enable error recovery
419    pub enable_error_recovery: bool,
420    /// Stream timeout in seconds
421    pub stream_timeout_secs: u64,
422}
423
424impl StreamProcessor {
425    /// Create new stream processor
426    pub fn new(config: StreamConfig) -> Self {
427        Self {
428            config,
429            converters: Arc::new(RwLock::new(Vec::new())),
430            load_balancer: LoadBalancer::new(LoadBalancingStrategy::LeastLoaded),
431        }
432    }
433
434    /// Create stream processor with multiple converters
435    pub async fn with_converter_pool(config: StreamConfig, pool_size: usize) -> Result<Self> {
436        let mut processor = Self::new(config.clone());
437
438        for _ in 0..pool_size {
439            let converter = StreamingConverter::new(config.clone())?;
440            processor.converters.write().await.push(converter);
441        }
442
443        Ok(processor)
444    }
445
446    /// Add a streaming converter to the pool
447    pub async fn add_converter(&self, converter: StreamingConverter) {
448        self.converters.write().await.push(converter);
449    }
450
451    /// Process audio stream using load balancing
452    pub async fn process_stream(&self, audio_stream: AudioStream) -> Result<ProcessedAudioStream> {
453        let converter_index = self.select_converter().await?;
454
455        Ok(ProcessedAudioStream::new(
456            audio_stream,
457            self.config.clone(),
458            converter_index,
459        ))
460    }
461
462    /// Process multiple streams concurrently
463    pub async fn process_multiple_streams(
464        &self,
465        streams: Vec<AudioStream>,
466    ) -> Result<Vec<ProcessedAudioStream>> {
467        if streams.len() > self.config.max_concurrent_streams {
468            return Err(Error::Streaming {
469                message: format!(
470                    "Too many streams: {} > {}",
471                    streams.len(),
472                    self.config.max_concurrent_streams
473                ),
474                stream_info: None,
475                context: None,
476                recovery_suggestions: Box::new(vec![
477                    "Reduce the number of concurrent streams".to_string(),
478                    "Increase max_concurrent_streams configuration".to_string(),
479                ]),
480            });
481        }
482
483        let mut processed_streams = Vec::new();
484
485        for stream in streams {
486            let processed = self.process_stream(stream).await?;
487            processed_streams.push(processed);
488        }
489
490        Ok(processed_streams)
491    }
492
493    /// Select the best converter for load balancing
494    async fn select_converter(&self) -> Result<usize> {
495        let converters = self.converters.read().await;
496
497        if converters.is_empty() {
498            return Err(Error::Streaming {
499                message: "No converters available".to_string(),
500                stream_info: None,
501                context: None,
502                recovery_suggestions: Box::new(vec![
503                    "Initialize converters before processing".to_string(),
504                    "Check converter configuration".to_string(),
505                ]),
506            });
507        }
508
509        match self.load_balancer.strategy {
510            LoadBalancingStrategy::RoundRobin => {
511                Ok(self.load_balancer.round_robin_index % converters.len())
512            }
513            LoadBalancingStrategy::LeastLoaded => {
514                // Select converter with lowest load based on comprehensive metrics
515                let mut best_index = 0;
516                let mut lowest_load_score = f32::MAX;
517
518                for (index, converter) in converters.iter().enumerate() {
519                    let stats = converter.get_stats().await;
520
521                    // Calculate composite load score (lower is better)
522                    let latency_score = stats.average_chunk_latency_ms();
523                    let error_rate = if stats.total_chunks_processed > 0 {
524                        stats.total_errors as f32 / stats.total_chunks_processed as f32
525                    } else {
526                        0.0
527                    };
528                    let throughput_score = 1.0 / (stats.throughput_samples_per_sec() + 1.0); // Inverse throughput
529
530                    // Weighted composite score (prioritize latency and throughput)
531                    let load_score = (latency_score * 0.5)
532                        + (throughput_score * 0.3)
533                        + (error_rate * 100.0 * 0.2);
534
535                    if load_score < lowest_load_score {
536                        lowest_load_score = load_score;
537                        best_index = index;
538                    }
539                }
540
541                Ok(best_index)
542            }
543            LoadBalancingStrategy::Random => Ok(fastrand::usize(0..converters.len())),
544        }
545    }
546
547    /// Get processor statistics
548    pub async fn get_stats(&self) -> ProcessorStats {
549        let converters = self.converters.read().await;
550        let mut total_processed = 0;
551        let mut total_errors = 0;
552        let mut avg_latency = 0.0;
553
554        for converter in converters.iter() {
555            let stats = converter.get_stats().await;
556            total_processed += stats.total_chunks_processed;
557            total_errors += stats.total_errors;
558            avg_latency += stats.average_chunk_latency_ms();
559        }
560
561        if !converters.is_empty() {
562            avg_latency /= converters.len() as f32;
563        }
564
565        ProcessorStats {
566            total_converters: converters.len(),
567            total_processed,
568            total_errors,
569            average_latency_ms: avg_latency,
570        }
571    }
572}
573
574impl LoadBalancer {
575    /// Create new load balancer
576    pub fn new(strategy: LoadBalancingStrategy) -> Self {
577        Self {
578            strategy,
579            round_robin_index: 0,
580        }
581    }
582
583    /// Set load balancing strategy
584    pub fn set_strategy(&mut self, strategy: LoadBalancingStrategy) {
585        self.strategy = strategy;
586    }
587}
588
589/// Audio stream wrapper providing buffered audio playback
590#[derive(Debug)]
591pub struct AudioStream {
592    /// Audio data
593    data: Vec<f32>,
594    /// Current position
595    position: usize,
596    /// Stream metadata
597    metadata: StreamMetadata,
598    /// Stream format
599    format: AudioFormat,
600}
601
602/// Stream metadata containing identification and source information
603#[derive(Debug, Clone)]
604pub struct StreamMetadata {
605    /// Stream identifier
606    pub id: String,
607    /// Stream name
608    pub name: String,
609    /// Source information
610    pub source: String,
611    /// Creation timestamp
612    pub created_at: std::time::SystemTime,
613}
614
615/// Audio format specification defining sample rate and encoding
616#[derive(Debug, Clone)]
617pub struct AudioFormat {
618    /// Sample rate in Hz
619    pub sample_rate: u32,
620    /// Number of channels
621    pub channels: u16,
622    /// Bits per sample
623    pub bits_per_sample: u16,
624    /// Audio encoding
625    pub encoding: AudioEncoding,
626}
627
628/// Audio encoding types supported for streaming
629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630pub enum AudioEncoding {
631    /// Linear PCM
632    PCM,
633    /// IEEE 754 floating point
634    Float32,
635    /// Compressed formats
636    MP3,
637    /// Advanced Audio Coding
638    AAC,
639    /// Opus audio codec
640    Opus,
641}
642
643impl AudioStream {
644    /// Create new audio stream
645    pub fn new(data: Vec<f32>) -> Self {
646        Self {
647            data,
648            position: 0,
649            metadata: StreamMetadata::default(),
650            format: AudioFormat::default(),
651        }
652    }
653
654    /// Create audio stream with metadata
655    pub fn with_metadata(data: Vec<f32>, metadata: StreamMetadata, format: AudioFormat) -> Self {
656        Self {
657            data,
658            position: 0,
659            metadata,
660            format,
661        }
662    }
663
664    /// Get stream metadata
665    pub fn metadata(&self) -> &StreamMetadata {
666        &self.metadata
667    }
668
669    /// Get audio format
670    pub fn format(&self) -> &AudioFormat {
671        &self.format
672    }
673
674    /// Get remaining samples
675    pub fn remaining(&self) -> usize {
676        self.data.len().saturating_sub(self.position)
677    }
678
679    /// Check if stream has ended
680    pub fn is_finished(&self) -> bool {
681        self.position >= self.data.len()
682    }
683
684    /// Reset stream position
685    pub fn reset(&mut self) {
686        self.position = 0;
687    }
688
689    /// Seek to position
690    pub fn seek(&mut self, position: usize) -> Result<()> {
691        if position > self.data.len() {
692            return Err(Error::Streaming {
693                message: "Seek position out of bounds".to_string(),
694                stream_info: None,
695                context: None,
696                recovery_suggestions: Box::new(vec![
697                    "Provide a valid seek position within stream bounds".to_string(),
698                    "Check stream length before seeking".to_string(),
699                ]),
700            });
701        }
702        self.position = position;
703        Ok(())
704    }
705}
706
707impl Default for StreamMetadata {
708    fn default() -> Self {
709        Self {
710            id: format!("stream_{}", fastrand::u64(..)),
711            name: "Unnamed Stream".to_string(),
712            source: "Unknown".to_string(),
713            created_at: std::time::SystemTime::now(),
714        }
715    }
716}
717
718impl Default for AudioFormat {
719    fn default() -> Self {
720        Self {
721            sample_rate: 22050,
722            channels: 1,
723            bits_per_sample: 32,
724            encoding: AudioEncoding::Float32,
725        }
726    }
727}
728
729/// Processed audio stream with conversion applied
730#[derive(Debug)]
731pub struct ProcessedAudioStream {
732    /// Source stream
733    source: AudioStream,
734    /// Processing config
735    config: StreamConfig,
736    /// Converter index for load balancing
737    converter_index: usize,
738    /// Processing buffer
739    buffer: VecDeque<f32>,
740    /// Error recovery state
741    error_recovery: ErrorRecoveryState,
742}
743
744/// Error recovery state tracking failures and recovery strategy
745#[derive(Debug, Clone)]
746pub struct ErrorRecoveryState {
747    /// Number of consecutive errors
748    consecutive_errors: u32,
749    /// Last error time
750    last_error_time: Option<Instant>,
751    /// Recovery strategy
752    strategy: ErrorRecoveryStrategy,
753}
754
755/// Error recovery strategies for handling stream errors
756#[derive(Debug, Clone, Copy, PartialEq, Eq)]
757pub enum ErrorRecoveryStrategy {
758    /// Skip problematic chunks
759    Skip,
760    /// Retry processing
761    Retry,
762    /// Fallback to pass-through
763    Passthrough,
764    /// Stop processing
765    Stop,
766}
767
768impl ProcessedAudioStream {
769    /// Create new processed stream
770    pub fn new(source: AudioStream, config: StreamConfig, converter_index: usize) -> Self {
771        let buffer_capacity = config.buffer_capacity;
772        Self {
773            source,
774            config,
775            converter_index,
776            buffer: VecDeque::with_capacity(buffer_capacity),
777            error_recovery: ErrorRecoveryState::default(),
778        }
779    }
780
781    /// Get converter index
782    pub fn converter_index(&self) -> usize {
783        self.converter_index
784    }
785
786    /// Get error recovery state
787    pub fn error_recovery_state(&self) -> &ErrorRecoveryState {
788        &self.error_recovery
789    }
790
791    /// Handle processing error
792    fn handle_error(&mut self, error: Error) -> Result<Option<Vec<f32>>> {
793        self.error_recovery.consecutive_errors += 1;
794        self.error_recovery.last_error_time = Some(Instant::now());
795
796        match self.error_recovery.strategy {
797            ErrorRecoveryStrategy::Skip => {
798                warn!("Skipping chunk due to error: {}", error);
799                Ok(None) // Skip this chunk
800            }
801            ErrorRecoveryStrategy::Retry => {
802                if self.error_recovery.consecutive_errors < 3 {
803                    Err(error) // Will trigger retry
804                } else {
805                    warn!("Too many retries, falling back to passthrough");
806                    self.error_recovery.strategy = ErrorRecoveryStrategy::Passthrough;
807                    Ok(None)
808                }
809            }
810            ErrorRecoveryStrategy::Passthrough => {
811                warn!("Using passthrough due to error: {}", error);
812                // Return original chunk if available
813                Ok(None)
814            }
815            ErrorRecoveryStrategy::Stop => {
816                error!("Stopping processing due to error: {}", error);
817                Err(error)
818            }
819        }
820    }
821}
822
823impl Default for ErrorRecoveryState {
824    fn default() -> Self {
825        Self {
826            consecutive_errors: 0,
827            last_error_time: None,
828            strategy: ErrorRecoveryStrategy::Retry,
829        }
830    }
831}
832
833impl Stream for ProcessedAudioStream {
834    type Item = Result<Vec<f32>>;
835
836    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
837        // Check if stream is finished
838        if self.source.is_finished() && self.buffer.is_empty() {
839            return Poll::Ready(None);
840        }
841
842        // Fill buffer if needed
843        while self.buffer.len() < self.config.chunk_size && !self.source.is_finished() {
844            let remaining = self.source.remaining();
845            let chunk_size = self.config.chunk_size.min(remaining);
846
847            if chunk_size == 0 {
848                break;
849            }
850
851            let start_pos = self.source.position;
852            let end_pos = start_pos + chunk_size;
853
854            let samples: Vec<f32> = self.source.data[start_pos..end_pos].to_vec();
855            for sample in samples {
856                self.buffer.push_back(sample);
857            }
858
859            self.source.position = end_pos;
860        }
861
862        // Return chunk if buffer has enough data
863        if self.buffer.len() >= self.config.chunk_size
864            || (self.source.is_finished() && !self.buffer.is_empty())
865        {
866            let chunk_size = self.config.chunk_size.min(self.buffer.len());
867            let mut chunk = Vec::with_capacity(chunk_size);
868
869            for _ in 0..chunk_size {
870                if let Some(sample) = self.buffer.pop_front() {
871                    chunk.push(sample);
872                }
873            }
874
875            // Reset error count on successful processing
876            if !chunk.is_empty() {
877                self.error_recovery.consecutive_errors = 0;
878            }
879
880            Poll::Ready(Some(Ok(chunk)))
881        } else {
882            // Not ready yet, register waker
883            cx.waker().wake_by_ref();
884            Poll::Pending
885        }
886    }
887}
888
889impl Default for StreamConfig {
890    fn default() -> Self {
891        Self {
892            chunk_size: 512,
893            sample_rate: 22050,
894            target_latency_ms: 20.0,
895            buffer_capacity: 8192,
896            channel_buffer_size: 100,
897            max_concurrent_streams: 4,
898            adaptive_buffering: true,
899            quality_vs_latency: 0.5,
900            enable_error_recovery: true,
901            stream_timeout_secs: 30,
902        }
903    }
904}
905
906/// Streaming statistics for performance monitoring
907#[derive(Debug, Clone, Default)]
908pub struct StreamingStats {
909    /// Total chunks processed
910    pub total_chunks_processed: u64,
911    /// Total processing time
912    pub total_processing_time_ms: f64,
913    /// Total errors encountered
914    pub total_errors: u64,
915    /// Maximum chunk latency
916    pub max_chunk_latency_ms: f32,
917    /// Minimum chunk latency
918    pub min_chunk_latency_ms: f32,
919    /// Total samples processed
920    pub total_samples: u64,
921}
922
923impl StreamingStats {
924    /// Update statistics with chunk processing information
925    pub fn update_chunk_stats(&mut self, sample_count: usize, processing_time: Duration) {
926        let time_ms = processing_time.as_millis() as f64;
927
928        self.total_chunks_processed += 1;
929        self.total_processing_time_ms += time_ms;
930        self.total_samples += sample_count as u64;
931
932        let time_ms_f32 = time_ms as f32;
933        if self.total_chunks_processed == 1 {
934            self.max_chunk_latency_ms = time_ms_f32;
935            self.min_chunk_latency_ms = time_ms_f32;
936        } else {
937            self.max_chunk_latency_ms = self.max_chunk_latency_ms.max(time_ms_f32);
938            self.min_chunk_latency_ms = self.min_chunk_latency_ms.min(time_ms_f32);
939        }
940    }
941
942    /// Get average chunk processing latency
943    pub fn average_chunk_latency_ms(&self) -> f32 {
944        if self.total_chunks_processed == 0 {
945            0.0
946        } else {
947            (self.total_processing_time_ms / self.total_chunks_processed as f64) as f32
948        }
949    }
950
951    /// Get processing throughput (samples per second)
952    pub fn throughput_samples_per_sec(&self) -> f32 {
953        if self.total_processing_time_ms == 0.0 {
954            0.0
955        } else {
956            (self.total_samples as f64 / (self.total_processing_time_ms / 1000.0)) as f32
957        }
958    }
959
960    /// Get error rate
961    pub fn error_rate(&self) -> f32 {
962        if self.total_chunks_processed == 0 {
963            0.0
964        } else {
965            self.total_errors as f32 / self.total_chunks_processed as f32
966        }
967    }
968}
969
970/// Processor statistics aggregating metrics across converters
971#[derive(Debug, Clone)]
972pub struct ProcessorStats {
973    /// Total number of converters
974    pub total_converters: usize,
975    /// Total chunks processed across all converters
976    pub total_processed: u64,
977    /// Total errors across all converters
978    pub total_errors: u64,
979    /// Average latency across all converters
980    pub average_latency_ms: f32,
981}
982
983impl Default for StreamProcessor {
984    fn default() -> Self {
985        Self::new(StreamConfig::default())
986    }
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992    use futures::stream;
993    use tokio_test;
994
995    #[tokio::test]
996    async fn test_streaming_converter_creation() {
997        let config = StreamConfig::default();
998        let converter = StreamingConverter::new(config);
999        assert!(converter.is_ok());
1000    }
1001
1002    #[tokio::test]
1003    async fn test_audio_stream_processing() {
1004        let data = vec![0.1, 0.2, 0.3, 0.4, 0.5];
1005        let audio_stream = AudioStream::new(data.clone());
1006
1007        assert_eq!(audio_stream.remaining(), data.len());
1008        assert!(!audio_stream.is_finished());
1009    }
1010
1011    #[tokio::test]
1012    async fn test_stream_processor() {
1013        let config = StreamConfig {
1014            chunk_size: 2,
1015            ..Default::default()
1016        };
1017        let processor = StreamProcessor::with_converter_pool(config.clone(), 1).await;
1018        assert!(processor.is_ok());
1019        let processor = processor.unwrap();
1020
1021        let data = vec![0.1, 0.2, 0.3, 0.4];
1022        let audio_stream = AudioStream::new(data);
1023
1024        let processed_stream = processor.process_stream(audio_stream).await;
1025        assert!(processed_stream.is_ok());
1026    }
1027
1028    #[tokio::test]
1029    async fn test_streaming_stats() {
1030        let mut stats = StreamingStats::default();
1031        let duration = Duration::from_millis(10);
1032
1033        stats.update_chunk_stats(100, duration);
1034
1035        assert_eq!(stats.total_chunks_processed, 1);
1036        assert_eq!(stats.total_samples, 100);
1037        assert_eq!(stats.average_chunk_latency_ms(), 10.0);
1038    }
1039}