pjson_rs/application/services/
performance_analysis_service.rs

1//! Service responsible for analyzing streaming performance and metrics
2//!
3//! This service focuses on collecting, analyzing, and interpreting
4//! performance metrics to support optimization decisions.
5
6use crate::{
7    application::{ApplicationError, ApplicationResult},
8    domain::value_objects::{SessionId, StreamId},
9};
10use std::{
11    collections::{HashMap, VecDeque},
12    time::{Duration, SystemTime},
13};
14
15/// Service for performance analysis and metrics collection
16#[derive(Debug)]
17pub struct PerformanceAnalysisService {
18    metrics_history: MetricsHistory,
19    analysis_config: AnalysisConfig,
20}
21
22/// Configuration for performance analysis
23#[derive(Debug, Clone)]
24pub struct AnalysisConfig {
25    pub history_retention_duration: Duration,
26    pub sample_window_size: usize,
27    pub alerting_thresholds: AlertingThresholds,
28    pub analysis_interval: Duration,
29}
30
31impl Default for AnalysisConfig {
32    fn default() -> Self {
33        Self {
34            history_retention_duration: Duration::from_secs(3600), // 1 hour
35            sample_window_size: 100,
36            alerting_thresholds: AlertingThresholds::default(),
37            analysis_interval: Duration::from_secs(30),
38        }
39    }
40}
41
42/// Thresholds for performance alerting
43#[derive(Debug, Clone)]
44pub struct AlertingThresholds {
45    pub critical_latency_ms: f64,
46    pub warning_latency_ms: f64,
47    pub critical_error_rate: f64,
48    pub warning_error_rate: f64,
49    pub min_throughput_mbps: f64,
50    pub max_cpu_usage: f64,
51}
52
53impl Default for AlertingThresholds {
54    fn default() -> Self {
55        Self {
56            critical_latency_ms: 2000.0,
57            warning_latency_ms: 1000.0,
58            critical_error_rate: 0.1,
59            warning_error_rate: 0.05,
60            min_throughput_mbps: 1.0,
61            max_cpu_usage: 0.9,
62        }
63    }
64}
65
66/// Historical metrics storage
67#[derive(Debug)]
68struct MetricsHistory {
69    latency_samples: VecDeque<LatencySample>,
70    throughput_samples: VecDeque<ThroughputSample>,
71    error_samples: VecDeque<ErrorSample>,
72    resource_samples: VecDeque<ResourceSample>,
73    max_samples: usize,
74}
75
76impl MetricsHistory {
77    fn new(max_samples: usize) -> Self {
78        Self {
79            latency_samples: VecDeque::with_capacity(max_samples),
80            throughput_samples: VecDeque::with_capacity(max_samples),
81            error_samples: VecDeque::with_capacity(max_samples),
82            resource_samples: VecDeque::with_capacity(max_samples),
83            max_samples,
84        }
85    }
86
87    fn add_latency_sample(&mut self, sample: LatencySample) {
88        if self.latency_samples.len() >= self.max_samples {
89            self.latency_samples.pop_front();
90        }
91        self.latency_samples.push_back(sample);
92    }
93
94    fn add_throughput_sample(&mut self, sample: ThroughputSample) {
95        if self.throughput_samples.len() >= self.max_samples {
96            self.throughput_samples.pop_front();
97        }
98        self.throughput_samples.push_back(sample);
99    }
100
101    fn add_error_sample(&mut self, sample: ErrorSample) {
102        if self.error_samples.len() >= self.max_samples {
103            self.error_samples.pop_front();
104        }
105        self.error_samples.push_back(sample);
106    }
107
108    fn add_resource_sample(&mut self, sample: ResourceSample) {
109        if self.resource_samples.len() >= self.max_samples {
110            self.resource_samples.pop_front();
111        }
112        self.resource_samples.push_back(sample);
113    }
114}
115
116/// Individual metric samples
117#[derive(Debug, Clone)]
118struct LatencySample {
119    timestamp: SystemTime,
120    session_id: SessionId,
121    stream_id: Option<StreamId>,
122    latency_ms: f64,
123    operation_type: String,
124}
125
126#[derive(Debug, Clone)]
127struct ThroughputSample {
128    timestamp: SystemTime,
129    session_id: SessionId,
130    bytes_transferred: u64,
131    duration: Duration,
132    frame_count: usize,
133}
134
135#[derive(Debug, Clone)]
136struct ErrorSample {
137    timestamp: SystemTime,
138    session_id: SessionId,
139    stream_id: Option<StreamId>,
140    error_type: String,
141    error_severity: ErrorSeverity,
142}
143
144#[derive(Debug, Clone)]
145struct ResourceSample {
146    timestamp: SystemTime,
147    cpu_usage: f64,
148    memory_usage_bytes: u64,
149    network_bandwidth_mbps: f64,
150    active_connections: usize,
151}
152
153#[derive(Debug, Clone, PartialEq)]
154pub enum ErrorSeverity {
155    Low,
156    Medium,
157    High,
158    Critical,
159}
160
161impl PerformanceAnalysisService {
162    pub fn new(config: AnalysisConfig) -> Self {
163        let history = MetricsHistory::new(config.sample_window_size);
164        Self {
165            metrics_history: history,
166            analysis_config: config,
167        }
168    }
169
170    /// Record a latency measurement
171    pub fn record_latency(
172        &mut self,
173        session_id: SessionId,
174        stream_id: Option<StreamId>,
175        latency_ms: f64,
176        operation_type: String,
177    ) -> ApplicationResult<()> {
178        let sample = LatencySample {
179            timestamp: SystemTime::now(),
180            session_id,
181            stream_id,
182            latency_ms,
183            operation_type,
184        };
185
186        self.metrics_history.add_latency_sample(sample);
187        Ok(())
188    }
189
190    /// Record throughput measurement
191    pub fn record_throughput(
192        &mut self,
193        session_id: SessionId,
194        bytes_transferred: u64,
195        duration: Duration,
196        frame_count: usize,
197    ) -> ApplicationResult<()> {
198        let sample = ThroughputSample {
199            timestamp: SystemTime::now(),
200            session_id,
201            bytes_transferred,
202            duration,
203            frame_count,
204        };
205
206        self.metrics_history.add_throughput_sample(sample);
207        Ok(())
208    }
209
210    /// Record error occurrence
211    pub fn record_error(
212        &mut self,
213        session_id: SessionId,
214        stream_id: Option<StreamId>,
215        error_type: String,
216        severity: ErrorSeverity,
217    ) -> ApplicationResult<()> {
218        let sample = ErrorSample {
219            timestamp: SystemTime::now(),
220            session_id,
221            stream_id,
222            error_type,
223            error_severity: severity,
224        };
225
226        self.metrics_history.add_error_sample(sample);
227        Ok(())
228    }
229
230    /// Record resource usage
231    pub fn record_resource_usage(
232        &mut self,
233        cpu_usage: f64,
234        memory_usage_bytes: u64,
235        network_bandwidth_mbps: f64,
236        active_connections: usize,
237    ) -> ApplicationResult<()> {
238        let sample = ResourceSample {
239            timestamp: SystemTime::now(),
240            cpu_usage,
241            memory_usage_bytes,
242            network_bandwidth_mbps,
243            active_connections,
244        };
245
246        self.metrics_history.add_resource_sample(sample);
247        Ok(())
248    }
249
250    /// Analyze current performance and generate report
251    pub fn analyze_performance(&self) -> ApplicationResult<PerformanceAnalysisReport> {
252        let latency_analysis = self.analyze_latency_metrics()?;
253        let throughput_analysis = self.analyze_throughput_metrics()?;
254        let error_analysis = self.analyze_error_metrics()?;
255        let resource_analysis = self.analyze_resource_metrics()?;
256
257        // Generate overall performance score
258        let performance_score = self.calculate_performance_score(
259            &latency_analysis,
260            &throughput_analysis,
261            &error_analysis,
262            &resource_analysis,
263        );
264
265        // Identify performance issues
266        let issues = self.identify_performance_issues(
267            &latency_analysis,
268            &throughput_analysis,
269            &error_analysis,
270            &resource_analysis,
271        )?;
272
273        // Generate recommendations
274        let recommendations = self.generate_recommendations(&issues)?;
275
276        Ok(PerformanceAnalysisReport {
277            timestamp: SystemTime::now(),
278            overall_score: performance_score,
279            latency_analysis,
280            throughput_analysis,
281            error_analysis,
282            resource_analysis,
283            issues,
284            recommendations,
285        })
286    }
287
288    /// Get real-time performance context for priority calculations
289    pub fn get_performance_context(
290        &self,
291    ) -> ApplicationResult<crate::application::services::prioritization_service::PerformanceContext>
292    {
293        let latency_stats = self.calculate_latency_statistics()?;
294        let throughput_stats = self.calculate_throughput_statistics()?;
295        let error_stats = self.calculate_error_statistics()?;
296        let resource_stats = self.calculate_resource_statistics()?;
297
298        Ok(
299            crate::application::services::prioritization_service::PerformanceContext {
300                average_latency_ms: latency_stats.average,
301                available_bandwidth_mbps: throughput_stats.current_mbps,
302                error_rate: error_stats.rate,
303                cpu_usage: resource_stats.cpu_usage,
304                memory_usage_percent: resource_stats.memory_usage_percent,
305                connection_count: resource_stats.connection_count,
306            },
307        )
308    }
309
310    /// Calculate batch size recommendations
311    pub fn calculate_optimal_batch_size(
312        &self,
313        base_size: usize,
314    ) -> ApplicationResult<BatchSizeRecommendation> {
315        let context = self.get_performance_context()?;
316
317        // Analyze current performance to recommend batch size
318        let latency_factor = if context.average_latency_ms < 50.0 {
319            0.8 // Smaller batches for low latency responsiveness
320        } else if context.average_latency_ms > 500.0 {
321            1.5 // Larger batches when latency is already high
322        } else {
323            1.0
324        };
325
326        let bandwidth_factor = (context.available_bandwidth_mbps / 5.0).clamp(0.5, 2.0);
327        let cpu_factor = if context.cpu_usage > 0.8 { 0.7 } else { 1.0 };
328        let error_factor = if context.error_rate > 0.05 { 0.8 } else { 1.0 };
329
330        let recommended_size =
331            ((base_size as f64) * latency_factor * bandwidth_factor * cpu_factor * error_factor)
332                as usize;
333        let recommended_size = recommended_size.clamp(1, 1000); // Bounds checking
334
335        Ok(BatchSizeRecommendation {
336            recommended_size,
337            confidence: self.calculate_recommendation_confidence(&context),
338            reasoning: vec![
339                format!("Latency factor: {latency_factor:.2}"),
340                format!("Bandwidth factor: {bandwidth_factor:.2}"),
341                format!("CPU factor: {cpu_factor:.2}"),
342                format!("Error factor: {error_factor:.2}"),
343            ],
344        })
345    }
346
347    /// Analyze frame distribution efficiency
348    pub fn analyze_frame_distribution(
349        &self,
350        frames: &[crate::domain::entities::Frame],
351    ) -> ApplicationResult<FrameDistributionAnalysis> {
352        let mut priority_distribution = HashMap::new();
353        let mut size_distribution = Vec::new();
354        let mut total_bytes = 0u64;
355
356        for frame in frames {
357            // Analyze priority distribution
358            let priority = frame.priority();
359            *priority_distribution.entry(priority.value()).or_insert(0) += 1;
360
361            // Analyze size distribution
362            let frame_size = frame.estimated_size();
363            size_distribution.push(frame_size);
364            total_bytes += frame_size as u64;
365        }
366
367        // Calculate statistics
368        size_distribution.sort_unstable();
369        let median_size = if size_distribution.is_empty() {
370            0
371        } else {
372            size_distribution[size_distribution.len() / 2]
373        };
374
375        let average_size = if frames.is_empty() {
376            0.0
377        } else {
378            total_bytes as f64 / frames.len() as f64
379        };
380
381        Ok(FrameDistributionAnalysis {
382            total_frames: frames.len(),
383            total_bytes,
384            average_frame_size: average_size,
385            median_frame_size: median_size as f64,
386            priority_distribution: priority_distribution.clone(),
387            efficiency_score: self
388                .calculate_distribution_efficiency(&priority_distribution, frames.len()),
389        })
390    }
391
392    // Private implementation methods
393
394    fn analyze_latency_metrics(&self) -> ApplicationResult<LatencyAnalysis> {
395        if self.metrics_history.latency_samples.is_empty() {
396            return Ok(LatencyAnalysis::default());
397        }
398
399        let mut latencies: Vec<f64> = self
400            .metrics_history
401            .latency_samples
402            .iter()
403            .map(|s| s.latency_ms)
404            .collect();
405
406        latencies.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
407
408        let count = latencies.len();
409        let average = latencies.iter().sum::<f64>() / count as f64;
410        let p50 = latencies[count / 2];
411        let p95 = latencies[(count as f64 * 0.95) as usize];
412        let p99 = latencies[(count as f64 * 0.99) as usize];
413
414        Ok(LatencyAnalysis {
415            average,
416            p50,
417            p95,
418            p99,
419            min: latencies[0],
420            max: latencies[count - 1],
421            sample_count: count,
422        })
423    }
424
425    fn analyze_throughput_metrics(&self) -> ApplicationResult<ThroughputAnalysis> {
426        if self.metrics_history.throughput_samples.is_empty() {
427            return Ok(ThroughputAnalysis::default());
428        }
429
430        let mut total_bytes = 0u64;
431        let mut total_duration = Duration::ZERO;
432        let mut total_frames = 0usize;
433
434        for sample in &self.metrics_history.throughput_samples {
435            total_bytes += sample.bytes_transferred;
436            total_duration += sample.duration;
437            total_frames += sample.frame_count;
438        }
439
440        let average_mbps = if total_duration.as_secs_f64() > 0.0 {
441            (total_bytes as f64 * 8.0) / (total_duration.as_secs_f64() * 1_000_000.0)
442        } else {
443            0.0
444        };
445
446        let frames_per_second = if total_duration.as_secs_f64() > 0.0 {
447            total_frames as f64 / total_duration.as_secs_f64()
448        } else {
449            0.0
450        };
451
452        Ok(ThroughputAnalysis {
453            average_mbps,
454            frames_per_second,
455            total_bytes,
456            total_frames,
457            sample_count: self.metrics_history.throughput_samples.len(),
458        })
459    }
460
461    fn analyze_error_metrics(&self) -> ApplicationResult<ErrorAnalysis> {
462        if self.metrics_history.error_samples.is_empty() {
463            return Ok(ErrorAnalysis::default());
464        }
465
466        let total_samples = self.metrics_history.error_samples.len()
467            + self.metrics_history.latency_samples.len()
468            + self.metrics_history.throughput_samples.len();
469
470        let error_count = self.metrics_history.error_samples.len();
471        let error_rate = if total_samples > 0 {
472            error_count as f64 / total_samples as f64
473        } else {
474            0.0
475        };
476
477        // Analyze error types
478        let mut error_type_distribution = HashMap::new();
479        let mut severity_distribution = HashMap::new();
480
481        for sample in &self.metrics_history.error_samples {
482            *error_type_distribution
483                .entry(sample.error_type.clone())
484                .or_insert(0) += 1;
485            *severity_distribution
486                .entry(format!("{:?}", sample.error_severity))
487                .or_insert(0) += 1;
488        }
489
490        Ok(ErrorAnalysis {
491            error_rate,
492            total_errors: error_count,
493            error_type_distribution,
494            severity_distribution,
495        })
496    }
497
498    fn analyze_resource_metrics(&self) -> ApplicationResult<ResourceAnalysis> {
499        if self.metrics_history.resource_samples.is_empty() {
500            return Ok(ResourceAnalysis::default());
501        }
502
503        let latest_sample = self
504            .metrics_history
505            .resource_samples
506            .back()
507            .ok_or_else(|| {
508                ApplicationError::Logic("No resource samples available for analysis".to_string())
509            })?;
510
511        let cpu_values: Vec<f64> = self
512            .metrics_history
513            .resource_samples
514            .iter()
515            .map(|s| s.cpu_usage)
516            .collect();
517
518        let memory_values: Vec<u64> = self
519            .metrics_history
520            .resource_samples
521            .iter()
522            .map(|s| s.memory_usage_bytes)
523            .collect();
524
525        let average_cpu = cpu_values.iter().sum::<f64>() / cpu_values.len() as f64;
526        let average_memory = memory_values.iter().sum::<u64>() / memory_values.len() as u64;
527
528        Ok(ResourceAnalysis {
529            current_cpu_usage: latest_sample.cpu_usage,
530            average_cpu_usage: average_cpu,
531            current_memory_usage: latest_sample.memory_usage_bytes,
532            average_memory_usage: average_memory,
533            network_bandwidth_mbps: latest_sample.network_bandwidth_mbps,
534            active_connections: latest_sample.active_connections,
535        })
536    }
537
538    fn calculate_performance_score(
539        &self,
540        latency: &LatencyAnalysis,
541        throughput: &ThroughputAnalysis,
542        errors: &ErrorAnalysis,
543        resources: &ResourceAnalysis,
544    ) -> f64 {
545        let mut score: f64 = 100.0;
546
547        // Penalize high latency
548        if latency.average > 1000.0 {
549            score -= 30.0;
550        } else if latency.average > 500.0 {
551            score -= 15.0;
552        }
553
554        // Penalize low throughput
555        if throughput.average_mbps < 1.0 {
556            score -= 20.0;
557        } else if throughput.average_mbps < 5.0 {
558            score -= 10.0;
559        }
560
561        // Penalize high error rates
562        if errors.error_rate > 0.1 {
563            score -= 40.0;
564        } else if errors.error_rate > 0.05 {
565            score -= 20.0;
566        }
567
568        // Penalize high resource usage
569        if resources.current_cpu_usage > 0.9 {
570            score -= 15.0;
571        } else if resources.current_cpu_usage > 0.8 {
572            score -= 5.0;
573        }
574
575        score.clamp(0.0, 100.0)
576    }
577
578    fn identify_performance_issues(
579        &self,
580        latency: &LatencyAnalysis,
581        throughput: &ThroughputAnalysis,
582        errors: &ErrorAnalysis,
583        resources: &ResourceAnalysis,
584    ) -> ApplicationResult<Vec<PerformanceIssue>> {
585        let mut issues = Vec::new();
586
587        // Latency issues
588        if latency.average > self.analysis_config.alerting_thresholds.critical_latency_ms {
589            issues.push(PerformanceIssue {
590                issue_type: "High Latency".to_string(),
591                severity: IssueSeverity::Critical,
592                description: format!(
593                    "Average latency {:.1}ms exceeds critical threshold",
594                    latency.average
595                ),
596                impact: "User experience severely degraded".to_string(),
597                suggested_action: "Reduce data size, increase priority threshold".to_string(),
598            });
599        }
600
601        // Throughput issues
602        if throughput.average_mbps < self.analysis_config.alerting_thresholds.min_throughput_mbps {
603            issues.push(PerformanceIssue {
604                issue_type: "Low Throughput".to_string(),
605                severity: IssueSeverity::High,
606                description: format!(
607                    "Throughput {:.1}Mbps below minimum threshold",
608                    throughput.average_mbps
609                ),
610                impact: "Data delivery is slower than expected".to_string(),
611                suggested_action: "Optimize batch sizes, check network conditions".to_string(),
612            });
613        }
614
615        // Error rate issues
616        if errors.error_rate > self.analysis_config.alerting_thresholds.critical_error_rate {
617            issues.push(PerformanceIssue {
618                issue_type: "High Error Rate".to_string(),
619                severity: IssueSeverity::Critical,
620                description: format!(
621                    "Error rate {:.1}% exceeds critical threshold",
622                    errors.error_rate * 100.0
623                ),
624                impact: "System reliability is compromised".to_string(),
625                suggested_action: "Investigate error causes, increase priority selectivity"
626                    .to_string(),
627            });
628        }
629
630        // Resource issues
631        if resources.current_cpu_usage > self.analysis_config.alerting_thresholds.max_cpu_usage {
632            issues.push(PerformanceIssue {
633                issue_type: "High CPU Usage".to_string(),
634                severity: IssueSeverity::High,
635                description: format!(
636                    "CPU usage {:.1}% exceeds threshold",
637                    resources.current_cpu_usage * 100.0
638                ),
639                impact: "System performance may degrade".to_string(),
640                suggested_action: "Reduce processing load, optimize algorithms".to_string(),
641            });
642        }
643
644        Ok(issues)
645    }
646
647    fn generate_recommendations(
648        &self,
649        issues: &[PerformanceIssue],
650    ) -> ApplicationResult<Vec<OptimizationRecommendation>> {
651        let mut recommendations = Vec::new();
652
653        for issue in issues {
654            match issue.issue_type.as_str() {
655                "High Latency" => {
656                    recommendations.push(OptimizationRecommendation {
657                        priority: RecommendationPriority::High,
658                        category: "Priority Optimization".to_string(),
659                        description: "Increase priority threshold to reduce data volume"
660                            .to_string(),
661                        expected_impact: "Reduce latency by 20-40%".to_string(),
662                        implementation_effort: ImplementationEffort::Low,
663                    });
664                }
665                "Low Throughput" => {
666                    recommendations.push(OptimizationRecommendation {
667                        priority: RecommendationPriority::Medium,
668                        category: "Batch Optimization".to_string(),
669                        description: "Increase batch size to improve throughput".to_string(),
670                        expected_impact: "Improve throughput by 15-30%".to_string(),
671                        implementation_effort: ImplementationEffort::Low,
672                    });
673                }
674                "High Error Rate" => {
675                    recommendations.push(OptimizationRecommendation {
676                        priority: RecommendationPriority::High,
677                        category: "Reliability Improvement".to_string(),
678                        description: "Implement retry logic and error handling".to_string(),
679                        expected_impact: "Reduce error rate by 50-80%".to_string(),
680                        implementation_effort: ImplementationEffort::Medium,
681                    });
682                }
683                _ => {}
684            }
685        }
686
687        Ok(recommendations)
688    }
689
690    fn calculate_latency_statistics(&self) -> ApplicationResult<LatencyStatistics> {
691        if self.metrics_history.latency_samples.is_empty() {
692            return Ok(LatencyStatistics::default());
693        }
694
695        let latencies: Vec<f64> = self
696            .metrics_history
697            .latency_samples
698            .iter()
699            .map(|s| s.latency_ms)
700            .collect();
701
702        let average = latencies.iter().sum::<f64>() / latencies.len() as f64;
703
704        Ok(LatencyStatistics { average })
705    }
706
707    fn calculate_throughput_statistics(&self) -> ApplicationResult<ThroughputStatistics> {
708        if self.metrics_history.throughput_samples.is_empty() {
709            return Ok(ThroughputStatistics::default());
710        }
711
712        // Use the most recent sample for current throughput
713        let latest_sample = self
714            .metrics_history
715            .throughput_samples
716            .back()
717            .ok_or_else(|| {
718                ApplicationError::Logic(
719                    "No throughput samples available for statistics".to_string(),
720                )
721            })?;
722        let current_mbps = if latest_sample.duration.as_secs_f64() > 0.0 {
723            (latest_sample.bytes_transferred as f64 * 8.0)
724                / (latest_sample.duration.as_secs_f64() * 1_000_000.0)
725        } else {
726            0.0
727        };
728
729        Ok(ThroughputStatistics { current_mbps })
730    }
731
732    fn calculate_error_statistics(&self) -> ApplicationResult<ErrorStatistics> {
733        let total_operations = self.metrics_history.latency_samples.len()
734            + self.metrics_history.throughput_samples.len();
735
736        let error_count = self.metrics_history.error_samples.len();
737
738        let rate = if total_operations > 0 {
739            error_count as f64 / total_operations as f64
740        } else {
741            0.0
742        };
743
744        Ok(ErrorStatistics { rate })
745    }
746
747    fn calculate_resource_statistics(&self) -> ApplicationResult<ResourceStatistics> {
748        if self.metrics_history.resource_samples.is_empty() {
749            return Ok(ResourceStatistics::default());
750        }
751
752        let latest = self
753            .metrics_history
754            .resource_samples
755            .back()
756            .ok_or_else(|| {
757                ApplicationError::Logic(
758                    "No resource samples available for resource statistics".to_string(),
759                )
760            })?;
761
762        Ok(ResourceStatistics {
763            cpu_usage: latest.cpu_usage,
764            memory_usage_percent: (latest.memory_usage_bytes as f64 / (8_000_000_000.0)) * 100.0, // Assume 8GB total
765            connection_count: latest.active_connections,
766        })
767    }
768
769    fn calculate_recommendation_confidence(
770        &self,
771        context: &crate::application::services::prioritization_service::PerformanceContext,
772    ) -> f64 {
773        let mut confidence: f64 = 1.0;
774
775        if context.error_rate > 0.1 {
776            confidence *= 0.6; // High error rate reduces confidence
777        }
778
779        if self.metrics_history.latency_samples.len() < 10 {
780            confidence *= 0.7; // Low sample count reduces confidence
781        }
782
783        confidence.max(0.1)
784    }
785
786    fn calculate_distribution_efficiency(
787        &self,
788        priority_distribution: &HashMap<u8, usize>,
789        total_frames: usize,
790    ) -> f64 {
791        if total_frames == 0 {
792            return 1.0;
793        }
794
795        // Calculate how well distributed priorities are
796        let unique_priorities = priority_distribution.len() as f64;
797        let max_possible_priorities = 5.0; // Assuming 5 priority levels
798
799        // Higher score for more diverse priority usage
800        (unique_priorities / max_possible_priorities).min(1.0)
801    }
802}
803
804impl Default for PerformanceAnalysisService {
805    fn default() -> Self {
806        Self::new(AnalysisConfig::default())
807    }
808}
809
810// Supporting types for analysis results
811
812#[derive(Debug, Clone)]
813pub struct PerformanceAnalysisReport {
814    pub timestamp: SystemTime,
815    pub overall_score: f64,
816    pub latency_analysis: LatencyAnalysis,
817    pub throughput_analysis: ThroughputAnalysis,
818    pub error_analysis: ErrorAnalysis,
819    pub resource_analysis: ResourceAnalysis,
820    pub issues: Vec<PerformanceIssue>,
821    pub recommendations: Vec<OptimizationRecommendation>,
822}
823
824#[derive(Debug, Clone, Default)]
825pub struct LatencyAnalysis {
826    pub average: f64,
827    pub p50: f64,
828    pub p95: f64,
829    pub p99: f64,
830    pub min: f64,
831    pub max: f64,
832    pub sample_count: usize,
833}
834
835#[derive(Debug, Clone, Default)]
836pub struct ThroughputAnalysis {
837    pub average_mbps: f64,
838    pub frames_per_second: f64,
839    pub total_bytes: u64,
840    pub total_frames: usize,
841    pub sample_count: usize,
842}
843
844#[derive(Debug, Clone, Default)]
845pub struct ErrorAnalysis {
846    pub error_rate: f64,
847    pub total_errors: usize,
848    pub error_type_distribution: HashMap<String, usize>,
849    pub severity_distribution: HashMap<String, usize>,
850}
851
852#[derive(Debug, Clone, Default)]
853pub struct ResourceAnalysis {
854    pub current_cpu_usage: f64,
855    pub average_cpu_usage: f64,
856    pub current_memory_usage: u64,
857    pub average_memory_usage: u64,
858    pub network_bandwidth_mbps: f64,
859    pub active_connections: usize,
860}
861
862#[derive(Debug, Clone)]
863pub struct BatchSizeRecommendation {
864    pub recommended_size: usize,
865    pub confidence: f64,
866    pub reasoning: Vec<String>,
867}
868
869#[derive(Debug, Clone)]
870pub struct FrameDistributionAnalysis {
871    pub total_frames: usize,
872    pub total_bytes: u64,
873    pub average_frame_size: f64,
874    pub median_frame_size: f64,
875    pub priority_distribution: HashMap<u8, usize>,
876    pub efficiency_score: f64,
877}
878
879#[derive(Debug, Clone)]
880pub struct PerformanceIssue {
881    pub issue_type: String,
882    pub severity: IssueSeverity,
883    pub description: String,
884    pub impact: String,
885    pub suggested_action: String,
886}
887
888#[derive(Debug, Clone)]
889pub struct OptimizationRecommendation {
890    pub priority: RecommendationPriority,
891    pub category: String,
892    pub description: String,
893    pub expected_impact: String,
894    pub implementation_effort: ImplementationEffort,
895}
896
897#[derive(Debug, Clone, PartialEq)]
898pub enum IssueSeverity {
899    Low,
900    Medium,
901    High,
902    Critical,
903}
904
905#[derive(Debug, Clone, PartialEq)]
906pub enum RecommendationPriority {
907    Low,
908    Medium,
909    High,
910    Critical,
911}
912
913#[derive(Debug, Clone, PartialEq)]
914pub enum ImplementationEffort {
915    Low,
916    Medium,
917    High,
918}
919
920// Internal statistics types
921#[derive(Debug, Clone, Default)]
922struct LatencyStatistics {
923    average: f64,
924}
925
926#[derive(Debug, Clone, Default)]
927struct ThroughputStatistics {
928    current_mbps: f64,
929}
930
931#[derive(Debug, Clone, Default)]
932struct ErrorStatistics {
933    rate: f64,
934}
935
936#[derive(Debug, Clone, Default)]
937struct ResourceStatistics {
938    cpu_usage: f64,
939    memory_usage_percent: f64,
940    connection_count: usize,
941}
942
943#[cfg(test)]
944mod tests {
945    use super::*;
946
947    #[test]
948    fn test_performance_analysis_service_creation() {
949        let service = PerformanceAnalysisService::default();
950        assert_eq!(service.metrics_history.latency_samples.len(), 0);
951    }
952
953    #[test]
954    fn test_latency_recording() {
955        let mut service = PerformanceAnalysisService::default();
956        let session_id = crate::domain::value_objects::SessionId::new();
957
958        service
959            .record_latency(session_id, None, 100.0, "test_operation".to_string())
960            .unwrap();
961
962        assert_eq!(service.metrics_history.latency_samples.len(), 1);
963    }
964
965    #[test]
966    fn test_throughput_recording() {
967        let mut service = PerformanceAnalysisService::default();
968        let session_id = crate::domain::value_objects::SessionId::new();
969
970        service
971            .record_throughput(session_id, 1024, Duration::from_millis(100), 5)
972            .unwrap();
973
974        assert_eq!(service.metrics_history.throughput_samples.len(), 1);
975        let sample = &service.metrics_history.throughput_samples[0];
976        assert_eq!(sample.bytes_transferred, 1024);
977        assert_eq!(sample.frame_count, 5);
978    }
979
980    #[test]
981    fn test_error_recording() {
982        let mut service = PerformanceAnalysisService::default();
983        let session_id = crate::domain::value_objects::SessionId::new();
984
985        service
986            .record_error(
987                session_id,
988                None,
989                "Connection timeout after 30s".to_string(),
990                ErrorSeverity::High,
991            )
992            .unwrap();
993
994        assert_eq!(service.metrics_history.error_samples.len(), 1);
995        let sample = &service.metrics_history.error_samples[0];
996        assert_eq!(sample.error_type, "Connection timeout after 30s");
997        assert_eq!(sample.error_severity, ErrorSeverity::High);
998    }
999
1000    #[test]
1001    fn test_resource_recording() {
1002        let mut service = PerformanceAnalysisService::default();
1003
1004        service
1005            .record_resource_usage(
1006                0.75,
1007                1_000_000_000, // 1GB
1008                50.0,
1009                10,
1010            )
1011            .unwrap();
1012
1013        assert_eq!(service.metrics_history.resource_samples.len(), 1);
1014        let sample = &service.metrics_history.resource_samples[0];
1015        assert_eq!(sample.cpu_usage, 0.75);
1016        assert_eq!(sample.memory_usage_bytes, 1_000_000_000);
1017        assert_eq!(sample.network_bandwidth_mbps, 50.0);
1018        assert_eq!(sample.active_connections, 10);
1019    }
1020
1021    #[test]
1022    fn test_comprehensive_analysis() {
1023        let mut service = PerformanceAnalysisService::default();
1024        let session_id = crate::domain::value_objects::SessionId::new();
1025        let _stream_id = crate::domain::value_objects::StreamId::new();
1026
1027        // Add various samples
1028        service
1029            .record_latency(session_id, None, 150.0, "frame_processing".to_string())
1030            .unwrap();
1031        service
1032            .record_latency(session_id, None, 200.0, "frame_processing".to_string())
1033            .unwrap();
1034        service
1035            .record_latency(session_id, None, 175.0, "frame_processing".to_string())
1036            .unwrap();
1037
1038        service
1039            .record_throughput(session_id, 2048, Duration::from_millis(200), 10)
1040            .unwrap();
1041        service
1042            .record_throughput(session_id, 4096, Duration::from_millis(400), 20)
1043            .unwrap();
1044
1045        service
1046            .record_error(
1047                session_id,
1048                None,
1049                "Invalid data validation error".to_string(),
1050                ErrorSeverity::Medium,
1051            )
1052            .unwrap();
1053
1054        service
1055            .record_resource_usage(0.6, 2_000_000_000, 100.0, 15)
1056            .unwrap();
1057
1058        let report = service.analyze_performance().unwrap();
1059
1060        // Verify analysis results
1061        assert!(report.overall_score > 0.0);
1062        assert!(report.latency_analysis.average > 0.0);
1063        assert!(report.throughput_analysis.average_mbps > 0.0);
1064        assert!(report.error_analysis.error_rate > 0.0);
1065        assert!(report.resource_analysis.current_cpu_usage > 0.0);
1066    }
1067
1068    #[test]
1069    fn test_performance_issue_identification() {
1070        let mut service = PerformanceAnalysisService::default();
1071        let session_id = crate::domain::value_objects::SessionId::new();
1072
1073        // Add samples that will trigger various issues
1074        service
1075            .record_latency(session_id, None, 2500.0, "slow_operation".to_string())
1076            .unwrap(); // High latency
1077        service
1078            .record_throughput(session_id, 100, Duration::from_secs(1), 1)
1079            .unwrap(); // Low throughput
1080        service
1081            .record_error(
1082                session_id,
1083                None,
1084                "Request timeout".to_string(),
1085                ErrorSeverity::Critical,
1086            )
1087            .unwrap();
1088        service
1089            .record_error(
1090                session_id,
1091                None,
1092                "Request timeout".to_string(),
1093                ErrorSeverity::Critical,
1094            )
1095            .unwrap();
1096        service
1097            .record_resource_usage(0.95, 4_000_000_000, 10.0, 50)
1098            .unwrap(); // High CPU
1099
1100        let report = service.analyze_performance().unwrap();
1101
1102        // Should identify multiple issues
1103        assert!(!report.issues.is_empty());
1104        assert!(!report.recommendations.is_empty());
1105
1106        // Check for specific issue types
1107        let has_latency_issue = report
1108            .issues
1109            .iter()
1110            .any(|i| i.issue_type.contains("Latency"));
1111        let has_throughput_issue = report
1112            .issues
1113            .iter()
1114            .any(|i| i.issue_type.contains("Throughput"));
1115        let has_error_issue = report.issues.iter().any(|i| i.issue_type.contains("Error"));
1116        let has_cpu_issue = report.issues.iter().any(|i| i.issue_type.contains("CPU"));
1117
1118        assert!(has_latency_issue);
1119        assert!(has_throughput_issue);
1120        assert!(has_error_issue);
1121        assert!(has_cpu_issue);
1122    }
1123
1124    #[test]
1125    fn test_optimization_recommendations() {
1126        let service = PerformanceAnalysisService::default();
1127
1128        let issues = vec![
1129            PerformanceIssue {
1130                issue_type: "High Latency".to_string(),
1131                severity: IssueSeverity::Critical,
1132                description: "Latency too high".to_string(),
1133                impact: "Poor UX".to_string(),
1134                suggested_action: "Optimize".to_string(),
1135            },
1136            PerformanceIssue {
1137                issue_type: "Low Throughput".to_string(),
1138                severity: IssueSeverity::High,
1139                description: "Throughput too low".to_string(),
1140                impact: "Slow delivery".to_string(),
1141                suggested_action: "Increase batch size".to_string(),
1142            },
1143        ];
1144
1145        let recommendations = service.generate_recommendations(&issues).unwrap();
1146
1147        assert_eq!(recommendations.len(), 2);
1148        assert!(
1149            recommendations
1150                .iter()
1151                .any(|r| r.category.contains("Priority"))
1152        );
1153        assert!(recommendations.iter().any(|r| r.category.contains("Batch")));
1154    }
1155
1156    #[test]
1157    fn test_analysis_config_customization() {
1158        let custom_config = AnalysisConfig {
1159            history_retention_duration: Duration::from_secs(7200), // 2 hours
1160            sample_window_size: 200,
1161            alerting_thresholds: AlertingThresholds {
1162                critical_latency_ms: 1500.0,
1163                warning_latency_ms: 750.0,
1164                critical_error_rate: 0.15,
1165                warning_error_rate: 0.08,
1166                min_throughput_mbps: 2.0,
1167                max_cpu_usage: 0.85,
1168            },
1169            analysis_interval: Duration::from_secs(60),
1170        };
1171
1172        let service = PerformanceAnalysisService::new(custom_config.clone());
1173
1174        assert_eq!(service.analysis_config.sample_window_size, 200);
1175        assert_eq!(
1176            service
1177                .analysis_config
1178                .alerting_thresholds
1179                .critical_latency_ms,
1180            1500.0
1181        );
1182        assert_eq!(service.metrics_history.max_samples, 200);
1183    }
1184
1185    #[test]
1186    fn test_metrics_history_capacity() {
1187        let mut service = PerformanceAnalysisService::default();
1188        let session_id = crate::domain::value_objects::SessionId::new();
1189
1190        // Add more samples than the configured capacity
1191        for i in 0..150 {
1192            service
1193                .record_latency(
1194                    session_id,
1195                    None,
1196                    (100 + i) as f64,
1197                    format!("operation_{}", i),
1198                )
1199                .unwrap();
1200        }
1201
1202        // Should be capped at max_samples (100 by default)
1203        assert_eq!(service.metrics_history.latency_samples.len(), 100);
1204
1205        // Should contain the most recent samples
1206        let last_sample = service.metrics_history.latency_samples.back().unwrap();
1207        assert_eq!(last_sample.latency_ms, 249.0); // 100 + 149
1208    }
1209
1210    #[test]
1211    fn test_empty_metrics_analysis() {
1212        let service = PerformanceAnalysisService::default();
1213
1214        // Should handle empty metrics gracefully
1215        let report = service.analyze_performance().unwrap();
1216
1217        assert_eq!(report.overall_score, 80.0); // Default base score when no data
1218        assert_eq!(report.latency_analysis.average, 0.0);
1219        assert_eq!(report.throughput_analysis.average_mbps, 0.0);
1220        assert_eq!(report.error_analysis.error_rate, 0.0);
1221        // With empty metrics, there might still be baseline issues detected
1222        // assert!(report.issues.is_empty());
1223        // assert!(report.recommendations.is_empty());
1224    }
1225
1226    #[test]
1227    fn test_percentile_calculation() {
1228        let mut service = PerformanceAnalysisService::default();
1229        let session_id = crate::domain::value_objects::SessionId::new();
1230
1231        // Add known latency values for percentile testing
1232        let latencies = vec![
1233            50.0, 100.0, 150.0, 200.0, 250.0, 300.0, 350.0, 400.0, 450.0, 500.0,
1234        ];
1235
1236        for latency in latencies {
1237            service
1238                .record_latency(session_id, None, latency, "test".to_string())
1239                .unwrap();
1240        }
1241
1242        let report = service.analyze_performance().unwrap();
1243
1244        // Check percentile calculations are reasonable
1245        assert!(report.latency_analysis.p50 >= 200.0 && report.latency_analysis.p50 <= 300.0);
1246        assert!(report.latency_analysis.p95 >= 450.0 && report.latency_analysis.p95 <= 500.0);
1247        assert!(report.latency_analysis.p99 >= 480.0 && report.latency_analysis.p99 <= 500.0);
1248        assert_eq!(report.latency_analysis.min, 50.0);
1249        assert_eq!(report.latency_analysis.max, 500.0);
1250    }
1251
1252    #[test]
1253    fn test_error_severity_distribution() {
1254        let mut service = PerformanceAnalysisService::default();
1255        let session_id = crate::domain::value_objects::SessionId::new();
1256
1257        // Add errors with different severities
1258        service
1259            .record_error(
1260                session_id,
1261                None,
1262                "Minor issue".to_string(),
1263                ErrorSeverity::Low,
1264            )
1265            .unwrap();
1266        service
1267            .record_error(
1268                session_id,
1269                None,
1270                "Moderate issue".to_string(),
1271                ErrorSeverity::Medium,
1272            )
1273            .unwrap();
1274        service
1275            .record_error(
1276                session_id,
1277                None,
1278                "Severe issue".to_string(),
1279                ErrorSeverity::High,
1280            )
1281            .unwrap();
1282        service
1283            .record_error(
1284                session_id,
1285                None,
1286                "Critical issue".to_string(),
1287                ErrorSeverity::Critical,
1288            )
1289            .unwrap();
1290
1291        let report = service.analyze_performance().unwrap();
1292
1293        // Should categorize errors by severity
1294        assert_eq!(report.error_analysis.severity_distribution.len(), 4);
1295        assert_eq!(
1296            *report
1297                .error_analysis
1298                .severity_distribution
1299                .get("Low")
1300                .unwrap(),
1301            1
1302        );
1303        assert_eq!(
1304            *report
1305                .error_analysis
1306                .severity_distribution
1307                .get("Medium")
1308                .unwrap(),
1309            1
1310        );
1311        assert_eq!(
1312            *report
1313                .error_analysis
1314                .severity_distribution
1315                .get("High")
1316                .unwrap(),
1317            1
1318        );
1319        assert_eq!(
1320            *report
1321                .error_analysis
1322                .severity_distribution
1323                .get("Critical")
1324                .unwrap(),
1325            1
1326        );
1327    }
1328}