1use crate::{Error, Result};
7use candle_core::{Device, Tensor};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, RwLock};
11use std::time::{Duration, Instant};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct RealtimeMLConfig {
16 pub target_latency_ms: f32,
18 pub max_latency_ms: f32,
20 pub optimization_strategy: OptimizationStrategy,
22 pub model_adaptation: ModelAdaptationConfig,
24 pub streaming_config: StreamingOptimizationConfig,
26 pub cache_config: CacheOptimizationConfig,
28 pub parallel_processing: ParallelProcessingConfig,
30 pub quality_speed_tradeoff: f32, }
33
34#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
36pub enum OptimizationStrategy {
37 Conservative,
39 Balanced,
41 Aggressive,
43 Adaptive,
45 Custom,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct ModelAdaptationConfig {
52 pub dynamic_model_switching: bool,
54 pub complexity_adaptation: bool,
56 pub resolution_adaptation: bool,
58 pub layer_pruning: bool,
60 pub quantization_adaptation: bool,
62 pub attention_optimization: bool,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct StreamingOptimizationConfig {
69 pub chunk_size_ms: f32,
71 pub chunk_overlap_ms: f32,
73 pub lookahead_buffer_ms: f32,
75 pub predictive_processing: bool,
77 pub pipeline_parallelism: bool,
79 pub buffer_strategy: BufferStrategy,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct CacheOptimizationConfig {
86 pub intermediate_caching: bool,
88 pub weight_caching: bool,
90 pub graph_caching: bool,
92 pub cache_size_limit_mb: usize,
94 pub eviction_policy: CacheEvictionPolicy,
96 pub precomputation_enabled: bool,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct ParallelProcessingConfig {
103 pub worker_threads: usize,
105 pub gpu_batch_processing: bool,
107 pub simd_optimization: bool,
109 pub memory_parallelism: bool,
111 pub model_parallelism: bool,
113 pub data_parallelism: bool,
115}
116
117#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
119pub enum BufferStrategy {
120 CircularBuffer,
122 DynamicBuffer,
124 DoubleBuffer,
126 TripleBuffer,
128 LockFreeBuffer,
130}
131
132#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
134pub enum CacheEvictionPolicy {
135 LRU,
137 LFU,
139 TTL,
141 CostAware,
143 Predictive,
145}
146
147#[derive(Debug, Default, Clone, Serialize, Deserialize)]
149pub struct RealtimeMetrics {
150 pub current_latency_ms: f32,
152 pub avg_latency_ms: f32,
154 pub p95_latency_ms: f32,
156 pub p99_latency_ms: f32,
158 pub throughput_sps: f32,
160 pub cpu_utilization: f32,
162 pub gpu_utilization: Option<f32>,
164 pub memory_usage_mb: f32,
166 pub cache_hit_rate: f32,
168 pub quality_score: f32,
170 pub optimization_overhead_ms: f32,
172}
173
174#[derive(Debug, Clone)]
176pub struct AdaptiveOptimizationState {
177 pub performance_history: VecDeque<PerformanceSample>,
179 pub optimization_level: f32,
181 pub quality_threshold: f32,
183 pub latency_budget_ms: f32,
185 pub system_load_factor: f32,
187 pub last_adaptation: Instant,
189}
190
191#[derive(Debug, Clone)]
193pub struct PerformanceSample {
194 pub timestamp: Instant,
196 pub latency_ms: f32,
198 pub quality_score: f32,
200 pub resource_usage: ResourceUsage,
202 pub config_snapshot: OptimizationSnapshot,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct ResourceUsage {
209 pub cpu_percent: f32,
211 pub memory_mb: f32,
213 pub gpu_percent: Option<f32>,
215 pub gpu_memory_mb: Option<f32>,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct OptimizationSnapshot {
222 pub model_complexity: f32,
224 pub chunk_size_ms: f32,
226 pub quantization_level: QuantizationLevel,
228 pub parallelism_factor: f32,
230 pub cache_effectiveness: f32,
232}
233
234#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
236pub enum QuantizationLevel {
237 FullPrecision,
239 HalfPrecision,
241 Int8,
243 Int4,
245 Dynamic,
247}
248
249pub struct RealtimeMLOptimizer {
251 config: RealtimeMLConfig,
253 adaptive_state: Arc<RwLock<AdaptiveOptimizationState>>,
255 metrics: Arc<RwLock<RealtimeMetrics>>,
257 optimization_cache: Arc<RwLock<OptimizationCache>>,
259 pipeline: Arc<RwLock<OptimizedPipeline>>,
261 latency_monitor: LatencyMonitor,
263}
264
265#[derive(Debug)]
267struct OptimizationCache {
268 intermediate_cache: HashMap<String, CachedComputation>,
270 weight_cache: HashMap<String, Tensor>,
272 graph_cache: HashMap<String, ComputationGraph>,
274 cache_stats: CacheStatistics,
276}
277
278#[derive(Debug, Clone)]
280struct CachedComputation {
281 result: Tensor,
283 input_hash: u64,
285 timestamp: Instant,
287 access_count: usize,
289 computation_cost: f32,
291}
292
293#[derive(Debug, Clone)]
295struct ComputationGraph {
296 nodes: Vec<GraphNode>,
298 execution_order: Vec<usize>,
300 optimization_level: f32,
302 expected_latency_ms: f32,
304}
305
306#[derive(Debug, Clone)]
308struct GraphNode {
309 id: usize,
311 operation: GraphOperation,
313 inputs: Vec<usize>,
315 output_shape: Vec<usize>,
317 cost_estimate: f32,
319}
320
321#[derive(Debug, Clone)]
323enum GraphOperation {
324 MatMul {
326 transpose_a: bool,
327 transpose_b: bool,
328 },
329 Conv2D {
331 kernel_size: usize,
332 stride: usize,
333 padding: usize,
334 },
335 Activation { function: ActivationFunction },
337 BatchNorm,
339 Attention { num_heads: usize },
341 Residual,
343 Custom {
345 name: String,
346 params: HashMap<String, f32>,
347 },
348}
349
350#[derive(Debug, Clone, Copy)]
352enum ActivationFunction {
353 ReLU,
354 Gelu,
355 Swish,
356 Tanh,
357 Sigmoid,
358}
359
360#[derive(Debug, Default, Clone)]
362struct CacheStatistics {
363 hits: u64,
365 misses: u64,
367 total_size_bytes: u64,
369 effectiveness_score: f32,
371}
372
373#[derive(Debug)]
375struct OptimizedPipeline {
376 stages: Vec<PipelineStage>,
378 parallel_execution: bool,
380 stage_timings: Vec<Duration>,
382 efficiency_score: f32,
384}
385
386#[derive(Debug)]
388struct PipelineStage {
389 name: String,
391 operation: StageOperation,
393 input_requirements: Vec<TensorRequirement>,
395 output_specs: Vec<TensorSpec>,
397 optimization_level: f32,
399}
400
401#[derive(Debug)]
403enum StageOperation {
404 AudioPreprocessing,
406 FeatureExtraction,
408 ModelInference,
410 Postprocessing,
412 Custom(String),
414}
415
416#[derive(Debug, Clone)]
418struct TensorRequirement {
419 name: String,
421 shape: Vec<i64>,
423 data_type: TensorDataType,
425 layout: MemoryLayout,
427}
428
429#[derive(Debug, Clone)]
431struct TensorSpec {
432 name: String,
434 shape: Vec<i64>,
436 data_type: TensorDataType,
438 layout: MemoryLayout,
440}
441
442#[derive(Debug, Clone, Copy)]
444enum TensorDataType {
445 Float32,
446 Float16,
447 Int32,
448 Int8,
449 UInt8,
450}
451
452#[derive(Debug, Clone, Copy)]
454enum MemoryLayout {
455 Contiguous,
457 ChannelFirst,
459 ChannelLast,
461 Custom,
463}
464
465#[derive(Debug)]
467struct LatencyMonitor {
468 latency_samples: VecDeque<f32>,
470 target_latency_ms: f32,
472 latency_budget_ms: f32,
474 window_size: usize,
476}
477
478impl Default for RealtimeMLConfig {
479 fn default() -> Self {
480 Self {
481 target_latency_ms: 50.0,
482 max_latency_ms: 100.0,
483 optimization_strategy: OptimizationStrategy::Balanced,
484 model_adaptation: ModelAdaptationConfig::default(),
485 streaming_config: StreamingOptimizationConfig::default(),
486 cache_config: CacheOptimizationConfig::default(),
487 parallel_processing: ParallelProcessingConfig::default(),
488 quality_speed_tradeoff: 0.7, }
490 }
491}
492
493impl Default for ModelAdaptationConfig {
494 fn default() -> Self {
495 Self {
496 dynamic_model_switching: true,
497 complexity_adaptation: true,
498 resolution_adaptation: true,
499 layer_pruning: false,
500 quantization_adaptation: true,
501 attention_optimization: true,
502 }
503 }
504}
505
506impl Default for StreamingOptimizationConfig {
507 fn default() -> Self {
508 Self {
509 chunk_size_ms: 25.0,
510 chunk_overlap_ms: 5.0,
511 lookahead_buffer_ms: 10.0,
512 predictive_processing: true,
513 pipeline_parallelism: true,
514 buffer_strategy: BufferStrategy::DoubleBuffer,
515 }
516 }
517}
518
519impl Default for CacheOptimizationConfig {
520 fn default() -> Self {
521 Self {
522 intermediate_caching: true,
523 weight_caching: true,
524 graph_caching: true,
525 cache_size_limit_mb: 512,
526 eviction_policy: CacheEvictionPolicy::LRU,
527 precomputation_enabled: true,
528 }
529 }
530}
531
532impl Default for ParallelProcessingConfig {
533 fn default() -> Self {
534 Self {
535 worker_threads: num_cpus::get(),
536 gpu_batch_processing: true,
537 simd_optimization: true,
538 memory_parallelism: true,
539 model_parallelism: false,
540 data_parallelism: true,
541 }
542 }
543}
544
545impl RealtimeMLOptimizer {
546 pub fn new(config: RealtimeMLConfig) -> Self {
548 let adaptive_state = AdaptiveOptimizationState {
549 performance_history: VecDeque::with_capacity(1000),
550 optimization_level: 0.5,
551 quality_threshold: 0.8,
552 latency_budget_ms: config.target_latency_ms,
553 system_load_factor: 0.5,
554 last_adaptation: Instant::now(),
555 };
556
557 Self {
558 config: config.clone(),
559 adaptive_state: Arc::new(RwLock::new(adaptive_state)),
560 metrics: Arc::new(RwLock::new(RealtimeMetrics::default())),
561 optimization_cache: Arc::new(RwLock::new(OptimizationCache {
562 intermediate_cache: HashMap::new(),
563 weight_cache: HashMap::new(),
564 graph_cache: HashMap::new(),
565 cache_stats: CacheStatistics::default(),
566 })),
567 pipeline: Arc::new(RwLock::new(OptimizedPipeline {
568 stages: Vec::new(),
569 parallel_execution: config.parallel_processing.data_parallelism,
570 stage_timings: Vec::new(),
571 efficiency_score: 0.8,
572 })),
573 latency_monitor: LatencyMonitor {
574 latency_samples: VecDeque::with_capacity(100),
575 target_latency_ms: config.target_latency_ms,
576 latency_budget_ms: config.max_latency_ms - config.target_latency_ms,
577 window_size: 100,
578 },
579 }
580 }
581
582 pub fn optimize_computation(
584 &self,
585 input: &Tensor,
586 operation: &str,
587 parameters: &HashMap<String, f32>,
588 ) -> Result<Tensor> {
589 let start_time = Instant::now();
590
591 let cache_key = self.generate_cache_key(input, operation, parameters)?;
593 if let Some(cached_result) = self.get_cached_computation(&cache_key)? {
594 self.update_metrics(start_time.elapsed(), true)?;
595 return Ok(cached_result);
596 }
597
598 let optimization_level = self.determine_optimization_level()?;
600
601 let optimized_input = self.apply_input_optimizations(input, optimization_level)?;
603
604 let result = self.execute_optimized_computation(
606 &optimized_input,
607 operation,
608 parameters,
609 optimization_level,
610 )?;
611
612 self.cache_computation(cache_key, &result, start_time)?;
614
615 self.update_metrics(start_time.elapsed(), false)?;
617
618 self.adaptive_optimization_check(start_time.elapsed(), &result)?;
620
621 Ok(result)
622 }
623
624 pub fn optimize_streaming(
626 &self,
627 input_stream: &[Tensor],
628 operation: &str,
629 parameters: &HashMap<String, f32>,
630 ) -> Result<Vec<Tensor>> {
631 let chunk_size = (self.config.streaming_config.chunk_size_ms / 1000.0 * 22050.0) as usize; let overlap_size =
633 (self.config.streaming_config.chunk_overlap_ms / 1000.0 * 22050.0) as usize;
634
635 let mut results = Vec::new();
636 let mut chunk_start = 0;
637
638 while chunk_start < input_stream.len() {
639 let chunk_end = (chunk_start + chunk_size).min(input_stream.len());
640 let chunk = &input_stream[chunk_start..chunk_end];
641
642 let chunk_result = self.process_streaming_chunk(chunk, operation, parameters)?;
644 results.extend(chunk_result);
645
646 chunk_start += chunk_size - overlap_size;
648 }
649
650 Ok(results)
651 }
652
653 fn generate_cache_key(
655 &self,
656 input: &Tensor,
657 operation: &str,
658 parameters: &HashMap<String, f32>,
659 ) -> Result<String> {
660 use std::collections::hash_map::DefaultHasher;
661 use std::hash::{Hash, Hasher};
662
663 let mut hasher = DefaultHasher::new();
664
665 for dim in input.shape().dims() {
667 dim.hash(&mut hasher);
668 }
669 operation.hash(&mut hasher);
670
671 for (key, value) in parameters {
673 key.hash(&mut hasher);
674 value.to_bits().hash(&mut hasher);
675 }
676
677 Ok(format!("{}_{:x}", operation, hasher.finish()))
678 }
679
680 fn get_cached_computation(&self, cache_key: &str) -> Result<Option<Tensor>> {
682 let cache = self.optimization_cache.read().map_err(|_| {
683 Error::runtime("Failed to acquire read lock on optimization cache".to_string())
684 })?;
685
686 if let Some(cached) = cache.intermediate_cache.get(cache_key) {
687 let cache_age = cached.timestamp.elapsed();
689 if cache_age < Duration::from_secs(300) {
690 return Ok(Some(cached.result.clone()));
692 }
693 }
694
695 Ok(None)
696 }
697
698 fn determine_optimization_level(&self) -> Result<f32> {
700 match self.config.optimization_strategy {
701 OptimizationStrategy::Conservative => Ok(0.3),
702 OptimizationStrategy::Balanced => Ok(0.6),
703 OptimizationStrategy::Aggressive => Ok(0.9),
704 OptimizationStrategy::Adaptive => {
705 let state = self.adaptive_state.read().map_err(|_| {
706 Error::runtime("Failed to acquire read lock on adaptive state".to_string())
707 })?;
708 Ok(state.optimization_level)
709 }
710 OptimizationStrategy::Custom => Ok(self.config.quality_speed_tradeoff),
711 }
712 }
713
714 fn apply_input_optimizations(&self, input: &Tensor, optimization_level: f32) -> Result<Tensor> {
716 let mut optimized = input.clone();
717
718 if optimization_level > 0.5 && self.config.model_adaptation.quantization_adaptation {
720 optimized = self.apply_quantization(&optimized, optimization_level)?;
721 }
722
723 if optimization_level > 0.7 && self.config.model_adaptation.resolution_adaptation {
725 optimized = self.apply_resolution_adaptation(&optimized, optimization_level)?;
726 }
727
728 Ok(optimized)
729 }
730
731 fn apply_quantization(&self, tensor: &Tensor, optimization_level: f32) -> Result<Tensor> {
734 let quantization_level = if optimization_level > 0.8 {
736 QuantizationLevel::Int8
737 } else if optimization_level > 0.6 {
738 QuantizationLevel::HalfPrecision
739 } else {
740 QuantizationLevel::FullPrecision
741 };
742
743 match quantization_level {
744 QuantizationLevel::FullPrecision => Ok(tensor.clone()),
745 QuantizationLevel::HalfPrecision => {
746 tensor.to_dtype(candle_core::DType::F16).map_err(|e| {
748 Error::processing(format!("Failed to convert to half precision: {e}"))
749 })
750 }
751 QuantizationLevel::Int8 => {
752 self.quantize_to_int8_symmetric(tensor)
755 }
756 QuantizationLevel::Dynamic => {
757 self.quantize_to_int8_dynamic(tensor)
759 }
760 QuantizationLevel::Int4 => {
761 self.quantize_to_int8_symmetric(tensor)
763 }
764 }
765 }
766
767 fn quantize_to_int8_symmetric(&self, tensor: &Tensor) -> Result<Tensor> {
770 use candle_core::Tensor as CandleTensor;
771
772 let abs_tensor = tensor
774 .abs()
775 .map_err(|e| Error::processing(format!("Failed to compute absolute values: {e}")))?;
776
777 let max_val = abs_tensor
778 .max(0)
779 .map_err(|e| Error::processing(format!("Failed to find max value: {e}")))?;
780
781 let scale = (max_val / 127.0)
784 .map_err(|e| Error::processing(format!("Failed to compute scale: {e}")))?;
785
786 let quantized = (tensor / &scale)
788 .map_err(|e| Error::processing(format!("Failed to scale tensor: {e}")))?;
789
790 let quantized = quantized
791 .round()
792 .map_err(|e| Error::processing(format!("Failed to round tensor: {e}")))?;
793
794 let quantized = quantized
796 .clamp(-127.0, 127.0)
797 .map_err(|e| Error::processing(format!("Failed to clamp tensor: {e}")))?;
798
799 let dequantized = (quantized * scale)
801 .map_err(|e| Error::processing(format!("Failed to dequantize tensor: {e}")))?;
802
803 Ok(dequantized)
804 }
805
806 fn quantize_to_int8_dynamic(&self, tensor: &Tensor) -> Result<Tensor> {
809 use candle_core::Tensor as CandleTensor;
810
811 let shape = tensor.shape();
812 if shape.dims().is_empty() {
813 return self.quantize_to_int8_symmetric(tensor);
814 }
815
816 let channel_dim = if shape.dims().len() > 1 { 1 } else { 0 };
821
822 let min_vals = tensor
824 .min(channel_dim)
825 .map_err(|e| Error::processing(format!("Failed to compute min values: {e}")))?;
826
827 let max_vals = tensor
828 .max(channel_dim)
829 .map_err(|e| Error::processing(format!("Failed to compute max values: {e}")))?;
830
831 let range = (&max_vals - &min_vals)
833 .map_err(|e| Error::processing(format!("Failed to compute range: {e}")))?;
834
835 let scale = (range / 255.0)
836 .map_err(|e| Error::processing(format!("Failed to compute scale: {e}")))?;
837
838 let scale =
840 (scale + 1e-8).map_err(|e| Error::processing(format!("Failed to add epsilon: {e}")))?;
841
842 let zero_point = (&min_vals / &scale)
843 .map_err(|e| Error::processing(format!("Failed to compute zero point: {e}")))?;
844
845 let zero_point = zero_point
846 .neg()
847 .map_err(|e| Error::processing(format!("Failed to negate zero point: {e}")))?;
848
849 let quantized = (tensor / &scale)
851 .map_err(|e| Error::processing(format!("Failed to scale tensor: {e}")))?;
852
853 let quantized = (&quantized + &zero_point)
854 .map_err(|e| Error::processing(format!("Failed to add zero point: {e}")))?;
855
856 let quantized = quantized
857 .round()
858 .map_err(|e| Error::processing(format!("Failed to round tensor: {e}")))?;
859
860 let quantized = quantized
862 .clamp(0.0, 255.0)
863 .map_err(|e| Error::processing(format!("Failed to clamp tensor: {e}")))?;
864
865 let dequantized = (&quantized - &zero_point)
867 .map_err(|e| Error::processing(format!("Failed to subtract zero point: {e}")))?;
868
869 let dequantized = (dequantized * scale)
870 .map_err(|e| Error::processing(format!("Failed to dequantize tensor: {e}")))?;
871
872 Ok(dequantized)
873 }
874
875 fn apply_resolution_adaptation(
877 &self,
878 tensor: &Tensor,
879 optimization_level: f32,
880 ) -> Result<Tensor> {
881 if optimization_level < 0.7 {
882 return Ok(tensor.clone());
883 }
884
885 Ok(tensor.clone())
888 }
889
890 fn execute_optimized_computation(
892 &self,
893 input: &Tensor,
894 operation: &str,
895 parameters: &HashMap<String, f32>,
896 optimization_level: f32,
897 ) -> Result<Tensor> {
898 match operation {
900 "conv2d" => self.execute_optimized_conv2d(input, parameters, optimization_level),
901 "linear" => self.execute_optimized_linear(input, parameters, optimization_level),
902 "attention" => self.execute_optimized_attention(input, parameters, optimization_level),
903 _ => Ok(input.clone()), }
905 }
906
907 fn execute_optimized_conv2d(
909 &self,
910 input: &Tensor,
911 parameters: &HashMap<String, f32>,
912 optimization_level: f32,
913 ) -> Result<Tensor> {
914 Ok(input.clone())
917 }
918
919 fn execute_optimized_linear(
921 &self,
922 input: &Tensor,
923 parameters: &HashMap<String, f32>,
924 optimization_level: f32,
925 ) -> Result<Tensor> {
926 Ok(input.clone())
929 }
930
931 fn execute_optimized_attention(
933 &self,
934 input: &Tensor,
935 parameters: &HashMap<String, f32>,
936 optimization_level: f32,
937 ) -> Result<Tensor> {
938 Ok(input.clone())
941 }
942
943 fn cache_computation(
945 &self,
946 cache_key: String,
947 result: &Tensor,
948 start_time: Instant,
949 ) -> Result<()> {
950 let mut cache = self.optimization_cache.write().map_err(|_| {
951 Error::runtime("Failed to acquire write lock on optimization cache".to_string())
952 })?;
953
954 let cached_computation = CachedComputation {
955 result: result.clone(),
956 input_hash: 0, timestamp: Instant::now(),
958 access_count: 1,
959 computation_cost: start_time.elapsed().as_millis() as f32,
960 };
961
962 cache
963 .intermediate_cache
964 .insert(cache_key, cached_computation);
965
966 cache.cache_stats.total_size_bytes += (result.elem_count() * 4) as u64; Ok(())
970 }
971
972 fn process_streaming_chunk(
974 &self,
975 chunk: &[Tensor],
976 operation: &str,
977 parameters: &HashMap<String, f32>,
978 ) -> Result<Vec<Tensor>> {
979 let mut results = Vec::new();
980
981 for tensor in chunk {
982 let result = self.optimize_computation(tensor, operation, parameters)?;
983 results.push(result);
984 }
985
986 Ok(results)
987 }
988
989 fn update_metrics(&self, elapsed: Duration, cache_hit: bool) -> Result<()> {
991 let mut metrics = self
992 .metrics
993 .write()
994 .map_err(|_| Error::runtime("Failed to acquire write lock on metrics".to_string()))?;
995
996 let latency_ms = elapsed.as_millis() as f32;
997 metrics.current_latency_ms = latency_ms;
998
999 metrics.avg_latency_ms = (metrics.avg_latency_ms * 0.9) + (latency_ms * 0.1);
1001
1002 if cache_hit {
1004 metrics.cache_hit_rate = (metrics.cache_hit_rate * 0.9) + 0.1;
1005 } else {
1006 metrics.cache_hit_rate *= 0.9;
1007 }
1008
1009 Ok(())
1010 }
1011
1012 fn adaptive_optimization_check(&self, elapsed: Duration, result: &Tensor) -> Result<()> {
1014 if !matches!(
1015 self.config.optimization_strategy,
1016 OptimizationStrategy::Adaptive
1017 ) {
1018 return Ok(());
1019 }
1020
1021 let mut state = self.adaptive_state.write().map_err(|_| {
1022 Error::runtime("Failed to acquire write lock on adaptive state".to_string())
1023 })?;
1024
1025 let sample = PerformanceSample {
1027 timestamp: Instant::now(),
1028 latency_ms: elapsed.as_millis() as f32,
1029 quality_score: 0.8, resource_usage: ResourceUsage {
1031 cpu_percent: 50.0, memory_mb: 100.0,
1033 gpu_percent: Some(30.0),
1034 gpu_memory_mb: Some(200.0),
1035 },
1036 config_snapshot: OptimizationSnapshot {
1037 model_complexity: 0.8,
1038 chunk_size_ms: self.config.streaming_config.chunk_size_ms,
1039 quantization_level: QuantizationLevel::HalfPrecision,
1040 parallelism_factor: 0.7,
1041 cache_effectiveness: 0.6,
1042 },
1043 };
1044
1045 state.performance_history.push_back(sample);
1046
1047 if state.performance_history.len() > 1000 {
1049 state.performance_history.pop_front();
1050 }
1051
1052 let latency_ms = elapsed.as_millis() as f32;
1054 if latency_ms > self.config.target_latency_ms {
1055 state.optimization_level = (state.optimization_level + 0.1).min(1.0);
1057 state.last_adaptation = Instant::now();
1058 } else if latency_ms < self.config.target_latency_ms * 0.7 && state.optimization_level > 0.3
1059 {
1060 state.optimization_level = (state.optimization_level - 0.05).max(0.1);
1062 state.last_adaptation = Instant::now();
1063 }
1064
1065 Ok(())
1066 }
1067
1068 pub fn get_metrics(&self) -> Result<RealtimeMetrics> {
1070 let metrics = self
1071 .metrics
1072 .read()
1073 .map_err(|_| Error::runtime("Failed to acquire read lock on metrics".to_string()))?;
1074
1075 Ok(metrics.clone())
1076 }
1077
1078 pub fn get_adaptive_state(&self) -> Result<AdaptiveOptimizationState> {
1080 let state = self.adaptive_state.read().map_err(|_| {
1081 Error::runtime("Failed to acquire read lock on adaptive state".to_string())
1082 })?;
1083
1084 Ok(state.clone())
1085 }
1086
1087 pub fn update_config(&mut self, new_config: RealtimeMLConfig) -> Result<()> {
1089 self.config = new_config;
1090
1091 let mut state = self.adaptive_state.write().map_err(|_| {
1093 Error::runtime("Failed to acquire write lock on adaptive state".to_string())
1094 })?;
1095
1096 state.latency_budget_ms = self.config.target_latency_ms;
1097
1098 Ok(())
1099 }
1100}
1101
1102#[cfg(test)]
1103mod tests {
1104 use super::*;
1105 use candle_core::{Device, Tensor};
1106
1107 #[test]
1108 fn test_realtime_ml_config_default() {
1109 let config = RealtimeMLConfig::default();
1110 assert_eq!(config.target_latency_ms, 50.0);
1111 assert_eq!(config.max_latency_ms, 100.0);
1112 assert!(config.model_adaptation.dynamic_model_switching);
1113 }
1114
1115 #[test]
1116 fn test_optimization_strategy() {
1117 let conservative = OptimizationStrategy::Conservative;
1118 let aggressive = OptimizationStrategy::Aggressive;
1119
1120 assert!(matches!(conservative, OptimizationStrategy::Conservative));
1121 assert!(matches!(aggressive, OptimizationStrategy::Aggressive));
1122 }
1123
1124 #[tokio::test]
1125 async fn test_realtime_optimizer_creation() {
1126 let config = RealtimeMLConfig::default();
1127 let optimizer = RealtimeMLOptimizer::new(config);
1128
1129 let metrics = optimizer.get_metrics().unwrap();
1130 assert_eq!(metrics.current_latency_ms, 0.0);
1131 }
1132
1133 #[test]
1134 fn test_quantization_levels() {
1135 let levels = [
1136 QuantizationLevel::FullPrecision,
1137 QuantizationLevel::HalfPrecision,
1138 QuantizationLevel::Int8,
1139 QuantizationLevel::Dynamic,
1140 ];
1141
1142 assert_eq!(levels.len(), 4);
1143 }
1144
1145 #[test]
1146 fn test_buffer_strategies() {
1147 let strategies = [
1148 BufferStrategy::CircularBuffer,
1149 BufferStrategy::DoubleBuffer,
1150 BufferStrategy::TripleBuffer,
1151 BufferStrategy::LockFreeBuffer,
1152 ];
1153
1154 assert_eq!(strategies.len(), 4);
1155 }
1156
1157 #[test]
1158 fn test_cache_eviction_policies() {
1159 let policies = [
1160 CacheEvictionPolicy::LRU,
1161 CacheEvictionPolicy::LFU,
1162 CacheEvictionPolicy::TTL,
1163 CacheEvictionPolicy::CostAware,
1164 ];
1165
1166 assert_eq!(policies.len(), 4);
1167 }
1168}