use scirs2_core::ndarray::{Array1, Array2};
use scirs2_core::numeric::{Float, FromPrimitive};
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::time::{Duration, Instant};
use crate::error::{Result, TimeSeriesError};
use statrs::statistics::Statistics;
pub mod change_detection;
pub mod config;
pub mod forecasting;
pub mod memory_management;
pub mod online_learning;
pub mod statistics;
pub use change_detection::CusumDetector;
pub use config::{ChangePoint, ChangeType, StreamConfig};
pub use forecasting::{ModelState, StreamingForecaster};
pub use memory_management::{
CircularBuffer, MultiSeriesAnalyzer, PatternMatch, StreamingAnomalyDetector,
StreamingPatternMatcher,
};
pub use online_learning::{AdaptiveARIMA, AdaptiveLinearRegression};
pub use statistics::{OnlineStats, EWMA};
#[derive(Debug)]
pub struct StreamingAnalyzer<F: Float + Debug> {
config: StreamConfig,
stats: OnlineStats<F>,
ewma: EWMA<F>,
cusum: CusumDetector<F>,
buffer: VecDeque<F>,
last_update: Instant,
}
impl<F: Float + Debug + Clone + FromPrimitive> StreamingAnalyzer<F> {
pub fn new(config: StreamConfig) -> Result<Self> {
let ewma = EWMA::new(F::from(0.1).expect("Failed to convert constant to float"))?;
let cusum = CusumDetector::new(
F::from(config.change_detection_threshold).expect("Failed to convert to float"),
F::from(0.5).expect("Failed to convert constant to float"),
);
let window_size = config.window_size;
Ok(Self {
config,
stats: OnlineStats::new(),
ewma,
cusum,
buffer: VecDeque::with_capacity(window_size),
last_update: Instant::now(),
})
}
pub fn add_observation(&mut self, value: F) -> Result<()> {
self.stats.update(value);
self.ewma.update(value);
if self.buffer.len() >= self.config.window_size {
self.buffer.pop_front();
}
self.buffer.push_back(value);
self.cusum.update(value);
self.last_update = Instant::now();
Ok(())
}
pub fn get_stats(&self) -> &OnlineStats<F> {
&self.stats
}
pub fn get_ewma(&self) -> Option<F> {
self.ewma.value()
}
pub fn detect_change(&self) -> bool {
self.cusum.is_change_detected()
}
pub fn get_change_signal(&self) -> (F, F) {
self.cusum.get_signals()
}
pub fn get_buffer(&self) -> &VecDeque<F> {
&self.buffer
}
pub fn get_config(&self) -> &StreamConfig {
&self.config
}
pub fn time_since_update(&self) -> Duration {
self.last_update.elapsed()
}
pub fn reset(&mut self) {
self.stats = OnlineStats::new();
self.ewma = EWMA::new(F::from(0.1).expect("Failed to convert constant to float"))
.expect("Operation failed");
self.cusum = CusumDetector::new(
F::from(self.config.change_detection_threshold).expect("Failed to convert to float"),
F::from(0.5).expect("Failed to convert constant to float"),
);
self.buffer.clear();
self.last_update = Instant::now();
}
pub fn cleanup_memory(&mut self) {
if self.buffer.len() > self.config.memory_threshold {
let target_size = self.config.memory_threshold / 2;
let to_remove = self.buffer.len() - target_size;
for _ in 0..to_remove {
self.buffer.pop_front();
}
}
self.last_update = Instant::now();
}
pub fn get_change_points(&self) -> Vec<ChangePoint> {
if self.detect_change() {
vec![ChangePoint {
index: self.stats.count(),
timestamp: Some(Instant::now()),
confidence: self
.cusum
.get_signals()
.0
.max(self.cusum.get_signals().1)
.to_f64()
.unwrap_or(0.0),
change_type: ChangeType::MeanShift,
}]
} else {
Vec::new()
}
}
pub fn is_outlier(&self, value: F) -> bool {
if self.stats.count() < 10 {
return false; }
let mean = self.stats.mean();
let std_dev = self.stats.std_dev();
let z_score = ((value - mean) / std_dev).abs();
z_score > F::from(3.0).expect("Failed to convert constant to float")
}
pub fn forecast(&self, _steps: usize) -> Result<Array1<F>> {
if let Some(ewma_value) = self.ewma.value() {
Ok(Array1::from_elem(_steps, ewma_value))
} else {
Ok(Array1::zeros(_steps))
}
}
pub fn observation_count(&self) -> usize {
self.stats.count()
}
pub fn buffer_size(&self) -> usize {
self.buffer.len()
}
pub fn time_since_last_update(&self) -> Duration {
self.time_since_update()
}
}
pub mod adaptive {
pub use super::online_learning::{AdaptiveARIMA, AdaptiveLinearRegression};
}
pub mod advanced {
pub use super::forecasting::{ModelState, StreamingForecaster};
pub use super::memory_management::{
CircularBuffer, PatternMatch, StreamingAnomalyDetector, StreamingPatternMatcher,
};
}
#[cfg(test)]
mod tests {
use super::*;
use approx::assert_abs_diff_eq;
#[test]
fn test_streaming_analyzer_basic() {
let config = StreamConfig::default();
let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
for i in 1..=10 {
analyzer
.add_observation(i as f64)
.expect("Operation failed");
}
let stats = analyzer.get_stats();
assert_eq!(stats.count(), 10);
assert_abs_diff_eq!(stats.mean(), 5.5, epsilon = 1e-10);
assert_abs_diff_eq!(stats.min(), 1.0);
assert_abs_diff_eq!(stats.max(), 10.0);
}
#[test]
fn test_streaming_analyzer_ewma() {
let config = StreamConfig::default();
let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
analyzer.add_observation(10.0).expect("Operation failed");
let ewma1 = analyzer.get_ewma().expect("Operation failed");
assert_abs_diff_eq!(ewma1, 10.0);
analyzer.add_observation(20.0).expect("Operation failed");
let ewma2 = analyzer.get_ewma().expect("Operation failed");
assert!(ewma2 > 10.0);
assert!(ewma2 < 20.0);
}
#[test]
fn test_streaming_analyzer_buffer() {
let mut config = StreamConfig::default();
config.window_size = 3;
let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
for i in 1..=5 {
analyzer
.add_observation(i as f64)
.expect("Operation failed");
}
let buffer = analyzer.get_buffer();
assert_eq!(buffer.len(), 3);
assert_eq!(*buffer.get(0).expect("Operation failed"), 3.0);
assert_eq!(*buffer.get(1).expect("Operation failed"), 4.0);
assert_eq!(*buffer.get(2).expect("Operation failed"), 5.0);
}
#[test]
fn test_memory_cleanup() {
let mut config = StreamConfig::default();
config.window_size = 1000;
config.memory_threshold = 50;
let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
for i in 1..=100 {
analyzer
.add_observation(i as f64)
.expect("Operation failed");
}
analyzer.cleanup_memory();
assert_eq!(analyzer.get_buffer().len(), 25);
}
}