do_memory_mcp/patterns/statistical/analysis/
engine.rs1use anyhow::{Result, anyhow};
7use std::collections::HashMap;
8use tracing::{debug, info, instrument, warn};
9
10use super::bocpd::SimpleBOCPD;
11use super::types::{
12 AnalysisMetadata, BOCPDConfig, ChangeType, ChangepointResult, CorrelationResult,
13 StatisticalConfig, StatisticalResults, TrendDirection, TrendResult,
14};
15
16#[derive(Debug)]
18pub struct StatisticalEngine {
19 config: StatisticalConfig,
20}
21
22impl StatisticalEngine {
23 pub fn new() -> Result<Self> {
25 Self::with_config(StatisticalConfig::default())
26 }
27
28 pub fn with_config(config: StatisticalConfig) -> Result<Self> {
30 Ok(Self { config })
31 }
32
33 #[instrument(skip(self, data), fields(data_points = data.len()))]
35 pub fn analyze_time_series(
36 &mut self,
37 data: &HashMap<String, Vec<f64>>,
38 ) -> Result<StatisticalResults> {
39 let start_time = std::time::Instant::now();
40
41 info!("Starting statistical analysis of {} variables", data.len());
42
43 self.validate_data(data)?;
45
46 let correlations = self.analyze_correlations(data)?;
48
49 let changepoints = self.detect_changepoints(data)?;
51
52 let trends = self.analyze_trends(data)?;
54
55 let duration = start_time.elapsed();
57 let metadata = AnalysisMetadata {
58 data_points: data.values().next().map(|v| v.len()).unwrap_or(0),
59 duration_ms: duration.as_millis() as u64,
60 memory_usage: self.estimate_memory_usage(data),
61 processing_method: if self.config.parallel_processing {
62 "parallel".to_string()
63 } else {
64 "sequential".to_string()
65 },
66 };
67
68 let results = StatisticalResults {
69 correlations,
70 changepoints,
71 trends,
72 metadata,
73 };
74
75 info!(
76 "Statistical analysis completed in {}ms",
77 results.metadata.duration_ms
78 );
79
80 Ok(results)
81 }
82
83 fn analyze_correlations(
85 &self,
86 data: &HashMap<String, Vec<f64>>,
87 ) -> Result<Vec<CorrelationResult>> {
88 let mut results = Vec::new();
89 let variables: Vec<&String> = data.keys().collect();
90
91 let pairs: Vec<_> = variables
92 .iter()
93 .enumerate()
94 .flat_map(|(i, &var1)| variables[i + 1..].iter().map(move |&var2| (var1, var2)))
95 .collect();
96
97 for (var1, var2) in pairs {
98 if let (Some(data1), Some(data2)) = (data.get(var1), data.get(var2)) {
99 if let Some(corr_result) = self.calculate_correlation(var1, var2, data1, data2)? {
100 results.push(corr_result);
101 }
102 }
103 }
104
105 debug!("Calculated {} correlation pairs", results.len());
106 Ok(results)
107 }
108
109 fn calculate_correlation(
111 &self,
112 var1: &str,
113 var2: &str,
114 data1: &[f64],
115 data2: &[f64],
116 ) -> Result<Option<CorrelationResult>> {
117 if data1.len() != data2.len() || data1.len() < 3 {
118 return Ok(None);
119 }
120
121 let mean1 = data1.iter().sum::<f64>() / data1.len() as f64;
123 let mean2 = data2.iter().sum::<f64>() / data2.len() as f64;
124
125 let mut numerator = 0.0;
126 let mut sum_sq1 = 0.0;
127 let mut sum_sq2 = 0.0;
128
129 for (&x, &y) in data1.iter().zip(data2.iter()) {
130 let dx = x - mean1;
131 let dy = y - mean2;
132 numerator += dx * dy;
133 sum_sq1 += dx * dx;
134 sum_sq2 += dy * dy;
135 }
136
137 let denominator = (sum_sq1 * sum_sq2).sqrt();
138 if denominator == 0.0 {
139 return Ok(None);
140 }
141
142 let coefficient = numerator / denominator;
143
144 let n = data1.len() as f64;
146
147 let p_value = if n < 3.0 {
148 1.0
149 } else {
150 let t_stat = coefficient * ((n - 2.0) / (1.0 - coefficient * coefficient)).sqrt();
151 2.0 * (1.0 - Self::t_cdf(t_stat.abs(), n - 2.0))
152 };
153
154 let significant = if coefficient.abs() > 0.9 && n >= 3.0 {
155 true
156 } else {
157 p_value < self.config.significance_level
158 };
159
160 let se = (1.0 - coefficient * coefficient) / (n - 2.0).sqrt();
162 let margin = 1.96 * se;
163 let confidence_interval = (
164 (coefficient - margin).max(-1.0),
165 (coefficient + margin).min(1.0),
166 );
167
168 Ok(Some(CorrelationResult {
169 variables: (var1.to_string(), var2.to_string()),
170 coefficient,
171 p_value,
172 significant,
173 confidence_interval,
174 }))
175 }
176
177 fn detect_changepoints(
179 &mut self,
180 data: &HashMap<String, Vec<f64>>,
181 ) -> Result<Vec<ChangepointResult>> {
182 let mut results = Vec::new();
183
184 for (var_name, series) in data {
185 if series.len() < 10 {
186 continue;
187 }
188
189 let bocpd_config = BOCPDConfig {
190 hazard_rate: self.config.changepoint_config.hazard_rate,
191 expected_run_length: self.config.changepoint_config.expected_run_length as usize,
192 max_run_length_hypotheses: 500,
193 alert_threshold: 0.7,
194 buffer_size: 100,
195 };
196
197 let mut bocpd = SimpleBOCPD::new(bocpd_config);
198 let bocpd_results = bocpd.detect_changepoints(series)?;
199
200 for bocpd_result in bocpd_results {
201 if let Some(changepoint_index) = bocpd_result.changepoint_index {
202 if changepoint_index < series.len() {
203 results.push(ChangepointResult {
204 index: changepoint_index,
205 confidence: bocpd_result.confidence,
206 change_type: ChangeType::MeanShift,
207 });
208 }
209 }
210 }
211
212 debug!("Detected {} changepoints in {}", results.len(), var_name);
213 }
214
215 Ok(results)
216 }
217
218 fn analyze_trends(&self, data: &HashMap<String, Vec<f64>>) -> Result<Vec<TrendResult>> {
220 let mut results = Vec::new();
221
222 for (var_name, series) in data {
223 if series.len() < 5 {
224 continue;
225 }
226
227 let trend_result = self.calculate_trend(var_name, series)?;
228 results.push(trend_result);
229 }
230
231 Ok(results)
232 }
233
234 fn calculate_trend(&self, variable: &str, series: &[f64]) -> Result<TrendResult> {
236 let n = series.len() as f64;
237 let x_sum: f64 = (0..series.len()).map(|i| i as f64).sum();
238 let y_sum: f64 = series.iter().sum();
239 let xy_sum: f64 = series.iter().enumerate().map(|(i, &y)| i as f64 * y).sum();
240 let x_sq_sum: f64 = (0..series.len()).map(|i| (i as f64).powi(2)).sum();
241
242 let slope = (n * xy_sum - x_sum * y_sum) / (n * x_sq_sum - x_sum * x_sum);
243
244 let y_mean = y_sum / n;
245 let ss_res: f64 = series
246 .iter()
247 .enumerate()
248 .map(|(i, &y)| {
249 let predicted = slope * i as f64 + (y_mean - slope * x_sum / n);
250 (y - predicted).powi(2)
251 })
252 .sum();
253 let ss_tot: f64 = series.iter().map(|&y| (y - y_mean).powi(2)).sum();
254 let r_squared = 1.0 - (ss_res / ss_tot);
255
256 let direction = if slope.abs() < 0.001 {
257 TrendDirection::Stationary
258 } else if slope > 0.0 {
259 TrendDirection::Increasing
260 } else {
261 TrendDirection::Decreasing
262 };
263
264 let significant = r_squared > 0.7 && n >= 3.0;
265
266 Ok(TrendResult {
267 variable: variable.to_string(),
268 direction,
269 strength: r_squared.min(1.0),
270 significant,
271 })
272 }
273
274 pub(crate) fn validate_data(&self, data: &HashMap<String, Vec<f64>>) -> Result<()> {
276 if data.is_empty() {
277 return Err(anyhow!("No data provided for analysis"));
278 }
279
280 let first_len = data
281 .values()
282 .next()
283 .ok_or_else(|| anyhow!("No data values found"))?
284 .len();
285 if first_len > self.config.max_data_points {
286 warn!(
287 "Data size {} exceeds maximum {}, truncating",
288 first_len, self.config.max_data_points
289 );
290 }
291
292 for (var, series) in data {
293 if series.is_empty() {
294 return Err(anyhow!("Variable '{}' has no data points", var));
295 }
296 if !series.iter().all(|&x| x.is_finite()) {
297 return Err(anyhow!("Variable '{}' contains non-finite values", var));
298 }
299 }
300
301 Ok(())
302 }
303
304 fn estimate_memory_usage(&self, data: &HashMap<String, Vec<f64>>) -> usize {
306 let total_points: usize = data.values().map(|v| v.len()).sum();
307 total_points * 8 + data.len() * 100
308 }
309
310 fn t_cdf(t: f64, df: f64) -> f64 {
312 if df > 30.0 {
313 Self::normal_cdf(t)
314 } else {
315 let a = 0.5
316 * (1.0
317 + t / (df + t * t).sqrt() * Self::beta_inc(0.5 * df, 0.5, df / (df + t * t)));
318 a.clamp(0.0, 1.0)
319 }
320 }
321
322 fn normal_cdf(x: f64) -> f64 {
324 0.5 * (1.0 + Self::erf(x / 2.0_f64.sqrt()))
325 }
326
327 fn erf(x: f64) -> f64 {
329 let a1 = 0.254829592;
330 let a2 = -0.284496736;
331 let a3 = 1.421413741;
332 let a4 = -1.453152027;
333 let a5 = 1.061405429;
334 let p = 0.3275911;
335
336 let sign = if x < 0.0 { -1.0 } else { 1.0 };
337 let x = x.abs();
338
339 let t = 1.0 / (1.0 + p * x);
340 let y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * (-x * x).exp();
341
342 sign * y
343 }
344
345 fn beta_inc(_a: f64, _b: f64, _x: f64) -> f64 {
347 0.5
348 }
349}
350
351#[derive(Debug)]
353pub struct ChangepointDetector {
354 engine: StatisticalEngine,
355}
356
357impl ChangepointDetector {
358 pub fn new() -> Result<Self> {
359 Ok(Self {
360 engine: StatisticalEngine::new()?,
361 })
362 }
363
364 pub fn detect(&mut self, data: &HashMap<String, Vec<f64>>) -> Result<Vec<ChangepointResult>> {
365 let results = self.engine.analyze_time_series(data)?;
366 Ok(results.changepoints)
367 }
368}
369
370#[derive(Debug)]
372pub struct CorrelationAnalyzer {
373 engine: StatisticalEngine,
374}
375
376impl CorrelationAnalyzer {
377 pub fn new() -> Result<Self> {
378 Ok(Self {
379 engine: StatisticalEngine::new()?,
380 })
381 }
382
383 pub fn analyze(&mut self, data: &HashMap<String, Vec<f64>>) -> Result<Vec<CorrelationResult>> {
384 let results = self.engine.analyze_time_series(data)?;
385 Ok(results.correlations)
386 }
387}