ruvector_data_framework/
forecasting.rs

1use chrono::{DateTime, Utc, Duration};
2use std::collections::VecDeque;
3
4/// Trend direction for coherence values
5#[derive(Debug, Clone, Copy, PartialEq)]
6pub enum Trend {
7    Rising,
8    Falling,
9    Stable,
10}
11
12/// Forecast result with confidence intervals and anomaly detection
13#[derive(Debug, Clone)]
14pub struct Forecast {
15    pub timestamp: DateTime<Utc>,
16    pub predicted_value: f64,
17    pub confidence_low: f64,
18    pub confidence_high: f64,
19    pub trend: Trend,
20    pub anomaly_probability: f64,
21}
22
23/// Coherence forecaster using exponential smoothing methods
24pub struct CoherenceForecaster {
25    history: VecDeque<(DateTime<Utc>, f64)>,
26    alpha: f64,      // Level smoothing parameter
27    beta: f64,       // Trend smoothing parameter
28    window: usize,   // Maximum history size
29    level: Option<f64>,
30    trend: Option<f64>,
31    cusum_pos: f64,  // Positive CUSUM for regime change detection
32    cusum_neg: f64,  // Negative CUSUM for regime change detection
33}
34
35impl CoherenceForecaster {
36    /// Create a new forecaster with smoothing parameters
37    ///
38    /// # Arguments
39    /// * `alpha` - Level smoothing parameter (0.0 to 1.0). Higher = more weight on recent values
40    /// * `window` - Maximum number of historical observations to keep
41    pub fn new(alpha: f64, window: usize) -> Self {
42        Self {
43            history: VecDeque::with_capacity(window),
44            alpha: alpha.clamp(0.0, 1.0),
45            beta: 0.1, // Default trend smoothing
46            window,
47            level: None,
48            trend: None,
49            cusum_pos: 0.0,
50            cusum_neg: 0.0,
51        }
52    }
53
54    /// Create a forecaster with custom trend smoothing parameter
55    pub fn with_beta(mut self, beta: f64) -> Self {
56        self.beta = beta.clamp(0.0, 1.0);
57        self
58    }
59
60    /// Add a new observation to the forecaster
61    pub fn add_observation(&mut self, timestamp: DateTime<Utc>, value: f64) {
62        // Add to history
63        self.history.push_back((timestamp, value));
64        if self.history.len() > self.window {
65            self.history.pop_front();
66        }
67
68        // Update smoothed level and trend (Holt's method)
69        match (self.level, self.trend) {
70            (None, None) => {
71                // Initialize with first observation
72                self.level = Some(value);
73                self.trend = Some(0.0);
74            }
75            (Some(prev_level), Some(prev_trend)) => {
76                // Update level: L_t = α * Y_t + (1 - α) * (L_{t-1} + T_{t-1})
77                let new_level = self.alpha * value + (1.0 - self.alpha) * (prev_level + prev_trend);
78
79                // Update trend: T_t = β * (L_t - L_{t-1}) + (1 - β) * T_{t-1}
80                let new_trend = self.beta * (new_level - prev_level) + (1.0 - self.beta) * prev_trend;
81
82                self.level = Some(new_level);
83                self.trend = Some(new_trend);
84
85                // Update CUSUM for regime change detection
86                self.update_cusum(value, prev_level);
87            }
88            _ => unreachable!(),
89        }
90    }
91
92    /// Update CUSUM statistics for regime change detection
93    fn update_cusum(&mut self, value: f64, expected: f64) {
94        let mean = self.get_mean();
95        let std = self.get_std();
96
97        if std > 0.0 {
98            let threshold = 0.5 * std;
99            let deviation = value - mean;
100
101            // Positive CUSUM (detects upward shifts)
102            self.cusum_pos = (self.cusum_pos + deviation - threshold).max(0.0);
103
104            // Negative CUSUM (detects downward shifts)
105            self.cusum_neg = (self.cusum_neg - deviation - threshold).max(0.0);
106        }
107    }
108
109    /// Generate forecasts for future time steps
110    ///
111    /// # Arguments
112    /// * `steps` - Number of future time steps to forecast
113    ///
114    /// # Returns
115    /// Vector of forecast results with confidence intervals
116    pub fn forecast(&self, steps: usize) -> Vec<Forecast> {
117        if self.history.is_empty() {
118            return Vec::new();
119        }
120
121        let level = self.level.unwrap_or(0.0);
122        let trend = self.trend.unwrap_or(0.0);
123        let std_error = self.get_prediction_error_std();
124
125        // Get time delta from last two observations
126        let time_delta = if self.history.len() >= 2 {
127            let (t1, _) = self.history[self.history.len() - 1];
128            let (t0, _) = self.history[self.history.len() - 2];
129            t1.signed_duration_since(t0)
130        } else {
131            Duration::hours(1) // Default 1 hour
132        };
133
134        let last_timestamp = self.history.back().unwrap().0;
135        let current_trend = self.get_trend();
136
137        let mut forecasts = Vec::with_capacity(steps);
138
139        for h in 1..=steps {
140            // Holt's linear trend forecast: F_{t+h} = L_t + h * T_t
141            let forecast_value = level + (h as f64) * trend;
142
143            // Prediction interval widens with horizon (sqrt(h))
144            let interval_width = 1.96 * std_error * (h as f64).sqrt();
145
146            // Calculate anomaly probability based on deviation and CUSUM
147            let anomaly_prob = self.calculate_anomaly_probability(forecast_value);
148
149            forecasts.push(Forecast {
150                timestamp: last_timestamp + time_delta * h as i32,
151                predicted_value: forecast_value,
152                confidence_low: forecast_value - interval_width,
153                confidence_high: forecast_value + interval_width,
154                trend: current_trend,
155                anomaly_probability: anomaly_prob,
156            });
157        }
158
159        forecasts
160    }
161
162    /// Detect probability of regime change using CUSUM statistics
163    ///
164    /// # Returns
165    /// Probability between 0.0 and 1.0 that a regime change is occurring
166    pub fn detect_regime_change_probability(&self) -> f64 {
167        if self.history.len() < 10 {
168            return 0.0; // Not enough data
169        }
170
171        let std = self.get_std();
172        if std == 0.0 {
173            return 0.0;
174        }
175
176        // CUSUM threshold (typically 4-5 standard deviations)
177        let threshold = 4.0 * std;
178
179        // Combine positive and negative CUSUM
180        let max_cusum = self.cusum_pos.max(self.cusum_neg);
181
182        // Convert to probability using sigmoid
183        let probability = 1.0 / (1.0 + (-0.5 * (max_cusum - threshold)).exp());
184
185        probability.clamp(0.0, 1.0)
186    }
187
188    /// Get current trend direction
189    pub fn get_trend(&self) -> Trend {
190        let trend_value = self.trend.unwrap_or(0.0);
191        let std = self.get_std();
192
193        // Use a fraction of std as threshold for "stable"
194        let threshold = 0.1 * std;
195
196        if trend_value > threshold {
197            Trend::Rising
198        } else if trend_value < -threshold {
199            Trend::Falling
200        } else {
201            Trend::Stable
202        }
203    }
204
205    /// Calculate mean of historical values
206    fn get_mean(&self) -> f64 {
207        if self.history.is_empty() {
208            return 0.0;
209        }
210
211        let sum: f64 = self.history.iter().map(|(_, v)| v).sum();
212        sum / self.history.len() as f64
213    }
214
215    /// Calculate standard deviation of historical values
216    fn get_std(&self) -> f64 {
217        if self.history.len() < 2 {
218            return 0.0;
219        }
220
221        let mean = self.get_mean();
222        let variance: f64 = self.history
223            .iter()
224            .map(|(_, v)| (v - mean).powi(2))
225            .sum::<f64>() / (self.history.len() - 1) as f64;
226
227        variance.sqrt()
228    }
229
230    /// Calculate standard error of predictions
231    fn get_prediction_error_std(&self) -> f64 {
232        if self.history.len() < 3 {
233            return self.get_std();
234        }
235
236        // Calculate residuals from one-step-ahead forecasts
237        let mut errors = Vec::new();
238
239        for i in 2..self.history.len() {
240            let (_, actual) = self.history[i];
241
242            // Simple exponential smoothing forecast using previous data
243            let prev_values: Vec<f64> = self.history.iter()
244                .take(i)
245                .map(|(_, v)| *v)
246                .collect();
247
248            if let Some(predicted) = self.simple_forecast(&prev_values, 1) {
249                errors.push(actual - predicted);
250            }
251        }
252
253        if errors.is_empty() {
254            return self.get_std();
255        }
256
257        // Root mean squared error
258        let mse: f64 = errors.iter().map(|e| e.powi(2)).sum::<f64>() / errors.len() as f64;
259        mse.sqrt()
260    }
261
262    /// Simple exponential smoothing forecast (for error calculation)
263    fn simple_forecast(&self, values: &[f64], steps: usize) -> Option<f64> {
264        if values.is_empty() {
265            return None;
266        }
267
268        let mut level = values[0];
269        for &value in &values[1..] {
270            level = self.alpha * value + (1.0 - self.alpha) * level;
271        }
272
273        // For SES, forecast is constant
274        Some(level)
275    }
276
277    /// Calculate anomaly probability for a forecasted value
278    fn calculate_anomaly_probability(&self, forecast_value: f64) -> f64 {
279        let mean = self.get_mean();
280        let std = self.get_std();
281
282        if std == 0.0 {
283            return 0.0;
284        }
285
286        // Z-score of the forecast
287        let z_score = ((forecast_value - mean) / std).abs();
288
289        // Combine with regime change probability
290        let regime_prob = self.detect_regime_change_probability();
291
292        // Anomaly if z-score > 2 (95% confidence) or regime change detected
293        let z_anomaly_prob = if z_score > 2.0 {
294            1.0 / (1.0 + (-(z_score - 2.0)).exp())
295        } else {
296            0.0
297        };
298
299        // Combine probabilities (max gives more sensitivity)
300        z_anomaly_prob.max(regime_prob)
301    }
302
303    /// Get the number of observations in history
304    pub fn len(&self) -> usize {
305        self.history.len()
306    }
307
308    /// Check if forecaster has no observations
309    pub fn is_empty(&self) -> bool {
310        self.history.is_empty()
311    }
312
313    /// Get the smoothed level value
314    pub fn get_level(&self) -> Option<f64> {
315        self.level
316    }
317
318    /// Get the smoothed trend value
319    pub fn get_trend_value(&self) -> Option<f64> {
320        self.trend
321    }
322}
323
324/// Cross-domain correlation forecaster
325pub struct CrossDomainForecaster {
326    forecasters: Vec<(String, CoherenceForecaster)>,
327}
328
329impl CrossDomainForecaster {
330    /// Create a new cross-domain forecaster
331    pub fn new() -> Self {
332        Self {
333            forecasters: Vec::new(),
334        }
335    }
336
337    /// Add a domain with its own forecaster
338    pub fn add_domain(&mut self, domain: String, forecaster: CoherenceForecaster) {
339        self.forecasters.push((domain, forecaster));
340    }
341
342    /// Calculate correlation between domains
343    pub fn calculate_correlation(&self, domain1: &str, domain2: &str) -> Option<f64> {
344        let (_, f1) = self.forecasters.iter().find(|(d, _)| d == domain1)?;
345        let (_, f2) = self.forecasters.iter().find(|(d, _)| d == domain2)?;
346
347        if f1.is_empty() || f2.is_empty() {
348            return None;
349        }
350
351        // Calculate Pearson correlation coefficient
352        let min_len = f1.history.len().min(f2.history.len());
353        if min_len < 2 {
354            return None;
355        }
356
357        let values1: Vec<f64> = f1.history.iter().rev().take(min_len).map(|(_, v)| *v).collect();
358        let values2: Vec<f64> = f2.history.iter().rev().take(min_len).map(|(_, v)| *v).collect();
359
360        let mean1 = values1.iter().sum::<f64>() / min_len as f64;
361        let mean2 = values2.iter().sum::<f64>() / min_len as f64;
362
363        let mut numerator = 0.0;
364        let mut sum_sq1 = 0.0;
365        let mut sum_sq2 = 0.0;
366
367        for i in 0..min_len {
368            let diff1 = values1[i] - mean1;
369            let diff2 = values2[i] - mean2;
370            numerator += diff1 * diff2;
371            sum_sq1 += diff1 * diff1;
372            sum_sq2 += diff2 * diff2;
373        }
374
375        let denominator = (sum_sq1 * sum_sq2).sqrt();
376        if denominator == 0.0 {
377            return None;
378        }
379
380        Some(numerator / denominator)
381    }
382
383    /// Forecast all domains and return combined results
384    pub fn forecast_all(&self, steps: usize) -> Vec<(String, Vec<Forecast>)> {
385        self.forecasters
386            .iter()
387            .map(|(domain, forecaster)| {
388                (domain.clone(), forecaster.forecast(steps))
389            })
390            .collect()
391    }
392
393    /// Detect synchronized regime changes across domains
394    pub fn detect_synchronized_regime_changes(&self) -> Vec<(String, f64)> {
395        self.forecasters
396            .iter()
397            .map(|(domain, forecaster)| {
398                (domain.clone(), forecaster.detect_regime_change_probability())
399            })
400            .filter(|(_, prob)| *prob > 0.5)
401            .collect()
402    }
403}
404
405impl Default for CrossDomainForecaster {
406    fn default() -> Self {
407        Self::new()
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    #[test]
416    fn test_forecaster_creation() {
417        let forecaster = CoherenceForecaster::new(0.3, 100);
418        assert!(forecaster.is_empty());
419        assert_eq!(forecaster.len(), 0);
420    }
421
422    #[test]
423    fn test_add_observation() {
424        let mut forecaster = CoherenceForecaster::new(0.3, 100);
425        let now = Utc::now();
426
427        forecaster.add_observation(now, 0.5);
428        assert_eq!(forecaster.len(), 1);
429        assert!(forecaster.get_level().is_some());
430    }
431
432    #[test]
433    fn test_trend_detection() {
434        let mut forecaster = CoherenceForecaster::new(0.3, 100);
435        let now = Utc::now();
436
437        // Add rising values
438        for i in 0..10 {
439            forecaster.add_observation(
440                now + Duration::hours(i),
441                0.5 + (i as f64) * 0.1
442            );
443        }
444
445        let trend = forecaster.get_trend();
446        assert_eq!(trend, Trend::Rising);
447    }
448
449    #[test]
450    fn test_forecast_generation() {
451        let mut forecaster = CoherenceForecaster::new(0.3, 100);
452        let now = Utc::now();
453
454        // Add some observations
455        for i in 0..10 {
456            forecaster.add_observation(
457                now + Duration::hours(i),
458                0.5 + (i as f64) * 0.05
459            );
460        }
461
462        let forecasts = forecaster.forecast(5);
463        assert_eq!(forecasts.len(), 5);
464
465        // Check that forecasts are in the future
466        for forecast in forecasts {
467            assert!(forecast.timestamp > now + Duration::hours(9));
468            assert!(forecast.confidence_low < forecast.predicted_value);
469            assert!(forecast.confidence_high > forecast.predicted_value);
470        }
471    }
472
473    #[test]
474    fn test_regime_change_detection() {
475        let mut forecaster = CoherenceForecaster::new(0.3, 100);
476        let now = Utc::now();
477
478        // Add stable values
479        for i in 0..20 {
480            forecaster.add_observation(now + Duration::hours(i), 0.5);
481        }
482
483        // Should have low regime change probability
484        let prob1 = forecaster.detect_regime_change_probability();
485        assert!(prob1 < 0.3);
486
487        // Add sudden shift
488        for i in 20..25 {
489            forecaster.add_observation(now + Duration::hours(i), 0.9);
490        }
491
492        // Should detect regime change
493        let prob2 = forecaster.detect_regime_change_probability();
494        assert!(prob2 > prob1);
495    }
496
497    #[test]
498    fn test_cross_domain_correlation() {
499        let mut cross = CrossDomainForecaster::new();
500
501        let mut f1 = CoherenceForecaster::new(0.3, 100);
502        let mut f2 = CoherenceForecaster::new(0.3, 100);
503        let now = Utc::now();
504
505        // Add correlated data
506        for i in 0..20 {
507            let value = 0.5 + (i as f64) * 0.01;
508            f1.add_observation(now + Duration::hours(i), value);
509            f2.add_observation(now + Duration::hours(i), value + 0.1);
510        }
511
512        cross.add_domain("domain1".to_string(), f1);
513        cross.add_domain("domain2".to_string(), f2);
514
515        let correlation = cross.calculate_correlation("domain1", "domain2");
516        assert!(correlation.is_some());
517
518        // Should be highly correlated
519        let corr_value = correlation.unwrap();
520        assert!(corr_value > 0.9, "Correlation was {}", corr_value);
521    }
522
523    #[test]
524    fn test_window_size_limit() {
525        let mut forecaster = CoherenceForecaster::new(0.3, 10);
526        let now = Utc::now();
527
528        // Add more observations than window size
529        for i in 0..20 {
530            forecaster.add_observation(now + Duration::hours(i), 0.5);
531        }
532
533        // Should only keep last 10
534        assert_eq!(forecaster.len(), 10);
535    }
536}