cbtop/correlation_analysis/
analyzer.rs1use std::collections::VecDeque;
4
5use super::{
6 CorrelationResult, EventSample, EventType, InterferenceResult, IsolationAction,
7 IsolationRecommendation, PerformanceSample, SystemSnapshot,
8};
9
10#[derive(Debug)]
12pub struct CorrelationAnalyzer {
13 perf_samples: VecDeque<PerformanceSample>,
15 event_samples: VecDeque<EventSample>,
17 snapshots: VecDeque<SystemSnapshot>,
19 max_samples: usize,
21 spike_threshold: f64,
23 window_sec: f64,
25}
26
27impl Default for CorrelationAnalyzer {
28 fn default() -> Self {
29 Self {
30 perf_samples: VecDeque::new(),
31 event_samples: VecDeque::new(),
32 snapshots: VecDeque::new(),
33 max_samples: 1000,
34 spike_threshold: 15.0,
35 window_sec: 60.0,
36 }
37 }
38}
39
40impl CorrelationAnalyzer {
41 pub fn new() -> Self {
43 Self::default()
44 }
45
46 pub fn with_max_samples(mut self, max: usize) -> Self {
48 self.max_samples = max;
49 self
50 }
51
52 pub fn with_spike_threshold(mut self, percent: f64) -> Self {
54 self.spike_threshold = percent;
55 self
56 }
57
58 pub fn with_window(mut self, seconds: f64) -> Self {
60 self.window_sec = seconds;
61 self
62 }
63
64 pub fn add_perf_sample(&mut self, sample: PerformanceSample) {
66 if self.perf_samples.len() >= self.max_samples {
67 self.perf_samples.pop_front();
68 }
69 self.perf_samples.push_back(sample);
70 }
71
72 pub fn add_event_sample(&mut self, sample: EventSample) {
74 if self.event_samples.len() >= self.max_samples {
75 self.event_samples.pop_front();
76 }
77 self.event_samples.push_back(sample);
78 }
79
80 pub fn add_snapshot(&mut self, snapshot: SystemSnapshot) {
82 if self.snapshots.len() >= self.max_samples / 10 {
83 self.snapshots.pop_front();
84 }
85 self.snapshots.push_back(snapshot);
86 }
87
88 pub fn perf_sample_count(&self) -> usize {
90 self.perf_samples.len()
91 }
92
93 pub fn event_sample_count(&self) -> usize {
95 self.event_samples.len()
96 }
97
98 pub fn correlate_events(&self, event_type: EventType) -> Option<CorrelationResult> {
100 let events: Vec<_> = self
101 .event_samples
102 .iter()
103 .filter(|e| e.event_type == event_type)
104 .collect();
105
106 if events.len() < 5 || self.perf_samples.len() < 5 {
107 return None;
108 }
109
110 let pearson_r = self.compute_correlation(&events);
112
113 let is_significant = pearson_r.abs() > 0.3 && events.len() >= 10;
114
115 Some(CorrelationResult {
116 event_type,
117 pearson_r,
118 sample_count: events.len().min(self.perf_samples.len()),
119 is_significant,
120 optimal_lag: 0.0,
121 })
122 }
123
124 fn compute_correlation(&self, events: &[&EventSample]) -> f64 {
126 let mut paired: Vec<(f64, f64)> = Vec::new();
128
129 for event in events {
130 if let Some(perf) = self.find_closest_perf(event.timestamp) {
131 paired.push((event.value, perf.cv_percent));
132 }
133 }
134
135 if paired.len() < 3 {
136 return 0.0;
137 }
138
139 let n = paired.len() as f64;
140 let sum_x: f64 = paired.iter().map(|(x, _)| x).sum();
141 let sum_y: f64 = paired.iter().map(|(_, y)| y).sum();
142 let sum_xy: f64 = paired.iter().map(|(x, y)| x * y).sum();
143 let sum_xx: f64 = paired.iter().map(|(x, _)| x * x).sum();
144 let sum_yy: f64 = paired.iter().map(|(_, y)| y * y).sum();
145
146 let numerator = n * sum_xy - sum_x * sum_y;
147 let denominator = ((n * sum_xx - sum_x * sum_x) * (n * sum_yy - sum_y * sum_y)).sqrt();
148
149 if denominator.abs() < 1e-10 {
150 0.0
151 } else {
152 numerator / denominator
153 }
154 }
155
156 fn find_closest_perf(&self, timestamp: f64) -> Option<&PerformanceSample> {
158 self.perf_samples
159 .iter()
160 .min_by(|a, b| {
161 (a.timestamp - timestamp)
162 .abs()
163 .partial_cmp(&(b.timestamp - timestamp).abs())
164 .expect("values should be comparable")
165 })
166 .filter(|p| (p.timestamp - timestamp).abs() < self.window_sec)
167 }
168
169 pub fn detect_interference(&self) -> Option<InterferenceResult> {
171 let event_types = [
172 EventType::Interrupt,
173 EventType::DiskIo,
174 EventType::Network,
175 EventType::ContextSwitch,
176 EventType::PageFault,
177 EventType::ProcessCpu,
178 ];
179
180 let mut correlations: Vec<(EventType, f64)> = Vec::new();
181
182 for event_type in event_types {
183 if let Some(result) = self.correlate_events(event_type) {
184 if result.pearson_r.abs() > 0.1 {
185 correlations.push((event_type, result.pearson_r));
186 }
187 }
188 }
189
190 if correlations.is_empty() {
191 return None;
192 }
193
194 correlations.sort_by(|a, b| {
195 b.1.abs()
196 .partial_cmp(&a.1.abs())
197 .expect("values should be comparable")
198 });
199
200 let (primary_source, correlation) = correlations[0];
201 let secondary_sources: Vec<_> = correlations.into_iter().skip(1).collect();
202
203 let confidence = if correlation.abs() > 0.5 && self.perf_samples.len() > 20 {
204 0.9
205 } else if correlation.abs() > 0.3 {
206 0.7
207 } else {
208 0.5
209 };
210
211 Some(InterferenceResult {
212 primary_source,
213 correlation,
214 confidence,
215 secondary_sources,
216 })
217 }
218
219 pub fn recommend_isolation(&self) -> IsolationRecommendation {
221 if let Some(interference) = self.detect_interference() {
222 let (action, expected_improvement) = match interference.primary_source {
223 EventType::Interrupt => (IsolationAction::CpuPin, 20.0),
224 EventType::ContextSwitch => (IsolationAction::RealtimePriority, 15.0),
225 EventType::ProcessCpu => (IsolationAction::CpuPin, 25.0),
226 EventType::DiskIo => (IsolationAction::MemoryIsolation, 10.0),
227 EventType::Network => (IsolationAction::NetworkIsolation, 15.0),
228 EventType::PageFault => (IsolationAction::MemoryIsolation, 20.0),
229 };
230
231 let reason = format!(
232 "{} correlation ({:.2}) with {} detected",
233 interference.category().name(),
234 interference.correlation,
235 interference.primary_source.name()
236 );
237
238 IsolationRecommendation {
239 action,
240 reason,
241 expected_improvement: expected_improvement * interference.correlation.abs(),
242 confidence: interference.confidence,
243 }
244 } else {
245 IsolationRecommendation {
246 action: IsolationAction::None,
247 reason: "No significant interference detected".to_string(),
248 expected_improvement: 0.0,
249 confidence: 0.9,
250 }
251 }
252 }
253
254 pub fn capture_system_state(&mut self, timestamp: f64) -> SystemSnapshot {
256 let snapshot = SystemSnapshot::new(timestamp);
259 self.add_snapshot(snapshot.clone());
260 snapshot
261 }
262
263 pub fn get_spikes(&self) -> Vec<&PerformanceSample> {
265 self.perf_samples
266 .iter()
267 .filter(|s| s.is_spike(self.spike_threshold))
268 .collect()
269 }
270
271 pub fn clear(&mut self) {
273 self.perf_samples.clear();
274 self.event_samples.clear();
275 self.snapshots.clear();
276 }
277}