1use chrono::{DateTime, Utc, Duration};
2use std::collections::VecDeque;
3
4#[derive(Debug, Clone, Copy, PartialEq)]
6pub enum Trend {
7 Rising,
8 Falling,
9 Stable,
10}
11
12#[derive(Debug, Clone)]
14pub struct Forecast {
15 pub timestamp: DateTime<Utc>,
16 pub predicted_value: f64,
17 pub confidence_low: f64,
18 pub confidence_high: f64,
19 pub trend: Trend,
20 pub anomaly_probability: f64,
21}
22
23pub struct CoherenceForecaster {
25 history: VecDeque<(DateTime<Utc>, f64)>,
26 alpha: f64, beta: f64, window: usize, level: Option<f64>,
30 trend: Option<f64>,
31 cusum_pos: f64, cusum_neg: f64, }
34
35impl CoherenceForecaster {
36 pub fn new(alpha: f64, window: usize) -> Self {
42 Self {
43 history: VecDeque::with_capacity(window),
44 alpha: alpha.clamp(0.0, 1.0),
45 beta: 0.1, window,
47 level: None,
48 trend: None,
49 cusum_pos: 0.0,
50 cusum_neg: 0.0,
51 }
52 }
53
54 pub fn with_beta(mut self, beta: f64) -> Self {
56 self.beta = beta.clamp(0.0, 1.0);
57 self
58 }
59
60 pub fn add_observation(&mut self, timestamp: DateTime<Utc>, value: f64) {
62 self.history.push_back((timestamp, value));
64 if self.history.len() > self.window {
65 self.history.pop_front();
66 }
67
68 match (self.level, self.trend) {
70 (None, None) => {
71 self.level = Some(value);
73 self.trend = Some(0.0);
74 }
75 (Some(prev_level), Some(prev_trend)) => {
76 let new_level = self.alpha * value + (1.0 - self.alpha) * (prev_level + prev_trend);
78
79 let new_trend = self.beta * (new_level - prev_level) + (1.0 - self.beta) * prev_trend;
81
82 self.level = Some(new_level);
83 self.trend = Some(new_trend);
84
85 self.update_cusum(value, prev_level);
87 }
88 _ => unreachable!(),
89 }
90 }
91
92 fn update_cusum(&mut self, value: f64, expected: f64) {
94 let mean = self.get_mean();
95 let std = self.get_std();
96
97 if std > 0.0 {
98 let threshold = 0.5 * std;
99 let deviation = value - mean;
100
101 self.cusum_pos = (self.cusum_pos + deviation - threshold).max(0.0);
103
104 self.cusum_neg = (self.cusum_neg - deviation - threshold).max(0.0);
106 }
107 }
108
109 pub fn forecast(&self, steps: usize) -> Vec<Forecast> {
117 if self.history.is_empty() {
118 return Vec::new();
119 }
120
121 let level = self.level.unwrap_or(0.0);
122 let trend = self.trend.unwrap_or(0.0);
123 let std_error = self.get_prediction_error_std();
124
125 let time_delta = if self.history.len() >= 2 {
127 let (t1, _) = self.history[self.history.len() - 1];
128 let (t0, _) = self.history[self.history.len() - 2];
129 t1.signed_duration_since(t0)
130 } else {
131 Duration::hours(1) };
133
134 let last_timestamp = self.history.back().unwrap().0;
135 let current_trend = self.get_trend();
136
137 let mut forecasts = Vec::with_capacity(steps);
138
139 for h in 1..=steps {
140 let forecast_value = level + (h as f64) * trend;
142
143 let interval_width = 1.96 * std_error * (h as f64).sqrt();
145
146 let anomaly_prob = self.calculate_anomaly_probability(forecast_value);
148
149 forecasts.push(Forecast {
150 timestamp: last_timestamp + time_delta * h as i32,
151 predicted_value: forecast_value,
152 confidence_low: forecast_value - interval_width,
153 confidence_high: forecast_value + interval_width,
154 trend: current_trend,
155 anomaly_probability: anomaly_prob,
156 });
157 }
158
159 forecasts
160 }
161
162 pub fn detect_regime_change_probability(&self) -> f64 {
167 if self.history.len() < 10 {
168 return 0.0; }
170
171 let std = self.get_std();
172 if std == 0.0 {
173 return 0.0;
174 }
175
176 let threshold = 4.0 * std;
178
179 let max_cusum = self.cusum_pos.max(self.cusum_neg);
181
182 let probability = 1.0 / (1.0 + (-0.5 * (max_cusum - threshold)).exp());
184
185 probability.clamp(0.0, 1.0)
186 }
187
188 pub fn get_trend(&self) -> Trend {
190 let trend_value = self.trend.unwrap_or(0.0);
191 let std = self.get_std();
192
193 let threshold = 0.1 * std;
195
196 if trend_value > threshold {
197 Trend::Rising
198 } else if trend_value < -threshold {
199 Trend::Falling
200 } else {
201 Trend::Stable
202 }
203 }
204
205 fn get_mean(&self) -> f64 {
207 if self.history.is_empty() {
208 return 0.0;
209 }
210
211 let sum: f64 = self.history.iter().map(|(_, v)| v).sum();
212 sum / self.history.len() as f64
213 }
214
215 fn get_std(&self) -> f64 {
217 if self.history.len() < 2 {
218 return 0.0;
219 }
220
221 let mean = self.get_mean();
222 let variance: f64 = self.history
223 .iter()
224 .map(|(_, v)| (v - mean).powi(2))
225 .sum::<f64>() / (self.history.len() - 1) as f64;
226
227 variance.sqrt()
228 }
229
230 fn get_prediction_error_std(&self) -> f64 {
232 if self.history.len() < 3 {
233 return self.get_std();
234 }
235
236 let mut errors = Vec::new();
238
239 for i in 2..self.history.len() {
240 let (_, actual) = self.history[i];
241
242 let prev_values: Vec<f64> = self.history.iter()
244 .take(i)
245 .map(|(_, v)| *v)
246 .collect();
247
248 if let Some(predicted) = self.simple_forecast(&prev_values, 1) {
249 errors.push(actual - predicted);
250 }
251 }
252
253 if errors.is_empty() {
254 return self.get_std();
255 }
256
257 let mse: f64 = errors.iter().map(|e| e.powi(2)).sum::<f64>() / errors.len() as f64;
259 mse.sqrt()
260 }
261
262 fn simple_forecast(&self, values: &[f64], steps: usize) -> Option<f64> {
264 if values.is_empty() {
265 return None;
266 }
267
268 let mut level = values[0];
269 for &value in &values[1..] {
270 level = self.alpha * value + (1.0 - self.alpha) * level;
271 }
272
273 Some(level)
275 }
276
277 fn calculate_anomaly_probability(&self, forecast_value: f64) -> f64 {
279 let mean = self.get_mean();
280 let std = self.get_std();
281
282 if std == 0.0 {
283 return 0.0;
284 }
285
286 let z_score = ((forecast_value - mean) / std).abs();
288
289 let regime_prob = self.detect_regime_change_probability();
291
292 let z_anomaly_prob = if z_score > 2.0 {
294 1.0 / (1.0 + (-(z_score - 2.0)).exp())
295 } else {
296 0.0
297 };
298
299 z_anomaly_prob.max(regime_prob)
301 }
302
303 pub fn len(&self) -> usize {
305 self.history.len()
306 }
307
308 pub fn is_empty(&self) -> bool {
310 self.history.is_empty()
311 }
312
313 pub fn get_level(&self) -> Option<f64> {
315 self.level
316 }
317
318 pub fn get_trend_value(&self) -> Option<f64> {
320 self.trend
321 }
322}
323
324pub struct CrossDomainForecaster {
326 forecasters: Vec<(String, CoherenceForecaster)>,
327}
328
329impl CrossDomainForecaster {
330 pub fn new() -> Self {
332 Self {
333 forecasters: Vec::new(),
334 }
335 }
336
337 pub fn add_domain(&mut self, domain: String, forecaster: CoherenceForecaster) {
339 self.forecasters.push((domain, forecaster));
340 }
341
342 pub fn calculate_correlation(&self, domain1: &str, domain2: &str) -> Option<f64> {
344 let (_, f1) = self.forecasters.iter().find(|(d, _)| d == domain1)?;
345 let (_, f2) = self.forecasters.iter().find(|(d, _)| d == domain2)?;
346
347 if f1.is_empty() || f2.is_empty() {
348 return None;
349 }
350
351 let min_len = f1.history.len().min(f2.history.len());
353 if min_len < 2 {
354 return None;
355 }
356
357 let values1: Vec<f64> = f1.history.iter().rev().take(min_len).map(|(_, v)| *v).collect();
358 let values2: Vec<f64> = f2.history.iter().rev().take(min_len).map(|(_, v)| *v).collect();
359
360 let mean1 = values1.iter().sum::<f64>() / min_len as f64;
361 let mean2 = values2.iter().sum::<f64>() / min_len as f64;
362
363 let mut numerator = 0.0;
364 let mut sum_sq1 = 0.0;
365 let mut sum_sq2 = 0.0;
366
367 for i in 0..min_len {
368 let diff1 = values1[i] - mean1;
369 let diff2 = values2[i] - mean2;
370 numerator += diff1 * diff2;
371 sum_sq1 += diff1 * diff1;
372 sum_sq2 += diff2 * diff2;
373 }
374
375 let denominator = (sum_sq1 * sum_sq2).sqrt();
376 if denominator == 0.0 {
377 return None;
378 }
379
380 Some(numerator / denominator)
381 }
382
383 pub fn forecast_all(&self, steps: usize) -> Vec<(String, Vec<Forecast>)> {
385 self.forecasters
386 .iter()
387 .map(|(domain, forecaster)| {
388 (domain.clone(), forecaster.forecast(steps))
389 })
390 .collect()
391 }
392
393 pub fn detect_synchronized_regime_changes(&self) -> Vec<(String, f64)> {
395 self.forecasters
396 .iter()
397 .map(|(domain, forecaster)| {
398 (domain.clone(), forecaster.detect_regime_change_probability())
399 })
400 .filter(|(_, prob)| *prob > 0.5)
401 .collect()
402 }
403}
404
405impl Default for CrossDomainForecaster {
406 fn default() -> Self {
407 Self::new()
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 #[test]
416 fn test_forecaster_creation() {
417 let forecaster = CoherenceForecaster::new(0.3, 100);
418 assert!(forecaster.is_empty());
419 assert_eq!(forecaster.len(), 0);
420 }
421
422 #[test]
423 fn test_add_observation() {
424 let mut forecaster = CoherenceForecaster::new(0.3, 100);
425 let now = Utc::now();
426
427 forecaster.add_observation(now, 0.5);
428 assert_eq!(forecaster.len(), 1);
429 assert!(forecaster.get_level().is_some());
430 }
431
432 #[test]
433 fn test_trend_detection() {
434 let mut forecaster = CoherenceForecaster::new(0.3, 100);
435 let now = Utc::now();
436
437 for i in 0..10 {
439 forecaster.add_observation(
440 now + Duration::hours(i),
441 0.5 + (i as f64) * 0.1
442 );
443 }
444
445 let trend = forecaster.get_trend();
446 assert_eq!(trend, Trend::Rising);
447 }
448
449 #[test]
450 fn test_forecast_generation() {
451 let mut forecaster = CoherenceForecaster::new(0.3, 100);
452 let now = Utc::now();
453
454 for i in 0..10 {
456 forecaster.add_observation(
457 now + Duration::hours(i),
458 0.5 + (i as f64) * 0.05
459 );
460 }
461
462 let forecasts = forecaster.forecast(5);
463 assert_eq!(forecasts.len(), 5);
464
465 for forecast in forecasts {
467 assert!(forecast.timestamp > now + Duration::hours(9));
468 assert!(forecast.confidence_low < forecast.predicted_value);
469 assert!(forecast.confidence_high > forecast.predicted_value);
470 }
471 }
472
473 #[test]
474 fn test_regime_change_detection() {
475 let mut forecaster = CoherenceForecaster::new(0.3, 100);
476 let now = Utc::now();
477
478 for i in 0..20 {
480 forecaster.add_observation(now + Duration::hours(i), 0.5);
481 }
482
483 let prob1 = forecaster.detect_regime_change_probability();
485 assert!(prob1 < 0.3);
486
487 for i in 20..25 {
489 forecaster.add_observation(now + Duration::hours(i), 0.9);
490 }
491
492 let prob2 = forecaster.detect_regime_change_probability();
494 assert!(prob2 > prob1);
495 }
496
497 #[test]
498 fn test_cross_domain_correlation() {
499 let mut cross = CrossDomainForecaster::new();
500
501 let mut f1 = CoherenceForecaster::new(0.3, 100);
502 let mut f2 = CoherenceForecaster::new(0.3, 100);
503 let now = Utc::now();
504
505 for i in 0..20 {
507 let value = 0.5 + (i as f64) * 0.01;
508 f1.add_observation(now + Duration::hours(i), value);
509 f2.add_observation(now + Duration::hours(i), value + 0.1);
510 }
511
512 cross.add_domain("domain1".to_string(), f1);
513 cross.add_domain("domain2".to_string(), f2);
514
515 let correlation = cross.calculate_correlation("domain1", "domain2");
516 assert!(correlation.is_some());
517
518 let corr_value = correlation.unwrap();
520 assert!(corr_value > 0.9, "Correlation was {}", corr_value);
521 }
522
523 #[test]
524 fn test_window_size_limit() {
525 let mut forecaster = CoherenceForecaster::new(0.3, 10);
526 let now = Utc::now();
527
528 for i in 0..20 {
530 forecaster.add_observation(now + Duration::hours(i), 0.5);
531 }
532
533 assert_eq!(forecaster.len(), 10);
535 }
536}