oxirs-stream 0.2.4

Real-time streaming support with Kafka/NATS/MQTT/OPC-UA I/O, RDF Patch, and SPARQL Update delta
Documentation
//! # Advanced ML Stream Processing Example
//!
//! Demonstrates using oxirs-stream with machine learning capabilities:
//! - Online learning from streaming data
//! - Real-time anomaly detection
//! - Predictive analytics and forecasting
//! - Feature engineering pipeline
//! - AutoML for automatic model selection

use anyhow::Result;
use oxirs_stream::{
    AnomalyConfig, AnomalyDetector, AutoML, AutoMLConfig, DetectorType, FeaturePipeline,
    ForecastAlgorithm, OnlineLearningConfig, OnlineLearningModel, PredictiveAnalytics,
    PredictiveStats, Sample, StreamConfig, StreamEvent, TaskType,
};
use tracing::{info, warn};
use tracing_subscriber;

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize logging
    tracing_subscriber::fmt::init();

    info!("Starting Advanced ML Stream Processing Example");

    // Demonstrate each ML capability
    online_learning_example().await?;
    anomaly_detection_example().await?;
    predictive_analytics_example().await?;
    automl_example().await?;

    Ok(())
}

/// Example: Online learning from streaming data
async fn online_learning_example() -> Result<()> {
    info!("=== Online Learning Example ===");

    // Create online learning configuration
    let config = OnlineLearningConfig {
        model_type: oxirs_stream::OnlineModelType::LinearRegression,
        learning_rate: 0.01,
        batch_size: 32,
        enable_drift_detection: true,
        drift_threshold: 0.1,
        checkpoint_interval: 100,
    };

    let mut model = OnlineLearningModel::new(config)?;

    // Simulate streaming data with features and labels
    for i in 0..1000 {
        // Extract features from streaming events
        let features = vec![i as f64, (i * 2) as f64, (i as f64).sqrt()];
        let label = i as f64 * 1.5 + 10.0; // True relationship

        let sample = Sample {
            features: features.clone(),
            label: Some(label),
            weight: 1.0,
            timestamp: chrono::Utc::now(),
        };

        // Incremental model update
        model.train(&sample).await?;

        // Make prediction every 100 samples
        if i % 100 == 0 {
            let prediction = model.predict(&features).await?;
            info!(
                "Sample {}: Prediction = {:.2}, Actual = {:.2}",
                i, prediction.value, label
            );

            // Get model metrics
            let metrics = model.get_metrics().await;
            info!("Model MSE: {:.4}", metrics.mse);
        }

        // Check for concept drift
        if model.detect_drift().await? {
            warn!("Concept drift detected! Model adapting...");
        }
    }

    // Save model checkpoint
    let checkpoint = model.checkpoint().await?;
    info!("Model checkpoint saved: {:?}", checkpoint.id);

    Ok(())
}

/// Example: Real-time anomaly detection
async fn anomaly_detection_example() -> Result<()> {
    info!("=== Anomaly Detection Example ===");

    // Create anomaly detector with ensemble methods
    let config = AnomalyConfig {
        detector_types: vec![
            DetectorType::ZScore,
            DetectorType::ModifiedZScore,
            DetectorType::IQR,
            DetectorType::EWMA,
        ],
        sensitivity: 0.95,
        window_size: 100,
        enable_adaptive_threshold: true,
        alert_threshold: 0.8,
    };

    let mut detector = AnomalyDetector::new(config)?;

    // Simulate streaming metrics
    for i in 0..500 {
        // Normal data with occasional anomalies
        let value = if i == 100 || i == 300 {
            100.0 // Anomaly
        } else {
            50.0 + fastrand::f64() * 10.0 // Normal
        };

        // Detect anomalies
        if let Some(anomaly) = detector.detect(value).await? {
            warn!(
                "Anomaly detected at sample {}: score = {:.3}, severity = {:?}",
                i, anomaly.score, anomaly.severity
            );
        }

        if i % 100 == 0 {
            let stats = detector.get_stats().await;
            info!(
                "Anomaly stats: total={}, rate={:.2}%",
                stats.total_anomalies,
                stats.anomaly_rate * 100.0
            );
        }
    }

    Ok(())
}

/// Example: Predictive analytics and forecasting
async fn predictive_analytics_example() -> Result<()> {
    info!("=== Predictive Analytics Example ===");

    // Create predictive analytics engine
    let config = oxirs_stream::ForecastingConfig {
        algorithm: ForecastAlgorithm::ARIMA { p: 2, d: 1, q: 2 },
        forecast_horizon: 10,
        confidence_level: 0.95,
        enable_seasonality: true,
        seasonal_period: Some(24),
    };

    let mut analytics = PredictiveAnalytics::new(config)?;

    // Historical time series data (e.g., hourly metrics)
    let mut time_series: Vec<f64> = Vec::new();
    for i in 0..200 {
        // Simulate seasonal pattern + trend + noise
        let value = 50.0
            + (i as f64 / 10.0) // Trend
            + 20.0 * (i as f64 * std::f64::consts::PI / 12.0).sin() // Seasonality
            + fastrand::f64() * 5.0; // Noise
        time_series.push(value);
    }

    // Train forecasting model
    analytics.fit(&time_series).await?;

    // Generate forecast
    let forecast = analytics.forecast(10).await?;

    info!("Forecast for next 10 time steps:");
    for (i, pred) in forecast.predictions.iter().enumerate() {
        info!(
            "  t+{}: {:.2} (CI: [{:.2}, {:.2}])",
            i + 1,
            pred.value,
            pred.confidence_interval.0,
            pred.confidence_interval.1
        );
    }

    // Get forecast accuracy metrics
    let metrics = analytics.evaluate(&time_series[190..]).await?;
    info!(
        "Forecast accuracy - MAE: {:.2}, RMSE: {:.2}",
        metrics.mae, metrics.rmse
    );

    Ok(())
}

/// Example: AutoML for automatic model selection
async fn automl_example() -> Result<()> {
    info!("=== AutoML Example ===");

    // Create AutoML configuration
    let config = AutoMLConfig {
        task_type: TaskType::Regression,
        max_trials: 20,
        max_time_seconds: Some(300),
        optimization_metric: "mse".to_string(),
        cv_folds: 5,
        ensemble_size: 3,
    };

    let mut automl = AutoML::new(config)?;

    // Prepare training data
    let mut X_train = Vec::new();
    let mut y_train = Vec::new();

    for i in 0..1000 {
        let x1 = fastrand::f64() * 100.0;
        let x2 = fastrand::f64() * 100.0;
        let x3 = fastrand::f64() * 100.0;

        // True relationship: y = 2*x1 + 3*x2 - 0.5*x3 + noise
        let y = 2.0 * x1 + 3.0 * x2 - 0.5 * x3 + fastrand::f64() * 10.0;

        X_train.push(vec![x1, x2, x3]);
        y_train.push(y);
    }

    // Run AutoML search
    info!("Starting AutoML search...");
    let best_model = automl.fit(&X_train, &y_train).await?;

    info!("Best model found: {:?}", best_model.algorithm);
    info!("Best score: {:.4}", best_model.performance.score);
    info!("Hyperparameters: {:?}", best_model.hyperparameters);

    // Make predictions with best model
    let test_sample = vec![50.0, 60.0, 30.0];
    let prediction = automl.predict(&test_sample).await?;
    info!("Prediction for test sample: {:.2}", prediction);

    // Get AutoML statistics
    let stats = automl.get_stats().await;
    info!(
        "AutoML completed {} trials in {:.2}s",
        stats.total_trials, stats.total_time_seconds
    );

    Ok(())
}