#[allow(unused_imports)] use crate::error::{FusekiError, FusekiResult};
use crate::store::Store;
use chrono::{DateTime, Duration, Timelike, Utc};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info, instrument};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsConfig {
pub columnar_storage: bool,
pub max_time_window_hours: u32,
pub default_window_size_seconds: u32,
pub streaming_enabled: bool,
pub max_concurrent_queries: usize,
pub advanced_stats_enabled: bool,
pub ml_predictions_enabled: bool,
}
impl Default for AnalyticsConfig {
fn default() -> Self {
Self {
columnar_storage: true,
max_time_window_hours: 24 * 30, default_window_size_seconds: 300, streaming_enabled: true,
max_concurrent_queries: 10,
advanced_stats_enabled: true,
ml_predictions_enabled: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeriesPoint {
pub timestamp: DateTime<Utc>,
pub metric: String,
pub value: f64,
pub tags: HashMap<String, String>,
pub metadata: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeriesQuery {
pub metric: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub aggregation: AggregationFunction,
pub window_size: Option<Duration>,
pub tag_filters: HashMap<String, String>,
pub limit: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AggregationFunction {
Avg,
Sum,
Min,
Max,
Count,
StdDev,
Variance,
Median,
Percentile(f64),
Rate,
Derivative,
MovingAverage(usize),
ExponentialMovingAverage(f64),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeriesResult {
pub query: TimeSeriesQuery,
pub data_points: Vec<TimeSeriesPoint>,
pub statistics: Option<StatisticalSummary>,
pub execution_time_ms: u64,
pub raw_points_processed: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatisticalSummary {
pub mean: f64,
pub std_dev: f64,
pub min: f64,
pub max: f64,
pub median: f64,
pub p25: f64,
pub p75: f64,
pub count: usize,
pub variance: f64,
pub skewness: f64,
pub kurtosis: f64,
}
#[derive(Debug)]
pub struct StreamingWindow {
pub window_size: Duration,
pub data_points: VecDeque<TimeSeriesPoint>,
pub last_aggregation: Option<f64>,
pub window_start: DateTime<Utc>,
}
impl StreamingWindow {
pub fn new(window_size: Duration) -> Self {
Self {
window_size,
data_points: VecDeque::new(),
last_aggregation: None,
window_start: Utc::now(),
}
}
pub fn add_point(&mut self, point: TimeSeriesPoint) {
let cutoff_time = Utc::now() - self.window_size;
while let Some(front) = self.data_points.front() {
if front.timestamp < cutoff_time {
self.data_points.pop_front();
} else {
break;
}
}
self.data_points.push_back(point);
}
pub fn compute_aggregation(&mut self, function: &AggregationFunction) -> Option<f64> {
if self.data_points.is_empty() {
return None;
}
let values: Vec<f64> = self.data_points.iter().map(|p| p.value).collect();
let result = compute_aggregation(&values, function);
self.last_aggregation = result;
result
}
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct AnalyticsEngine {
config: AnalyticsConfig,
store: Arc<Store>,
streaming_windows: Arc<Mutex<HashMap<String, StreamingWindow>>>,
query_cache: Arc<RwLock<HashMap<String, TimeSeriesResult>>>,
metrics: Arc<Mutex<AnalyticsMetrics>>,
}
#[derive(Debug, Default, Clone)]
pub struct AnalyticsMetrics {
pub total_queries: u64,
pub avg_execution_time_ms: f64,
pub cache_hit_ratio: f64,
pub total_data_points: u64,
pub active_streams: usize,
}
impl AnalyticsEngine {
pub fn new(config: AnalyticsConfig, store: Arc<Store>) -> Self {
Self {
config,
store,
streaming_windows: Arc::new(Mutex::new(HashMap::new())),
query_cache: Arc::new(RwLock::new(HashMap::new())),
metrics: Arc::new(Mutex::new(AnalyticsMetrics::default())),
}
}
#[instrument(skip(self))]
pub async fn execute_query(&self, query: TimeSeriesQuery) -> FusekiResult<TimeSeriesResult> {
let start_time = std::time::Instant::now();
let cache_key = self.generate_cache_key(&query);
if let Some(cached_result) = self.get_cached_result(&cache_key).await {
info!("Cache hit for query: {}", query.metric);
return Ok(cached_result);
}
let data_points = self.fetch_time_series_data(&query).await?;
let aggregated_points = self.apply_aggregation(&data_points, &query).await?;
let statistics = if self.config.advanced_stats_enabled {
Some(self.compute_statistics(&aggregated_points))
} else {
None
};
let execution_time = start_time.elapsed();
let result = TimeSeriesResult {
query: query.clone(),
data_points: aggregated_points,
statistics,
execution_time_ms: execution_time.as_millis() as u64,
raw_points_processed: data_points.len(),
};
self.cache_result(cache_key, result.clone()).await;
self.update_metrics(execution_time.as_millis() as u64, data_points.len())
.await;
Ok(result)
}
async fn fetch_time_series_data(
&self,
query: &TimeSeriesQuery,
) -> FusekiResult<Vec<TimeSeriesPoint>> {
debug!("Fetching time-series data for metric: {}", query.metric);
let mut data_points = Vec::new();
let mut current_time = query.start_time;
let interval = Duration::seconds(60);
while current_time <= query.end_time {
use scirs2_core::random::{Random, Rng};
let mut rng = Random::seed(42);
let base_value = 100.0;
let trend = (current_time - query.start_time).num_hours() as f64 * 0.1;
let noise = (rng.gen_range(0.0..1.0) - 0.5) * 20.0;
let seasonal = 10.0 * (current_time.hour() as f64 * std::f64::consts::PI / 12.0).sin();
let value = base_value + trend + seasonal + noise;
data_points.push(TimeSeriesPoint {
timestamp: current_time,
metric: query.metric.clone(),
value,
tags: HashMap::new(),
metadata: None,
});
current_time += interval;
if let Some(limit) = query.limit {
if data_points.len() >= limit {
break;
}
}
}
Ok(data_points)
}
async fn apply_aggregation(
&self,
data_points: &[TimeSeriesPoint],
query: &TimeSeriesQuery,
) -> FusekiResult<Vec<TimeSeriesPoint>> {
if let Some(window_size) = query.window_size {
self.windowed_aggregation(data_points, &query.aggregation, window_size)
.await
} else {
let values: Vec<f64> = data_points.iter().map(|p| p.value).collect();
if let Some(aggregated_value) = compute_aggregation(&values, &query.aggregation) {
Ok(vec![TimeSeriesPoint {
timestamp: query.end_time,
metric: query.metric.clone(),
value: aggregated_value,
tags: HashMap::new(),
metadata: None,
}])
} else {
Ok(Vec::new())
}
}
}
async fn windowed_aggregation(
&self,
data_points: &[TimeSeriesPoint],
function: &AggregationFunction,
window_size: Duration,
) -> FusekiResult<Vec<TimeSeriesPoint>> {
let mut result = Vec::new();
let mut window_start = data_points
.first()
.map(|p| p.timestamp)
.unwrap_or_else(Utc::now);
while window_start
< data_points
.last()
.map(|p| p.timestamp)
.unwrap_or_else(Utc::now)
{
let window_end = window_start + window_size;
let window_points: Vec<&TimeSeriesPoint> = data_points
.iter()
.filter(|p| p.timestamp >= window_start && p.timestamp < window_end)
.collect();
if !window_points.is_empty() {
let values: Vec<f64> = window_points.iter().map(|p| p.value).collect();
if let Some(aggregated_value) = compute_aggregation(&values, function) {
result.push(TimeSeriesPoint {
timestamp: window_start + window_size / 2, metric: window_points[0].metric.clone(),
value: aggregated_value,
tags: HashMap::new(),
metadata: None,
});
}
}
window_start = window_end;
}
Ok(result)
}
fn compute_statistics(&self, data_points: &[TimeSeriesPoint]) -> StatisticalSummary {
let values: Vec<f64> = data_points.iter().map(|p| p.value).collect();
compute_statistics(&values)
}
fn generate_cache_key(&self, query: &TimeSeriesQuery) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query.metric.hash(&mut hasher);
query.start_time.hash(&mut hasher);
query.end_time.hash(&mut hasher);
format!("{:?}", query.aggregation).hash(&mut hasher);
format!("analytics_{:x}", hasher.finish())
}
async fn get_cached_result(&self, cache_key: &str) -> Option<TimeSeriesResult> {
let cache = self.query_cache.read().await;
cache.get(cache_key).cloned()
}
async fn cache_result(&self, cache_key: String, result: TimeSeriesResult) {
let mut cache = self.query_cache.write().await;
cache.insert(cache_key, result);
if cache.len() > 1000 {
let keys_to_remove: Vec<String> = cache.keys().take(100).cloned().collect();
for key in keys_to_remove {
cache.remove(&key);
}
}
}
async fn update_metrics(&self, execution_time_ms: u64, data_points_processed: usize) {
let mut metrics = self.metrics.lock().await;
metrics.total_queries += 1;
metrics.total_data_points += data_points_processed as u64;
if metrics.total_queries == 1 {
metrics.avg_execution_time_ms = execution_time_ms as f64;
} else {
let alpha = 0.1; metrics.avg_execution_time_ms =
alpha * execution_time_ms as f64 + (1.0 - alpha) * metrics.avg_execution_time_ms;
}
}
pub async fn start_streaming(&self, metric: String, window_size: Duration) -> FusekiResult<()> {
let mut windows = self.streaming_windows.lock().await;
windows.insert(metric.clone(), StreamingWindow::new(window_size));
info!("Started streaming analytics for metric: {}", metric);
Ok(())
}
pub async fn add_streaming_point(&self, point: TimeSeriesPoint) -> FusekiResult<()> {
let mut windows = self.streaming_windows.lock().await;
if let Some(window) = windows.get_mut(&point.metric) {
window.add_point(point);
}
Ok(())
}
pub async fn get_streaming_aggregation(
&self,
metric: &str,
function: &AggregationFunction,
) -> Option<f64> {
let mut windows = self.streaming_windows.lock().await;
if let Some(window) = windows.get_mut(metric) {
window.compute_aggregation(function)
} else {
None
}
}
pub async fn get_metrics(&self) -> AnalyticsMetrics {
let metrics = self.metrics.lock().await;
let mut result = (*metrics).clone();
let windows = self.streaming_windows.lock().await;
result.active_streams = windows.len();
result
}
}
fn compute_aggregation(values: &[f64], function: &AggregationFunction) -> Option<f64> {
if values.is_empty() {
return None;
}
match function {
AggregationFunction::Avg => Some(values.iter().sum::<f64>() / values.len() as f64),
AggregationFunction::Sum => Some(values.iter().sum()),
AggregationFunction::Min => values.iter().fold(f64::INFINITY, |a, &b| a.min(b)).into(),
AggregationFunction::Max => values
.iter()
.fold(f64::NEG_INFINITY, |a, &b| a.max(b))
.into(),
AggregationFunction::Count => Some(values.len() as f64),
AggregationFunction::StdDev => {
let mean = values.iter().sum::<f64>() / values.len() as f64;
let variance =
values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64;
Some(variance.sqrt())
}
AggregationFunction::Variance => {
let mean = values.iter().sum::<f64>() / values.len() as f64;
Some(values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64)
}
AggregationFunction::Median => {
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mid = sorted.len() / 2;
if sorted.len() % 2 == 0 {
Some((sorted[mid - 1] + sorted[mid]) / 2.0)
} else {
Some(sorted[mid])
}
}
AggregationFunction::Percentile(p) => {
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let index = ((*p / 100.0) * (sorted.len() - 1) as f64).round() as usize;
sorted.get(index).copied()
}
AggregationFunction::Rate => {
if values.len() >= 2 {
let last = values.last().expect("values has at least 2 elements");
let first = values.first().expect("values has at least 2 elements");
Some(last - first)
} else {
None
}
}
AggregationFunction::Derivative => {
if values.len() >= 2 {
let diffs: Vec<f64> = values.windows(2).map(|w| w[1] - w[0]).collect();
Some(diffs.iter().sum::<f64>() / diffs.len() as f64)
} else {
None
}
}
AggregationFunction::MovingAverage(window_size) => {
if values.len() >= *window_size {
let window_values = &values[values.len() - window_size..];
Some(window_values.iter().sum::<f64>() / window_values.len() as f64)
} else {
Some(values.iter().sum::<f64>() / values.len() as f64)
}
}
AggregationFunction::ExponentialMovingAverage(alpha) => {
let mut ema = values[0];
for &value in &values[1..] {
ema = alpha * value + (1.0 - alpha) * ema;
}
Some(ema)
}
}
}
fn compute_statistics(values: &[f64]) -> StatisticalSummary {
if values.is_empty() {
return StatisticalSummary {
mean: 0.0,
std_dev: 0.0,
min: 0.0,
max: 0.0,
median: 0.0,
p25: 0.0,
p75: 0.0,
count: 0,
variance: 0.0,
skewness: 0.0,
kurtosis: 0.0,
};
}
let count = values.len();
let mean = values.iter().sum::<f64>() / count as f64;
let variance = values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / count as f64;
let std_dev = variance.sqrt();
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let min = sorted[0];
let max = sorted[count - 1];
let median = if count % 2 == 0 {
(sorted[count / 2 - 1] + sorted[count / 2]) / 2.0
} else {
sorted[count / 2]
};
let p25_idx = (0.25 * (count - 1) as f64).round() as usize;
let p75_idx = (0.75 * (count - 1) as f64).round() as usize;
let p25 = sorted[p25_idx];
let p75 = sorted[p75_idx];
let skewness = if std_dev > 0.0 {
values
.iter()
.map(|&x| ((x - mean) / std_dev).powi(3))
.sum::<f64>()
/ count as f64
} else {
0.0
};
let kurtosis = if std_dev > 0.0 {
values
.iter()
.map(|&x| ((x - mean) / std_dev).powi(4))
.sum::<f64>()
/ count as f64
- 3.0
} else {
0.0
};
StatisticalSummary {
mean,
std_dev,
min,
max,
median,
p25,
p75,
count,
variance,
skewness,
kurtosis,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_aggregation_functions() {
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
assert_eq!(
compute_aggregation(&values, &AggregationFunction::Avg),
Some(3.0)
);
assert_eq!(
compute_aggregation(&values, &AggregationFunction::Sum),
Some(15.0)
);
assert_eq!(
compute_aggregation(&values, &AggregationFunction::Min),
Some(1.0)
);
assert_eq!(
compute_aggregation(&values, &AggregationFunction::Max),
Some(5.0)
);
assert_eq!(
compute_aggregation(&values, &AggregationFunction::Count),
Some(5.0)
);
assert_eq!(
compute_aggregation(&values, &AggregationFunction::Median),
Some(3.0)
);
}
#[test]
fn test_statistics_computation() {
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let stats = compute_statistics(&values);
assert_eq!(stats.mean, 3.0);
assert_eq!(stats.min, 1.0);
assert_eq!(stats.max, 5.0);
assert_eq!(stats.median, 3.0);
assert_eq!(stats.count, 5);
}
#[tokio::test]
async fn test_streaming_window() {
let mut window = StreamingWindow::new(Duration::seconds(60));
let point = TimeSeriesPoint {
timestamp: Utc::now(),
metric: "test_metric".to_string(),
value: 42.0,
tags: HashMap::new(),
metadata: None,
};
window.add_point(point);
let avg = window.compute_aggregation(&AggregationFunction::Avg);
assert_eq!(avg, Some(42.0));
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyDetector {
pub sensitivity: f64,
pub baseline_window_hours: u32,
pub detection_methods: Vec<AnomalyDetectionMethod>,
pub confidence_threshold: f64,
}
impl Default for AnomalyDetector {
fn default() -> Self {
Self {
sensitivity: 0.95,
baseline_window_hours: 24 * 7, detection_methods: vec![
AnomalyDetectionMethod::StatisticalOutlier,
AnomalyDetectionMethod::IsolationForest,
AnomalyDetectionMethod::SeasonalDecomposition,
],
confidence_threshold: 0.8,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AnomalyDetectionMethod {
StatisticalOutlier,
IsolationForest,
SeasonalDecomposition,
LstmTimeSeriesAnomaly,
MultivariateAnomaly,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyResult {
pub timestamp: DateTime<Utc>,
pub metric: String,
pub observed_value: f64,
pub expected_value: f64,
pub confidence_score: f64,
pub anomaly_type: AnomalyType,
pub severity: AnomalySeverity,
pub detection_method: AnomalyDetectionMethod,
pub context: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(clippy::enum_variant_names)]
pub enum AnomalyType {
PointAnomaly,
ContextualAnomaly,
CollectiveAnomaly,
TrendAnomaly,
SeasonalAnomaly,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AnomalySeverity {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PredictiveAnalytics {
pub forecasting_methods: Vec<ForecastingMethod>,
pub prediction_horizon_hours: u32,
pub confidence_intervals: Vec<f64>,
pub ensemble_weights: HashMap<String, f64>,
}
impl Default for PredictiveAnalytics {
fn default() -> Self {
Self {
forecasting_methods: vec![
ForecastingMethod::LinearRegression,
ForecastingMethod::ExponentialSmoothing,
ForecastingMethod::Arima,
ForecastingMethod::SeasonalDecomposition,
],
prediction_horizon_hours: 24,
confidence_intervals: vec![0.8, 0.9, 0.95],
ensemble_weights: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ForecastingMethod {
LinearRegression,
ExponentialSmoothing,
Arima,
SeasonalDecomposition,
LstmForecasting,
Prophet,
EnsembleForecasting,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForecastResult {
pub timestamps: Vec<DateTime<Utc>>,
pub predicted_values: Vec<f64>,
pub confidence_intervals: HashMap<String, (Vec<f64>, Vec<f64>)>, pub accuracy_metrics: AccuracyMetrics,
pub method: ForecastingMethod,
pub computation_time_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccuracyMetrics {
pub mae: f64,
pub mse: f64,
pub rmse: f64,
pub mape: f64,
pub r_squared: f64,
}
impl AnalyticsEngine {
pub async fn detect_anomalies(
&self,
data_points: &[TimeSeriesPoint],
detector: &AnomalyDetector,
) -> FusekiResult<Vec<AnomalyResult>> {
let mut anomalies = Vec::new();
for method in &detector.detection_methods {
let method_anomalies = match method {
AnomalyDetectionMethod::StatisticalOutlier => {
self.detect_statistical_outliers(data_points, detector)
.await?
}
AnomalyDetectionMethod::IsolationForest => {
self.detect_statistical_outliers(data_points, detector)
.await?
}
AnomalyDetectionMethod::SeasonalDecomposition => {
self.detect_statistical_outliers(data_points, detector)
.await?
}
AnomalyDetectionMethod::LstmTimeSeriesAnomaly => {
self.detect_statistical_outliers(data_points, detector)
.await?
}
AnomalyDetectionMethod::MultivariateAnomaly => {
self.detect_statistical_outliers(data_points, detector)
.await?
}
};
anomalies.extend(method_anomalies);
}
anomalies.sort_by(|a, b| {
b.confidence_score
.partial_cmp(&a.confidence_score)
.unwrap_or(std::cmp::Ordering::Equal)
});
anomalies.dedup_by(|a, b| a.timestamp == b.timestamp && a.metric == b.metric);
Ok(anomalies)
}
async fn detect_statistical_outliers(
&self,
data_points: &[TimeSeriesPoint],
detector: &AnomalyDetector,
) -> FusekiResult<Vec<AnomalyResult>> {
let values: Vec<f64> = data_points.iter().map(|p| p.value).collect();
let stats = compute_statistics(&values);
let threshold = 2.5 * detector.sensitivity;
let mut anomalies = Vec::new();
for point in data_points {
let z_score = if stats.std_dev > 0.0 {
(point.value - stats.mean).abs() / stats.std_dev
} else {
0.0
};
if z_score > threshold {
let confidence = (z_score / (threshold * 2.0)).clamp(0.0, 1.0);
let severity = match z_score {
z if z > threshold * 2.0 => AnomalySeverity::Critical,
z if z > threshold * 1.5 => AnomalySeverity::High,
z if z > threshold * 1.2 => AnomalySeverity::Medium,
_ => AnomalySeverity::Low,
};
anomalies.push(AnomalyResult {
timestamp: point.timestamp,
metric: point.metric.clone(),
observed_value: point.value,
expected_value: stats.mean,
confidence_score: confidence,
anomaly_type: AnomalyType::PointAnomaly,
severity,
detection_method: AnomalyDetectionMethod::StatisticalOutlier,
context: HashMap::from([
(
"z_score".to_string(),
serde_json::Value::Number(
serde_json::Number::from_f64(z_score)
.unwrap_or_else(|| serde_json::Number::from(0)),
),
),
(
"threshold".to_string(),
serde_json::Value::Number(
serde_json::Number::from_f64(threshold)
.unwrap_or_else(|| serde_json::Number::from(0)),
),
),
]),
});
}
}
Ok(anomalies)
}
pub async fn generate_forecast(
&self,
data_points: &[TimeSeriesPoint],
config: &PredictiveAnalytics,
) -> FusekiResult<ForecastResult> {
let start_time = std::time::Instant::now();
let method = config
.forecasting_methods
.first()
.unwrap_or(&ForecastingMethod::LinearRegression);
let forecast = match method {
ForecastingMethod::LinearRegression => {
self.linear_regression_forecast(data_points, config).await?
}
_ => {
self.linear_regression_forecast(data_points, config).await?
}
};
let computation_time = start_time.elapsed().as_millis() as u64;
Ok(ForecastResult {
timestamps: forecast.0,
predicted_values: forecast.1,
confidence_intervals: forecast.2,
accuracy_metrics: forecast.3,
method: method.clone(),
computation_time_ms: computation_time,
})
}
async fn linear_regression_forecast(
&self,
data_points: &[TimeSeriesPoint],
config: &PredictiveAnalytics,
) -> FusekiResult<(
Vec<DateTime<Utc>>,
Vec<f64>,
HashMap<String, (Vec<f64>, Vec<f64>)>,
AccuracyMetrics,
)> {
if data_points.len() < 2 {
return Err(FusekiError::invalid_query(
"Insufficient data for forecasting",
));
}
let base_time = data_points[0].timestamp.timestamp() as f64;
let x_values: Vec<f64> = data_points
.iter()
.map(|p| (p.timestamp.timestamp() as f64 - base_time) / 3600.0) .collect();
let y_values: Vec<f64> = data_points.iter().map(|p| p.value).collect();
let n = x_values.len() as f64;
let sum_x: f64 = x_values.iter().sum();
let sum_y: f64 = y_values.iter().sum();
let sum_xy: f64 = x_values.iter().zip(&y_values).map(|(x, y)| x * y).sum();
let sum_x2: f64 = x_values.iter().map(|x| x * x).sum();
let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x);
let intercept = (sum_y - slope * sum_x) / n;
let last_time = data_points
.last()
.expect("data_points validated to have at least 2 elements")
.timestamp;
let forecast_hours = config.prediction_horizon_hours as i64;
let mut forecast_timestamps = Vec::new();
let mut predicted_values = Vec::new();
for hour in 1..=forecast_hours {
let forecast_time = last_time + Duration::hours(hour);
let x_forecast = (forecast_time.timestamp() as f64 - base_time) / 3600.0;
let y_forecast = slope * x_forecast + intercept;
forecast_timestamps.push(forecast_time);
predicted_values.push(y_forecast);
}
let mut confidence_intervals = HashMap::new();
for &confidence_level in &config.confidence_intervals {
let z_score = match confidence_level {
0.8 => 1.28,
0.9 => 1.645,
0.95 => 1.96,
0.99 => 2.576,
_ => 1.96, };
let residuals: Vec<f64> = x_values
.iter()
.zip(&y_values)
.map(|(x, y)| y - (slope * x + intercept))
.collect();
let mse = residuals.iter().map(|r| r * r).sum::<f64>() / (n - 2.0);
let std_error = mse.sqrt();
let lower_bounds: Vec<f64> = predicted_values
.iter()
.map(|y| y - z_score * std_error)
.collect();
let upper_bounds: Vec<f64> = predicted_values
.iter()
.map(|y| y + z_score * std_error)
.collect();
let confidence_level_percent = (confidence_level * 100.0) as u32;
confidence_intervals.insert(
format!("{confidence_level_percent}%"),
(lower_bounds, upper_bounds),
);
}
let predictions: Vec<f64> = x_values.iter().map(|x| slope * x + intercept).collect();
let accuracy_metrics = self.calculate_accuracy_metrics(&y_values, &predictions);
Ok((
forecast_timestamps,
predicted_values,
confidence_intervals,
accuracy_metrics,
))
}
fn calculate_accuracy_metrics(&self, actual: &[f64], predicted: &[f64]) -> AccuracyMetrics {
if actual.len() != predicted.len() || actual.is_empty() {
return AccuracyMetrics {
mae: 0.0,
mse: 0.0,
rmse: 0.0,
mape: 0.0,
r_squared: 0.0,
};
}
let n = actual.len() as f64;
let mae = actual
.iter()
.zip(predicted)
.map(|(a, p)| (a - p).abs())
.sum::<f64>()
/ n;
let mse = actual
.iter()
.zip(predicted)
.map(|(a, p)| (a - p).powi(2))
.sum::<f64>()
/ n;
let rmse = mse.sqrt();
let mape = actual
.iter()
.zip(predicted)
.filter(|(a, _)| **a != 0.0)
.map(|(a, p)| ((a - p) / a).abs())
.sum::<f64>()
/ n
* 100.0;
let actual_mean = actual.iter().sum::<f64>() / n;
let ss_tot = actual
.iter()
.map(|a| (a - actual_mean).powi(2))
.sum::<f64>();
let ss_res = actual
.iter()
.zip(predicted)
.map(|(a, p)| (a - p).powi(2))
.sum::<f64>();
let r_squared = if ss_tot != 0.0 {
1.0 - ss_res / ss_tot
} else {
0.0
};
AccuracyMetrics {
mae,
mse,
rmse,
mape,
r_squared,
}
}
}
#[cfg(test)]
mod advanced_tests {
use super::*;
#[tokio::test]
async fn test_anomaly_detection() {
let mut data_points = Vec::new();
let base_time = Utc::now();
for i in 0..100 {
let value = if i == 50 {
1000.0
} else {
100.0 + (i as f64 * 0.1)
}; data_points.push(TimeSeriesPoint {
timestamp: base_time + Duration::hours(i),
metric: "test_metric".to_string(),
value,
tags: HashMap::new(),
metadata: None,
});
}
let detector = AnomalyDetector::default();
let engine =
AnalyticsEngine::new(AnalyticsConfig::default(), Arc::new(Store::new().unwrap()));
let anomalies = engine
.detect_anomalies(&data_points, &detector)
.await
.unwrap();
assert!(!anomalies.is_empty());
let anomaly = &anomalies[0];
assert_eq!(anomaly.observed_value, 1000.0);
assert!(anomaly.confidence_score > 0.5);
}
#[tokio::test]
async fn test_linear_regression_forecast() {
let mut data_points = Vec::new();
let base_time = Utc::now();
for i in 0..48 {
data_points.push(TimeSeriesPoint {
timestamp: base_time + Duration::hours(i),
metric: "test_metric".to_string(),
value: 100.0 + i as f64 * 2.0, tags: HashMap::new(),
metadata: None,
});
}
let config = PredictiveAnalytics::default();
let engine =
AnalyticsEngine::new(AnalyticsConfig::default(), Arc::new(Store::new().unwrap()));
let forecast = engine
.generate_forecast(&data_points, &config)
.await
.unwrap();
assert_eq!(forecast.predicted_values.len(), 24); assert!(!forecast.confidence_intervals.is_empty());
assert!(forecast.accuracy_metrics.r_squared > 0.8); }
}
}