Skip to main content

voirs_vocoder/streaming/
mod.rs

1//! Real-time streaming architecture for neural vocoders
2//!
3//! This module provides a complete streaming pipeline for real-time audio generation
4//! with latency optimization, chunk-based processing, and adaptive buffering.
5
6pub mod buffer;
7pub mod chunk_processor;
8pub mod interrupt_processor;
9pub mod latency;
10pub mod latency_optimizer;
11pub mod memory_buffer;
12pub mod pipeline;
13pub mod realtime_scheduler;
14
15pub use buffer::{BufferManager, RingBuffer, StreamingBuffer};
16pub use chunk_processor::{
17    AdvancedChunkConfig, AdvancedChunkProcessor, AdvancedChunkStats, WindowType,
18};
19pub use interrupt_processor::{
20    BufferEventType, InterruptContext, InterruptController, InterruptData, InterruptPriority,
21    InterruptResponse, SystemCommand,
22};
23pub use latency::{LatencyOptimizer, PredictiveProcessor};
24pub use latency_optimizer::{EnhancedLatencyOptimizer, EnhancedLatencyStats};
25pub use memory_buffer::{
26    AllocationStrategy, LockFreeCircularBuffer, MemoryEfficientBufferManager, MemoryStats,
27};
28pub use pipeline::{ChunkProcessor, StreamingPipeline};
29pub use realtime_scheduler::{
30    EnhancedRtScheduler, LoadBalancingStrategy, RtPriority, RtTask, SchedulerConfig, SchedulerStats,
31};
32
33// Re-export from config for convenience
34pub use crate::config::StreamingConfig;
35use crate::{AudioBuffer, MelSpectrogram, Result, VocoderError};
36use async_trait::async_trait;
37use std::collections::HashMap;
38use std::sync::{Arc, Mutex};
39use tokio::sync::mpsc;
40
41/// Streaming vocoder trait for real-time audio generation
42#[async_trait]
43pub trait StreamingVocoder: Send + Sync {
44    /// Initialize the streaming vocoder with configuration
45    async fn initialize(&mut self, config: StreamingConfig) -> Result<()>;
46
47    /// Start streaming processing
48    async fn start_stream(&self) -> Result<StreamHandle>;
49
50    /// Process a mel spectrogram chunk
51    async fn process_chunk(&self, mel_chunk: MelSpectrogram) -> Result<AudioBuffer>;
52
53    /// Process multiple chunks in parallel
54    async fn process_batch(&self, mel_chunks: Vec<MelSpectrogram>) -> Result<Vec<AudioBuffer>>;
55
56    /// Stop streaming and cleanup resources
57    async fn stop_stream(&self) -> Result<()>;
58
59    /// Get current streaming statistics
60    fn get_stats(&self) -> StreamingStats;
61}
62
63/// Handle for managing an active stream
64pub struct StreamHandle {
65    /// Input channel for mel spectrograms
66    pub input_tx: mpsc::Sender<MelSpectrogram>,
67
68    /// Output channel for audio buffers
69    pub output_rx: mpsc::Receiver<AudioBuffer>,
70
71    /// Control channel for stream management
72    pub control_tx: mpsc::Sender<StreamCommand>,
73
74    /// Stream ID for tracking
75    pub stream_id: u64,
76}
77
78/// Commands for controlling the streaming process
79#[derive(Debug, Clone)]
80pub enum StreamCommand {
81    /// Pause streaming
82    Pause,
83    /// Resume streaming
84    Resume,
85    /// Flush buffers
86    Flush,
87    /// Update configuration
88    UpdateConfig(StreamingConfig),
89    /// Get current statistics
90    GetStats,
91    /// Shutdown stream
92    Shutdown,
93}
94
95/// Streaming performance statistics
96#[derive(Debug, Clone, Default)]
97pub struct StreamingStats {
98    /// Total processed chunks
99    pub chunks_processed: u64,
100
101    /// Average processing time per chunk (ms)
102    pub avg_processing_time_ms: f32,
103
104    /// Current latency (ms)
105    pub current_latency_ms: f32,
106
107    /// Peak latency (ms)
108    pub peak_latency_ms: f32,
109
110    /// Buffer underruns
111    pub buffer_underruns: u64,
112
113    /// Buffer overruns
114    pub buffer_overruns: u64,
115
116    /// Real-time factor (1.0 = real-time)
117    pub real_time_factor: f32,
118
119    /// CPU usage percentage
120    pub cpu_usage: f32,
121
122    /// Memory usage (MB)
123    pub memory_usage_mb: f32,
124
125    /// Active stream count
126    pub active_streams: u32,
127
128    /// Error count
129    pub error_count: u64,
130}
131
132impl StreamingStats {
133    /// Update real-time factor calculation
134    pub fn update_rtf(&mut self, audio_duration_ms: f32, processing_time_ms: f32) {
135        if audio_duration_ms > 0.0 {
136            self.real_time_factor = processing_time_ms / audio_duration_ms;
137        }
138    }
139
140    /// Check if streaming is meeting real-time requirements
141    pub fn is_real_time(&self) -> bool {
142        self.real_time_factor <= 1.0
143    }
144
145    /// Get quality score (0.0-1.0, higher is better)
146    pub fn quality_score(&self) -> f32 {
147        let rtf_score = if self.real_time_factor <= 1.0 {
148            1.0
149        } else {
150            (2.0 - self.real_time_factor).max(0.0)
151        };
152
153        let latency_score = if self.current_latency_ms <= 50.0 {
154            1.0
155        } else if self.current_latency_ms <= 200.0 {
156            1.0 - (self.current_latency_ms - 50.0) / 150.0
157        } else {
158            0.0
159        };
160
161        let error_score = if self.error_count == 0 {
162            1.0
163        } else {
164            (1.0 / (1.0 + self.error_count as f32)).max(0.1)
165        };
166
167        (rtf_score + latency_score + error_score) / 3.0
168    }
169}
170
171/// Streaming error types
172#[derive(Debug, Clone)]
173pub enum StreamingError {
174    /// Buffer overflow
175    BufferOverflow,
176    /// Buffer underflow
177    BufferUnderflow,
178    /// Latency exceeded threshold
179    LatencyExceeded(f32),
180    /// Processing timeout
181    ProcessingTimeout,
182    /// Stream not initialized
183    NotInitialized,
184    /// Invalid chunk size
185    InvalidChunkSize(usize),
186    /// Configuration error
187    ConfigurationError(String),
188}
189
190impl std::fmt::Display for StreamingError {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        match self {
193            StreamingError::BufferOverflow => write!(f, "Buffer overflow"),
194            StreamingError::BufferUnderflow => write!(f, "Buffer underflow"),
195            StreamingError::LatencyExceeded(latency) => {
196                write!(f, "Latency exceeded: {latency:.2}ms")
197            }
198            StreamingError::ProcessingTimeout => write!(f, "Processing timeout"),
199            StreamingError::NotInitialized => write!(f, "Stream not initialized"),
200            StreamingError::InvalidChunkSize(size) => write!(f, "Invalid chunk size: {size}"),
201            StreamingError::ConfigurationError(msg) => write!(f, "Configuration error: {msg}"),
202        }
203    }
204}
205
206impl std::error::Error for StreamingError {}
207
208impl From<StreamingError> for VocoderError {
209    fn from(error: StreamingError) -> Self {
210        VocoderError::StreamingError(error.to_string())
211    }
212}
213
214/// Simple buffer pool for common buffer sizes to reduce allocation overhead
215pub struct BufferPool {
216    pools: Arc<Mutex<HashMap<usize, Vec<Vec<f32>>>>>,
217    max_buffers_per_size: usize,
218}
219
220impl BufferPool {
221    /// Create a new buffer pool
222    pub fn new() -> Self {
223        Self {
224            pools: Arc::new(Mutex::new(HashMap::new())),
225            max_buffers_per_size: 16,
226        }
227    }
228
229    /// Get a buffer of the specified size
230    pub fn get_buffer(&self, size: usize) -> Vec<f32> {
231        let mut pools = self.pools.lock().unwrap();
232
233        if let Some(pool) = pools.get_mut(&size) {
234            if let Some(buffer) = pool.pop() {
235                return buffer;
236            }
237        }
238
239        // Create new buffer if pool is empty
240        vec![0.0; size]
241    }
242
243    /// Return a buffer to the pool
244    pub fn return_buffer(&self, mut buffer: Vec<f32>) {
245        let size = buffer.len();
246        buffer.clear();
247        buffer.resize(size, 0.0);
248
249        let mut pools = self.pools.lock().unwrap();
250        let pool = pools.entry(size).or_default();
251
252        if pool.len() < self.max_buffers_per_size {
253            pool.push(buffer);
254        }
255    }
256}
257
258impl Default for BufferPool {
259    fn default() -> Self {
260        Self::new()
261    }
262}
263
264/// Global buffer pool instance
265use once_cell::sync::Lazy;
266static GLOBAL_BUFFER_POOL: Lazy<BufferPool> = Lazy::new(BufferPool::new);
267
268/// Convenience function to get a buffer from the global pool
269pub fn get_pooled_buffer(size: usize) -> Vec<f32> {
270    GLOBAL_BUFFER_POOL.get_buffer(size)
271}
272
273/// Convenience function to return a buffer to the global pool
274pub fn return_pooled_buffer(buffer: Vec<f32>) {
275    GLOBAL_BUFFER_POOL.return_buffer(buffer);
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    #[test]
283    fn test_streaming_stats_rtf_calculation() {
284        let mut stats = StreamingStats::default();
285
286        // Test real-time performance
287        stats.update_rtf(100.0, 50.0); // 0.5x real-time
288        assert!(stats.is_real_time());
289        assert_eq!(stats.real_time_factor, 0.5);
290
291        // Test slower than real-time
292        stats.update_rtf(100.0, 150.0); // 1.5x real-time
293        assert!(!stats.is_real_time());
294        assert_eq!(stats.real_time_factor, 1.5);
295    }
296
297    #[test]
298    fn test_quality_score_calculation() {
299        // Perfect quality
300        let mut stats = StreamingStats {
301            real_time_factor: 0.8,
302            current_latency_ms: 30.0,
303            error_count: 0,
304            ..Default::default()
305        };
306
307        let score = stats.quality_score();
308        assert!(score > 0.9);
309
310        // Poor quality
311        stats.real_time_factor = 2.0;
312        stats.current_latency_ms = 500.0;
313        stats.error_count = 10;
314
315        let score = stats.quality_score();
316        assert!(score < 0.5);
317    }
318
319    #[test]
320    fn test_streaming_error_display() {
321        let error = StreamingError::LatencyExceeded(123.45);
322        assert_eq!(error.to_string(), "Latency exceeded: 123.45ms");
323
324        let error = StreamingError::InvalidChunkSize(2048);
325        assert_eq!(error.to_string(), "Invalid chunk size: 2048");
326    }
327
328    #[test]
329    fn test_buffer_pool_optimization() {
330        let pool = BufferPool::new();
331
332        // Test allocation and deallocation
333        let buffer1 = pool.get_buffer(1024);
334        assert_eq!(buffer1.len(), 1024);
335
336        pool.return_buffer(buffer1);
337
338        // Test reuse
339        let buffer2 = pool.get_buffer(1024);
340        assert_eq!(buffer2.len(), 1024);
341
342        pool.return_buffer(buffer2);
343    }
344}