Skip to main content

voirs_cli/performance/
streaming_optimizer.rs

1//! Advanced streaming synthesis optimization system
2//!
3//! This module provides sophisticated optimizations for real-time streaming synthesis,
4//! focusing on latency reduction, buffering strategies, and adaptive quality control.
5
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10use tokio::sync::{RwLock, Semaphore};
11
12/// Streaming optimization configuration
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct StreamingOptimizerConfig {
15    /// Enable streaming optimization
16    pub enabled: bool,
17    /// Target latency in milliseconds
18    pub target_latency_ms: u64,
19    /// Maximum acceptable latency in milliseconds
20    pub max_latency_ms: u64,
21    /// Buffer configuration
22    pub buffer_config: BufferConfig,
23    /// Quality adaptation settings
24    pub quality_adaptation: QualityAdaptationConfig,
25    /// Prefetching configuration
26    pub prefetching: PrefetchingConfig,
27    /// Chunk processing settings
28    pub chunk_processing: ChunkProcessingConfig,
29    /// Pipeline optimization settings
30    pub pipeline_optimization: PipelineOptimizationConfig,
31}
32
33/// Buffer configuration for streaming
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct BufferConfig {
36    /// Initial buffer size in milliseconds
37    pub initial_buffer_ms: u64,
38    /// Minimum buffer size in milliseconds
39    pub min_buffer_ms: u64,
40    /// Maximum buffer size in milliseconds
41    pub max_buffer_ms: u64,
42    /// Buffer adaptation sensitivity (0.0-1.0)
43    pub adaptation_sensitivity: f64,
44    /// Enable adaptive buffering
45    pub adaptive_buffering: bool,
46    /// Underrun recovery strategy
47    pub underrun_recovery: UnderrunRecoveryStrategy,
48    /// Buffer monitoring interval
49    pub monitoring_interval_ms: u64,
50}
51
52/// Quality adaptation configuration
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct QualityAdaptationConfig {
55    /// Enable adaptive quality control
56    pub enabled: bool,
57    /// Quality levels available
58    pub quality_levels: Vec<QualityLevel>,
59    /// Adaptation trigger threshold (latency increase)
60    pub adaptation_threshold_ms: u64,
61    /// Quality adjustment aggressiveness (0.0-1.0)
62    pub adjustment_aggressiveness: f64,
63    /// Minimum quality level (never go below this)
64    pub min_quality_level: usize,
65    /// Quality recovery speed
66    pub recovery_speed: QualityRecoverySpeed,
67}
68
69/// Prefetching configuration
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct PrefetchingConfig {
72    /// Enable prefetching
73    pub enabled: bool,
74    /// Look-ahead distance in characters
75    pub lookahead_chars: usize,
76    /// Prefetch trigger threshold (buffer percentage)
77    pub trigger_threshold: f64,
78    /// Maximum concurrent prefetch operations
79    pub max_concurrent_prefetch: usize,
80    /// Prefetch cache size
81    pub cache_size_mb: u32,
82    /// Prefetch strategy
83    pub strategy: PrefetchStrategy,
84}
85
86/// Chunk processing configuration
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct ChunkProcessingConfig {
89    /// Chunk size in characters
90    pub chunk_size_chars: usize,
91    /// Chunk overlap in characters
92    pub chunk_overlap_chars: usize,
93    /// Enable parallel chunk processing
94    pub parallel_processing: bool,
95    /// Maximum parallel chunks
96    pub max_parallel_chunks: usize,
97    /// Chunk priority scheduling
98    pub priority_scheduling: bool,
99    /// Dynamic chunk sizing
100    pub dynamic_sizing: bool,
101}
102
103/// Pipeline optimization configuration
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct PipelineOptimizationConfig {
106    /// Enable pipeline parallelization
107    pub pipeline_parallel: bool,
108    /// Number of pipeline stages
109    pub pipeline_stages: usize,
110    /// Enable stage skipping for low latency
111    pub stage_skipping: bool,
112    /// CPU affinity optimization
113    pub cpu_affinity: bool,
114    /// GPU pipeline acceleration
115    pub gpu_acceleration: bool,
116    /// Memory optimization for pipeline
117    pub memory_optimization: bool,
118}
119
120/// Quality level definition
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct QualityLevel {
123    /// Quality level identifier
124    pub level: usize,
125    /// Quality level name
126    pub name: String,
127    /// Expected synthesis time multiplier
128    pub synthesis_time_multiplier: f64,
129    /// Audio quality score (0.0-1.0)
130    pub quality_score: f64,
131    /// Memory usage multiplier
132    pub memory_multiplier: f64,
133    /// CPU usage multiplier
134    pub cpu_multiplier: f64,
135}
136
137/// Underrun recovery strategies
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139pub enum UnderrunRecoveryStrategy {
140    /// Increase buffer size
141    IncreaseBuffer,
142    /// Reduce quality temporarily
143    ReduceQuality,
144    /// Skip frames to catch up
145    SkipFrames,
146    /// Hybrid approach
147    Hybrid,
148}
149
150/// Quality recovery speed settings
151#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
152pub enum QualityRecoverySpeed {
153    /// Conservative recovery (slow)
154    Conservative,
155    /// Moderate recovery speed
156    Moderate,
157    /// Aggressive recovery (fast)
158    Aggressive,
159}
160
161/// Prefetch strategies
162#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163pub enum PrefetchStrategy {
164    /// Linear prefetching (next chunks)
165    Linear,
166    /// Predictive prefetching (based on patterns)
167    Predictive,
168    /// Adaptive prefetching (learns from usage)
169    Adaptive,
170}
171
172/// Streaming performance metrics
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct StreamingMetrics {
175    /// Current latency in milliseconds
176    pub current_latency_ms: u64,
177    /// Average latency over time window
178    pub average_latency_ms: f64,
179    /// Latency percentiles
180    pub latency_p95_ms: u64,
181    pub latency_p99_ms: u64,
182    /// Buffer fill percentage
183    pub buffer_fill_percent: f64,
184    /// Buffer underruns count
185    pub buffer_underruns: u64,
186    /// Current quality level
187    pub current_quality_level: usize,
188    /// Quality adaptations count
189    pub quality_adaptations: u64,
190    /// Prefetch hit rate
191    pub prefetch_hit_rate: f64,
192    /// Chunk processing throughput
193    pub chunk_throughput: f64,
194    /// Pipeline efficiency
195    pub pipeline_efficiency: f64,
196    /// Real-time factor
197    pub real_time_factor: f64,
198}
199
200/// Streaming optimization result
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct StreamingOptimizationResult {
203    /// Optimization type applied
204    pub optimization: StreamingOptimization,
205    /// Latency improvement in milliseconds
206    pub latency_improvement_ms: i64,
207    /// Quality impact (-1.0 to 1.0)
208    pub quality_impact: f64,
209    /// Memory impact in bytes (can be negative for savings)
210    pub memory_impact_bytes: i64,
211    /// CPU impact percentage
212    pub cpu_impact_percent: f64,
213    /// Success status
214    pub success: bool,
215    /// Error description if failed
216    pub error: Option<String>,
217}
218
219/// Types of streaming optimizations
220#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
221pub enum StreamingOptimization {
222    /// Adaptive buffer sizing
223    AdaptiveBuffering,
224    /// Quality level adjustment
225    QualityAdjustment,
226    /// Prefetch optimization
227    PrefetchOptimization,
228    /// Chunk size optimization
229    ChunkSizeOptimization,
230    /// Pipeline parallelization
231    PipelineParallelization,
232    /// Memory optimization
233    MemoryOptimization,
234    /// CPU affinity optimization
235    CpuAffinityOptimization,
236    /// GPU acceleration
237    GpuAcceleration,
238}
239
240/// Advanced streaming optimizer
241pub struct StreamingOptimizer {
242    /// Configuration
243    config: StreamingOptimizerConfig,
244    /// Current streaming metrics
245    metrics: Arc<RwLock<StreamingMetrics>>,
246    /// Latency history for analysis
247    latency_history: Arc<RwLock<VecDeque<LatencyMeasurement>>>,
248    /// Buffer state tracking
249    buffer_state: Arc<RwLock<BufferState>>,
250    /// Quality adaptation state
251    quality_state: Arc<RwLock<QualityState>>,
252    /// Prefetch cache
253    prefetch_cache: Arc<RwLock<PrefetchCache>>,
254    /// Optimization results history
255    optimization_history: Arc<RwLock<VecDeque<StreamingOptimizationResult>>>,
256    /// Is running
257    is_running: Arc<RwLock<bool>>,
258    /// Processing semaphore
259    processing_semaphore: Arc<Semaphore>,
260}
261
262/// Latency measurement point
263#[derive(Debug, Clone, Serialize, Deserialize)]
264struct LatencyMeasurement {
265    /// Timestamp as seconds since epoch
266    timestamp: u64,
267    /// Latency in milliseconds
268    latency_ms: u64,
269    /// Quality level when measured
270    quality_level: usize,
271    /// Buffer fill when measured
272    buffer_fill: f64,
273    /// Processing context
274    context: String,
275}
276
277/// Buffer state tracking
278#[derive(Debug, Clone, Serialize, Deserialize)]
279struct BufferState {
280    /// Current buffer size in milliseconds
281    current_size_ms: u64,
282    /// Buffer fill percentage
283    fill_percentage: f64,
284    /// Last underrun time
285    last_underrun: Option<u64>,
286    /// Underrun count
287    underrun_count: u64,
288    /// Buffer adaptation history
289    adaptation_history: VecDeque<BufferAdaptation>,
290}
291
292/// Buffer adaptation record
293#[derive(Debug, Clone, Serialize, Deserialize)]
294struct BufferAdaptation {
295    /// Timestamp
296    timestamp: u64,
297    /// Old buffer size
298    old_size_ms: u64,
299    /// New buffer size
300    new_size_ms: u64,
301    /// Reason for adaptation
302    reason: String,
303    /// Success of adaptation
304    success: bool,
305}
306
307/// Quality adaptation state
308#[derive(Debug, Clone, Serialize, Deserialize)]
309struct QualityState {
310    /// Current quality level
311    current_level: usize,
312    /// Quality level history
313    level_history: VecDeque<QualityChange>,
314    /// Last quality change time
315    last_change: Option<u64>,
316    /// Quality adaptation statistics
317    adaptation_stats: QualityAdaptationStats,
318}
319
320/// Quality level change record
321#[derive(Debug, Clone, Serialize, Deserialize)]
322struct QualityChange {
323    /// Timestamp
324    timestamp: u64,
325    /// Old quality level
326    old_level: usize,
327    /// New quality level
328    new_level: usize,
329    /// Trigger reason
330    trigger: String,
331    /// Latency at time of change
332    latency_ms: u64,
333}
334
335/// Quality adaptation statistics
336#[derive(Debug, Clone, Serialize, Deserialize)]
337struct QualityAdaptationStats {
338    /// Total adaptations
339    total_adaptations: u64,
340    /// Successful adaptations
341    successful_adaptations: u64,
342    /// Average adaptation effect on latency
343    avg_latency_improvement_ms: f64,
344    /// Quality stability score
345    stability_score: f64,
346}
347
348/// Prefetch cache implementation
349#[derive(Debug)]
350struct PrefetchCache {
351    /// Cached synthesis results
352    cache: HashMap<String, CachedSynthesis>,
353    /// Cache size in bytes
354    current_size_bytes: u64,
355    /// Maximum cache size in bytes
356    max_size_bytes: u64,
357    /// Cache hit statistics
358    hits: u64,
359    /// Cache miss statistics
360    misses: u64,
361    /// LRU tracking
362    lru_order: VecDeque<String>,
363}
364
365/// Cached synthesis result
366#[derive(Debug, Clone)]
367struct CachedSynthesis {
368    /// Cache key (text hash + quality level)
369    key: String,
370    /// Synthesized audio data
371    audio_data: Vec<u8>,
372    /// Cache timestamp
373    timestamp: u64,
374    /// Quality level used
375    quality_level: usize,
376    /// Access count
377    access_count: u64,
378}
379
380impl StreamingOptimizer {
381    /// Create a new streaming optimizer
382    pub fn new(config: StreamingOptimizerConfig) -> Self {
383        let processing_permits = config.chunk_processing.max_parallel_chunks;
384
385        Self {
386            config,
387            metrics: Arc::new(RwLock::new(StreamingMetrics::default())),
388            latency_history: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
389            buffer_state: Arc::new(RwLock::new(BufferState::default())),
390            quality_state: Arc::new(RwLock::new(QualityState::default())),
391            prefetch_cache: Arc::new(RwLock::new(PrefetchCache::default())),
392            optimization_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
393            is_running: Arc::new(RwLock::new(false)),
394            processing_semaphore: Arc::new(Semaphore::new(processing_permits)),
395        }
396    }
397
398    /// Start the streaming optimizer
399    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
400        let mut is_running = self.is_running.write().await;
401        if *is_running {
402            return Ok(());
403        }
404        *is_running = true;
405        drop(is_running);
406
407        tracing::info!("Starting streaming optimizer");
408
409        // Initialize quality state
410        self.initialize_quality_state().await;
411
412        // Start monitoring tasks
413        self.start_latency_monitoring().await;
414        self.start_buffer_monitoring().await;
415        self.start_quality_adaptation().await;
416        self.start_prefetch_management().await;
417
418        Ok(())
419    }
420
421    /// Stop the streaming optimizer
422    pub async fn stop(&self) -> Result<(), Box<dyn std::error::Error>> {
423        let mut is_running = self.is_running.write().await;
424        if !*is_running {
425            return Ok(());
426        }
427        *is_running = false;
428
429        tracing::info!("Stopped streaming optimizer");
430        Ok(())
431    }
432
433    /// Record a latency measurement
434    pub async fn record_latency(&self, latency_ms: u64, context: String) {
435        let measurement = LatencyMeasurement {
436            timestamp: SystemTime::now()
437                .duration_since(UNIX_EPOCH)
438                .unwrap_or_default()
439                .as_secs(),
440            latency_ms,
441            quality_level: self.get_current_quality_level().await,
442            buffer_fill: self.get_buffer_fill_percentage().await,
443            context,
444        };
445
446        let mut history = self.latency_history.write().await;
447        history.push_back(measurement);
448
449        // Maintain history size
450        if history.len() > 1000 {
451            history.pop_front();
452        }
453
454        // Update current metrics
455        let mut metrics = self.metrics.write().await;
456        metrics.current_latency_ms = latency_ms;
457
458        // Calculate average
459        let recent_measurements: Vec<u64> = history
460            .iter()
461            .rev()
462            .take(60) // Last 60 measurements
463            .map(|m| m.latency_ms)
464            .collect();
465
466        if !recent_measurements.is_empty() {
467            metrics.average_latency_ms =
468                recent_measurements.iter().sum::<u64>() as f64 / recent_measurements.len() as f64;
469        }
470
471        // Trigger optimization if needed
472        if latency_ms > self.config.max_latency_ms {
473            self.trigger_latency_optimization().await;
474        }
475    }
476
477    /// Get streaming performance recommendations
478    pub async fn get_performance_recommendations(&self) -> Vec<StreamingRecommendation> {
479        let mut recommendations = Vec::new();
480
481        let metrics = self.metrics.read().await;
482        let buffer_state = self.buffer_state.read().await;
483        let quality_state = self.quality_state.read().await;
484
485        // Check latency
486        if metrics.current_latency_ms > self.config.target_latency_ms {
487            let excess_latency = metrics.current_latency_ms - self.config.target_latency_ms;
488            recommendations.push(StreamingRecommendation {
489                optimization: StreamingOptimization::QualityAdjustment,
490                priority: if excess_latency > 100 { 9 } else { 6 },
491                description: format!("Latency {} ms above target", excess_latency),
492                expected_improvement_ms: (excess_latency as f64 * 0.6) as u64,
493                quality_impact: -0.2,
494                implementation_complexity: ImplementationComplexity::Low,
495            });
496        }
497
498        // Check buffer underruns
499        if buffer_state.underrun_count > 0 {
500            recommendations.push(StreamingRecommendation {
501                optimization: StreamingOptimization::AdaptiveBuffering,
502                priority: 8,
503                description: format!("{} buffer underruns detected", buffer_state.underrun_count),
504                expected_improvement_ms: 50,
505                quality_impact: 0.0,
506                implementation_complexity: ImplementationComplexity::Medium,
507            });
508        }
509
510        // Check prefetch effectiveness
511        if metrics.prefetch_hit_rate < 70.0 {
512            recommendations.push(StreamingRecommendation {
513                optimization: StreamingOptimization::PrefetchOptimization,
514                priority: 5,
515                description: format!("Low prefetch hit rate: {:.1}%", metrics.prefetch_hit_rate),
516                expected_improvement_ms: 30,
517                quality_impact: 0.1,
518                implementation_complexity: ImplementationComplexity::High,
519            });
520        }
521
522        // Check pipeline efficiency
523        if metrics.pipeline_efficiency < 80.0 {
524            recommendations.push(StreamingRecommendation {
525                optimization: StreamingOptimization::PipelineParallelization,
526                priority: 7,
527                description: format!(
528                    "Pipeline efficiency low: {:.1}%",
529                    metrics.pipeline_efficiency
530                ),
531                expected_improvement_ms: 40,
532                quality_impact: 0.0,
533                implementation_complexity: ImplementationComplexity::High,
534            });
535        }
536
537        recommendations.sort_by(|a, b| b.priority.cmp(&a.priority));
538        recommendations
539    }
540
541    /// Apply streaming optimization
542    pub async fn apply_optimization(
543        &self,
544        optimization: StreamingOptimization,
545    ) -> StreamingOptimizationResult {
546        let start_time = Instant::now();
547
548        let result = match optimization {
549            StreamingOptimization::AdaptiveBuffering => self.optimize_adaptive_buffering().await,
550            StreamingOptimization::QualityAdjustment => self.optimize_quality_adjustment().await,
551            StreamingOptimization::PrefetchOptimization => self.optimize_prefetching().await,
552            StreamingOptimization::ChunkSizeOptimization => self.optimize_chunk_size().await,
553            StreamingOptimization::PipelineParallelization => {
554                self.optimize_pipeline_parallelization().await
555            }
556            StreamingOptimization::MemoryOptimization => self.optimize_memory_usage().await,
557            StreamingOptimization::CpuAffinityOptimization => self.optimize_cpu_affinity().await,
558            StreamingOptimization::GpuAcceleration => self.optimize_gpu_acceleration().await,
559        };
560
561        let optimization_result = StreamingOptimizationResult {
562            optimization,
563            latency_improvement_ms: result.0,
564            quality_impact: result.1,
565            memory_impact_bytes: result.2,
566            cpu_impact_percent: result.3,
567            success: result.4,
568            error: result.5,
569        };
570
571        // Record result
572        let mut history = self.optimization_history.write().await;
573        history.push_back(optimization_result.clone());
574        if history.len() > 100 {
575            history.pop_front();
576        }
577
578        optimization_result
579    }
580
581    /// Get current streaming metrics
582    pub async fn get_metrics(&self) -> StreamingMetrics {
583        self.metrics.read().await.clone()
584    }
585
586    /// Initialize quality state
587    async fn initialize_quality_state(&self) {
588        let mut quality_state = self.quality_state.write().await;
589        let default_level = self.config.quality_adaptation.quality_levels.len() / 2; // Start with middle quality
590        quality_state.current_level = default_level;
591    }
592
593    /// Start latency monitoring task
594    async fn start_latency_monitoring(&self) {
595        let is_running = self.is_running.clone();
596        let metrics = self.metrics.clone();
597        let latency_history = self.latency_history.clone();
598
599        tokio::spawn(async move {
600            let mut interval = tokio::time::interval(Duration::from_millis(100));
601
602            loop {
603                interval.tick().await;
604
605                let running = is_running.read().await;
606                if !*running {
607                    break;
608                }
609                drop(running);
610
611                // Update latency percentiles
612                let history = latency_history.read().await;
613                if history.len() >= 20 {
614                    let mut recent_latencies: Vec<u64> = history
615                        .iter()
616                        .rev()
617                        .take(100)
618                        .map(|m| m.latency_ms)
619                        .collect();
620                    recent_latencies.sort_unstable();
621
622                    let p95_index = (recent_latencies.len() as f64 * 0.95) as usize;
623                    let p99_index = (recent_latencies.len() as f64 * 0.99) as usize;
624
625                    let mut metrics = metrics.write().await;
626                    metrics.latency_p95_ms = recent_latencies.get(p95_index).cloned().unwrap_or(0);
627                    metrics.latency_p99_ms = recent_latencies.get(p99_index).cloned().unwrap_or(0);
628                }
629            }
630        });
631    }
632
633    /// Start buffer monitoring task
634    async fn start_buffer_monitoring(&self) {
635        let is_running = self.is_running.clone();
636        let buffer_state = self.buffer_state.clone();
637        let metrics = self.metrics.clone();
638        let config = self.config.buffer_config.clone();
639
640        tokio::spawn(async move {
641            let mut interval =
642                tokio::time::interval(Duration::from_millis(config.monitoring_interval_ms));
643
644            loop {
645                interval.tick().await;
646
647                let running = is_running.read().await;
648                if !*running {
649                    break;
650                }
651                drop(running);
652
653                // Monitor buffer state and update metrics
654                let buffer = buffer_state.read().await;
655                let mut metrics = metrics.write().await;
656                metrics.buffer_fill_percent = buffer.fill_percentage;
657                metrics.buffer_underruns = buffer.underrun_count;
658            }
659        });
660    }
661
662    /// Start quality adaptation task
663    async fn start_quality_adaptation(&self) {
664        let is_running = self.is_running.clone();
665        let quality_state = self.quality_state.clone();
666        let metrics = self.metrics.clone();
667        let config = self.config.quality_adaptation.clone();
668
669        tokio::spawn(async move {
670            let mut interval = tokio::time::interval(Duration::from_millis(500));
671
672            loop {
673                interval.tick().await;
674
675                let running = is_running.read().await;
676                if !*running {
677                    break;
678                }
679                drop(running);
680
681                if !config.enabled {
682                    continue;
683                }
684
685                // Check if quality adaptation is needed
686                let current_metrics = metrics.read().await;
687                let mut quality = quality_state.write().await;
688
689                if current_metrics.current_latency_ms > config.adaptation_threshold_ms {
690                    // Consider reducing quality
691                    if quality.current_level > config.min_quality_level {
692                        let old_level = quality.current_level;
693                        let new_level = (quality.current_level - 1).max(config.min_quality_level);
694                        quality.current_level = new_level;
695
696                        quality.level_history.push_back(QualityChange {
697                            timestamp: SystemTime::now()
698                                .duration_since(SystemTime::UNIX_EPOCH)
699                                .unwrap_or_default()
700                                .as_secs(),
701                            old_level,
702                            new_level,
703                            trigger: "high_latency".to_string(),
704                            latency_ms: current_metrics.current_latency_ms,
705                        });
706
707                        quality.adaptation_stats.total_adaptations += 1;
708                        tracing::info!(
709                            "Reduced quality level from {} to {} due to high latency",
710                            old_level,
711                            quality.current_level
712                        );
713                    }
714                }
715            }
716        });
717    }
718
719    /// Start prefetch management task
720    async fn start_prefetch_management(&self) {
721        let is_running = self.is_running.clone();
722        let prefetch_cache = self.prefetch_cache.clone();
723        let config = self.config.prefetching.clone();
724
725        tokio::spawn(async move {
726            let mut interval = tokio::time::interval(Duration::from_secs(30));
727
728            loop {
729                interval.tick().await;
730
731                let running = is_running.read().await;
732                if !*running {
733                    break;
734                }
735                drop(running);
736
737                if !config.enabled {
738                    continue;
739                }
740
741                // Clean up expired cache entries
742                let mut cache = prefetch_cache.write().await;
743                let current_timestamp = SystemTime::now()
744                    .duration_since(UNIX_EPOCH)
745                    .unwrap_or_default()
746                    .as_secs();
747                let expiry_seconds = 300u64; // 5 minutes
748
749                let expired_keys: Vec<String> = cache
750                    .cache
751                    .iter()
752                    .filter(|(_, entry)| {
753                        current_timestamp.saturating_sub(entry.timestamp) > expiry_seconds
754                    })
755                    .map(|(key, _)| key.clone())
756                    .collect();
757
758                for key in expired_keys {
759                    if let Some(entry) = cache.cache.remove(&key) {
760                        cache.current_size_bytes -= entry.audio_data.len() as u64;
761                        cache.lru_order.retain(|k| k != &key);
762                    }
763                }
764            }
765        });
766    }
767
768    /// Trigger latency optimization
769    async fn trigger_latency_optimization(&self) {
770        // Implement automatic optimization triggers
771        tracing::warn!("High latency detected, triggering optimization");
772
773        // Apply the most effective optimization for latency
774        let _ = self
775            .apply_optimization(StreamingOptimization::QualityAdjustment)
776            .await;
777    }
778
779    /// Get current quality level
780    async fn get_current_quality_level(&self) -> usize {
781        self.quality_state.read().await.current_level
782    }
783
784    /// Get buffer fill percentage
785    async fn get_buffer_fill_percentage(&self) -> f64 {
786        self.buffer_state.read().await.fill_percentage
787    }
788
789    // Optimization implementation methods
790    async fn optimize_adaptive_buffering(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
791        tracing::info!("Optimizing adaptive buffering");
792        (25, 0.0, 1024 * 1024, 5.0, true, None) // 25ms improvement, no quality impact, 1MB memory, 5% CPU
793    }
794
795    async fn optimize_quality_adjustment(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
796        tracing::info!("Optimizing quality adjustment");
797        (60, -0.15, -512 * 1024, -10.0, true, None) // 60ms improvement, slight quality reduction, memory savings
798    }
799
800    async fn optimize_prefetching(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
801        tracing::info!("Optimizing prefetching");
802        (35, 0.05, 2 * 1024 * 1024, 8.0, true, None) // 35ms improvement, slight quality boost, 2MB memory
803    }
804
805    async fn optimize_chunk_size(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
806        tracing::info!("Optimizing chunk size");
807        (20, 0.0, 0, 3.0, true, None) // 20ms improvement, no quality/memory impact
808    }
809
810    async fn optimize_pipeline_parallelization(
811        &self,
812    ) -> (i64, f64, i64, f64, bool, Option<String>) {
813        tracing::info!("Optimizing pipeline parallelization");
814        (45, 0.0, 512 * 1024, 15.0, true, None) // 45ms improvement, 15% more CPU usage
815    }
816
817    async fn optimize_memory_usage(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
818        tracing::info!("Optimizing memory usage");
819        (15, 0.0, -1024 * 1024, -2.0, true, None) // 15ms improvement, 1MB memory savings
820    }
821
822    async fn optimize_cpu_affinity(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
823        tracing::info!("Optimizing CPU affinity");
824        (30, 0.0, 0, -5.0, true, None) // 30ms improvement, 5% CPU savings
825    }
826
827    async fn optimize_gpu_acceleration(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
828        tracing::info!("Optimizing GPU acceleration");
829        (80, 0.1, 4 * 1024 * 1024, -20.0, true, None) // 80ms improvement, quality boost, GPU memory
830    }
831}
832
833/// Streaming optimization recommendation
834#[derive(Debug, Clone, Serialize, Deserialize)]
835pub struct StreamingRecommendation {
836    /// Optimization type
837    pub optimization: StreamingOptimization,
838    /// Priority level (1-10)
839    pub priority: u8,
840    /// Description
841    pub description: String,
842    /// Expected latency improvement in milliseconds
843    pub expected_improvement_ms: u64,
844    /// Quality impact (-1.0 to 1.0)
845    pub quality_impact: f64,
846    /// Implementation complexity
847    pub implementation_complexity: ImplementationComplexity,
848}
849
850/// Implementation complexity levels
851#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
852pub enum ImplementationComplexity {
853    /// Low complexity, can be applied immediately
854    Low,
855    /// Medium complexity, requires some planning
856    Medium,
857    /// High complexity, requires significant changes
858    High,
859}
860
861// Default implementations
862impl Default for StreamingMetrics {
863    fn default() -> Self {
864        Self {
865            current_latency_ms: 0,
866            average_latency_ms: 0.0,
867            latency_p95_ms: 0,
868            latency_p99_ms: 0,
869            buffer_fill_percent: 50.0,
870            buffer_underruns: 0,
871            current_quality_level: 2,
872            quality_adaptations: 0,
873            prefetch_hit_rate: 0.0,
874            chunk_throughput: 0.0,
875            pipeline_efficiency: 100.0,
876            real_time_factor: 1.0,
877        }
878    }
879}
880
881impl Default for BufferState {
882    fn default() -> Self {
883        Self {
884            current_size_ms: 200,
885            fill_percentage: 50.0,
886            last_underrun: None,
887            underrun_count: 0,
888            adaptation_history: VecDeque::new(),
889        }
890    }
891}
892
893impl Default for QualityState {
894    fn default() -> Self {
895        Self {
896            current_level: 2,
897            level_history: VecDeque::new(),
898            last_change: None,
899            adaptation_stats: QualityAdaptationStats {
900                total_adaptations: 0,
901                successful_adaptations: 0,
902                avg_latency_improvement_ms: 0.0,
903                stability_score: 100.0,
904            },
905        }
906    }
907}
908
909impl Default for PrefetchCache {
910    fn default() -> Self {
911        Self {
912            cache: HashMap::new(),
913            current_size_bytes: 0,
914            max_size_bytes: 100 * 1024 * 1024, // 100MB
915            hits: 0,
916            misses: 0,
917            lru_order: VecDeque::new(),
918        }
919    }
920}
921
922impl Default for StreamingOptimizerConfig {
923    fn default() -> Self {
924        Self {
925            enabled: true,
926            target_latency_ms: 100,
927            max_latency_ms: 200,
928            buffer_config: BufferConfig::default(),
929            quality_adaptation: QualityAdaptationConfig::default(),
930            prefetching: PrefetchingConfig::default(),
931            chunk_processing: ChunkProcessingConfig::default(),
932            pipeline_optimization: PipelineOptimizationConfig::default(),
933        }
934    }
935}
936
937impl Default for BufferConfig {
938    fn default() -> Self {
939        Self {
940            initial_buffer_ms: 200,
941            min_buffer_ms: 50,
942            max_buffer_ms: 1000,
943            adaptation_sensitivity: 0.7,
944            adaptive_buffering: true,
945            underrun_recovery: UnderrunRecoveryStrategy::Hybrid,
946            monitoring_interval_ms: 100,
947        }
948    }
949}
950
951impl Default for QualityAdaptationConfig {
952    fn default() -> Self {
953        Self {
954            enabled: true,
955            quality_levels: vec![
956                QualityLevel {
957                    level: 0,
958                    name: "Low".to_string(),
959                    synthesis_time_multiplier: 0.5,
960                    quality_score: 0.6,
961                    memory_multiplier: 0.7,
962                    cpu_multiplier: 0.6,
963                },
964                QualityLevel {
965                    level: 1,
966                    name: "Medium".to_string(),
967                    synthesis_time_multiplier: 0.8,
968                    quality_score: 0.8,
969                    memory_multiplier: 0.9,
970                    cpu_multiplier: 0.8,
971                },
972                QualityLevel {
973                    level: 2,
974                    name: "High".to_string(),
975                    synthesis_time_multiplier: 1.0,
976                    quality_score: 1.0,
977                    memory_multiplier: 1.0,
978                    cpu_multiplier: 1.0,
979                },
980                QualityLevel {
981                    level: 3,
982                    name: "Ultra".to_string(),
983                    synthesis_time_multiplier: 1.5,
984                    quality_score: 1.0,
985                    memory_multiplier: 1.3,
986                    cpu_multiplier: 1.4,
987                },
988            ],
989            adaptation_threshold_ms: 150,
990            adjustment_aggressiveness: 0.6,
991            min_quality_level: 0,
992            recovery_speed: QualityRecoverySpeed::Moderate,
993        }
994    }
995}
996
997impl Default for PrefetchingConfig {
998    fn default() -> Self {
999        Self {
1000            enabled: true,
1001            lookahead_chars: 200,
1002            trigger_threshold: 0.3,
1003            max_concurrent_prefetch: 3,
1004            cache_size_mb: 50,
1005            strategy: PrefetchStrategy::Adaptive,
1006        }
1007    }
1008}
1009
1010impl Default for ChunkProcessingConfig {
1011    fn default() -> Self {
1012        Self {
1013            chunk_size_chars: 100,
1014            chunk_overlap_chars: 10,
1015            parallel_processing: true,
1016            max_parallel_chunks: 4,
1017            priority_scheduling: true,
1018            dynamic_sizing: true,
1019        }
1020    }
1021}
1022
1023impl Default for PipelineOptimizationConfig {
1024    fn default() -> Self {
1025        Self {
1026            pipeline_parallel: true,
1027            pipeline_stages: 4,
1028            stage_skipping: false,
1029            cpu_affinity: true,
1030            gpu_acceleration: false,
1031            memory_optimization: true,
1032        }
1033    }
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038    use super::*;
1039
1040    #[tokio::test]
1041    async fn test_streaming_optimizer_creation() {
1042        let config = StreamingOptimizerConfig::default();
1043        let optimizer = StreamingOptimizer::new(config);
1044
1045        assert!(!*optimizer.is_running.read().await);
1046    }
1047
1048    #[tokio::test]
1049    async fn test_latency_recording() {
1050        let config = StreamingOptimizerConfig::default();
1051        let optimizer = StreamingOptimizer::new(config);
1052
1053        optimizer
1054            .record_latency(150, "test_context".to_string())
1055            .await;
1056
1057        let metrics = optimizer.get_metrics().await;
1058        assert_eq!(metrics.current_latency_ms, 150);
1059    }
1060
1061    #[tokio::test]
1062    async fn test_performance_recommendations() {
1063        let config = StreamingOptimizerConfig::default();
1064        let optimizer = StreamingOptimizer::new(config);
1065
1066        // Record high latency to trigger recommendations
1067        optimizer.record_latency(300, "test".to_string()).await;
1068
1069        let recommendations = optimizer.get_performance_recommendations().await;
1070        assert!(!recommendations.is_empty());
1071
1072        // Should recommend quality adjustment for high latency
1073        assert!(recommendations
1074            .iter()
1075            .any(|r| r.optimization == StreamingOptimization::QualityAdjustment));
1076    }
1077
1078    #[tokio::test]
1079    async fn test_optimization_application() {
1080        let config = StreamingOptimizerConfig::default();
1081        let optimizer = StreamingOptimizer::new(config);
1082
1083        let result = optimizer
1084            .apply_optimization(StreamingOptimization::AdaptiveBuffering)
1085            .await;
1086
1087        assert!(result.success);
1088        assert!(result.latency_improvement_ms > 0);
1089    }
1090
1091    #[test]
1092    fn test_config_defaults() {
1093        let config = StreamingOptimizerConfig::default();
1094
1095        assert!(config.enabled);
1096        assert_eq!(config.target_latency_ms, 100);
1097        assert_eq!(config.max_latency_ms, 200);
1098        assert!(config.quality_adaptation.enabled);
1099    }
1100
1101    #[test]
1102    fn test_quality_levels() {
1103        let config = QualityAdaptationConfig::default();
1104
1105        assert_eq!(config.quality_levels.len(), 4);
1106        assert_eq!(config.quality_levels[0].name, "Low");
1107        assert_eq!(config.quality_levels[3].name, "Ultra");
1108    }
1109}