//! # Advanced ML Stream Processing Example
//!
//! Demonstrates using oxirs-stream with machine learning capabilities:
//! - Online learning from streaming data
//! - Real-time anomaly detection
//! - Predictive analytics and forecasting
//! - Feature engineering pipeline
//! - AutoML for automatic model selection
use anyhow::Result;
use oxirs_stream::{
AnomalyConfig, AnomalyDetector, AutoML, AutoMLConfig, DetectorType, FeaturePipeline,
ForecastAlgorithm, OnlineLearningConfig, OnlineLearningModel, PredictiveAnalytics,
PredictiveStats, Sample, StreamConfig, StreamEvent, TaskType,
};
use tracing::{info, warn};
use tracing_subscriber;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt::init();
info!("Starting Advanced ML Stream Processing Example");
// Demonstrate each ML capability
online_learning_example().await?;
anomaly_detection_example().await?;
predictive_analytics_example().await?;
automl_example().await?;
Ok(())
}
/// Example: Online learning from streaming data
async fn online_learning_example() -> Result<()> {
info!("=== Online Learning Example ===");
// Create online learning configuration
let config = OnlineLearningConfig {
model_type: oxirs_stream::OnlineModelType::LinearRegression,
learning_rate: 0.01,
batch_size: 32,
enable_drift_detection: true,
drift_threshold: 0.1,
checkpoint_interval: 100,
};
let mut model = OnlineLearningModel::new(config)?;
// Simulate streaming data with features and labels
for i in 0..1000 {
// Extract features from streaming events
let features = vec![i as f64, (i * 2) as f64, (i as f64).sqrt()];
let label = i as f64 * 1.5 + 10.0; // True relationship
let sample = Sample {
features: features.clone(),
label: Some(label),
weight: 1.0,
timestamp: chrono::Utc::now(),
};
// Incremental model update
model.train(&sample).await?;
// Make prediction every 100 samples
if i % 100 == 0 {
let prediction = model.predict(&features).await?;
info!(
"Sample {}: Prediction = {:.2}, Actual = {:.2}",
i, prediction.value, label
);
// Get model metrics
let metrics = model.get_metrics().await;
info!("Model MSE: {:.4}", metrics.mse);
}
// Check for concept drift
if model.detect_drift().await? {
warn!("Concept drift detected! Model adapting...");
}
}
// Save model checkpoint
let checkpoint = model.checkpoint().await?;
info!("Model checkpoint saved: {:?}", checkpoint.id);
Ok(())
}
/// Example: Real-time anomaly detection
async fn anomaly_detection_example() -> Result<()> {
info!("=== Anomaly Detection Example ===");
// Create anomaly detector with ensemble methods
let config = AnomalyConfig {
detector_types: vec![
DetectorType::ZScore,
DetectorType::ModifiedZScore,
DetectorType::IQR,
DetectorType::EWMA,
],
sensitivity: 0.95,
window_size: 100,
enable_adaptive_threshold: true,
alert_threshold: 0.8,
};
let mut detector = AnomalyDetector::new(config)?;
// Simulate streaming metrics
for i in 0..500 {
// Normal data with occasional anomalies
let value = if i == 100 || i == 300 {
100.0 // Anomaly
} else {
50.0 + fastrand::f64() * 10.0 // Normal
};
// Detect anomalies
if let Some(anomaly) = detector.detect(value).await? {
warn!(
"Anomaly detected at sample {}: score = {:.3}, severity = {:?}",
i, anomaly.score, anomaly.severity
);
}
if i % 100 == 0 {
let stats = detector.get_stats().await;
info!(
"Anomaly stats: total={}, rate={:.2}%",
stats.total_anomalies,
stats.anomaly_rate * 100.0
);
}
}
Ok(())
}
/// Example: Predictive analytics and forecasting
async fn predictive_analytics_example() -> Result<()> {
info!("=== Predictive Analytics Example ===");
// Create predictive analytics engine
let config = oxirs_stream::ForecastingConfig {
algorithm: ForecastAlgorithm::ARIMA { p: 2, d: 1, q: 2 },
forecast_horizon: 10,
confidence_level: 0.95,
enable_seasonality: true,
seasonal_period: Some(24),
};
let mut analytics = PredictiveAnalytics::new(config)?;
// Historical time series data (e.g., hourly metrics)
let mut time_series: Vec<f64> = Vec::new();
for i in 0..200 {
// Simulate seasonal pattern + trend + noise
let value = 50.0
+ (i as f64 / 10.0) // Trend
+ 20.0 * (i as f64 * std::f64::consts::PI / 12.0).sin() // Seasonality
+ fastrand::f64() * 5.0; // Noise
time_series.push(value);
}
// Train forecasting model
analytics.fit(&time_series).await?;
// Generate forecast
let forecast = analytics.forecast(10).await?;
info!("Forecast for next 10 time steps:");
for (i, pred) in forecast.predictions.iter().enumerate() {
info!(
" t+{}: {:.2} (CI: [{:.2}, {:.2}])",
i + 1,
pred.value,
pred.confidence_interval.0,
pred.confidence_interval.1
);
}
// Get forecast accuracy metrics
let metrics = analytics.evaluate(&time_series[190..]).await?;
info!(
"Forecast accuracy - MAE: {:.2}, RMSE: {:.2}",
metrics.mae, metrics.rmse
);
Ok(())
}
/// Example: AutoML for automatic model selection
async fn automl_example() -> Result<()> {
info!("=== AutoML Example ===");
// Create AutoML configuration
let config = AutoMLConfig {
task_type: TaskType::Regression,
max_trials: 20,
max_time_seconds: Some(300),
optimization_metric: "mse".to_string(),
cv_folds: 5,
ensemble_size: 3,
};
let mut automl = AutoML::new(config)?;
// Prepare training data
let mut X_train = Vec::new();
let mut y_train = Vec::new();
for i in 0..1000 {
let x1 = fastrand::f64() * 100.0;
let x2 = fastrand::f64() * 100.0;
let x3 = fastrand::f64() * 100.0;
// True relationship: y = 2*x1 + 3*x2 - 0.5*x3 + noise
let y = 2.0 * x1 + 3.0 * x2 - 0.5 * x3 + fastrand::f64() * 10.0;
X_train.push(vec![x1, x2, x3]);
y_train.push(y);
}
// Run AutoML search
info!("Starting AutoML search...");
let best_model = automl.fit(&X_train, &y_train).await?;
info!("Best model found: {:?}", best_model.algorithm);
info!("Best score: {:.4}", best_model.performance.score);
info!("Hyperparameters: {:?}", best_model.hyperparameters);
// Make predictions with best model
let test_sample = vec![50.0, 60.0, 30.0];
let prediction = automl.predict(&test_sample).await?;
info!("Prediction for test sample: {:.2}", prediction);
// Get AutoML statistics
let stats = automl.get_stats().await;
info!(
"AutoML completed {} trials in {:.2}s",
stats.total_trials, stats.total_time_seconds
);
Ok(())
}