scirs2_io/pipeline/advanced_optimization/
performance.rs1use crate::error::{IoError, Result};
7use chrono::{DateTime, Utc};
8use std::collections::HashMap;
9
10use super::config::{
11 AutoTuningParameters, ExecutionRecord, OptimizedPipelineConfig, PipelinePerformanceMetrics,
12 RegressionDetector, SystemMetrics,
13};
14
15#[derive(Debug)]
17pub struct PerformanceHistory {
18 executions: Vec<ExecutionRecord>,
19 pipeline_profiles: HashMap<String, PipelineProfile>,
20 max_history_size: usize,
21}
22
23impl Default for PerformanceHistory {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl PerformanceHistory {
30 pub fn new() -> Self {
31 Self {
32 executions: Vec::new(),
33 pipeline_profiles: HashMap::new(),
34 max_history_size: 10000,
35 }
36 }
37
38 pub fn record_execution(
39 &mut self,
40 pipeline_id: &str,
41 config: &OptimizedPipelineConfig,
42 metrics: &PipelinePerformanceMetrics,
43 ) -> Result<()> {
44 let record = ExecutionRecord {
45 timestamp: Utc::now(),
46 pipeline_id: pipeline_id.to_string(),
47 config: config.clone(),
48 metrics: metrics.clone(),
49 };
50
51 self.executions.push(record);
52
53 if self.executions.len() > self.max_history_size {
55 self.executions.remove(0);
56 }
57
58 self.update_pipeline_profile(pipeline_id, config, metrics);
60
61 Ok(())
62 }
63
64 pub fn get_similar_configurations(
65 &self,
66 pipeline_id: &str,
67 data_size: usize,
68 ) -> Vec<&ExecutionRecord> {
69 let size_threshold = 0.2; self.executions
72 .iter()
73 .filter(|record| {
74 record.pipeline_id == pipeline_id
75 && (record.metrics.data_size as f64 - data_size as f64).abs()
76 / (data_size as f64)
77 < size_threshold
78 })
79 .collect()
80 }
81
82 fn update_pipeline_profile(
83 &mut self,
84 pipeline_id: &str,
85 config: &OptimizedPipelineConfig,
86 metrics: &PipelinePerformanceMetrics,
87 ) {
88 let profile = self
89 .pipeline_profiles
90 .entry(pipeline_id.to_string())
91 .or_insert_with(|| PipelineProfile::new(pipeline_id));
92
93 profile.update(config, metrics);
94 }
95
96 pub fn get_pipeline_profile(&self, pipeline_id: &str) -> Option<&PipelineProfile> {
97 self.pipeline_profiles.get(pipeline_id)
98 }
99
100 pub fn get_best_configurations(
101 &self,
102 pipeline_id: &str,
103 limit: usize,
104 ) -> Vec<&ExecutionRecord> {
105 let mut records: Vec<&ExecutionRecord> = self
106 .executions
107 .iter()
108 .filter(|record| record.pipeline_id == pipeline_id)
109 .collect();
110
111 records.sort_by(|a, b| {
112 b.metrics
113 .throughput
114 .partial_cmp(&a.metrics.throughput)
115 .unwrap()
116 });
117
118 records.into_iter().take(limit).collect()
119 }
120}
121
122#[derive(Debug)]
124pub struct PipelineProfile {
125 pub pipeline_id: String,
126 pub execution_count: usize,
127 pub avg_throughput: f64,
128 pub avg_memory_usage: f64,
129 pub avg_cpu_utilization: f64,
130 pub optimal_configurations: Vec<OptimizedPipelineConfig>,
131 pub performance_regression_detector: RegressionDetector,
132}
133
134impl PipelineProfile {
135 pub fn new(pipeline_id: &str) -> Self {
136 Self {
137 pipeline_id: pipeline_id.to_string(),
138 execution_count: 0,
139 avg_throughput: 0.0,
140 avg_memory_usage: 0.0,
141 avg_cpu_utilization: 0.0,
142 optimal_configurations: Vec::new(),
143 performance_regression_detector: RegressionDetector::new(),
144 }
145 }
146
147 pub fn update(
148 &mut self,
149 config: &OptimizedPipelineConfig,
150 metrics: &PipelinePerformanceMetrics,
151 ) {
152 self.execution_count += 1;
153
154 let weight = 1.0 / self.execution_count as f64;
156 self.avg_throughput += weight * (metrics.throughput - self.avg_throughput);
157 self.avg_memory_usage +=
158 weight * (metrics.peak_memory_usage as f64 - self.avg_memory_usage);
159 self.avg_cpu_utilization += weight * (metrics.cpu_utilization - self.avg_cpu_utilization);
160
161 self.performance_regression_detector
163 .check_regression(metrics);
164
165 if self.is_better_configuration(config, metrics) {
167 self.optimal_configurations.push(config.clone());
168 if self.optimal_configurations.len() > 5 {
170 self.optimal_configurations.remove(0);
171 }
172 }
173 }
174
175 fn is_better_configuration(
176 &self,
177 _config: &OptimizedPipelineConfig,
178 metrics: &PipelinePerformanceMetrics,
179 ) -> bool {
180 let score = metrics.throughput * 0.5
182 + (1.0 / metrics.peak_memory_usage as f64) * 0.3
183 + metrics.cpu_utilization * 0.2;
184
185 let avg_score = self.avg_throughput * 0.5
187 + (1.0 / self.avg_memory_usage) * 0.3
188 + self.avg_cpu_utilization * 0.2;
189
190 score > avg_score * 1.1 }
192
193 pub fn get_performance_trend(&self) -> PerformanceTrend {
194 PerformanceTrend {
196 direction: TrendDirection::Stable,
197 magnitude: 0.0,
198 confidence: 0.8,
199 }
200 }
201}
202
203#[derive(Debug)]
204pub struct PerformanceTrend {
205 pub direction: TrendDirection,
206 pub magnitude: f64,
207 pub confidence: f64,
208}
209
210#[derive(Debug)]
211pub enum TrendDirection {
212 Improving,
213 Degrading,
214 Stable,
215}
216
217#[derive(Debug)]
219pub struct AutoTuner {
220 feature_weights: Vec<f64>,
222 learning_rate: f64,
224 training_data: Vec<TrainingExample>,
226 max_training_data: usize,
228}
229
230impl Default for AutoTuner {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl AutoTuner {
237 pub fn new() -> Self {
238 Self {
239 feature_weights: vec![0.1; 10], learning_rate: 0.01,
241 training_data: Vec::new(),
242 max_training_data: 1000,
243 }
244 }
245
246 pub fn optimize_parameters(
247 &mut self,
248 system_metrics: &SystemMetrics,
249 historical_data: &[&ExecutionRecord],
250 estimated_data_size: usize,
251 ) -> Result<AutoTuningParameters> {
252 let features = self.extract_features(system_metrics, historical_data, estimated_data_size);
254
255 let predicted_params = self.predict_optimal_parameters(&features)?;
257
258 Ok(predicted_params)
259 }
260
261 pub fn update_model(
262 &mut self,
263 config: &OptimizedPipelineConfig,
264 metrics: &PipelinePerformanceMetrics,
265 ) -> Result<()> {
266 let training_example = TrainingExample {
268 features: self.config_to_features(config),
269 performance_score: metrics.throughput,
270 };
271
272 self.training_data.push(training_example);
273
274 if self.training_data.len() > self.max_training_data {
276 self.training_data.remove(0);
277 }
278
279 self.update_weights()?;
281
282 Ok(())
283 }
284
285 fn extract_features(
286 &self,
287 system_metrics: &SystemMetrics,
288 historical_data: &[&ExecutionRecord],
289 estimated_data_size: usize,
290 ) -> Vec<f64> {
291 let mut features = Vec::new();
292
293 features.push(system_metrics.cpu_usage);
295 features.push(system_metrics.memory_usage.utilization);
296 features.push(system_metrics.io_utilization);
297 features.push(system_metrics.cache_performance.l1_hit_rate);
298
299 features.push((estimated_data_size as f64).ln() / 20.0); if !historical_data.is_empty() {
304 let avg_throughput: f64 = historical_data
305 .iter()
306 .map(|record| record.metrics.throughput)
307 .sum::<f64>()
308 / historical_data.len() as f64;
309 features.push(avg_throughput / 1000.0); } else {
311 features.push(0.0);
312 }
313
314 while features.len() < self.feature_weights.len() {
316 features.push(0.0);
317 }
318
319 features
320 }
321
322 fn predict_optimal_parameters(&self, features: &[f64]) -> Result<AutoTuningParameters> {
323 let prediction_score: f64 = features
325 .iter()
326 .zip(self.feature_weights.iter())
327 .map(|(f, w)| f * w)
328 .sum();
329
330 let thread_count = ((prediction_score * 8.0).exp() as usize).clamp(1, num_cpus::get());
332 let chunk_size = ((prediction_score * 1000.0).abs() as usize).clamp(256, 8192);
333 let simd_enabled = prediction_score > 0.0;
334 let gpu_enabled = prediction_score > 0.5 && self.is_gpu_beneficial(features);
335
336 Ok(AutoTuningParameters {
337 thread_count,
338 chunk_size,
339 simd_enabled,
340 gpu_enabled,
341 prefetch_strategy: super::config::PrefetchStrategy::Sequential { distance: 64 },
342 compression_level: (prediction_score * 9.0).abs() as u8,
343 io_buffer_size: ((prediction_score * 64.0 * 1024.0).abs() as usize)
344 .clamp(4096, 1024 * 1024),
345 batch_processing: super::config::BatchProcessingMode::Disabled,
346 })
347 }
348
349 fn config_to_features(&self, config: &OptimizedPipelineConfig) -> Vec<f64> {
350 let mut features = Vec::new();
351 features.push(config.thread_count as f64 / num_cpus::get() as f64);
352 features.push((config.chunk_size as f64).ln() / 10.0);
353 features.push(if config.simd_optimization { 1.0 } else { 0.0 });
354 features.push(if config.gpu_acceleration { 1.0 } else { 0.0 });
355 features.push(config.compression_level as f64 / 9.0);
356
357 while features.len() < self.feature_weights.len() {
359 features.push(0.0);
360 }
361
362 features
363 }
364
365 fn update_weights(&mut self) -> Result<()> {
366 if self.training_data.len() < 10 {
367 return Ok(()); }
369
370 for example in &self.training_data {
372 let predicted = self.predict_score(&example.features);
373 let error = example.performance_score - predicted;
374
375 for (i, &feature) in example.features.iter().enumerate() {
377 if i < self.feature_weights.len() {
378 self.feature_weights[i] += self.learning_rate * error * feature;
379 }
380 }
381 }
382
383 Ok(())
384 }
385
386 fn predict_score(&self, features: &[f64]) -> f64 {
387 features
388 .iter()
389 .zip(self.feature_weights.iter())
390 .map(|(f, w)| f * w)
391 .sum()
392 }
393
394 fn is_gpu_beneficial(&self, features: &[f64]) -> bool {
395 if features.len() >= 5 {
397 features[4] > 0.5 } else {
399 false
400 }
401 }
402
403 pub fn get_model_accuracy(&self) -> f64 {
404 if self.training_data.len() < 10 {
405 return 0.0;
406 }
407
408 let mut total_error = 0.0;
409 for example in &self.training_data {
410 let predicted = self.predict_score(&example.features);
411 let error = (example.performance_score - predicted).abs();
412 total_error += error;
413 }
414
415 let mean_error = total_error / self.training_data.len() as f64;
416 let mean_performance: f64 = self
417 .training_data
418 .iter()
419 .map(|e| e.performance_score)
420 .sum::<f64>()
421 / self.training_data.len() as f64;
422
423 if mean_performance > 0.0 {
424 1.0 - (mean_error / mean_performance)
425 } else {
426 0.0
427 }
428 }
429}
430
431#[derive(Debug, Clone)]
433struct TrainingExample {
434 features: Vec<f64>,
435 performance_score: f64,
436}
437
438#[derive(Debug)]
440pub struct PerformancePredictor {
441 throughput_model: LinearRegressionModel,
443 memory_model: LinearRegressionModel,
445 cpu_model: LinearRegressionModel,
447}
448
449impl PerformancePredictor {
450 pub fn new() -> Self {
451 Self {
452 throughput_model: LinearRegressionModel::new(8),
453 memory_model: LinearRegressionModel::new(8),
454 cpu_model: LinearRegressionModel::new(8),
455 }
456 }
457
458 pub fn predict_performance(
459 &self,
460 config: &OptimizedPipelineConfig,
461 data_size: usize,
462 ) -> PipelinePerformanceMetrics {
463 let features = self.extract_prediction_features(config, data_size);
464
465 let predicted_throughput = self.throughput_model.predict(&features).max(0.0);
466 let predicted_memory = self.memory_model.predict(&features).max(0.0) as usize;
467 let predicted_cpu = self.cpu_model.predict(&features).clamp(0.0, 1.0);
468
469 PipelinePerformanceMetrics {
470 throughput: predicted_throughput,
471 peak_memory_usage: predicted_memory,
472 cpu_utilization: predicted_cpu,
473 data_size,
474 ..Default::default()
475 }
476 }
477
478 fn extract_prediction_features(
479 &self,
480 config: &OptimizedPipelineConfig,
481 data_size: usize,
482 ) -> Vec<f64> {
483 vec![
484 config.thread_count as f64,
485 (config.chunk_size as f64).ln(),
486 if config.simd_optimization { 1.0 } else { 0.0 },
487 if config.gpu_acceleration { 1.0 } else { 0.0 },
488 (data_size as f64).ln(),
489 config.compression_level as f64,
490 (config.io_buffer_size as f64).ln(),
491 match config.batch_processing {
492 super::config::BatchProcessingMode::Disabled => 0.0,
493 _ => 1.0,
494 },
495 ]
496 }
497}
498
499impl Default for PerformancePredictor {
500 fn default() -> Self {
501 Self::new()
502 }
503}
504
505#[derive(Debug)]
507struct LinearRegressionModel {
508 weights: Vec<f64>,
509 bias: f64,
510}
511
512impl LinearRegressionModel {
513 fn new(num_features: usize) -> Self {
514 Self {
515 weights: vec![0.1; num_features],
516 bias: 0.0,
517 }
518 }
519
520 fn predict(&self, features: &[f64]) -> f64 {
521 let prediction: f64 = features
522 .iter()
523 .zip(self.weights.iter())
524 .map(|(f, w)| f * w)
525 .sum();
526 prediction + self.bias
527 }
528}