use chrono::{DateTime, Utc, Duration};
use std::collections::VecDeque;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Trend {
Rising,
Falling,
Stable,
}
#[derive(Debug, Clone)]
pub struct Forecast {
pub timestamp: DateTime<Utc>,
pub predicted_value: f64,
pub confidence_low: f64,
pub confidence_high: f64,
pub trend: Trend,
pub anomaly_probability: f64,
}
pub struct CoherenceForecaster {
history: VecDeque<(DateTime<Utc>, f64)>,
alpha: f64, beta: f64, window: usize, level: Option<f64>,
trend: Option<f64>,
cusum_pos: f64, cusum_neg: f64, }
impl CoherenceForecaster {
pub fn new(alpha: f64, window: usize) -> Self {
Self {
history: VecDeque::with_capacity(window),
alpha: alpha.clamp(0.0, 1.0),
beta: 0.1, window,
level: None,
trend: None,
cusum_pos: 0.0,
cusum_neg: 0.0,
}
}
pub fn with_beta(mut self, beta: f64) -> Self {
self.beta = beta.clamp(0.0, 1.0);
self
}
pub fn add_observation(&mut self, timestamp: DateTime<Utc>, value: f64) {
self.history.push_back((timestamp, value));
if self.history.len() > self.window {
self.history.pop_front();
}
match (self.level, self.trend) {
(None, None) => {
self.level = Some(value);
self.trend = Some(0.0);
}
(Some(prev_level), Some(prev_trend)) => {
let new_level = self.alpha * value + (1.0 - self.alpha) * (prev_level + prev_trend);
let new_trend = self.beta * (new_level - prev_level) + (1.0 - self.beta) * prev_trend;
self.level = Some(new_level);
self.trend = Some(new_trend);
self.update_cusum(value, prev_level);
}
_ => unreachable!(),
}
}
fn update_cusum(&mut self, value: f64, expected: f64) {
let mean = self.get_mean();
let std = self.get_std();
if std > 0.0 {
let threshold = 0.5 * std;
let deviation = value - mean;
self.cusum_pos = (self.cusum_pos + deviation - threshold).max(0.0);
self.cusum_neg = (self.cusum_neg - deviation - threshold).max(0.0);
}
}
pub fn forecast(&self, steps: usize) -> Vec<Forecast> {
if self.history.is_empty() {
return Vec::new();
}
let level = self.level.unwrap_or(0.0);
let trend = self.trend.unwrap_or(0.0);
let std_error = self.get_prediction_error_std();
let time_delta = if self.history.len() >= 2 {
let (t1, _) = self.history[self.history.len() - 1];
let (t0, _) = self.history[self.history.len() - 2];
t1.signed_duration_since(t0)
} else {
Duration::hours(1) };
let last_timestamp = self.history.back().unwrap().0;
let current_trend = self.get_trend();
let mut forecasts = Vec::with_capacity(steps);
for h in 1..=steps {
let forecast_value = level + (h as f64) * trend;
let interval_width = 1.96 * std_error * (h as f64).sqrt();
let anomaly_prob = self.calculate_anomaly_probability(forecast_value);
forecasts.push(Forecast {
timestamp: last_timestamp + time_delta * h as i32,
predicted_value: forecast_value,
confidence_low: forecast_value - interval_width,
confidence_high: forecast_value + interval_width,
trend: current_trend,
anomaly_probability: anomaly_prob,
});
}
forecasts
}
pub fn detect_regime_change_probability(&self) -> f64 {
if self.history.len() < 10 {
return 0.0; }
let std = self.get_std();
if std == 0.0 {
return 0.0;
}
let threshold = 4.0 * std;
let max_cusum = self.cusum_pos.max(self.cusum_neg);
let probability = 1.0 / (1.0 + (-0.5 * (max_cusum - threshold)).exp());
probability.clamp(0.0, 1.0)
}
pub fn get_trend(&self) -> Trend {
let trend_value = self.trend.unwrap_or(0.0);
let std = self.get_std();
let threshold = 0.1 * std;
if trend_value > threshold {
Trend::Rising
} else if trend_value < -threshold {
Trend::Falling
} else {
Trend::Stable
}
}
fn get_mean(&self) -> f64 {
if self.history.is_empty() {
return 0.0;
}
let sum: f64 = self.history.iter().map(|(_, v)| v).sum();
sum / self.history.len() as f64
}
fn get_std(&self) -> f64 {
if self.history.len() < 2 {
return 0.0;
}
let mean = self.get_mean();
let variance: f64 = self.history
.iter()
.map(|(_, v)| (v - mean).powi(2))
.sum::<f64>() / (self.history.len() - 1) as f64;
variance.sqrt()
}
fn get_prediction_error_std(&self) -> f64 {
if self.history.len() < 3 {
return self.get_std();
}
let mut errors = Vec::new();
for i in 2..self.history.len() {
let (_, actual) = self.history[i];
let prev_values: Vec<f64> = self.history.iter()
.take(i)
.map(|(_, v)| *v)
.collect();
if let Some(predicted) = self.simple_forecast(&prev_values, 1) {
errors.push(actual - predicted);
}
}
if errors.is_empty() {
return self.get_std();
}
let mse: f64 = errors.iter().map(|e| e.powi(2)).sum::<f64>() / errors.len() as f64;
mse.sqrt()
}
fn simple_forecast(&self, values: &[f64], steps: usize) -> Option<f64> {
if values.is_empty() {
return None;
}
let mut level = values[0];
for &value in &values[1..] {
level = self.alpha * value + (1.0 - self.alpha) * level;
}
Some(level)
}
fn calculate_anomaly_probability(&self, forecast_value: f64) -> f64 {
let mean = self.get_mean();
let std = self.get_std();
if std == 0.0 {
return 0.0;
}
let z_score = ((forecast_value - mean) / std).abs();
let regime_prob = self.detect_regime_change_probability();
let z_anomaly_prob = if z_score > 2.0 {
1.0 / (1.0 + (-(z_score - 2.0)).exp())
} else {
0.0
};
z_anomaly_prob.max(regime_prob)
}
pub fn len(&self) -> usize {
self.history.len()
}
pub fn is_empty(&self) -> bool {
self.history.is_empty()
}
pub fn get_level(&self) -> Option<f64> {
self.level
}
pub fn get_trend_value(&self) -> Option<f64> {
self.trend
}
}
pub struct CrossDomainForecaster {
forecasters: Vec<(String, CoherenceForecaster)>,
}
impl CrossDomainForecaster {
pub fn new() -> Self {
Self {
forecasters: Vec::new(),
}
}
pub fn add_domain(&mut self, domain: String, forecaster: CoherenceForecaster) {
self.forecasters.push((domain, forecaster));
}
pub fn calculate_correlation(&self, domain1: &str, domain2: &str) -> Option<f64> {
let (_, f1) = self.forecasters.iter().find(|(d, _)| d == domain1)?;
let (_, f2) = self.forecasters.iter().find(|(d, _)| d == domain2)?;
if f1.is_empty() || f2.is_empty() {
return None;
}
let min_len = f1.history.len().min(f2.history.len());
if min_len < 2 {
return None;
}
let values1: Vec<f64> = f1.history.iter().rev().take(min_len).map(|(_, v)| *v).collect();
let values2: Vec<f64> = f2.history.iter().rev().take(min_len).map(|(_, v)| *v).collect();
let mean1 = values1.iter().sum::<f64>() / min_len as f64;
let mean2 = values2.iter().sum::<f64>() / min_len as f64;
let mut numerator = 0.0;
let mut sum_sq1 = 0.0;
let mut sum_sq2 = 0.0;
for i in 0..min_len {
let diff1 = values1[i] - mean1;
let diff2 = values2[i] - mean2;
numerator += diff1 * diff2;
sum_sq1 += diff1 * diff1;
sum_sq2 += diff2 * diff2;
}
let denominator = (sum_sq1 * sum_sq2).sqrt();
if denominator == 0.0 {
return None;
}
Some(numerator / denominator)
}
pub fn forecast_all(&self, steps: usize) -> Vec<(String, Vec<Forecast>)> {
self.forecasters
.iter()
.map(|(domain, forecaster)| {
(domain.clone(), forecaster.forecast(steps))
})
.collect()
}
pub fn detect_synchronized_regime_changes(&self) -> Vec<(String, f64)> {
self.forecasters
.iter()
.map(|(domain, forecaster)| {
(domain.clone(), forecaster.detect_regime_change_probability())
})
.filter(|(_, prob)| *prob > 0.5)
.collect()
}
}
impl Default for CrossDomainForecaster {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_forecaster_creation() {
let forecaster = CoherenceForecaster::new(0.3, 100);
assert!(forecaster.is_empty());
assert_eq!(forecaster.len(), 0);
}
#[test]
fn test_add_observation() {
let mut forecaster = CoherenceForecaster::new(0.3, 100);
let now = Utc::now();
forecaster.add_observation(now, 0.5);
assert_eq!(forecaster.len(), 1);
assert!(forecaster.get_level().is_some());
}
#[test]
fn test_trend_detection() {
let mut forecaster = CoherenceForecaster::new(0.3, 100);
let now = Utc::now();
for i in 0..10 {
forecaster.add_observation(
now + Duration::hours(i),
0.5 + (i as f64) * 0.1
);
}
let trend = forecaster.get_trend();
assert_eq!(trend, Trend::Rising);
}
#[test]
fn test_forecast_generation() {
let mut forecaster = CoherenceForecaster::new(0.3, 100);
let now = Utc::now();
for i in 0..10 {
forecaster.add_observation(
now + Duration::hours(i),
0.5 + (i as f64) * 0.05
);
}
let forecasts = forecaster.forecast(5);
assert_eq!(forecasts.len(), 5);
for forecast in forecasts {
assert!(forecast.timestamp > now + Duration::hours(9));
assert!(forecast.confidence_low < forecast.predicted_value);
assert!(forecast.confidence_high > forecast.predicted_value);
}
}
#[test]
fn test_regime_change_detection() {
let mut forecaster = CoherenceForecaster::new(0.3, 100);
let now = Utc::now();
for i in 0..20 {
forecaster.add_observation(now + Duration::hours(i), 0.5);
}
let prob1 = forecaster.detect_regime_change_probability();
assert!(prob1 < 0.3);
for i in 20..25 {
forecaster.add_observation(now + Duration::hours(i), 0.9);
}
let prob2 = forecaster.detect_regime_change_probability();
assert!(prob2 > prob1);
}
#[test]
fn test_cross_domain_correlation() {
let mut cross = CrossDomainForecaster::new();
let mut f1 = CoherenceForecaster::new(0.3, 100);
let mut f2 = CoherenceForecaster::new(0.3, 100);
let now = Utc::now();
for i in 0..20 {
let value = 0.5 + (i as f64) * 0.01;
f1.add_observation(now + Duration::hours(i), value);
f2.add_observation(now + Duration::hours(i), value + 0.1);
}
cross.add_domain("domain1".to_string(), f1);
cross.add_domain("domain2".to_string(), f2);
let correlation = cross.calculate_correlation("domain1", "domain2");
assert!(correlation.is_some());
let corr_value = correlation.unwrap();
assert!(corr_value > 0.9, "Correlation was {}", corr_value);
}
#[test]
fn test_window_size_limit() {
let mut forecaster = CoherenceForecaster::new(0.3, 10);
let now = Utc::now();
for i in 0..20 {
forecaster.add_observation(now + Duration::hours(i), 0.5);
}
assert_eq!(forecaster.len(), 10);
}
}