Skip to main content

cbtop/correlation_analysis/
analyzer.rs

1//! Correlation analyzer for multi-metric performance analysis.
2
3use std::collections::VecDeque;
4
5use super::{
6    CorrelationResult, EventSample, EventType, InterferenceResult, IsolationAction,
7    IsolationRecommendation, PerformanceSample, SystemSnapshot,
8};
9
10/// Correlation analyzer
11#[derive(Debug)]
12pub struct CorrelationAnalyzer {
13    /// Performance samples
14    perf_samples: VecDeque<PerformanceSample>,
15    /// Event samples by type
16    event_samples: VecDeque<EventSample>,
17    /// System snapshots
18    snapshots: VecDeque<SystemSnapshot>,
19    /// Maximum samples to keep
20    max_samples: usize,
21    /// CV spike threshold (%)
22    spike_threshold: f64,
23    /// Correlation window (seconds)
24    window_sec: f64,
25}
26
27impl Default for CorrelationAnalyzer {
28    fn default() -> Self {
29        Self {
30            perf_samples: VecDeque::new(),
31            event_samples: VecDeque::new(),
32            snapshots: VecDeque::new(),
33            max_samples: 1000,
34            spike_threshold: 15.0,
35            window_sec: 60.0,
36        }
37    }
38}
39
40impl CorrelationAnalyzer {
41    /// Create new analyzer
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    /// Set maximum samples
47    pub fn with_max_samples(mut self, max: usize) -> Self {
48        self.max_samples = max;
49        self
50    }
51
52    /// Set CV spike threshold
53    pub fn with_spike_threshold(mut self, percent: f64) -> Self {
54        self.spike_threshold = percent;
55        self
56    }
57
58    /// Set correlation window
59    pub fn with_window(mut self, seconds: f64) -> Self {
60        self.window_sec = seconds;
61        self
62    }
63
64    /// Add performance sample
65    pub fn add_perf_sample(&mut self, sample: PerformanceSample) {
66        if self.perf_samples.len() >= self.max_samples {
67            self.perf_samples.pop_front();
68        }
69        self.perf_samples.push_back(sample);
70    }
71
72    /// Add event sample
73    pub fn add_event_sample(&mut self, sample: EventSample) {
74        if self.event_samples.len() >= self.max_samples {
75            self.event_samples.pop_front();
76        }
77        self.event_samples.push_back(sample);
78    }
79
80    /// Add system snapshot
81    pub fn add_snapshot(&mut self, snapshot: SystemSnapshot) {
82        if self.snapshots.len() >= self.max_samples / 10 {
83            self.snapshots.pop_front();
84        }
85        self.snapshots.push_back(snapshot);
86    }
87
88    /// Get performance sample count
89    pub fn perf_sample_count(&self) -> usize {
90        self.perf_samples.len()
91    }
92
93    /// Get event sample count
94    pub fn event_sample_count(&self) -> usize {
95        self.event_samples.len()
96    }
97
98    /// Calculate correlation between CV and event type
99    pub fn correlate_events(&self, event_type: EventType) -> Option<CorrelationResult> {
100        let events: Vec<_> = self
101            .event_samples
102            .iter()
103            .filter(|e| e.event_type == event_type)
104            .collect();
105
106        if events.len() < 5 || self.perf_samples.len() < 5 {
107            return None;
108        }
109
110        // Simple Pearson correlation at lag 0
111        let pearson_r = self.compute_correlation(&events);
112
113        let is_significant = pearson_r.abs() > 0.3 && events.len() >= 10;
114
115        Some(CorrelationResult {
116            event_type,
117            pearson_r,
118            sample_count: events.len().min(self.perf_samples.len()),
119            is_significant,
120            optimal_lag: 0.0,
121        })
122    }
123
124    /// Compute Pearson correlation
125    fn compute_correlation(&self, events: &[&EventSample]) -> f64 {
126        // Match events to closest perf samples by timestamp
127        let mut paired: Vec<(f64, f64)> = Vec::new();
128
129        for event in events {
130            if let Some(perf) = self.find_closest_perf(event.timestamp) {
131                paired.push((event.value, perf.cv_percent));
132            }
133        }
134
135        if paired.len() < 3 {
136            return 0.0;
137        }
138
139        let n = paired.len() as f64;
140        let sum_x: f64 = paired.iter().map(|(x, _)| x).sum();
141        let sum_y: f64 = paired.iter().map(|(_, y)| y).sum();
142        let sum_xy: f64 = paired.iter().map(|(x, y)| x * y).sum();
143        let sum_xx: f64 = paired.iter().map(|(x, _)| x * x).sum();
144        let sum_yy: f64 = paired.iter().map(|(_, y)| y * y).sum();
145
146        let numerator = n * sum_xy - sum_x * sum_y;
147        let denominator = ((n * sum_xx - sum_x * sum_x) * (n * sum_yy - sum_y * sum_y)).sqrt();
148
149        if denominator.abs() < 1e-10 {
150            0.0
151        } else {
152            numerator / denominator
153        }
154    }
155
156    /// Find closest performance sample
157    fn find_closest_perf(&self, timestamp: f64) -> Option<&PerformanceSample> {
158        self.perf_samples
159            .iter()
160            .min_by(|a, b| {
161                (a.timestamp - timestamp)
162                    .abs()
163                    .partial_cmp(&(b.timestamp - timestamp).abs())
164                    .expect("values should be comparable")
165            })
166            .filter(|p| (p.timestamp - timestamp).abs() < self.window_sec)
167    }
168
169    /// Detect interference source
170    pub fn detect_interference(&self) -> Option<InterferenceResult> {
171        let event_types = [
172            EventType::Interrupt,
173            EventType::DiskIo,
174            EventType::Network,
175            EventType::ContextSwitch,
176            EventType::PageFault,
177            EventType::ProcessCpu,
178        ];
179
180        let mut correlations: Vec<(EventType, f64)> = Vec::new();
181
182        for event_type in event_types {
183            if let Some(result) = self.correlate_events(event_type) {
184                if result.pearson_r.abs() > 0.1 {
185                    correlations.push((event_type, result.pearson_r));
186                }
187            }
188        }
189
190        if correlations.is_empty() {
191            return None;
192        }
193
194        correlations.sort_by(|a, b| {
195            b.1.abs()
196                .partial_cmp(&a.1.abs())
197                .expect("values should be comparable")
198        });
199
200        let (primary_source, correlation) = correlations[0];
201        let secondary_sources: Vec<_> = correlations.into_iter().skip(1).collect();
202
203        let confidence = if correlation.abs() > 0.5 && self.perf_samples.len() > 20 {
204            0.9
205        } else if correlation.abs() > 0.3 {
206            0.7
207        } else {
208            0.5
209        };
210
211        Some(InterferenceResult {
212            primary_source,
213            correlation,
214            confidence,
215            secondary_sources,
216        })
217    }
218
219    /// Recommend isolation strategy
220    pub fn recommend_isolation(&self) -> IsolationRecommendation {
221        if let Some(interference) = self.detect_interference() {
222            let (action, expected_improvement) = match interference.primary_source {
223                EventType::Interrupt => (IsolationAction::CpuPin, 20.0),
224                EventType::ContextSwitch => (IsolationAction::RealtimePriority, 15.0),
225                EventType::ProcessCpu => (IsolationAction::CpuPin, 25.0),
226                EventType::DiskIo => (IsolationAction::MemoryIsolation, 10.0),
227                EventType::Network => (IsolationAction::NetworkIsolation, 15.0),
228                EventType::PageFault => (IsolationAction::MemoryIsolation, 20.0),
229            };
230
231            let reason = format!(
232                "{} correlation ({:.2}) with {} detected",
233                interference.category().name(),
234                interference.correlation,
235                interference.primary_source.name()
236            );
237
238            IsolationRecommendation {
239                action,
240                reason,
241                expected_improvement: expected_improvement * interference.correlation.abs(),
242                confidence: interference.confidence,
243            }
244        } else {
245            IsolationRecommendation {
246                action: IsolationAction::None,
247                reason: "No significant interference detected".to_string(),
248                expected_improvement: 0.0,
249                confidence: 0.9,
250            }
251        }
252    }
253
254    /// Capture current system state
255    pub fn capture_system_state(&mut self, timestamp: f64) -> SystemSnapshot {
256        // In real implementation, would read from /proc
257        // For now, return empty snapshot
258        let snapshot = SystemSnapshot::new(timestamp);
259        self.add_snapshot(snapshot.clone());
260        snapshot
261    }
262
263    /// Get CV spikes
264    pub fn get_spikes(&self) -> Vec<&PerformanceSample> {
265        self.perf_samples
266            .iter()
267            .filter(|s| s.is_spike(self.spike_threshold))
268            .collect()
269    }
270
271    /// Clear all samples
272    pub fn clear(&mut self) {
273        self.perf_samples.clear();
274        self.event_samples.clear();
275        self.snapshots.clear();
276    }
277}