1pub mod buffer;
7pub mod chunk_processor;
8pub mod interrupt_processor;
9pub mod latency;
10pub mod latency_optimizer;
11pub mod memory_buffer;
12pub mod pipeline;
13pub mod realtime_scheduler;
14
15pub use buffer::{BufferManager, RingBuffer, StreamingBuffer};
16pub use chunk_processor::{
17 AdvancedChunkConfig, AdvancedChunkProcessor, AdvancedChunkStats, WindowType,
18};
19pub use interrupt_processor::{
20 BufferEventType, InterruptContext, InterruptController, InterruptData, InterruptPriority,
21 InterruptResponse, SystemCommand,
22};
23pub use latency::{LatencyOptimizer, PredictiveProcessor};
24pub use latency_optimizer::{EnhancedLatencyOptimizer, EnhancedLatencyStats};
25pub use memory_buffer::{
26 AllocationStrategy, LockFreeCircularBuffer, MemoryEfficientBufferManager, MemoryStats,
27};
28pub use pipeline::{ChunkProcessor, StreamingPipeline};
29pub use realtime_scheduler::{
30 EnhancedRtScheduler, LoadBalancingStrategy, RtPriority, RtTask, SchedulerConfig, SchedulerStats,
31};
32
33pub use crate::config::StreamingConfig;
35use crate::{AudioBuffer, MelSpectrogram, Result, VocoderError};
36use async_trait::async_trait;
37use std::collections::HashMap;
38use std::sync::{Arc, Mutex};
39use tokio::sync::mpsc;
40
41#[async_trait]
43pub trait StreamingVocoder: Send + Sync {
44 async fn initialize(&mut self, config: StreamingConfig) -> Result<()>;
46
47 async fn start_stream(&self) -> Result<StreamHandle>;
49
50 async fn process_chunk(&self, mel_chunk: MelSpectrogram) -> Result<AudioBuffer>;
52
53 async fn process_batch(&self, mel_chunks: Vec<MelSpectrogram>) -> Result<Vec<AudioBuffer>>;
55
56 async fn stop_stream(&self) -> Result<()>;
58
59 fn get_stats(&self) -> StreamingStats;
61}
62
63pub struct StreamHandle {
65 pub input_tx: mpsc::Sender<MelSpectrogram>,
67
68 pub output_rx: mpsc::Receiver<AudioBuffer>,
70
71 pub control_tx: mpsc::Sender<StreamCommand>,
73
74 pub stream_id: u64,
76}
77
78#[derive(Debug, Clone)]
80pub enum StreamCommand {
81 Pause,
83 Resume,
85 Flush,
87 UpdateConfig(StreamingConfig),
89 GetStats,
91 Shutdown,
93}
94
95#[derive(Debug, Clone, Default)]
97pub struct StreamingStats {
98 pub chunks_processed: u64,
100
101 pub avg_processing_time_ms: f32,
103
104 pub current_latency_ms: f32,
106
107 pub peak_latency_ms: f32,
109
110 pub buffer_underruns: u64,
112
113 pub buffer_overruns: u64,
115
116 pub real_time_factor: f32,
118
119 pub cpu_usage: f32,
121
122 pub memory_usage_mb: f32,
124
125 pub active_streams: u32,
127
128 pub error_count: u64,
130}
131
132impl StreamingStats {
133 pub fn update_rtf(&mut self, audio_duration_ms: f32, processing_time_ms: f32) {
135 if audio_duration_ms > 0.0 {
136 self.real_time_factor = processing_time_ms / audio_duration_ms;
137 }
138 }
139
140 pub fn is_real_time(&self) -> bool {
142 self.real_time_factor <= 1.0
143 }
144
145 pub fn quality_score(&self) -> f32 {
147 let rtf_score = if self.real_time_factor <= 1.0 {
148 1.0
149 } else {
150 (2.0 - self.real_time_factor).max(0.0)
151 };
152
153 let latency_score = if self.current_latency_ms <= 50.0 {
154 1.0
155 } else if self.current_latency_ms <= 200.0 {
156 1.0 - (self.current_latency_ms - 50.0) / 150.0
157 } else {
158 0.0
159 };
160
161 let error_score = if self.error_count == 0 {
162 1.0
163 } else {
164 (1.0 / (1.0 + self.error_count as f32)).max(0.1)
165 };
166
167 (rtf_score + latency_score + error_score) / 3.0
168 }
169}
170
171#[derive(Debug, Clone)]
173pub enum StreamingError {
174 BufferOverflow,
176 BufferUnderflow,
178 LatencyExceeded(f32),
180 ProcessingTimeout,
182 NotInitialized,
184 InvalidChunkSize(usize),
186 ConfigurationError(String),
188}
189
190impl std::fmt::Display for StreamingError {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 match self {
193 StreamingError::BufferOverflow => write!(f, "Buffer overflow"),
194 StreamingError::BufferUnderflow => write!(f, "Buffer underflow"),
195 StreamingError::LatencyExceeded(latency) => {
196 write!(f, "Latency exceeded: {latency:.2}ms")
197 }
198 StreamingError::ProcessingTimeout => write!(f, "Processing timeout"),
199 StreamingError::NotInitialized => write!(f, "Stream not initialized"),
200 StreamingError::InvalidChunkSize(size) => write!(f, "Invalid chunk size: {size}"),
201 StreamingError::ConfigurationError(msg) => write!(f, "Configuration error: {msg}"),
202 }
203 }
204}
205
206impl std::error::Error for StreamingError {}
207
208impl From<StreamingError> for VocoderError {
209 fn from(error: StreamingError) -> Self {
210 VocoderError::StreamingError(error.to_string())
211 }
212}
213
214pub struct BufferPool {
216 pools: Arc<Mutex<HashMap<usize, Vec<Vec<f32>>>>>,
217 max_buffers_per_size: usize,
218}
219
220impl BufferPool {
221 pub fn new() -> Self {
223 Self {
224 pools: Arc::new(Mutex::new(HashMap::new())),
225 max_buffers_per_size: 16,
226 }
227 }
228
229 pub fn get_buffer(&self, size: usize) -> Vec<f32> {
231 let mut pools = self.pools.lock().unwrap();
232
233 if let Some(pool) = pools.get_mut(&size) {
234 if let Some(buffer) = pool.pop() {
235 return buffer;
236 }
237 }
238
239 vec![0.0; size]
241 }
242
243 pub fn return_buffer(&self, mut buffer: Vec<f32>) {
245 let size = buffer.len();
246 buffer.clear();
247 buffer.resize(size, 0.0);
248
249 let mut pools = self.pools.lock().unwrap();
250 let pool = pools.entry(size).or_default();
251
252 if pool.len() < self.max_buffers_per_size {
253 pool.push(buffer);
254 }
255 }
256}
257
258impl Default for BufferPool {
259 fn default() -> Self {
260 Self::new()
261 }
262}
263
264use once_cell::sync::Lazy;
266static GLOBAL_BUFFER_POOL: Lazy<BufferPool> = Lazy::new(BufferPool::new);
267
268pub fn get_pooled_buffer(size: usize) -> Vec<f32> {
270 GLOBAL_BUFFER_POOL.get_buffer(size)
271}
272
273pub fn return_pooled_buffer(buffer: Vec<f32>) {
275 GLOBAL_BUFFER_POOL.return_buffer(buffer);
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 #[test]
283 fn test_streaming_stats_rtf_calculation() {
284 let mut stats = StreamingStats::default();
285
286 stats.update_rtf(100.0, 50.0); assert!(stats.is_real_time());
289 assert_eq!(stats.real_time_factor, 0.5);
290
291 stats.update_rtf(100.0, 150.0); assert!(!stats.is_real_time());
294 assert_eq!(stats.real_time_factor, 1.5);
295 }
296
297 #[test]
298 fn test_quality_score_calculation() {
299 let mut stats = StreamingStats {
301 real_time_factor: 0.8,
302 current_latency_ms: 30.0,
303 error_count: 0,
304 ..Default::default()
305 };
306
307 let score = stats.quality_score();
308 assert!(score > 0.9);
309
310 stats.real_time_factor = 2.0;
312 stats.current_latency_ms = 500.0;
313 stats.error_count = 10;
314
315 let score = stats.quality_score();
316 assert!(score < 0.5);
317 }
318
319 #[test]
320 fn test_streaming_error_display() {
321 let error = StreamingError::LatencyExceeded(123.45);
322 assert_eq!(error.to_string(), "Latency exceeded: 123.45ms");
323
324 let error = StreamingError::InvalidChunkSize(2048);
325 assert_eq!(error.to_string(), "Invalid chunk size: 2048");
326 }
327
328 #[test]
329 fn test_buffer_pool_optimization() {
330 let pool = BufferPool::new();
331
332 let buffer1 = pool.get_buffer(1024);
334 assert_eq!(buffer1.len(), 1024);
335
336 pool.return_buffer(buffer1);
337
338 let buffer2 = pool.get_buffer(1024);
340 assert_eq!(buffer2.len(), 1024);
341
342 pool.return_buffer(buffer2);
343 }
344}