1use chrono::{DateTime, Utc};
8use scirs2_core::random::{thread_rng, Rng};
9use serde::{Deserialize, Serialize};
10use std::collections::{BTreeMap, HashMap};
11use std::sync::{Arc, Mutex};
12use std::thread;
13use std::time::Duration;
14
15#[derive(Debug, Clone)]
17pub struct PerformanceProfiler {
18 config: ProfilerConfig,
19 active_sessions: Arc<Mutex<HashMap<String, ProfileSession>>>,
20 completed_sessions: Arc<Mutex<Vec<ProfileSession>>>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct ProfilerConfig {
26 pub enable_timing: bool,
28 pub enable_memory_tracking: bool,
30 pub enable_cpu_monitoring: bool,
32 pub enable_gpu_monitoring: bool,
34 pub sample_interval_ms: u64,
36 pub max_sessions: usize,
38 pub enable_bottleneck_detection: bool,
40 pub enable_optimization_hints: bool,
42}
43
44impl Default for ProfilerConfig {
45 fn default() -> Self {
46 Self {
47 enable_timing: true,
48 enable_memory_tracking: true,
49 enable_cpu_monitoring: true,
50 enable_gpu_monitoring: false,
51 sample_interval_ms: 100,
52 max_sessions: 100,
53 enable_bottleneck_detection: true,
54 enable_optimization_hints: true,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct ProfileSession {
62 pub session_id: String,
63 pub pipeline_name: String,
64 pub start_time: DateTime<Utc>,
65 pub end_time: Option<DateTime<Utc>>,
66 pub stages: BTreeMap<String, StageProfile>,
67 pub overall_metrics: OverallMetrics,
68 pub bottlenecks: Vec<Bottleneck>,
69 pub optimization_hints: Vec<OptimizationHint>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct StageProfile {
75 pub stage_name: String,
76 pub component_type: String,
77 pub start_time: DateTime<Utc>,
78 pub end_time: Option<DateTime<Utc>>,
79 pub execution_time: Duration,
80 pub memory_samples: Vec<MemorySample>,
81 pub cpu_samples: Vec<CpuSample>,
82 pub gpu_samples: Vec<GpuSample>,
83 pub input_shape: Option<(usize, usize)>,
84 pub output_shape: Option<(usize, usize)>,
85 pub parameters: HashMap<String, String>,
86 pub error_count: u32,
87 pub warning_count: u32,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct OverallMetrics {
93 pub total_execution_time: Duration,
94 pub peak_memory_usage_mb: f64,
95 pub average_cpu_usage: f64,
96 pub average_gpu_usage: f64,
97 pub total_data_processed_mb: f64,
98 pub throughput_samples_per_second: f64,
99 pub cache_hit_ratio: f64,
100 pub parallel_efficiency: f64,
101 pub pipeline_stages: usize,
102 pub data_transformations: usize,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct MemorySample {
108 pub timestamp: DateTime<Utc>,
109 pub heap_usage_mb: f64,
110 pub stack_usage_mb: f64,
111 pub gpu_memory_mb: f64,
112 pub virtual_memory_mb: f64,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct CpuSample {
118 pub timestamp: DateTime<Utc>,
119 pub overall_usage: f64,
120 pub user_usage: f64,
121 pub system_usage: f64,
122 pub core_usage: Vec<f64>,
123 pub thread_count: u32,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct GpuSample {
129 pub timestamp: DateTime<Utc>,
130 pub gpu_utilization: f64,
131 pub memory_utilization: f64,
132 pub temperature: f64,
133 pub power_consumption: f64,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct Bottleneck {
139 pub bottleneck_type: BottleneckType,
140 pub affected_stage: String,
141 pub severity: BottleneckSeverity,
142 pub impact_factor: f64,
143 pub description: String,
144 pub metrics: BottleneckMetrics,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub enum BottleneckType {
150 MemoryConstraint,
152 ComputationalBottleneck,
154 IOBottleneck,
156 CacheInefficiency,
158 SynchronizationOverhead,
160 DataMovementOverhead,
162 AlgorithmicComplexity,
164 ConfigurationSuboptimal,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
170pub enum BottleneckSeverity {
171 Low,
173 Medium,
175 High,
177 Critical,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct BottleneckMetrics {
184 pub time_spent_waiting_ms: f64,
185 pub resource_utilization: f64,
186 pub efficiency_score: f64,
187 pub improvement_potential: f64,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct OptimizationHint {
193 pub category: OptimizationCategory,
194 pub priority: OptimizationPriority,
195 pub title: String,
196 pub description: String,
197 pub expected_improvement: f64,
198 pub implementation_difficulty: ImplementationDifficulty,
199 pub code_examples: Vec<String>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub enum OptimizationCategory {
205 AlgorithmSelection,
207 ParameterTuning,
209 MemoryOptimization,
211 ParallelProcessing,
213 CacheOptimization,
215 DataStructureOptimization,
217 HardwareUtilization,
219 PipelineRestructuring,
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
225pub enum OptimizationPriority {
226 Low,
228 Medium,
230 High,
232 Critical,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum ImplementationDifficulty {
239 Trivial, Easy, Moderate, Hard, Expert, }
250
251impl PerformanceProfiler {
252 #[must_use]
254 pub fn new(config: ProfilerConfig) -> Self {
255 Self {
256 config,
257 active_sessions: Arc::new(Mutex::new(HashMap::new())),
258 completed_sessions: Arc::new(Mutex::new(Vec::new())),
259 }
260 }
261
262 #[must_use]
264 pub fn default() -> Self {
265 Self::new(ProfilerConfig::default())
266 }
267
268 #[must_use]
270 pub fn start_session(&self, pipeline_name: &str) -> String {
271 let session_id = format!("profile_{}", uuid::Uuid::new_v4());
272 let session = ProfileSession {
273 session_id: session_id.clone(),
274 pipeline_name: pipeline_name.to_string(),
275 start_time: Utc::now(),
276 end_time: None,
277 stages: BTreeMap::new(),
278 overall_metrics: OverallMetrics::default(),
279 bottlenecks: Vec::new(),
280 optimization_hints: Vec::new(),
281 };
282
283 {
284 let mut active = self.active_sessions.lock().unwrap();
285 active.insert(session_id.clone(), session);
286 }
287
288 if self.config.enable_cpu_monitoring || self.config.enable_memory_tracking {
290 self.start_background_monitoring(&session_id);
291 }
292
293 session_id
294 }
295
296 pub fn start_stage(
298 &self,
299 session_id: &str,
300 stage_name: &str,
301 component_type: &str,
302 ) -> Result<(), String> {
303 let mut active = self.active_sessions.lock().unwrap();
304 if let Some(session) = active.get_mut(session_id) {
305 let stage_profile = StageProfile {
306 stage_name: stage_name.to_string(),
307 component_type: component_type.to_string(),
308 start_time: Utc::now(),
309 end_time: None,
310 execution_time: Duration::from_secs(0),
311 memory_samples: Vec::new(),
312 cpu_samples: Vec::new(),
313 gpu_samples: Vec::new(),
314 input_shape: None,
315 output_shape: None,
316 parameters: HashMap::new(),
317 error_count: 0,
318 warning_count: 0,
319 };
320 session.stages.insert(stage_name.to_string(), stage_profile);
321 Ok(())
322 } else {
323 Err(format!("Session {session_id} not found"))
324 }
325 }
326
327 pub fn end_stage(&self, session_id: &str, stage_name: &str) -> Result<Duration, String> {
329 let mut active = self.active_sessions.lock().unwrap();
330 if let Some(session) = active.get_mut(session_id) {
331 if let Some(stage) = session.stages.get_mut(stage_name) {
332 let end_time = Utc::now();
333 stage.end_time = Some(end_time);
334 stage.execution_time = end_time
335 .signed_duration_since(stage.start_time)
336 .to_std()
337 .unwrap_or(Duration::from_secs(0));
338 Ok(stage.execution_time)
339 } else {
340 Err(format!(
341 "Stage {stage_name} not found in session {session_id}"
342 ))
343 }
344 } else {
345 Err(format!("Session {session_id} not found"))
346 }
347 }
348
349 pub fn record_stage_parameters(
351 &self,
352 session_id: &str,
353 stage_name: &str,
354 parameters: HashMap<String, String>,
355 ) -> Result<(), String> {
356 let mut active = self.active_sessions.lock().unwrap();
357 if let Some(session) = active.get_mut(session_id) {
358 if let Some(stage) = session.stages.get_mut(stage_name) {
359 stage.parameters = parameters;
360 Ok(())
361 } else {
362 Err(format!("Stage {stage_name} not found"))
363 }
364 } else {
365 Err(format!("Session {session_id} not found"))
366 }
367 }
368
369 pub fn record_data_shapes(
371 &self,
372 session_id: &str,
373 stage_name: &str,
374 input_shape: Option<(usize, usize)>,
375 output_shape: Option<(usize, usize)>,
376 ) -> Result<(), String> {
377 let mut active = self.active_sessions.lock().unwrap();
378 if let Some(session) = active.get_mut(session_id) {
379 if let Some(stage) = session.stages.get_mut(stage_name) {
380 stage.input_shape = input_shape;
381 stage.output_shape = output_shape;
382 Ok(())
383 } else {
384 Err(format!("Stage {stage_name} not found"))
385 }
386 } else {
387 Err(format!("Session {session_id} not found"))
388 }
389 }
390
391 pub fn end_session(&self, session_id: &str) -> Result<ProfileSession, String> {
393 let mut session = {
394 let mut active = self.active_sessions.lock().unwrap();
395 active
396 .remove(session_id)
397 .ok_or_else(|| format!("Session {session_id} not found"))?
398 };
399
400 session.end_time = Some(Utc::now());
401
402 session.overall_metrics = self.calculate_overall_metrics(&session);
404
405 if self.config.enable_bottleneck_detection {
407 session.bottlenecks = self.detect_bottlenecks(&session);
408 }
409
410 if self.config.enable_optimization_hints {
412 session.optimization_hints = self.generate_optimization_hints(&session);
413 }
414
415 {
417 let mut completed = self.completed_sessions.lock().unwrap();
418 completed.push(session.clone());
419
420 while completed.len() > self.config.max_sessions {
422 completed.remove(0);
423 }
424 }
425
426 Ok(session)
427 }
428
429 fn start_background_monitoring(&self, session_id: &str) {
431 let session_id = session_id.to_string();
432 let active_sessions = Arc::clone(&self.active_sessions);
433 let config = self.config.clone();
434
435 thread::spawn(move || {
436 let sample_interval = Duration::from_millis(config.sample_interval_ms);
437
438 loop {
439 let should_continue = {
440 let active = active_sessions.lock().unwrap();
441 active.contains_key(&session_id)
442 };
443
444 if !should_continue {
445 break;
446 }
447
448 if config.enable_memory_tracking {
450 let memory_sample = Self::sample_memory();
451 Self::add_memory_sample(&active_sessions, &session_id, memory_sample);
452 }
453
454 if config.enable_cpu_monitoring {
455 let cpu_sample = Self::sample_cpu();
456 Self::add_cpu_sample(&active_sessions, &session_id, cpu_sample);
457 }
458
459 if config.enable_gpu_monitoring {
460 if let Some(gpu_sample) = Self::sample_gpu() {
461 Self::add_gpu_sample(&active_sessions, &session_id, gpu_sample);
462 }
463 }
464
465 thread::sleep(sample_interval);
466 }
467 });
468 }
469
470 fn sample_memory() -> MemorySample {
472 MemorySample {
476 timestamp: Utc::now(),
477 heap_usage_mb: Self::get_process_memory(),
478 stack_usage_mb: 5.2, gpu_memory_mb: 0.0, virtual_memory_mb: Self::get_process_memory() * 1.5,
481 }
482 }
483
484 fn sample_cpu() -> CpuSample {
486 CpuSample {
489 timestamp: Utc::now(),
490 overall_usage: Self::get_cpu_usage(),
491 user_usage: Self::get_cpu_usage() * 0.8,
492 system_usage: Self::get_cpu_usage() * 0.2,
493 core_usage: (0..num_cpus::get())
494 .map(|_| Self::get_cpu_usage())
495 .collect(),
496 thread_count: 8, }
498 }
499
500 fn sample_gpu() -> Option<GpuSample> {
502 None
504 }
505
506 fn get_process_memory() -> f64 {
508 150.0 + (thread_rng().gen::<f64>() * 50.0)
511 }
512
513 fn get_cpu_usage() -> f64 {
515 30.0 + (thread_rng().gen::<f64>() * 40.0)
517 }
518
519 fn add_memory_sample(
521 active_sessions: &Arc<Mutex<HashMap<String, ProfileSession>>>,
522 session_id: &str,
523 sample: MemorySample,
524 ) {
525 let mut active = active_sessions.lock().unwrap();
526 if let Some(session) = active.get_mut(session_id) {
527 if let Some((_, stage)) = session.stages.iter_mut().last() {
529 if stage.end_time.is_none() {
530 stage.memory_samples.push(sample);
531 }
532 }
533 }
534 }
535
536 fn add_cpu_sample(
538 active_sessions: &Arc<Mutex<HashMap<String, ProfileSession>>>,
539 session_id: &str,
540 sample: CpuSample,
541 ) {
542 let mut active = active_sessions.lock().unwrap();
543 if let Some(session) = active.get_mut(session_id) {
544 if let Some((_, stage)) = session.stages.iter_mut().last() {
545 if stage.end_time.is_none() {
546 stage.cpu_samples.push(sample);
547 }
548 }
549 }
550 }
551
552 fn add_gpu_sample(
554 active_sessions: &Arc<Mutex<HashMap<String, ProfileSession>>>,
555 session_id: &str,
556 sample: GpuSample,
557 ) {
558 let mut active = active_sessions.lock().unwrap();
559 if let Some(session) = active.get_mut(session_id) {
560 if let Some((_, stage)) = session.stages.iter_mut().last() {
561 if stage.end_time.is_none() {
562 stage.gpu_samples.push(sample);
563 }
564 }
565 }
566 }
567
568 fn calculate_overall_metrics(&self, session: &ProfileSession) -> OverallMetrics {
570 let total_execution_time = session
571 .stages
572 .values()
573 .map(|stage| stage.execution_time)
574 .fold(Duration::from_secs(0), |acc, dur| acc + dur);
575
576 let peak_memory = session
577 .stages
578 .values()
579 .flat_map(|stage| &stage.memory_samples)
580 .map(|sample| sample.heap_usage_mb)
581 .fold(0.0, f64::max);
582
583 let avg_cpu = session
584 .stages
585 .values()
586 .flat_map(|stage| &stage.cpu_samples)
587 .map(|sample| sample.overall_usage)
588 .collect::<Vec<_>>();
589 let average_cpu_usage = if avg_cpu.is_empty() {
590 0.0
591 } else {
592 avg_cpu.iter().sum::<f64>() / avg_cpu.len() as f64
593 };
594
595 OverallMetrics {
597 total_execution_time,
598 peak_memory_usage_mb: peak_memory,
599 average_cpu_usage,
600 average_gpu_usage: 0.0, total_data_processed_mb: Self::estimate_data_processed(session),
602 throughput_samples_per_second: Self::calculate_throughput(session),
603 cache_hit_ratio: 0.75, parallel_efficiency: Self::calculate_parallel_efficiency(session),
605 pipeline_stages: session.stages.len(),
606 data_transformations: Self::count_transformations(session),
607 }
608 }
609
610 fn estimate_data_processed(session: &ProfileSession) -> f64 {
612 session
613 .stages
614 .values()
615 .filter_map(|stage| stage.input_shape)
616 .map(|(samples, features)| (samples * features * 8) as f64 / (1024.0 * 1024.0)) .sum()
618 }
619
620 fn calculate_throughput(session: &ProfileSession) -> f64 {
622 let total_samples: usize = session
623 .stages
624 .values()
625 .filter_map(|stage| stage.input_shape)
626 .map(|(samples, _)| samples)
627 .sum();
628
629 let total_time_seconds = session.overall_metrics.total_execution_time.as_secs_f64();
630
631 if total_time_seconds > 0.0 {
632 total_samples as f64 / total_time_seconds
633 } else {
634 0.0
635 }
636 }
637
638 fn calculate_parallel_efficiency(session: &ProfileSession) -> f64 {
640 let ideal_parallel_stages = session.stages.len().min(num_cpus::get());
642 let avg_cpu_per_core = session
643 .stages
644 .values()
645 .flat_map(|stage| &stage.cpu_samples)
646 .map(|sample| sample.overall_usage / sample.core_usage.len() as f64)
647 .collect::<Vec<_>>();
648
649 if avg_cpu_per_core.is_empty() {
650 0.5 } else {
652 let actual_efficiency =
653 avg_cpu_per_core.iter().sum::<f64>() / avg_cpu_per_core.len() as f64;
654 (actual_efficiency / 100.0).min(1.0)
655 }
656 }
657
658 fn count_transformations(session: &ProfileSession) -> usize {
660 session
661 .stages
662 .values()
663 .filter(|stage| {
664 stage.component_type.contains("transformer")
665 || stage.component_type.contains("preprocessor")
666 })
667 .count()
668 }
669
670 fn detect_bottlenecks(&self, session: &ProfileSession) -> Vec<Bottleneck> {
672 let mut bottlenecks = Vec::new();
673
674 for (stage_name, stage) in &session.stages {
676 if let Some(max_memory) = stage
677 .memory_samples
678 .iter()
679 .map(|s| s.heap_usage_mb)
680 .fold(None, |acc, x| Some(acc.map_or(x, |acc: f64| acc.max(x))))
681 {
682 if max_memory > 1000.0 {
683 bottlenecks.push(Bottleneck {
685 bottleneck_type: BottleneckType::MemoryConstraint,
686 affected_stage: stage_name.clone(),
687 severity: if max_memory > 4000.0 {
688 BottleneckSeverity::Critical
689 } else {
690 BottleneckSeverity::High
691 },
692 impact_factor: (max_memory / 1000.0).min(5.0),
693 description: format!(
694 "High memory usage: {max_memory:.1}MB in stage '{stage_name}'"
695 ),
696 metrics: BottleneckMetrics {
697 time_spent_waiting_ms: 0.0,
698 resource_utilization: max_memory / 8192.0, efficiency_score: 1.0 - (max_memory / 8192.0),
700 improvement_potential: 0.3,
701 },
702 });
703 }
704 }
705
706 if stage.execution_time.as_secs_f64() > 10.0 {
708 let severity = if stage.execution_time.as_secs_f64() > 60.0 {
709 BottleneckSeverity::Critical
710 } else if stage.execution_time.as_secs_f64() > 30.0 {
711 BottleneckSeverity::High
712 } else {
713 BottleneckSeverity::Medium
714 };
715
716 bottlenecks.push(Bottleneck {
717 bottleneck_type: BottleneckType::ComputationalBottleneck,
718 affected_stage: stage_name.clone(),
719 severity,
720 impact_factor: stage.execution_time.as_secs_f64() / 10.0,
721 description: format!(
722 "Slow execution: {:.1}s in stage '{}'",
723 stage.execution_time.as_secs_f64(),
724 stage_name
725 ),
726 metrics: BottleneckMetrics {
727 time_spent_waiting_ms: stage.execution_time.as_millis() as f64,
728 resource_utilization: 0.8,
729 efficiency_score: 1.0 / stage.execution_time.as_secs_f64().max(1.0),
730 improvement_potential: 0.5,
731 },
732 });
733 }
734 }
735
736 bottlenecks
737 }
738
739 fn generate_optimization_hints(&self, session: &ProfileSession) -> Vec<OptimizationHint> {
741 let mut hints = Vec::new();
742
743 if session.overall_metrics.peak_memory_usage_mb > 2000.0 {
745 hints.push(OptimizationHint {
746 category: OptimizationCategory::MemoryOptimization,
747 priority: OptimizationPriority::High,
748 title: "High Memory Usage Detected".to_string(),
749 description: format!(
750 "Pipeline uses {:.1}MB peak memory. Consider chunked processing or streaming.",
751 session.overall_metrics.peak_memory_usage_mb
752 ),
753 expected_improvement: 0.4,
754 implementation_difficulty: ImplementationDifficulty::Moderate,
755 code_examples: vec![
756 "Use streaming: pipeline.enable_streaming(chunk_size=1000)".to_string(),
757 "Enable memory optimization: config.memory_efficient = true".to_string(),
758 ],
759 });
760 }
761
762 if session.overall_metrics.parallel_efficiency < 0.5 {
764 hints.push(OptimizationHint {
765 category: OptimizationCategory::ParallelProcessing,
766 priority: OptimizationPriority::Medium,
767 title: "Low Parallel Efficiency".to_string(),
768 description: format!(
769 "Parallel efficiency is {:.1}%. Consider enabling more parallelization.",
770 session.overall_metrics.parallel_efficiency * 100.0
771 ),
772 expected_improvement: 0.6,
773 implementation_difficulty: ImplementationDifficulty::Easy,
774 code_examples: vec![
775 "Set parallel jobs: pipeline.set_n_jobs(-1)".to_string(),
776 "Enable SIMD: config.enable_simd = true".to_string(),
777 ],
778 });
779 }
780
781 for (stage_name, stage) in &session.stages {
783 if stage.execution_time.as_secs_f64() > 30.0
784 && stage.component_type.contains("estimator")
785 {
786 hints.push(OptimizationHint {
787 category: OptimizationCategory::AlgorithmSelection,
788 priority: OptimizationPriority::High,
789 title: format!("Slow Algorithm in {stage_name}"),
790 description: format!(
791 "Stage '{}' takes {:.1}s. Consider faster algorithms or approximations.",
792 stage_name,
793 stage.execution_time.as_secs_f64()
794 ),
795 expected_improvement: 0.7,
796 implementation_difficulty: ImplementationDifficulty::Moderate,
797 code_examples: vec![
798 "Use approximate algorithms where applicable".to_string(),
799 "Consider ensemble methods for better speed/accuracy trade-off".to_string(),
800 ],
801 });
802 }
803 }
804
805 hints
806 }
807
808 #[must_use]
810 pub fn get_completed_sessions(&self) -> Vec<ProfileSession> {
811 let completed = self.completed_sessions.lock().unwrap();
812 completed.clone()
813 }
814
815 #[must_use]
817 pub fn generate_report(&self, session_id: Option<&str>) -> PerformanceReport {
818 let sessions = if let Some(id) = session_id {
819 let completed = self.completed_sessions.lock().unwrap();
820 completed
821 .iter()
822 .filter(|s| s.session_id == id)
823 .cloned()
824 .collect()
825 } else {
826 self.get_completed_sessions()
827 };
828
829 PerformanceReport::from_sessions(sessions)
830 }
831}
832
833impl Default for OverallMetrics {
834 fn default() -> Self {
835 Self {
836 total_execution_time: Duration::from_secs(0),
837 peak_memory_usage_mb: 0.0,
838 average_cpu_usage: 0.0,
839 average_gpu_usage: 0.0,
840 total_data_processed_mb: 0.0,
841 throughput_samples_per_second: 0.0,
842 cache_hit_ratio: 0.0,
843 parallel_efficiency: 0.0,
844 pipeline_stages: 0,
845 data_transformations: 0,
846 }
847 }
848}
849
850#[derive(Debug, Clone, Serialize, Deserialize)]
852pub struct PerformanceReport {
853 pub report_id: String,
854 pub generated_at: DateTime<Utc>,
855 pub sessions_analyzed: usize,
856 pub summary_metrics: SummaryMetrics,
857 pub bottleneck_analysis: BottleneckAnalysis,
858 pub optimization_recommendations: Vec<OptimizationHint>,
859 pub trend_analysis: TrendAnalysis,
860 pub comparative_analysis: ComparativeAnalysis,
861}
862
863#[derive(Debug, Clone, Serialize, Deserialize)]
864pub struct SummaryMetrics {
865 pub average_execution_time: Duration,
866 pub fastest_execution_time: Duration,
867 pub slowest_execution_time: Duration,
868 pub average_memory_usage: f64,
869 pub peak_memory_across_sessions: f64,
870 pub average_throughput: f64,
871 pub best_parallel_efficiency: f64,
872}
873
874#[derive(Debug, Clone, Serialize, Deserialize)]
875pub struct BottleneckAnalysis {
876 pub most_common_bottleneck: BottleneckType,
877 pub bottleneck_frequency: HashMap<String, u32>,
878 pub severity_distribution: HashMap<BottleneckSeverity, u32>,
879 pub impact_analysis: HashMap<String, f64>,
880}
881
882#[derive(Debug, Clone, Serialize, Deserialize)]
883pub struct TrendAnalysis {
884 pub performance_trend: TrendDirection,
885 pub memory_usage_trend: TrendDirection,
886 pub throughput_trend: TrendDirection,
887 pub session_performance_scores: Vec<f64>,
888}
889
890#[derive(Debug, Clone, Serialize, Deserialize)]
891pub enum TrendDirection {
892 Improving,
894 Stable,
896 Degrading,
898 InsufficientData,
900}
901
902#[derive(Debug, Clone, Serialize, Deserialize)]
903pub struct ComparativeAnalysis {
904 pub best_performing_session: String,
905 pub worst_performing_session: String,
906 pub performance_variance: f64,
907 pub consistency_score: f64,
908}
909
910impl PerformanceReport {
911 #[must_use]
912 pub fn from_sessions(sessions: Vec<ProfileSession>) -> Self {
913 let report_id = format!("report_{}", uuid::Uuid::new_v4());
914
915 let summary_metrics = Self::calculate_summary_metrics(&sessions);
916 let bottleneck_analysis = Self::analyze_bottlenecks(&sessions);
917 let trend_analysis = Self::analyze_trends(&sessions);
918 let comparative_analysis = Self::comparative_analysis(&sessions);
919
920 let mut all_hints: Vec<OptimizationHint> = sessions
922 .iter()
923 .flat_map(|s| s.optimization_hints.iter())
924 .cloned()
925 .collect();
926
927 all_hints.sort_by(|a, b| b.priority.cmp(&a.priority));
929 all_hints.truncate(10); Self {
932 report_id,
933 generated_at: Utc::now(),
934 sessions_analyzed: sessions.len(),
935 summary_metrics,
936 bottleneck_analysis,
937 optimization_recommendations: all_hints,
938 trend_analysis,
939 comparative_analysis,
940 }
941 }
942
943 fn calculate_summary_metrics(sessions: &[ProfileSession]) -> SummaryMetrics {
944 if sessions.is_empty() {
945 return SummaryMetrics {
946 average_execution_time: Duration::from_secs(0),
947 fastest_execution_time: Duration::from_secs(0),
948 slowest_execution_time: Duration::from_secs(0),
949 average_memory_usage: 0.0,
950 peak_memory_across_sessions: 0.0,
951 average_throughput: 0.0,
952 best_parallel_efficiency: 0.0,
953 };
954 }
955
956 let execution_times: Vec<Duration> = sessions
957 .iter()
958 .map(|s| s.overall_metrics.total_execution_time)
959 .collect();
960
961 let average_execution = Duration::from_secs_f64(
962 execution_times
963 .iter()
964 .map(std::time::Duration::as_secs_f64)
965 .sum::<f64>()
966 / sessions.len() as f64,
967 );
968
969 SummaryMetrics {
971 average_execution_time: average_execution,
972 fastest_execution_time: *execution_times.iter().min().unwrap(),
973 slowest_execution_time: *execution_times.iter().max().unwrap(),
974 average_memory_usage: sessions
975 .iter()
976 .map(|s| s.overall_metrics.peak_memory_usage_mb)
977 .sum::<f64>()
978 / sessions.len() as f64,
979 peak_memory_across_sessions: sessions
980 .iter()
981 .map(|s| s.overall_metrics.peak_memory_usage_mb)
982 .fold(0.0, f64::max),
983 average_throughput: sessions
984 .iter()
985 .map(|s| s.overall_metrics.throughput_samples_per_second)
986 .sum::<f64>()
987 / sessions.len() as f64,
988 best_parallel_efficiency: sessions
989 .iter()
990 .map(|s| s.overall_metrics.parallel_efficiency)
991 .fold(0.0, f64::max),
992 }
993 }
994
995 fn analyze_bottlenecks(sessions: &[ProfileSession]) -> BottleneckAnalysis {
996 let all_bottlenecks: Vec<&Bottleneck> =
997 sessions.iter().flat_map(|s| &s.bottlenecks).collect();
998
999 let mut bottleneck_frequency = HashMap::new();
1000 let mut severity_distribution = HashMap::new();
1001 let mut impact_analysis = HashMap::new();
1002
1003 for bottleneck in &all_bottlenecks {
1004 *bottleneck_frequency
1005 .entry(bottleneck.affected_stage.clone())
1006 .or_insert(0) += 1;
1007 *severity_distribution
1008 .entry(bottleneck.severity.clone())
1009 .or_insert(0) += 1;
1010 *impact_analysis
1011 .entry(bottleneck.affected_stage.clone())
1012 .or_insert(0.0) += bottleneck.impact_factor;
1013 }
1014
1015 let most_common_bottleneck = all_bottlenecks
1016 .iter()
1017 .fold(HashMap::new(), |mut acc, b| {
1018 *acc.entry(format!("{:?}", b.bottleneck_type)).or_insert(0) += 1;
1019 acc
1020 })
1021 .into_iter()
1022 .max_by_key(|(_, count)| *count)
1023 .map_or(
1024 BottleneckType::ComputationalBottleneck,
1025 |(bottleneck_type, _)| match bottleneck_type.as_str() {
1026 "MemoryConstraint" => BottleneckType::MemoryConstraint,
1027 "ComputationalBottleneck" => BottleneckType::ComputationalBottleneck,
1028 _ => BottleneckType::ComputationalBottleneck,
1029 },
1030 );
1031
1032 BottleneckAnalysis {
1034 most_common_bottleneck,
1035 bottleneck_frequency,
1036 severity_distribution,
1037 impact_analysis,
1038 }
1039 }
1040
1041 fn analyze_trends(sessions: &[ProfileSession]) -> TrendAnalysis {
1042 if sessions.len() < 3 {
1043 return TrendAnalysis {
1044 performance_trend: TrendDirection::InsufficientData,
1045 memory_usage_trend: TrendDirection::InsufficientData,
1046 throughput_trend: TrendDirection::InsufficientData,
1047 session_performance_scores: Vec::new(),
1048 };
1049 }
1050
1051 let performance_scores: Vec<f64> = sessions
1052 .iter()
1053 .map(|s| {
1054 1000.0
1055 / s.overall_metrics
1056 .total_execution_time
1057 .as_secs_f64()
1058 .max(1.0)
1059 })
1060 .collect();
1061
1062 let performance_trend = Self::calculate_trend_direction(&performance_scores);
1063
1064 let memory_scores: Vec<f64> = sessions
1065 .iter()
1066 .map(|s| s.overall_metrics.peak_memory_usage_mb)
1067 .collect();
1068 let memory_usage_trend = Self::calculate_trend_direction(&memory_scores);
1069
1070 let throughput_scores: Vec<f64> = sessions
1071 .iter()
1072 .map(|s| s.overall_metrics.throughput_samples_per_second)
1073 .collect();
1074 let throughput_trend = Self::calculate_trend_direction(&throughput_scores);
1075
1076 TrendAnalysis {
1078 performance_trend,
1079 memory_usage_trend,
1080 throughput_trend,
1081 session_performance_scores: performance_scores,
1082 }
1083 }
1084
1085 fn calculate_trend_direction(values: &[f64]) -> TrendDirection {
1086 if values.len() < 3 {
1087 return TrendDirection::InsufficientData;
1088 }
1089
1090 let mid_point = values.len() / 2;
1091 let first_half_avg = values[..mid_point].iter().sum::<f64>() / mid_point as f64;
1092 let second_half_avg =
1093 values[mid_point..].iter().sum::<f64>() / (values.len() - mid_point) as f64;
1094
1095 let change_percentage =
1096 (second_half_avg - first_half_avg) / first_half_avg.abs().max(1e-10);
1097
1098 if change_percentage > 0.05 {
1099 TrendDirection::Improving
1100 } else if change_percentage < -0.05 {
1101 TrendDirection::Degrading
1102 } else {
1103 TrendDirection::Stable
1104 }
1105 }
1106
1107 fn comparative_analysis(sessions: &[ProfileSession]) -> ComparativeAnalysis {
1108 if sessions.is_empty() {
1109 return ComparativeAnalysis {
1110 best_performing_session: "none".to_string(),
1111 worst_performing_session: "none".to_string(),
1112 performance_variance: 0.0,
1113 consistency_score: 0.0,
1114 };
1115 }
1116
1117 let (best_session, worst_session) = sessions.iter().fold(
1118 (sessions[0].clone(), sessions[0].clone()),
1119 |(best, worst), session| {
1120 let best_next = if session.overall_metrics.total_execution_time
1121 < best.overall_metrics.total_execution_time
1122 {
1123 session.clone()
1124 } else {
1125 best
1126 };
1127
1128 let worst_next = if session.overall_metrics.total_execution_time
1129 > worst.overall_metrics.total_execution_time
1130 {
1131 session.clone()
1132 } else {
1133 worst
1134 };
1135
1136 (best_next, worst_next)
1137 },
1138 );
1139
1140 let execution_times: Vec<f64> = sessions
1141 .iter()
1142 .map(|s| s.overall_metrics.total_execution_time.as_secs_f64())
1143 .collect();
1144
1145 let mean_time = execution_times.iter().sum::<f64>() / sessions.len() as f64;
1146 let variance = execution_times
1147 .iter()
1148 .map(|t| (t - mean_time).powi(2))
1149 .sum::<f64>()
1150 / sessions.len() as f64;
1151
1152 let consistency_score = 1.0 / (1.0 + variance.sqrt() / mean_time);
1153
1154 ComparativeAnalysis {
1156 best_performing_session: best_session.session_id,
1157 worst_performing_session: worst_session.session_id,
1158 performance_variance: variance,
1159 consistency_score,
1160 }
1161 }
1162}
1163
1164#[allow(non_snake_case)]
1165#[cfg(test)]
1166mod tests {
1167 use super::*;
1168
1169 #[test]
1170 fn test_profiler_creation() {
1171 let profiler = PerformanceProfiler::default();
1172 assert_eq!(profiler.config.enable_timing, true);
1173 assert_eq!(profiler.config.enable_memory_tracking, true);
1174 }
1175
1176 #[test]
1177 fn test_session_lifecycle() {
1178 let profiler = PerformanceProfiler::default();
1179 let session_id = profiler.start_session("test_pipeline");
1180
1181 profiler
1183 .start_stage(&session_id, "preprocessing", "transformer")
1184 .unwrap();
1185 thread::sleep(Duration::from_millis(10));
1186 let stage_duration = profiler.end_stage(&session_id, "preprocessing").unwrap();
1187
1188 assert!(stage_duration > Duration::from_millis(5));
1189
1190 let completed_session = profiler.end_session(&session_id).unwrap();
1192 assert_eq!(completed_session.pipeline_name, "test_pipeline");
1193 assert_eq!(completed_session.stages.len(), 1);
1194 }
1195
1196 #[test]
1197 fn test_bottleneck_detection() {
1198 let profiler = PerformanceProfiler::default();
1199 let session_id = profiler.start_session("test_pipeline");
1200
1201 profiler
1203 .start_stage(&session_id, "slow_stage", "estimator")
1204 .unwrap();
1205 thread::sleep(Duration::from_millis(50)); profiler.end_stage(&session_id, "slow_stage").unwrap();
1207
1208 let completed_session = profiler.end_session(&session_id).unwrap();
1209
1210 assert_eq!(completed_session.stages.len(), 1);
1212 }
1213
1214 #[test]
1215 fn test_performance_report_generation() {
1216 let profiler = PerformanceProfiler::default();
1217
1218 for i in 0..3 {
1220 let session_id = profiler.start_session(&format!("pipeline_{}", i));
1221 profiler
1222 .start_stage(&session_id, "stage", "transformer")
1223 .unwrap();
1224 thread::sleep(Duration::from_millis(10));
1225 profiler.end_stage(&session_id, "stage").unwrap();
1226 profiler.end_session(&session_id).unwrap();
1227 }
1228
1229 let report = profiler.generate_report(None);
1230 assert_eq!(report.sessions_analyzed, 3);
1231 assert!(report.summary_metrics.average_execution_time > Duration::from_secs(0));
1232 }
1233}