use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingConfig {
pub base_window_size: usize,
pub max_window_size: usize,
pub min_window_size: usize,
pub drift_sensitivity: f64,
pub warning_threshold: f64,
pub drift_threshold: f64,
pub adaptive_windowing: bool,
pub adaptation_strategy: WindowAdaptationStrategy,
pub enable_drift_detection: bool,
pub drift_detection_methods: Vec<DriftDetectionMethod>,
pub enable_anomaly_detection: bool,
pub anomaly_algorithm: AnomalyDetectionAlgorithm,
pub monitoring_interval: Duration,
pub enable_alerts: bool,
pub alert_config: AlertConfig,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
base_window_size: 1000,
max_window_size: 10000,
min_window_size: 100,
drift_sensitivity: 0.05,
warning_threshold: 0.95,
drift_threshold: 0.99,
adaptive_windowing: true,
adaptation_strategy: WindowAdaptationStrategy::DriftBased,
enable_drift_detection: true,
drift_detection_methods: vec![
DriftDetectionMethod::Adwin { delta: 0.002 },
DriftDetectionMethod::Ddm { alpha: 0.05, beta: 0.0 },
],
enable_anomaly_detection: true,
anomaly_algorithm: AnomalyDetectionAlgorithm::IsolationForest {
n_trees: 100,
subsample_size: 256,
},
monitoring_interval: Duration::from_secs(10),
enable_alerts: true,
alert_config: AlertConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WindowAdaptationStrategy {
Fixed,
ExponentialDecay { decay_rate: f64 },
PerformanceBased { target_accuracy: f64 },
DriftBased,
Hybrid {
strategies: Vec<WindowAdaptationStrategy>,
weights: Vec<f64>,
},
MLBased { model_type: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DriftDetectionMethod {
Adwin {
delta: f64,
},
Ddm {
alpha: f64,
beta: f64,
},
PageHinkley {
min_instances: usize,
threshold: f64,
alpha: f64,
},
Kswin {
window_size: usize,
stat_size: usize,
},
HddmA {
drift_confidence: f64,
warning_confidence: f64,
},
HddmW {
drift_confidence: f64,
warning_confidence: f64,
lambda: f64,
},
Stepd {
window_size: usize,
alpha: f64,
},
Custom {
name: String,
parameters: std::collections::HashMap<String, f64>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AnomalyDetectionAlgorithm {
IsolationForest {
n_trees: usize,
subsample_size: usize,
},
OneClassSvm {
nu: f64,
gamma: f64,
},
LocalOutlierFactor {
n_neighbors: usize,
contamination: f64,
},
EllipticEnvelope {
contamination: f64,
},
Statistical {
z_threshold: f64,
modified: bool,
},
RobustCovariance {
support_fraction: f64,
},
Custom {
name: String,
parameters: std::collections::HashMap<String, f64>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertConfig {
pub drift_alerts: bool,
pub anomaly_alerts: bool,
pub performance_alerts: bool,
pub min_severity: AlertSeverity,
pub cooldown_period: Duration,
pub max_alerts_per_window: usize,
pub aggregation_window: Duration,
pub email_notifications: bool,
pub webhook_notifications: bool,
pub webhook_url: Option<String>,
pub custom_handlers: Vec<String>,
}
impl Default for AlertConfig {
fn default() -> Self {
Self {
drift_alerts: true,
anomaly_alerts: true,
performance_alerts: true,
min_severity: AlertSeverity::Warning,
cooldown_period: Duration::from_secs(300), max_alerts_per_window: 10,
aggregation_window: Duration::from_secs(3600), email_notifications: false,
webhook_notifications: false,
webhook_url: None,
custom_handlers: vec![],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum AlertSeverity {
Info,
Warning,
Error,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AdaptationTrigger {
Time(Duration),
SampleCount(usize),
Performance {
accuracy_threshold: f64,
latency_threshold: Duration,
},
Drift { confidence: f64 },
Manual,
Combined(Vec<AdaptationTrigger>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EnsembleAggregation {
Average,
WeightedAverage(Vec<f64>),
Maximum,
Minimum,
Median,
MajorityVoting,
SoftVoting,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMonitoringConfig {
pub monitor_latency: bool,
pub monitor_throughput: bool,
pub monitor_memory: bool,
pub monitor_accuracy: bool,
pub sampling_rate: f64,
pub baseline_update_interval: Duration,
pub degradation_threshold: f64,
pub alert_on_degradation: bool,
}
impl Default for PerformanceMonitoringConfig {
fn default() -> Self {
Self {
monitor_latency: true,
monitor_throughput: true,
monitor_memory: true,
monitor_accuracy: true,
sampling_rate: 1.0,
baseline_update_interval: Duration::from_secs(3600), degradation_threshold: 0.1, alert_on_degradation: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BufferConfig {
pub max_size: usize,
pub eviction_policy: EvictionPolicy,
pub enable_compression: bool,
pub compression_algorithm: CompressionAlgorithm,
pub persistence: PersistenceConfig,
}
impl Default for BufferConfig {
fn default() -> Self {
Self {
max_size: 100000,
eviction_policy: EvictionPolicy::Lru,
enable_compression: false,
compression_algorithm: CompressionAlgorithm::Gzip,
persistence: PersistenceConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EvictionPolicy {
Lru,
Lfu,
Fifo,
Random,
TimeBased(Duration),
SizeBased(usize),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CompressionAlgorithm {
None,
Gzip,
Lz4,
Zstd,
Snappy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistenceConfig {
pub enabled: bool,
pub backend: PersistenceBackend,
pub sync_interval: Duration,
pub compress_data: bool,
pub retention_policy: RetentionPolicy,
}
impl Default for PersistenceConfig {
fn default() -> Self {
Self {
enabled: false,
backend: PersistenceBackend::File { path: "/tmp/streaming_data".to_string() },
sync_interval: Duration::from_secs(60),
compress_data: true,
retention_policy: RetentionPolicy::TimeBasedRetention(Duration::from_secs(86400 * 7)), }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PersistenceBackend {
File { path: String },
Database {
connection_string: String,
table_name: String,
},
Redis {
host: String,
port: u16,
database: usize,
},
Custom {
backend_type: String,
config: std::collections::HashMap<String, String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RetentionPolicy {
KeepAll,
TimeBasedRetention(Duration),
SizeBasedRetention(usize),
CountBasedRetention(usize),
Custom {
policy_name: String,
parameters: std::collections::HashMap<String, String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchingConfig {
pub enabled: bool,
pub batch_size: usize,
pub batch_timeout: Duration,
pub max_batch_size: usize,
pub strategy: BatchingStrategy,
pub adaptive_batching: bool,
}
impl Default for BatchingConfig {
fn default() -> Self {
Self {
enabled: true,
batch_size: 100,
batch_timeout: Duration::from_millis(100),
max_batch_size: 1000,
strategy: BatchingStrategy::SizeBased,
adaptive_batching: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BatchingStrategy {
SizeBased,
TimeBased,
Hybrid,
LoadBased,
LatencyOptimized,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_config_default() {
let config = StreamingConfig::default();
assert_eq!(config.base_window_size, 1000);
assert!(config.adaptive_windowing);
assert!(config.enable_drift_detection);
assert!(config.enable_anomaly_detection);
}
#[test]
fn test_alert_config_default() {
let config = AlertConfig::default();
assert!(config.drift_alerts);
assert!(config.anomaly_alerts);
assert_eq!(config.min_severity, AlertSeverity::Warning);
}
#[test]
fn test_window_adaptation_strategy() {
let strategy = WindowAdaptationStrategy::ExponentialDecay { decay_rate: 0.95 };
match strategy {
WindowAdaptationStrategy::ExponentialDecay { decay_rate } => {
assert_eq!(decay_rate, 0.95);
}
_ => panic!("Unexpected strategy type"),
}
}
#[test]
fn test_drift_detection_method() {
let method = DriftDetectionMethod::Adwin { delta: 0.002 };
match method {
DriftDetectionMethod::Adwin { delta } => {
assert_eq!(delta, 0.002);
}
_ => panic!("Unexpected method type"),
}
}
#[test]
fn test_anomaly_detection_algorithm() {
let algorithm = AnomalyDetectionAlgorithm::IsolationForest {
n_trees: 100,
subsample_size: 256,
};
match algorithm {
AnomalyDetectionAlgorithm::IsolationForest { n_trees, subsample_size } => {
assert_eq!(n_trees, 100);
assert_eq!(subsample_size, 256);
}
_ => panic!("Unexpected algorithm type"),
}
}
#[test]
fn test_alert_severity_ordering() {
assert!(AlertSeverity::Critical > AlertSeverity::Error);
assert!(AlertSeverity::Error > AlertSeverity::Warning);
assert!(AlertSeverity::Warning > AlertSeverity::Info);
}
#[test]
fn test_buffer_config_default() {
let config = BufferConfig::default();
assert_eq!(config.max_size, 100000);
assert!(matches!(config.eviction_policy, EvictionPolicy::Lru));
}
#[test]
fn test_performance_monitoring_config() {
let config = PerformanceMonitoringConfig::default();
assert!(config.monitor_latency);
assert!(config.monitor_throughput);
assert_eq!(config.sampling_rate, 1.0);
}
#[test]
fn test_batching_config_default() {
let config = BatchingConfig::default();
assert!(config.enabled);
assert_eq!(config.batch_size, 100);
assert!(config.adaptive_batching);
}
}