1use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10use tokio::sync::{RwLock, Semaphore};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct StreamingOptimizerConfig {
15 pub enabled: bool,
17 pub target_latency_ms: u64,
19 pub max_latency_ms: u64,
21 pub buffer_config: BufferConfig,
23 pub quality_adaptation: QualityAdaptationConfig,
25 pub prefetching: PrefetchingConfig,
27 pub chunk_processing: ChunkProcessingConfig,
29 pub pipeline_optimization: PipelineOptimizationConfig,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct BufferConfig {
36 pub initial_buffer_ms: u64,
38 pub min_buffer_ms: u64,
40 pub max_buffer_ms: u64,
42 pub adaptation_sensitivity: f64,
44 pub adaptive_buffering: bool,
46 pub underrun_recovery: UnderrunRecoveryStrategy,
48 pub monitoring_interval_ms: u64,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct QualityAdaptationConfig {
55 pub enabled: bool,
57 pub quality_levels: Vec<QualityLevel>,
59 pub adaptation_threshold_ms: u64,
61 pub adjustment_aggressiveness: f64,
63 pub min_quality_level: usize,
65 pub recovery_speed: QualityRecoverySpeed,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct PrefetchingConfig {
72 pub enabled: bool,
74 pub lookahead_chars: usize,
76 pub trigger_threshold: f64,
78 pub max_concurrent_prefetch: usize,
80 pub cache_size_mb: u32,
82 pub strategy: PrefetchStrategy,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct ChunkProcessingConfig {
89 pub chunk_size_chars: usize,
91 pub chunk_overlap_chars: usize,
93 pub parallel_processing: bool,
95 pub max_parallel_chunks: usize,
97 pub priority_scheduling: bool,
99 pub dynamic_sizing: bool,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct PipelineOptimizationConfig {
106 pub pipeline_parallel: bool,
108 pub pipeline_stages: usize,
110 pub stage_skipping: bool,
112 pub cpu_affinity: bool,
114 pub gpu_acceleration: bool,
116 pub memory_optimization: bool,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct QualityLevel {
123 pub level: usize,
125 pub name: String,
127 pub synthesis_time_multiplier: f64,
129 pub quality_score: f64,
131 pub memory_multiplier: f64,
133 pub cpu_multiplier: f64,
135}
136
137#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139pub enum UnderrunRecoveryStrategy {
140 IncreaseBuffer,
142 ReduceQuality,
144 SkipFrames,
146 Hybrid,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
152pub enum QualityRecoverySpeed {
153 Conservative,
155 Moderate,
157 Aggressive,
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163pub enum PrefetchStrategy {
164 Linear,
166 Predictive,
168 Adaptive,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct StreamingMetrics {
175 pub current_latency_ms: u64,
177 pub average_latency_ms: f64,
179 pub latency_p95_ms: u64,
181 pub latency_p99_ms: u64,
182 pub buffer_fill_percent: f64,
184 pub buffer_underruns: u64,
186 pub current_quality_level: usize,
188 pub quality_adaptations: u64,
190 pub prefetch_hit_rate: f64,
192 pub chunk_throughput: f64,
194 pub pipeline_efficiency: f64,
196 pub real_time_factor: f64,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct StreamingOptimizationResult {
203 pub optimization: StreamingOptimization,
205 pub latency_improvement_ms: i64,
207 pub quality_impact: f64,
209 pub memory_impact_bytes: i64,
211 pub cpu_impact_percent: f64,
213 pub success: bool,
215 pub error: Option<String>,
217}
218
219#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
221pub enum StreamingOptimization {
222 AdaptiveBuffering,
224 QualityAdjustment,
226 PrefetchOptimization,
228 ChunkSizeOptimization,
230 PipelineParallelization,
232 MemoryOptimization,
234 CpuAffinityOptimization,
236 GpuAcceleration,
238}
239
240pub struct StreamingOptimizer {
242 config: StreamingOptimizerConfig,
244 metrics: Arc<RwLock<StreamingMetrics>>,
246 latency_history: Arc<RwLock<VecDeque<LatencyMeasurement>>>,
248 buffer_state: Arc<RwLock<BufferState>>,
250 quality_state: Arc<RwLock<QualityState>>,
252 prefetch_cache: Arc<RwLock<PrefetchCache>>,
254 optimization_history: Arc<RwLock<VecDeque<StreamingOptimizationResult>>>,
256 is_running: Arc<RwLock<bool>>,
258 processing_semaphore: Arc<Semaphore>,
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
264struct LatencyMeasurement {
265 timestamp: u64,
267 latency_ms: u64,
269 quality_level: usize,
271 buffer_fill: f64,
273 context: String,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279struct BufferState {
280 current_size_ms: u64,
282 fill_percentage: f64,
284 last_underrun: Option<u64>,
286 underrun_count: u64,
288 adaptation_history: VecDeque<BufferAdaptation>,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
294struct BufferAdaptation {
295 timestamp: u64,
297 old_size_ms: u64,
299 new_size_ms: u64,
301 reason: String,
303 success: bool,
305}
306
307#[derive(Debug, Clone, Serialize, Deserialize)]
309struct QualityState {
310 current_level: usize,
312 level_history: VecDeque<QualityChange>,
314 last_change: Option<u64>,
316 adaptation_stats: QualityAdaptationStats,
318}
319
320#[derive(Debug, Clone, Serialize, Deserialize)]
322struct QualityChange {
323 timestamp: u64,
325 old_level: usize,
327 new_level: usize,
329 trigger: String,
331 latency_ms: u64,
333}
334
335#[derive(Debug, Clone, Serialize, Deserialize)]
337struct QualityAdaptationStats {
338 total_adaptations: u64,
340 successful_adaptations: u64,
342 avg_latency_improvement_ms: f64,
344 stability_score: f64,
346}
347
348#[derive(Debug)]
350struct PrefetchCache {
351 cache: HashMap<String, CachedSynthesis>,
353 current_size_bytes: u64,
355 max_size_bytes: u64,
357 hits: u64,
359 misses: u64,
361 lru_order: VecDeque<String>,
363}
364
365#[derive(Debug, Clone)]
367struct CachedSynthesis {
368 key: String,
370 audio_data: Vec<u8>,
372 timestamp: u64,
374 quality_level: usize,
376 access_count: u64,
378}
379
380impl StreamingOptimizer {
381 pub fn new(config: StreamingOptimizerConfig) -> Self {
383 let processing_permits = config.chunk_processing.max_parallel_chunks;
384
385 Self {
386 config,
387 metrics: Arc::new(RwLock::new(StreamingMetrics::default())),
388 latency_history: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
389 buffer_state: Arc::new(RwLock::new(BufferState::default())),
390 quality_state: Arc::new(RwLock::new(QualityState::default())),
391 prefetch_cache: Arc::new(RwLock::new(PrefetchCache::default())),
392 optimization_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
393 is_running: Arc::new(RwLock::new(false)),
394 processing_semaphore: Arc::new(Semaphore::new(processing_permits)),
395 }
396 }
397
398 pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
400 let mut is_running = self.is_running.write().await;
401 if *is_running {
402 return Ok(());
403 }
404 *is_running = true;
405 drop(is_running);
406
407 tracing::info!("Starting streaming optimizer");
408
409 self.initialize_quality_state().await;
411
412 self.start_latency_monitoring().await;
414 self.start_buffer_monitoring().await;
415 self.start_quality_adaptation().await;
416 self.start_prefetch_management().await;
417
418 Ok(())
419 }
420
421 pub async fn stop(&self) -> Result<(), Box<dyn std::error::Error>> {
423 let mut is_running = self.is_running.write().await;
424 if !*is_running {
425 return Ok(());
426 }
427 *is_running = false;
428
429 tracing::info!("Stopped streaming optimizer");
430 Ok(())
431 }
432
433 pub async fn record_latency(&self, latency_ms: u64, context: String) {
435 let measurement = LatencyMeasurement {
436 timestamp: SystemTime::now()
437 .duration_since(UNIX_EPOCH)
438 .unwrap_or_default()
439 .as_secs(),
440 latency_ms,
441 quality_level: self.get_current_quality_level().await,
442 buffer_fill: self.get_buffer_fill_percentage().await,
443 context,
444 };
445
446 let mut history = self.latency_history.write().await;
447 history.push_back(measurement);
448
449 if history.len() > 1000 {
451 history.pop_front();
452 }
453
454 let mut metrics = self.metrics.write().await;
456 metrics.current_latency_ms = latency_ms;
457
458 let recent_measurements: Vec<u64> = history
460 .iter()
461 .rev()
462 .take(60) .map(|m| m.latency_ms)
464 .collect();
465
466 if !recent_measurements.is_empty() {
467 metrics.average_latency_ms =
468 recent_measurements.iter().sum::<u64>() as f64 / recent_measurements.len() as f64;
469 }
470
471 if latency_ms > self.config.max_latency_ms {
473 self.trigger_latency_optimization().await;
474 }
475 }
476
477 pub async fn get_performance_recommendations(&self) -> Vec<StreamingRecommendation> {
479 let mut recommendations = Vec::new();
480
481 let metrics = self.metrics.read().await;
482 let buffer_state = self.buffer_state.read().await;
483 let quality_state = self.quality_state.read().await;
484
485 if metrics.current_latency_ms > self.config.target_latency_ms {
487 let excess_latency = metrics.current_latency_ms - self.config.target_latency_ms;
488 recommendations.push(StreamingRecommendation {
489 optimization: StreamingOptimization::QualityAdjustment,
490 priority: if excess_latency > 100 { 9 } else { 6 },
491 description: format!("Latency {} ms above target", excess_latency),
492 expected_improvement_ms: (excess_latency as f64 * 0.6) as u64,
493 quality_impact: -0.2,
494 implementation_complexity: ImplementationComplexity::Low,
495 });
496 }
497
498 if buffer_state.underrun_count > 0 {
500 recommendations.push(StreamingRecommendation {
501 optimization: StreamingOptimization::AdaptiveBuffering,
502 priority: 8,
503 description: format!("{} buffer underruns detected", buffer_state.underrun_count),
504 expected_improvement_ms: 50,
505 quality_impact: 0.0,
506 implementation_complexity: ImplementationComplexity::Medium,
507 });
508 }
509
510 if metrics.prefetch_hit_rate < 70.0 {
512 recommendations.push(StreamingRecommendation {
513 optimization: StreamingOptimization::PrefetchOptimization,
514 priority: 5,
515 description: format!("Low prefetch hit rate: {:.1}%", metrics.prefetch_hit_rate),
516 expected_improvement_ms: 30,
517 quality_impact: 0.1,
518 implementation_complexity: ImplementationComplexity::High,
519 });
520 }
521
522 if metrics.pipeline_efficiency < 80.0 {
524 recommendations.push(StreamingRecommendation {
525 optimization: StreamingOptimization::PipelineParallelization,
526 priority: 7,
527 description: format!(
528 "Pipeline efficiency low: {:.1}%",
529 metrics.pipeline_efficiency
530 ),
531 expected_improvement_ms: 40,
532 quality_impact: 0.0,
533 implementation_complexity: ImplementationComplexity::High,
534 });
535 }
536
537 recommendations.sort_by(|a, b| b.priority.cmp(&a.priority));
538 recommendations
539 }
540
541 pub async fn apply_optimization(
543 &self,
544 optimization: StreamingOptimization,
545 ) -> StreamingOptimizationResult {
546 let start_time = Instant::now();
547
548 let result = match optimization {
549 StreamingOptimization::AdaptiveBuffering => self.optimize_adaptive_buffering().await,
550 StreamingOptimization::QualityAdjustment => self.optimize_quality_adjustment().await,
551 StreamingOptimization::PrefetchOptimization => self.optimize_prefetching().await,
552 StreamingOptimization::ChunkSizeOptimization => self.optimize_chunk_size().await,
553 StreamingOptimization::PipelineParallelization => {
554 self.optimize_pipeline_parallelization().await
555 }
556 StreamingOptimization::MemoryOptimization => self.optimize_memory_usage().await,
557 StreamingOptimization::CpuAffinityOptimization => self.optimize_cpu_affinity().await,
558 StreamingOptimization::GpuAcceleration => self.optimize_gpu_acceleration().await,
559 };
560
561 let optimization_result = StreamingOptimizationResult {
562 optimization,
563 latency_improvement_ms: result.0,
564 quality_impact: result.1,
565 memory_impact_bytes: result.2,
566 cpu_impact_percent: result.3,
567 success: result.4,
568 error: result.5,
569 };
570
571 let mut history = self.optimization_history.write().await;
573 history.push_back(optimization_result.clone());
574 if history.len() > 100 {
575 history.pop_front();
576 }
577
578 optimization_result
579 }
580
581 pub async fn get_metrics(&self) -> StreamingMetrics {
583 self.metrics.read().await.clone()
584 }
585
586 async fn initialize_quality_state(&self) {
588 let mut quality_state = self.quality_state.write().await;
589 let default_level = self.config.quality_adaptation.quality_levels.len() / 2; quality_state.current_level = default_level;
591 }
592
593 async fn start_latency_monitoring(&self) {
595 let is_running = self.is_running.clone();
596 let metrics = self.metrics.clone();
597 let latency_history = self.latency_history.clone();
598
599 tokio::spawn(async move {
600 let mut interval = tokio::time::interval(Duration::from_millis(100));
601
602 loop {
603 interval.tick().await;
604
605 let running = is_running.read().await;
606 if !*running {
607 break;
608 }
609 drop(running);
610
611 let history = latency_history.read().await;
613 if history.len() >= 20 {
614 let mut recent_latencies: Vec<u64> = history
615 .iter()
616 .rev()
617 .take(100)
618 .map(|m| m.latency_ms)
619 .collect();
620 recent_latencies.sort_unstable();
621
622 let p95_index = (recent_latencies.len() as f64 * 0.95) as usize;
623 let p99_index = (recent_latencies.len() as f64 * 0.99) as usize;
624
625 let mut metrics = metrics.write().await;
626 metrics.latency_p95_ms = recent_latencies.get(p95_index).cloned().unwrap_or(0);
627 metrics.latency_p99_ms = recent_latencies.get(p99_index).cloned().unwrap_or(0);
628 }
629 }
630 });
631 }
632
633 async fn start_buffer_monitoring(&self) {
635 let is_running = self.is_running.clone();
636 let buffer_state = self.buffer_state.clone();
637 let metrics = self.metrics.clone();
638 let config = self.config.buffer_config.clone();
639
640 tokio::spawn(async move {
641 let mut interval =
642 tokio::time::interval(Duration::from_millis(config.monitoring_interval_ms));
643
644 loop {
645 interval.tick().await;
646
647 let running = is_running.read().await;
648 if !*running {
649 break;
650 }
651 drop(running);
652
653 let buffer = buffer_state.read().await;
655 let mut metrics = metrics.write().await;
656 metrics.buffer_fill_percent = buffer.fill_percentage;
657 metrics.buffer_underruns = buffer.underrun_count;
658 }
659 });
660 }
661
662 async fn start_quality_adaptation(&self) {
664 let is_running = self.is_running.clone();
665 let quality_state = self.quality_state.clone();
666 let metrics = self.metrics.clone();
667 let config = self.config.quality_adaptation.clone();
668
669 tokio::spawn(async move {
670 let mut interval = tokio::time::interval(Duration::from_millis(500));
671
672 loop {
673 interval.tick().await;
674
675 let running = is_running.read().await;
676 if !*running {
677 break;
678 }
679 drop(running);
680
681 if !config.enabled {
682 continue;
683 }
684
685 let current_metrics = metrics.read().await;
687 let mut quality = quality_state.write().await;
688
689 if current_metrics.current_latency_ms > config.adaptation_threshold_ms {
690 if quality.current_level > config.min_quality_level {
692 let old_level = quality.current_level;
693 let new_level = (quality.current_level - 1).max(config.min_quality_level);
694 quality.current_level = new_level;
695
696 quality.level_history.push_back(QualityChange {
697 timestamp: SystemTime::now()
698 .duration_since(SystemTime::UNIX_EPOCH)
699 .unwrap_or_default()
700 .as_secs(),
701 old_level,
702 new_level,
703 trigger: "high_latency".to_string(),
704 latency_ms: current_metrics.current_latency_ms,
705 });
706
707 quality.adaptation_stats.total_adaptations += 1;
708 tracing::info!(
709 "Reduced quality level from {} to {} due to high latency",
710 old_level,
711 quality.current_level
712 );
713 }
714 }
715 }
716 });
717 }
718
719 async fn start_prefetch_management(&self) {
721 let is_running = self.is_running.clone();
722 let prefetch_cache = self.prefetch_cache.clone();
723 let config = self.config.prefetching.clone();
724
725 tokio::spawn(async move {
726 let mut interval = tokio::time::interval(Duration::from_secs(30));
727
728 loop {
729 interval.tick().await;
730
731 let running = is_running.read().await;
732 if !*running {
733 break;
734 }
735 drop(running);
736
737 if !config.enabled {
738 continue;
739 }
740
741 let mut cache = prefetch_cache.write().await;
743 let current_timestamp = SystemTime::now()
744 .duration_since(UNIX_EPOCH)
745 .unwrap_or_default()
746 .as_secs();
747 let expiry_seconds = 300u64; let expired_keys: Vec<String> = cache
750 .cache
751 .iter()
752 .filter(|(_, entry)| {
753 current_timestamp.saturating_sub(entry.timestamp) > expiry_seconds
754 })
755 .map(|(key, _)| key.clone())
756 .collect();
757
758 for key in expired_keys {
759 if let Some(entry) = cache.cache.remove(&key) {
760 cache.current_size_bytes -= entry.audio_data.len() as u64;
761 cache.lru_order.retain(|k| k != &key);
762 }
763 }
764 }
765 });
766 }
767
768 async fn trigger_latency_optimization(&self) {
770 tracing::warn!("High latency detected, triggering optimization");
772
773 let _ = self
775 .apply_optimization(StreamingOptimization::QualityAdjustment)
776 .await;
777 }
778
779 async fn get_current_quality_level(&self) -> usize {
781 self.quality_state.read().await.current_level
782 }
783
784 async fn get_buffer_fill_percentage(&self) -> f64 {
786 self.buffer_state.read().await.fill_percentage
787 }
788
789 async fn optimize_adaptive_buffering(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
791 tracing::info!("Optimizing adaptive buffering");
792 (25, 0.0, 1024 * 1024, 5.0, true, None) }
794
795 async fn optimize_quality_adjustment(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
796 tracing::info!("Optimizing quality adjustment");
797 (60, -0.15, -512 * 1024, -10.0, true, None) }
799
800 async fn optimize_prefetching(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
801 tracing::info!("Optimizing prefetching");
802 (35, 0.05, 2 * 1024 * 1024, 8.0, true, None) }
804
805 async fn optimize_chunk_size(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
806 tracing::info!("Optimizing chunk size");
807 (20, 0.0, 0, 3.0, true, None) }
809
810 async fn optimize_pipeline_parallelization(
811 &self,
812 ) -> (i64, f64, i64, f64, bool, Option<String>) {
813 tracing::info!("Optimizing pipeline parallelization");
814 (45, 0.0, 512 * 1024, 15.0, true, None) }
816
817 async fn optimize_memory_usage(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
818 tracing::info!("Optimizing memory usage");
819 (15, 0.0, -1024 * 1024, -2.0, true, None) }
821
822 async fn optimize_cpu_affinity(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
823 tracing::info!("Optimizing CPU affinity");
824 (30, 0.0, 0, -5.0, true, None) }
826
827 async fn optimize_gpu_acceleration(&self) -> (i64, f64, i64, f64, bool, Option<String>) {
828 tracing::info!("Optimizing GPU acceleration");
829 (80, 0.1, 4 * 1024 * 1024, -20.0, true, None) }
831}
832
833#[derive(Debug, Clone, Serialize, Deserialize)]
835pub struct StreamingRecommendation {
836 pub optimization: StreamingOptimization,
838 pub priority: u8,
840 pub description: String,
842 pub expected_improvement_ms: u64,
844 pub quality_impact: f64,
846 pub implementation_complexity: ImplementationComplexity,
848}
849
850#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
852pub enum ImplementationComplexity {
853 Low,
855 Medium,
857 High,
859}
860
861impl Default for StreamingMetrics {
863 fn default() -> Self {
864 Self {
865 current_latency_ms: 0,
866 average_latency_ms: 0.0,
867 latency_p95_ms: 0,
868 latency_p99_ms: 0,
869 buffer_fill_percent: 50.0,
870 buffer_underruns: 0,
871 current_quality_level: 2,
872 quality_adaptations: 0,
873 prefetch_hit_rate: 0.0,
874 chunk_throughput: 0.0,
875 pipeline_efficiency: 100.0,
876 real_time_factor: 1.0,
877 }
878 }
879}
880
881impl Default for BufferState {
882 fn default() -> Self {
883 Self {
884 current_size_ms: 200,
885 fill_percentage: 50.0,
886 last_underrun: None,
887 underrun_count: 0,
888 adaptation_history: VecDeque::new(),
889 }
890 }
891}
892
893impl Default for QualityState {
894 fn default() -> Self {
895 Self {
896 current_level: 2,
897 level_history: VecDeque::new(),
898 last_change: None,
899 adaptation_stats: QualityAdaptationStats {
900 total_adaptations: 0,
901 successful_adaptations: 0,
902 avg_latency_improvement_ms: 0.0,
903 stability_score: 100.0,
904 },
905 }
906 }
907}
908
909impl Default for PrefetchCache {
910 fn default() -> Self {
911 Self {
912 cache: HashMap::new(),
913 current_size_bytes: 0,
914 max_size_bytes: 100 * 1024 * 1024, hits: 0,
916 misses: 0,
917 lru_order: VecDeque::new(),
918 }
919 }
920}
921
922impl Default for StreamingOptimizerConfig {
923 fn default() -> Self {
924 Self {
925 enabled: true,
926 target_latency_ms: 100,
927 max_latency_ms: 200,
928 buffer_config: BufferConfig::default(),
929 quality_adaptation: QualityAdaptationConfig::default(),
930 prefetching: PrefetchingConfig::default(),
931 chunk_processing: ChunkProcessingConfig::default(),
932 pipeline_optimization: PipelineOptimizationConfig::default(),
933 }
934 }
935}
936
937impl Default for BufferConfig {
938 fn default() -> Self {
939 Self {
940 initial_buffer_ms: 200,
941 min_buffer_ms: 50,
942 max_buffer_ms: 1000,
943 adaptation_sensitivity: 0.7,
944 adaptive_buffering: true,
945 underrun_recovery: UnderrunRecoveryStrategy::Hybrid,
946 monitoring_interval_ms: 100,
947 }
948 }
949}
950
951impl Default for QualityAdaptationConfig {
952 fn default() -> Self {
953 Self {
954 enabled: true,
955 quality_levels: vec![
956 QualityLevel {
957 level: 0,
958 name: "Low".to_string(),
959 synthesis_time_multiplier: 0.5,
960 quality_score: 0.6,
961 memory_multiplier: 0.7,
962 cpu_multiplier: 0.6,
963 },
964 QualityLevel {
965 level: 1,
966 name: "Medium".to_string(),
967 synthesis_time_multiplier: 0.8,
968 quality_score: 0.8,
969 memory_multiplier: 0.9,
970 cpu_multiplier: 0.8,
971 },
972 QualityLevel {
973 level: 2,
974 name: "High".to_string(),
975 synthesis_time_multiplier: 1.0,
976 quality_score: 1.0,
977 memory_multiplier: 1.0,
978 cpu_multiplier: 1.0,
979 },
980 QualityLevel {
981 level: 3,
982 name: "Ultra".to_string(),
983 synthesis_time_multiplier: 1.5,
984 quality_score: 1.0,
985 memory_multiplier: 1.3,
986 cpu_multiplier: 1.4,
987 },
988 ],
989 adaptation_threshold_ms: 150,
990 adjustment_aggressiveness: 0.6,
991 min_quality_level: 0,
992 recovery_speed: QualityRecoverySpeed::Moderate,
993 }
994 }
995}
996
997impl Default for PrefetchingConfig {
998 fn default() -> Self {
999 Self {
1000 enabled: true,
1001 lookahead_chars: 200,
1002 trigger_threshold: 0.3,
1003 max_concurrent_prefetch: 3,
1004 cache_size_mb: 50,
1005 strategy: PrefetchStrategy::Adaptive,
1006 }
1007 }
1008}
1009
1010impl Default for ChunkProcessingConfig {
1011 fn default() -> Self {
1012 Self {
1013 chunk_size_chars: 100,
1014 chunk_overlap_chars: 10,
1015 parallel_processing: true,
1016 max_parallel_chunks: 4,
1017 priority_scheduling: true,
1018 dynamic_sizing: true,
1019 }
1020 }
1021}
1022
1023impl Default for PipelineOptimizationConfig {
1024 fn default() -> Self {
1025 Self {
1026 pipeline_parallel: true,
1027 pipeline_stages: 4,
1028 stage_skipping: false,
1029 cpu_affinity: true,
1030 gpu_acceleration: false,
1031 memory_optimization: true,
1032 }
1033 }
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038 use super::*;
1039
1040 #[tokio::test]
1041 async fn test_streaming_optimizer_creation() {
1042 let config = StreamingOptimizerConfig::default();
1043 let optimizer = StreamingOptimizer::new(config);
1044
1045 assert!(!*optimizer.is_running.read().await);
1046 }
1047
1048 #[tokio::test]
1049 async fn test_latency_recording() {
1050 let config = StreamingOptimizerConfig::default();
1051 let optimizer = StreamingOptimizer::new(config);
1052
1053 optimizer
1054 .record_latency(150, "test_context".to_string())
1055 .await;
1056
1057 let metrics = optimizer.get_metrics().await;
1058 assert_eq!(metrics.current_latency_ms, 150);
1059 }
1060
1061 #[tokio::test]
1062 async fn test_performance_recommendations() {
1063 let config = StreamingOptimizerConfig::default();
1064 let optimizer = StreamingOptimizer::new(config);
1065
1066 optimizer.record_latency(300, "test".to_string()).await;
1068
1069 let recommendations = optimizer.get_performance_recommendations().await;
1070 assert!(!recommendations.is_empty());
1071
1072 assert!(recommendations
1074 .iter()
1075 .any(|r| r.optimization == StreamingOptimization::QualityAdjustment));
1076 }
1077
1078 #[tokio::test]
1079 async fn test_optimization_application() {
1080 let config = StreamingOptimizerConfig::default();
1081 let optimizer = StreamingOptimizer::new(config);
1082
1083 let result = optimizer
1084 .apply_optimization(StreamingOptimization::AdaptiveBuffering)
1085 .await;
1086
1087 assert!(result.success);
1088 assert!(result.latency_improvement_ms > 0);
1089 }
1090
1091 #[test]
1092 fn test_config_defaults() {
1093 let config = StreamingOptimizerConfig::default();
1094
1095 assert!(config.enabled);
1096 assert_eq!(config.target_latency_ms, 100);
1097 assert_eq!(config.max_latency_ms, 200);
1098 assert!(config.quality_adaptation.enabled);
1099 }
1100
1101 #[test]
1102 fn test_quality_levels() {
1103 let config = QualityAdaptationConfig::default();
1104
1105 assert_eq!(config.quality_levels.len(), 4);
1106 assert_eq!(config.quality_levels[0].name, "Low");
1107 assert_eq!(config.quality_levels[3].name, "Ultra");
1108 }
1109}