1use chrono::{DateTime, Utc};
8use scirs2_core::random::{thread_rng, Rng};
9use serde::{Deserialize, Serialize};
10use std::collections::{BTreeMap, HashMap};
11use std::sync::{Arc, Mutex};
12use std::thread;
13use std::time::Duration;
14
15#[derive(Debug, Clone)]
17pub struct PerformanceProfiler {
18 config: ProfilerConfig,
19 active_sessions: Arc<Mutex<HashMap<String, ProfileSession>>>,
20 completed_sessions: Arc<Mutex<Vec<ProfileSession>>>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct ProfilerConfig {
26 pub enable_timing: bool,
28 pub enable_memory_tracking: bool,
30 pub enable_cpu_monitoring: bool,
32 pub enable_gpu_monitoring: bool,
34 pub sample_interval_ms: u64,
36 pub max_sessions: usize,
38 pub enable_bottleneck_detection: bool,
40 pub enable_optimization_hints: bool,
42}
43
44impl Default for ProfilerConfig {
45 fn default() -> Self {
46 Self {
47 enable_timing: true,
48 enable_memory_tracking: true,
49 enable_cpu_monitoring: true,
50 enable_gpu_monitoring: false,
51 sample_interval_ms: 100,
52 max_sessions: 100,
53 enable_bottleneck_detection: true,
54 enable_optimization_hints: true,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct ProfileSession {
62 pub session_id: String,
63 pub pipeline_name: String,
64 pub start_time: DateTime<Utc>,
65 pub end_time: Option<DateTime<Utc>>,
66 pub stages: BTreeMap<String, StageProfile>,
67 pub overall_metrics: OverallMetrics,
68 pub bottlenecks: Vec<Bottleneck>,
69 pub optimization_hints: Vec<OptimizationHint>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct StageProfile {
75 pub stage_name: String,
76 pub component_type: String,
77 pub start_time: DateTime<Utc>,
78 pub end_time: Option<DateTime<Utc>>,
79 pub execution_time: Duration,
80 pub memory_samples: Vec<MemorySample>,
81 pub cpu_samples: Vec<CpuSample>,
82 pub gpu_samples: Vec<GpuSample>,
83 pub input_shape: Option<(usize, usize)>,
84 pub output_shape: Option<(usize, usize)>,
85 pub parameters: HashMap<String, String>,
86 pub error_count: u32,
87 pub warning_count: u32,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct OverallMetrics {
93 pub total_execution_time: Duration,
94 pub peak_memory_usage_mb: f64,
95 pub average_cpu_usage: f64,
96 pub average_gpu_usage: f64,
97 pub total_data_processed_mb: f64,
98 pub throughput_samples_per_second: f64,
99 pub cache_hit_ratio: f64,
100 pub parallel_efficiency: f64,
101 pub pipeline_stages: usize,
102 pub data_transformations: usize,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct MemorySample {
108 pub timestamp: DateTime<Utc>,
109 pub heap_usage_mb: f64,
110 pub stack_usage_mb: f64,
111 pub gpu_memory_mb: f64,
112 pub virtual_memory_mb: f64,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct CpuSample {
118 pub timestamp: DateTime<Utc>,
119 pub overall_usage: f64,
120 pub user_usage: f64,
121 pub system_usage: f64,
122 pub core_usage: Vec<f64>,
123 pub thread_count: u32,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct GpuSample {
129 pub timestamp: DateTime<Utc>,
130 pub gpu_utilization: f64,
131 pub memory_utilization: f64,
132 pub temperature: f64,
133 pub power_consumption: f64,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct Bottleneck {
139 pub bottleneck_type: BottleneckType,
140 pub affected_stage: String,
141 pub severity: BottleneckSeverity,
142 pub impact_factor: f64,
143 pub description: String,
144 pub metrics: BottleneckMetrics,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub enum BottleneckType {
150 MemoryConstraint,
152 ComputationalBottleneck,
154 IOBottleneck,
156 CacheInefficiency,
158 SynchronizationOverhead,
160 DataMovementOverhead,
162 AlgorithmicComplexity,
164 ConfigurationSuboptimal,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
170pub enum BottleneckSeverity {
171 Low,
173 Medium,
175 High,
177 Critical,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct BottleneckMetrics {
184 pub time_spent_waiting_ms: f64,
185 pub resource_utilization: f64,
186 pub efficiency_score: f64,
187 pub improvement_potential: f64,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct OptimizationHint {
193 pub category: OptimizationCategory,
194 pub priority: OptimizationPriority,
195 pub title: String,
196 pub description: String,
197 pub expected_improvement: f64,
198 pub implementation_difficulty: ImplementationDifficulty,
199 pub code_examples: Vec<String>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub enum OptimizationCategory {
205 AlgorithmSelection,
207 ParameterTuning,
209 MemoryOptimization,
211 ParallelProcessing,
213 CacheOptimization,
215 DataStructureOptimization,
217 HardwareUtilization,
219 PipelineRestructuring,
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
225pub enum OptimizationPriority {
226 Low,
228 Medium,
230 High,
232 Critical,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum ImplementationDifficulty {
239 Trivial, Easy, Moderate, Hard, Expert, }
250
251impl PerformanceProfiler {
252 #[must_use]
254 pub fn new(config: ProfilerConfig) -> Self {
255 Self {
256 config,
257 active_sessions: Arc::new(Mutex::new(HashMap::new())),
258 completed_sessions: Arc::new(Mutex::new(Vec::new())),
259 }
260 }
261
262 #[must_use]
264 pub fn default() -> Self {
265 Self::new(ProfilerConfig::default())
266 }
267
268 #[must_use]
270 pub fn start_session(&self, pipeline_name: &str) -> String {
271 let session_id = format!("profile_{}", uuid::Uuid::new_v4());
272 let session = ProfileSession {
273 session_id: session_id.clone(),
274 pipeline_name: pipeline_name.to_string(),
275 start_time: Utc::now(),
276 end_time: None,
277 stages: BTreeMap::new(),
278 overall_metrics: OverallMetrics::default(),
279 bottlenecks: Vec::new(),
280 optimization_hints: Vec::new(),
281 };
282
283 {
284 let mut active = self
285 .active_sessions
286 .lock()
287 .unwrap_or_else(|e| e.into_inner());
288 active.insert(session_id.clone(), session);
289 }
290
291 if self.config.enable_cpu_monitoring || self.config.enable_memory_tracking {
293 self.start_background_monitoring(&session_id);
294 }
295
296 session_id
297 }
298
299 pub fn start_stage(
301 &self,
302 session_id: &str,
303 stage_name: &str,
304 component_type: &str,
305 ) -> Result<(), String> {
306 let mut active = self
307 .active_sessions
308 .lock()
309 .unwrap_or_else(|e| e.into_inner());
310 if let Some(session) = active.get_mut(session_id) {
311 let stage_profile = StageProfile {
312 stage_name: stage_name.to_string(),
313 component_type: component_type.to_string(),
314 start_time: Utc::now(),
315 end_time: None,
316 execution_time: Duration::from_secs(0),
317 memory_samples: Vec::new(),
318 cpu_samples: Vec::new(),
319 gpu_samples: Vec::new(),
320 input_shape: None,
321 output_shape: None,
322 parameters: HashMap::new(),
323 error_count: 0,
324 warning_count: 0,
325 };
326 session.stages.insert(stage_name.to_string(), stage_profile);
327 Ok(())
328 } else {
329 Err(format!("Session {session_id} not found"))
330 }
331 }
332
333 pub fn end_stage(&self, session_id: &str, stage_name: &str) -> Result<Duration, String> {
335 let mut active = self
336 .active_sessions
337 .lock()
338 .unwrap_or_else(|e| e.into_inner());
339 if let Some(session) = active.get_mut(session_id) {
340 if let Some(stage) = session.stages.get_mut(stage_name) {
341 let end_time = Utc::now();
342 stage.end_time = Some(end_time);
343 stage.execution_time = end_time
344 .signed_duration_since(stage.start_time)
345 .to_std()
346 .unwrap_or(Duration::from_secs(0));
347 Ok(stage.execution_time)
348 } else {
349 Err(format!(
350 "Stage {stage_name} not found in session {session_id}"
351 ))
352 }
353 } else {
354 Err(format!("Session {session_id} not found"))
355 }
356 }
357
358 pub fn record_stage_parameters(
360 &self,
361 session_id: &str,
362 stage_name: &str,
363 parameters: HashMap<String, String>,
364 ) -> Result<(), String> {
365 let mut active = self
366 .active_sessions
367 .lock()
368 .unwrap_or_else(|e| e.into_inner());
369 if let Some(session) = active.get_mut(session_id) {
370 if let Some(stage) = session.stages.get_mut(stage_name) {
371 stage.parameters = parameters;
372 Ok(())
373 } else {
374 Err(format!("Stage {stage_name} not found"))
375 }
376 } else {
377 Err(format!("Session {session_id} not found"))
378 }
379 }
380
381 pub fn record_data_shapes(
383 &self,
384 session_id: &str,
385 stage_name: &str,
386 input_shape: Option<(usize, usize)>,
387 output_shape: Option<(usize, usize)>,
388 ) -> Result<(), String> {
389 let mut active = self
390 .active_sessions
391 .lock()
392 .unwrap_or_else(|e| e.into_inner());
393 if let Some(session) = active.get_mut(session_id) {
394 if let Some(stage) = session.stages.get_mut(stage_name) {
395 stage.input_shape = input_shape;
396 stage.output_shape = output_shape;
397 Ok(())
398 } else {
399 Err(format!("Stage {stage_name} not found"))
400 }
401 } else {
402 Err(format!("Session {session_id} not found"))
403 }
404 }
405
406 pub fn end_session(&self, session_id: &str) -> Result<ProfileSession, String> {
408 let mut session = {
409 let mut active = self
410 .active_sessions
411 .lock()
412 .unwrap_or_else(|e| e.into_inner());
413 active
414 .remove(session_id)
415 .ok_or_else(|| format!("Session {session_id} not found"))?
416 };
417
418 session.end_time = Some(Utc::now());
419
420 session.overall_metrics = self.calculate_overall_metrics(&session);
422
423 if self.config.enable_bottleneck_detection {
425 session.bottlenecks = self.detect_bottlenecks(&session);
426 }
427
428 if self.config.enable_optimization_hints {
430 session.optimization_hints = self.generate_optimization_hints(&session);
431 }
432
433 {
435 let mut completed = self
436 .completed_sessions
437 .lock()
438 .unwrap_or_else(|e| e.into_inner());
439 completed.push(session.clone());
440
441 while completed.len() > self.config.max_sessions {
443 completed.remove(0);
444 }
445 }
446
447 Ok(session)
448 }
449
450 fn start_background_monitoring(&self, session_id: &str) {
452 let session_id = session_id.to_string();
453 let active_sessions = Arc::clone(&self.active_sessions);
454 let config = self.config.clone();
455
456 thread::spawn(move || {
457 let sample_interval = Duration::from_millis(config.sample_interval_ms);
458
459 loop {
460 let should_continue = {
461 let active = active_sessions.lock().unwrap_or_else(|e| e.into_inner());
462 active.contains_key(&session_id)
463 };
464
465 if !should_continue {
466 break;
467 }
468
469 if config.enable_memory_tracking {
471 let memory_sample = Self::sample_memory();
472 Self::add_memory_sample(&active_sessions, &session_id, memory_sample);
473 }
474
475 if config.enable_cpu_monitoring {
476 let cpu_sample = Self::sample_cpu();
477 Self::add_cpu_sample(&active_sessions, &session_id, cpu_sample);
478 }
479
480 if config.enable_gpu_monitoring {
481 if let Some(gpu_sample) = Self::sample_gpu() {
482 Self::add_gpu_sample(&active_sessions, &session_id, gpu_sample);
483 }
484 }
485
486 thread::sleep(sample_interval);
487 }
488 });
489 }
490
491 fn sample_memory() -> MemorySample {
493 MemorySample {
497 timestamp: Utc::now(),
498 heap_usage_mb: Self::get_process_memory(),
499 stack_usage_mb: 5.2, gpu_memory_mb: 0.0, virtual_memory_mb: Self::get_process_memory() * 1.5,
502 }
503 }
504
505 fn sample_cpu() -> CpuSample {
507 CpuSample {
510 timestamp: Utc::now(),
511 overall_usage: Self::get_cpu_usage(),
512 user_usage: Self::get_cpu_usage() * 0.8,
513 system_usage: Self::get_cpu_usage() * 0.2,
514 core_usage: (0..num_cpus::get())
515 .map(|_| Self::get_cpu_usage())
516 .collect(),
517 thread_count: 8, }
519 }
520
521 fn sample_gpu() -> Option<GpuSample> {
523 None
525 }
526
527 fn get_process_memory() -> f64 {
529 150.0 + (thread_rng().random::<f64>() * 50.0)
532 }
533
534 fn get_cpu_usage() -> f64 {
536 30.0 + (thread_rng().random::<f64>() * 40.0)
538 }
539
540 fn add_memory_sample(
542 active_sessions: &Arc<Mutex<HashMap<String, ProfileSession>>>,
543 session_id: &str,
544 sample: MemorySample,
545 ) {
546 let mut active = active_sessions.lock().unwrap_or_else(|e| e.into_inner());
547 if let Some(session) = active.get_mut(session_id) {
548 if let Some((_, stage)) = session.stages.iter_mut().last() {
550 if stage.end_time.is_none() {
551 stage.memory_samples.push(sample);
552 }
553 }
554 }
555 }
556
557 fn add_cpu_sample(
559 active_sessions: &Arc<Mutex<HashMap<String, ProfileSession>>>,
560 session_id: &str,
561 sample: CpuSample,
562 ) {
563 let mut active = active_sessions.lock().unwrap_or_else(|e| e.into_inner());
564 if let Some(session) = active.get_mut(session_id) {
565 if let Some((_, stage)) = session.stages.iter_mut().last() {
566 if stage.end_time.is_none() {
567 stage.cpu_samples.push(sample);
568 }
569 }
570 }
571 }
572
573 fn add_gpu_sample(
575 active_sessions: &Arc<Mutex<HashMap<String, ProfileSession>>>,
576 session_id: &str,
577 sample: GpuSample,
578 ) {
579 let mut active = active_sessions.lock().unwrap_or_else(|e| e.into_inner());
580 if let Some(session) = active.get_mut(session_id) {
581 if let Some((_, stage)) = session.stages.iter_mut().last() {
582 if stage.end_time.is_none() {
583 stage.gpu_samples.push(sample);
584 }
585 }
586 }
587 }
588
589 fn calculate_overall_metrics(&self, session: &ProfileSession) -> OverallMetrics {
591 let total_execution_time = session
592 .stages
593 .values()
594 .map(|stage| stage.execution_time)
595 .fold(Duration::from_secs(0), |acc, dur| acc + dur);
596
597 let peak_memory = session
598 .stages
599 .values()
600 .flat_map(|stage| &stage.memory_samples)
601 .map(|sample| sample.heap_usage_mb)
602 .fold(0.0, f64::max);
603
604 let avg_cpu = session
605 .stages
606 .values()
607 .flat_map(|stage| &stage.cpu_samples)
608 .map(|sample| sample.overall_usage)
609 .collect::<Vec<_>>();
610 let average_cpu_usage = if avg_cpu.is_empty() {
611 0.0
612 } else {
613 avg_cpu.iter().sum::<f64>() / avg_cpu.len() as f64
614 };
615
616 OverallMetrics {
618 total_execution_time,
619 peak_memory_usage_mb: peak_memory,
620 average_cpu_usage,
621 average_gpu_usage: 0.0, total_data_processed_mb: Self::estimate_data_processed(session),
623 throughput_samples_per_second: Self::calculate_throughput(session),
624 cache_hit_ratio: 0.75, parallel_efficiency: Self::calculate_parallel_efficiency(session),
626 pipeline_stages: session.stages.len(),
627 data_transformations: Self::count_transformations(session),
628 }
629 }
630
631 fn estimate_data_processed(session: &ProfileSession) -> f64 {
633 session
634 .stages
635 .values()
636 .filter_map(|stage| stage.input_shape)
637 .map(|(samples, features)| (samples * features * 8) as f64 / (1024.0 * 1024.0)) .sum()
639 }
640
641 fn calculate_throughput(session: &ProfileSession) -> f64 {
643 let total_samples: usize = session
644 .stages
645 .values()
646 .filter_map(|stage| stage.input_shape)
647 .map(|(samples, _)| samples)
648 .sum();
649
650 let total_time_seconds = session.overall_metrics.total_execution_time.as_secs_f64();
651
652 if total_time_seconds > 0.0 {
653 total_samples as f64 / total_time_seconds
654 } else {
655 0.0
656 }
657 }
658
659 fn calculate_parallel_efficiency(session: &ProfileSession) -> f64 {
661 let ideal_parallel_stages = session.stages.len().min(num_cpus::get());
663 let avg_cpu_per_core = session
664 .stages
665 .values()
666 .flat_map(|stage| &stage.cpu_samples)
667 .map(|sample| sample.overall_usage / sample.core_usage.len() as f64)
668 .collect::<Vec<_>>();
669
670 if avg_cpu_per_core.is_empty() {
671 0.5 } else {
673 let actual_efficiency =
674 avg_cpu_per_core.iter().sum::<f64>() / avg_cpu_per_core.len() as f64;
675 (actual_efficiency / 100.0).min(1.0)
676 }
677 }
678
679 fn count_transformations(session: &ProfileSession) -> usize {
681 session
682 .stages
683 .values()
684 .filter(|stage| {
685 stage.component_type.contains("transformer")
686 || stage.component_type.contains("preprocessor")
687 })
688 .count()
689 }
690
691 fn detect_bottlenecks(&self, session: &ProfileSession) -> Vec<Bottleneck> {
693 let mut bottlenecks = Vec::new();
694
695 for (stage_name, stage) in &session.stages {
697 if let Some(max_memory) = stage
698 .memory_samples
699 .iter()
700 .map(|s| s.heap_usage_mb)
701 .fold(None, |acc, x| Some(acc.map_or(x, |acc: f64| acc.max(x))))
702 {
703 if max_memory > 1000.0 {
704 bottlenecks.push(Bottleneck {
706 bottleneck_type: BottleneckType::MemoryConstraint,
707 affected_stage: stage_name.clone(),
708 severity: if max_memory > 4000.0 {
709 BottleneckSeverity::Critical
710 } else {
711 BottleneckSeverity::High
712 },
713 impact_factor: (max_memory / 1000.0).min(5.0),
714 description: format!(
715 "High memory usage: {max_memory:.1}MB in stage '{stage_name}'"
716 ),
717 metrics: BottleneckMetrics {
718 time_spent_waiting_ms: 0.0,
719 resource_utilization: max_memory / 8192.0, efficiency_score: 1.0 - (max_memory / 8192.0),
721 improvement_potential: 0.3,
722 },
723 });
724 }
725 }
726
727 if stage.execution_time.as_secs_f64() > 10.0 {
729 let severity = if stage.execution_time.as_secs_f64() > 60.0 {
730 BottleneckSeverity::Critical
731 } else if stage.execution_time.as_secs_f64() > 30.0 {
732 BottleneckSeverity::High
733 } else {
734 BottleneckSeverity::Medium
735 };
736
737 bottlenecks.push(Bottleneck {
738 bottleneck_type: BottleneckType::ComputationalBottleneck,
739 affected_stage: stage_name.clone(),
740 severity,
741 impact_factor: stage.execution_time.as_secs_f64() / 10.0,
742 description: format!(
743 "Slow execution: {:.1}s in stage '{}'",
744 stage.execution_time.as_secs_f64(),
745 stage_name
746 ),
747 metrics: BottleneckMetrics {
748 time_spent_waiting_ms: stage.execution_time.as_millis() as f64,
749 resource_utilization: 0.8,
750 efficiency_score: 1.0 / stage.execution_time.as_secs_f64().max(1.0),
751 improvement_potential: 0.5,
752 },
753 });
754 }
755 }
756
757 bottlenecks
758 }
759
760 fn generate_optimization_hints(&self, session: &ProfileSession) -> Vec<OptimizationHint> {
762 let mut hints = Vec::new();
763
764 if session.overall_metrics.peak_memory_usage_mb > 2000.0 {
766 hints.push(OptimizationHint {
767 category: OptimizationCategory::MemoryOptimization,
768 priority: OptimizationPriority::High,
769 title: "High Memory Usage Detected".to_string(),
770 description: format!(
771 "Pipeline uses {:.1}MB peak memory. Consider chunked processing or streaming.",
772 session.overall_metrics.peak_memory_usage_mb
773 ),
774 expected_improvement: 0.4,
775 implementation_difficulty: ImplementationDifficulty::Moderate,
776 code_examples: vec![
777 "Use streaming: pipeline.enable_streaming(chunk_size=1000)".to_string(),
778 "Enable memory optimization: config.memory_efficient = true".to_string(),
779 ],
780 });
781 }
782
783 if session.overall_metrics.parallel_efficiency < 0.5 {
785 hints.push(OptimizationHint {
786 category: OptimizationCategory::ParallelProcessing,
787 priority: OptimizationPriority::Medium,
788 title: "Low Parallel Efficiency".to_string(),
789 description: format!(
790 "Parallel efficiency is {:.1}%. Consider enabling more parallelization.",
791 session.overall_metrics.parallel_efficiency * 100.0
792 ),
793 expected_improvement: 0.6,
794 implementation_difficulty: ImplementationDifficulty::Easy,
795 code_examples: vec![
796 "Set parallel jobs: pipeline.set_n_jobs(-1)".to_string(),
797 "Enable SIMD: config.enable_simd = true".to_string(),
798 ],
799 });
800 }
801
802 for (stage_name, stage) in &session.stages {
804 if stage.execution_time.as_secs_f64() > 30.0
805 && stage.component_type.contains("estimator")
806 {
807 hints.push(OptimizationHint {
808 category: OptimizationCategory::AlgorithmSelection,
809 priority: OptimizationPriority::High,
810 title: format!("Slow Algorithm in {stage_name}"),
811 description: format!(
812 "Stage '{}' takes {:.1}s. Consider faster algorithms or approximations.",
813 stage_name,
814 stage.execution_time.as_secs_f64()
815 ),
816 expected_improvement: 0.7,
817 implementation_difficulty: ImplementationDifficulty::Moderate,
818 code_examples: vec![
819 "Use approximate algorithms where applicable".to_string(),
820 "Consider ensemble methods for better speed/accuracy trade-off".to_string(),
821 ],
822 });
823 }
824 }
825
826 hints
827 }
828
829 #[must_use]
831 pub fn get_completed_sessions(&self) -> Vec<ProfileSession> {
832 let completed = self
833 .completed_sessions
834 .lock()
835 .unwrap_or_else(|e| e.into_inner());
836 completed.clone()
837 }
838
839 #[must_use]
841 pub fn generate_report(&self, session_id: Option<&str>) -> PerformanceReport {
842 let sessions = if let Some(id) = session_id {
843 let completed = self
844 .completed_sessions
845 .lock()
846 .unwrap_or_else(|e| e.into_inner());
847 completed
848 .iter()
849 .filter(|s| s.session_id == id)
850 .cloned()
851 .collect()
852 } else {
853 self.get_completed_sessions()
854 };
855
856 PerformanceReport::from_sessions(sessions)
857 }
858}
859
860impl Default for OverallMetrics {
861 fn default() -> Self {
862 Self {
863 total_execution_time: Duration::from_secs(0),
864 peak_memory_usage_mb: 0.0,
865 average_cpu_usage: 0.0,
866 average_gpu_usage: 0.0,
867 total_data_processed_mb: 0.0,
868 throughput_samples_per_second: 0.0,
869 cache_hit_ratio: 0.0,
870 parallel_efficiency: 0.0,
871 pipeline_stages: 0,
872 data_transformations: 0,
873 }
874 }
875}
876
877#[derive(Debug, Clone, Serialize, Deserialize)]
879pub struct PerformanceReport {
880 pub report_id: String,
881 pub generated_at: DateTime<Utc>,
882 pub sessions_analyzed: usize,
883 pub summary_metrics: SummaryMetrics,
884 pub bottleneck_analysis: BottleneckAnalysis,
885 pub optimization_recommendations: Vec<OptimizationHint>,
886 pub trend_analysis: TrendAnalysis,
887 pub comparative_analysis: ComparativeAnalysis,
888}
889
890#[derive(Debug, Clone, Serialize, Deserialize)]
891pub struct SummaryMetrics {
892 pub average_execution_time: Duration,
893 pub fastest_execution_time: Duration,
894 pub slowest_execution_time: Duration,
895 pub average_memory_usage: f64,
896 pub peak_memory_across_sessions: f64,
897 pub average_throughput: f64,
898 pub best_parallel_efficiency: f64,
899}
900
901#[derive(Debug, Clone, Serialize, Deserialize)]
902pub struct BottleneckAnalysis {
903 pub most_common_bottleneck: BottleneckType,
904 pub bottleneck_frequency: HashMap<String, u32>,
905 pub severity_distribution: HashMap<BottleneckSeverity, u32>,
906 pub impact_analysis: HashMap<String, f64>,
907}
908
909#[derive(Debug, Clone, Serialize, Deserialize)]
910pub struct TrendAnalysis {
911 pub performance_trend: TrendDirection,
912 pub memory_usage_trend: TrendDirection,
913 pub throughput_trend: TrendDirection,
914 pub session_performance_scores: Vec<f64>,
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize)]
918pub enum TrendDirection {
919 Improving,
921 Stable,
923 Degrading,
925 InsufficientData,
927}
928
929#[derive(Debug, Clone, Serialize, Deserialize)]
930pub struct ComparativeAnalysis {
931 pub best_performing_session: String,
932 pub worst_performing_session: String,
933 pub performance_variance: f64,
934 pub consistency_score: f64,
935}
936
937impl PerformanceReport {
938 #[must_use]
939 pub fn from_sessions(sessions: Vec<ProfileSession>) -> Self {
940 let report_id = format!("report_{}", uuid::Uuid::new_v4());
941
942 let summary_metrics = Self::calculate_summary_metrics(&sessions);
943 let bottleneck_analysis = Self::analyze_bottlenecks(&sessions);
944 let trend_analysis = Self::analyze_trends(&sessions);
945 let comparative_analysis = Self::comparative_analysis(&sessions);
946
947 let mut all_hints: Vec<OptimizationHint> = sessions
949 .iter()
950 .flat_map(|s| s.optimization_hints.iter())
951 .cloned()
952 .collect();
953
954 all_hints.sort_by(|a, b| b.priority.cmp(&a.priority));
956 all_hints.truncate(10); Self {
959 report_id,
960 generated_at: Utc::now(),
961 sessions_analyzed: sessions.len(),
962 summary_metrics,
963 bottleneck_analysis,
964 optimization_recommendations: all_hints,
965 trend_analysis,
966 comparative_analysis,
967 }
968 }
969
970 fn calculate_summary_metrics(sessions: &[ProfileSession]) -> SummaryMetrics {
971 if sessions.is_empty() {
972 return SummaryMetrics {
973 average_execution_time: Duration::from_secs(0),
974 fastest_execution_time: Duration::from_secs(0),
975 slowest_execution_time: Duration::from_secs(0),
976 average_memory_usage: 0.0,
977 peak_memory_across_sessions: 0.0,
978 average_throughput: 0.0,
979 best_parallel_efficiency: 0.0,
980 };
981 }
982
983 let execution_times: Vec<Duration> = sessions
984 .iter()
985 .map(|s| s.overall_metrics.total_execution_time)
986 .collect();
987
988 let average_execution = Duration::from_secs_f64(
989 execution_times
990 .iter()
991 .map(std::time::Duration::as_secs_f64)
992 .sum::<f64>()
993 / sessions.len() as f64,
994 );
995
996 SummaryMetrics {
998 average_execution_time: average_execution,
999 fastest_execution_time: execution_times.iter().min().copied().unwrap_or_default(),
1000 slowest_execution_time: execution_times.iter().max().copied().unwrap_or_default(),
1001 average_memory_usage: sessions
1002 .iter()
1003 .map(|s| s.overall_metrics.peak_memory_usage_mb)
1004 .sum::<f64>()
1005 / sessions.len() as f64,
1006 peak_memory_across_sessions: sessions
1007 .iter()
1008 .map(|s| s.overall_metrics.peak_memory_usage_mb)
1009 .fold(0.0, f64::max),
1010 average_throughput: sessions
1011 .iter()
1012 .map(|s| s.overall_metrics.throughput_samples_per_second)
1013 .sum::<f64>()
1014 / sessions.len() as f64,
1015 best_parallel_efficiency: sessions
1016 .iter()
1017 .map(|s| s.overall_metrics.parallel_efficiency)
1018 .fold(0.0, f64::max),
1019 }
1020 }
1021
1022 fn analyze_bottlenecks(sessions: &[ProfileSession]) -> BottleneckAnalysis {
1023 let all_bottlenecks: Vec<&Bottleneck> =
1024 sessions.iter().flat_map(|s| &s.bottlenecks).collect();
1025
1026 let mut bottleneck_frequency = HashMap::new();
1027 let mut severity_distribution = HashMap::new();
1028 let mut impact_analysis = HashMap::new();
1029
1030 for bottleneck in &all_bottlenecks {
1031 *bottleneck_frequency
1032 .entry(bottleneck.affected_stage.clone())
1033 .or_insert(0) += 1;
1034 *severity_distribution
1035 .entry(bottleneck.severity.clone())
1036 .or_insert(0) += 1;
1037 *impact_analysis
1038 .entry(bottleneck.affected_stage.clone())
1039 .or_insert(0.0) += bottleneck.impact_factor;
1040 }
1041
1042 let most_common_bottleneck = all_bottlenecks
1043 .iter()
1044 .fold(HashMap::new(), |mut acc, b| {
1045 *acc.entry(format!("{:?}", b.bottleneck_type)).or_insert(0) += 1;
1046 acc
1047 })
1048 .into_iter()
1049 .max_by_key(|(_, count)| *count)
1050 .map_or(
1051 BottleneckType::ComputationalBottleneck,
1052 |(bottleneck_type, _)| match bottleneck_type.as_str() {
1053 "MemoryConstraint" => BottleneckType::MemoryConstraint,
1054 "ComputationalBottleneck" => BottleneckType::ComputationalBottleneck,
1055 _ => BottleneckType::ComputationalBottleneck,
1056 },
1057 );
1058
1059 BottleneckAnalysis {
1061 most_common_bottleneck,
1062 bottleneck_frequency,
1063 severity_distribution,
1064 impact_analysis,
1065 }
1066 }
1067
1068 fn analyze_trends(sessions: &[ProfileSession]) -> TrendAnalysis {
1069 if sessions.len() < 3 {
1070 return TrendAnalysis {
1071 performance_trend: TrendDirection::InsufficientData,
1072 memory_usage_trend: TrendDirection::InsufficientData,
1073 throughput_trend: TrendDirection::InsufficientData,
1074 session_performance_scores: Vec::new(),
1075 };
1076 }
1077
1078 let performance_scores: Vec<f64> = sessions
1079 .iter()
1080 .map(|s| {
1081 1000.0
1082 / s.overall_metrics
1083 .total_execution_time
1084 .as_secs_f64()
1085 .max(1.0)
1086 })
1087 .collect();
1088
1089 let performance_trend = Self::calculate_trend_direction(&performance_scores);
1090
1091 let memory_scores: Vec<f64> = sessions
1092 .iter()
1093 .map(|s| s.overall_metrics.peak_memory_usage_mb)
1094 .collect();
1095 let memory_usage_trend = Self::calculate_trend_direction(&memory_scores);
1096
1097 let throughput_scores: Vec<f64> = sessions
1098 .iter()
1099 .map(|s| s.overall_metrics.throughput_samples_per_second)
1100 .collect();
1101 let throughput_trend = Self::calculate_trend_direction(&throughput_scores);
1102
1103 TrendAnalysis {
1105 performance_trend,
1106 memory_usage_trend,
1107 throughput_trend,
1108 session_performance_scores: performance_scores,
1109 }
1110 }
1111
1112 fn calculate_trend_direction(values: &[f64]) -> TrendDirection {
1113 if values.len() < 3 {
1114 return TrendDirection::InsufficientData;
1115 }
1116
1117 let mid_point = values.len() / 2;
1118 let first_half_avg = values[..mid_point].iter().sum::<f64>() / mid_point as f64;
1119 let second_half_avg =
1120 values[mid_point..].iter().sum::<f64>() / (values.len() - mid_point) as f64;
1121
1122 let change_percentage =
1123 (second_half_avg - first_half_avg) / first_half_avg.abs().max(1e-10);
1124
1125 if change_percentage > 0.05 {
1126 TrendDirection::Improving
1127 } else if change_percentage < -0.05 {
1128 TrendDirection::Degrading
1129 } else {
1130 TrendDirection::Stable
1131 }
1132 }
1133
1134 fn comparative_analysis(sessions: &[ProfileSession]) -> ComparativeAnalysis {
1135 if sessions.is_empty() {
1136 return ComparativeAnalysis {
1137 best_performing_session: "none".to_string(),
1138 worst_performing_session: "none".to_string(),
1139 performance_variance: 0.0,
1140 consistency_score: 0.0,
1141 };
1142 }
1143
1144 let (best_session, worst_session) = sessions.iter().fold(
1145 (sessions[0].clone(), sessions[0].clone()),
1146 |(best, worst), session| {
1147 let best_next = if session.overall_metrics.total_execution_time
1148 < best.overall_metrics.total_execution_time
1149 {
1150 session.clone()
1151 } else {
1152 best
1153 };
1154
1155 let worst_next = if session.overall_metrics.total_execution_time
1156 > worst.overall_metrics.total_execution_time
1157 {
1158 session.clone()
1159 } else {
1160 worst
1161 };
1162
1163 (best_next, worst_next)
1164 },
1165 );
1166
1167 let execution_times: Vec<f64> = sessions
1168 .iter()
1169 .map(|s| s.overall_metrics.total_execution_time.as_secs_f64())
1170 .collect();
1171
1172 let mean_time = execution_times.iter().sum::<f64>() / sessions.len() as f64;
1173 let variance = execution_times
1174 .iter()
1175 .map(|t| (t - mean_time).powi(2))
1176 .sum::<f64>()
1177 / sessions.len() as f64;
1178
1179 let consistency_score = 1.0 / (1.0 + variance.sqrt() / mean_time);
1180
1181 ComparativeAnalysis {
1183 best_performing_session: best_session.session_id,
1184 worst_performing_session: worst_session.session_id,
1185 performance_variance: variance,
1186 consistency_score,
1187 }
1188 }
1189}
1190
1191#[allow(non_snake_case)]
1192#[cfg(test)]
1193mod tests {
1194 use super::*;
1195
1196 #[test]
1197 fn test_profiler_creation() {
1198 let profiler = PerformanceProfiler::default();
1199 assert_eq!(profiler.config.enable_timing, true);
1200 assert_eq!(profiler.config.enable_memory_tracking, true);
1201 }
1202
1203 #[test]
1204 fn test_session_lifecycle() {
1205 let profiler = PerformanceProfiler::default();
1206 let session_id = profiler.start_session("test_pipeline");
1207
1208 profiler
1210 .start_stage(&session_id, "preprocessing", "transformer")
1211 .unwrap_or_default();
1212 thread::sleep(Duration::from_millis(10));
1213 let stage_duration = profiler
1214 .end_stage(&session_id, "preprocessing")
1215 .unwrap_or_default();
1216
1217 assert!(stage_duration > Duration::from_millis(5));
1218
1219 let completed_session = profiler
1221 .end_session(&session_id)
1222 .expect("operation should succeed");
1223 assert_eq!(completed_session.pipeline_name, "test_pipeline");
1224 assert_eq!(completed_session.stages.len(), 1);
1225 }
1226
1227 #[test]
1228 fn test_bottleneck_detection() {
1229 let profiler = PerformanceProfiler::default();
1230 let session_id = profiler.start_session("test_pipeline");
1231
1232 profiler
1234 .start_stage(&session_id, "slow_stage", "estimator")
1235 .unwrap_or_default();
1236 thread::sleep(Duration::from_millis(50)); profiler
1238 .end_stage(&session_id, "slow_stage")
1239 .unwrap_or_default();
1240
1241 let completed_session = profiler
1242 .end_session(&session_id)
1243 .expect("operation should succeed");
1244
1245 assert_eq!(completed_session.stages.len(), 1);
1247 }
1248
1249 #[test]
1250 fn test_performance_report_generation() {
1251 let profiler = PerformanceProfiler::default();
1252
1253 for i in 0..3 {
1255 let session_id = profiler.start_session(&format!("pipeline_{}", i));
1256 profiler
1257 .start_stage(&session_id, "stage", "transformer")
1258 .unwrap_or_default();
1259 thread::sleep(Duration::from_millis(10));
1260 profiler.end_stage(&session_id, "stage").unwrap_or_default();
1261 profiler
1262 .end_session(&session_id)
1263 .expect("operation should succeed");
1264 }
1265
1266 let report = profiler.generate_report(None);
1267 assert_eq!(report.sessions_analyzed, 3);
1268 assert!(report.summary_metrics.average_execution_time > Duration::from_secs(0));
1269 }
1270}