Skip to main content

scirs2_series/streaming/
mod.rs

1//! Real-time streaming time series analysis module
2//!
3//! This module provides capabilities for analyzing time series data in real-time,
4//! including online learning algorithms, streaming forecasting, incremental statistics,
5//! change point detection, anomaly detection, and pattern matching.
6//!
7//! # Module Structure
8//!
9//! - `config` - Configuration types and change point definitions
10//! - `statistics` - Online statistics and exponential weighted moving averages
11//! - `change_detection` - Change point detection algorithms
12//! - `online_learning` - Adaptive learning algorithms (regression, ARIMA)
13//! - `forecasting` - Real-time forecasting with exponential smoothing
14//! - `memory_management` - Memory-efficient utilities, anomaly detection, and pattern matching
15
16use scirs2_core::ndarray::{Array1, Array2};
17use scirs2_core::numeric::{Float, FromPrimitive};
18use std::collections::{HashMap, VecDeque};
19use std::fmt::Debug;
20use std::time::{Duration, Instant};
21
22use crate::error::{Result, TimeSeriesError};
23use statrs::statistics::Statistics;
24
25// Declare sub-modules
26pub mod change_detection;
27pub mod cointegration;
28pub mod config;
29pub mod forecasting;
30pub mod memory_management;
31pub mod online_learning;
32pub mod statistics;
33
34// Re-export all public types for backward compatibility
35pub use change_detection::CusumDetector;
36pub use config::{ChangePoint, ChangeType, StreamConfig};
37pub use forecasting::{ModelState, StreamingForecaster};
38pub use memory_management::{
39    CircularBuffer, MultiSeriesAnalyzer, PatternMatch, StreamingAnomalyDetector,
40    StreamingPatternMatcher,
41};
42pub use online_learning::{AdaptiveARIMA, AdaptiveLinearRegression};
43pub use statistics::{OnlineStats, EWMA};
44
45/// Core streaming time series analyzer
46///
47/// This is the main interface for streaming time series analysis, providing
48/// a unified API for real-time analysis capabilities.
49#[derive(Debug)]
50pub struct StreamingAnalyzer<F: Float + Debug> {
51    /// Configuration parameters
52    config: StreamConfig,
53    /// Online statistics tracker
54    stats: OnlineStats<F>,
55    /// Exponential weighted moving average
56    ewma: EWMA<F>,
57    /// Change point detector
58    cusum: CusumDetector<F>,
59    /// Recent observations buffer
60    buffer: VecDeque<F>,
61    /// Last update timestamp
62    last_update: Instant,
63}
64
65impl<F: Float + Debug + Clone + FromPrimitive> StreamingAnalyzer<F> {
66    /// Create new streaming analyzer with default configuration
67    pub fn new(config: StreamConfig) -> Result<Self> {
68        let ewma = EWMA::new(F::from(0.1).expect("Failed to convert constant to float"))?;
69        let cusum = CusumDetector::new(
70            F::from(config.change_detection_threshold).expect("Failed to convert to float"),
71            F::from(0.5).expect("Failed to convert constant to float"),
72        );
73
74        let window_size = config.window_size;
75
76        Ok(Self {
77            config,
78            stats: OnlineStats::new(),
79            ewma,
80            cusum,
81            buffer: VecDeque::with_capacity(window_size),
82            last_update: Instant::now(),
83        })
84    }
85
86    /// Add new observation to the analyzer
87    pub fn add_observation(&mut self, value: F) -> Result<()> {
88        // Update statistics
89        self.stats.update(value);
90        self.ewma.update(value);
91
92        // Add to buffer
93        if self.buffer.len() >= self.config.window_size {
94            self.buffer.pop_front();
95        }
96        self.buffer.push_back(value);
97
98        // Update change detection
99        self.cusum.update(value);
100
101        self.last_update = Instant::now();
102        Ok(())
103    }
104
105    /// Get current statistics
106    pub fn get_stats(&self) -> &OnlineStats<F> {
107        &self.stats
108    }
109
110    /// Get current EWMA value
111    pub fn get_ewma(&self) -> Option<F> {
112        self.ewma.value()
113    }
114
115    /// Check for change points
116    pub fn detect_change(&self) -> bool {
117        self.cusum.is_change_detected()
118    }
119
120    /// Get change detection signal value
121    pub fn get_change_signal(&self) -> (F, F) {
122        self.cusum.get_signals()
123    }
124
125    /// Get recent observations buffer
126    pub fn get_buffer(&self) -> &VecDeque<F> {
127        &self.buffer
128    }
129
130    /// Get configuration
131    pub fn get_config(&self) -> &StreamConfig {
132        &self.config
133    }
134
135    /// Get time since last update
136    pub fn time_since_update(&self) -> Duration {
137        self.last_update.elapsed()
138    }
139
140    /// Reset the analyzer state
141    pub fn reset(&mut self) {
142        self.stats = OnlineStats::new();
143        self.ewma = EWMA::new(F::from(0.1).expect("Failed to convert constant to float"))
144            .expect("Operation failed");
145        self.cusum = CusumDetector::new(
146            F::from(self.config.change_detection_threshold).expect("Failed to convert to float"),
147            F::from(0.5).expect("Failed to convert constant to float"),
148        );
149        self.buffer.clear();
150        self.last_update = Instant::now();
151    }
152
153    /// Perform automatic memory cleanup if threshold is reached
154    pub fn cleanup_memory(&mut self) {
155        if self.buffer.len() > self.config.memory_threshold {
156            let target_size = self.config.memory_threshold / 2;
157            let to_remove = self.buffer.len() - target_size;
158
159            for _ in 0..to_remove {
160                self.buffer.pop_front();
161            }
162        }
163
164        self.last_update = Instant::now();
165    }
166
167    /// Get change points (compatibility method)
168    pub fn get_change_points(&self) -> Vec<ChangePoint> {
169        // This is a simple implementation - in the original it might have been more sophisticated
170        if self.detect_change() {
171            vec![ChangePoint {
172                index: self.stats.count(),
173                timestamp: Some(Instant::now()),
174                confidence: self
175                    .cusum
176                    .get_signals()
177                    .0
178                    .max(self.cusum.get_signals().1)
179                    .to_f64()
180                    .unwrap_or(0.0),
181                change_type: ChangeType::MeanShift,
182            }]
183        } else {
184            Vec::new()
185        }
186    }
187
188    /// Check if a value is an outlier (simple implementation)
189    pub fn is_outlier(&self, value: F) -> bool {
190        if self.stats.count() < 10 {
191            return false; // Not enough data
192        }
193
194        let mean = self.stats.mean();
195        let std_dev = self.stats.std_dev();
196        let z_score = ((value - mean) / std_dev).abs();
197
198        // Consider outlier if z-score > 3
199        z_score > F::from(3.0).expect("Failed to convert constant to float")
200    }
201
202    /// Simple forecast method (compatibility)
203    pub fn forecast(&self, _steps: usize) -> Result<Array1<F>> {
204        // Simple implementation using EWMA as forecast
205        if let Some(ewma_value) = self.ewma.value() {
206            Ok(Array1::from_elem(_steps, ewma_value))
207        } else {
208            Ok(Array1::zeros(_steps))
209        }
210    }
211
212    /// Get observation count
213    pub fn observation_count(&self) -> usize {
214        self.stats.count()
215    }
216
217    /// Get buffer size
218    pub fn buffer_size(&self) -> usize {
219        self.buffer.len()
220    }
221
222    /// Get time since last update (alias for compatibility)
223    pub fn time_since_last_update(&self) -> Duration {
224        self.time_since_update()
225    }
226}
227
228/// Adaptive learning models for streaming data
229///
230/// Re-exports adaptive model types (maintaining backward compatibility)
231pub mod adaptive {
232    pub use super::online_learning::{AdaptiveARIMA, AdaptiveLinearRegression};
233}
234
235/// Advanced streaming analytics and forecasting
236///
237/// Re-exports advanced model types (maintaining backward compatibility)
238pub mod advanced {
239    pub use super::forecasting::{ModelState, StreamingForecaster};
240    pub use super::memory_management::{
241        CircularBuffer, PatternMatch, StreamingAnomalyDetector, StreamingPatternMatcher,
242    };
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use approx::assert_abs_diff_eq;
249
250    #[test]
251    fn test_streaming_analyzer_basic() {
252        let config = StreamConfig::default();
253        let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
254
255        // Add some observations
256        for i in 1..=10 {
257            analyzer
258                .add_observation(i as f64)
259                .expect("Operation failed");
260        }
261
262        // Check statistics
263        let stats = analyzer.get_stats();
264        assert_eq!(stats.count(), 10);
265        assert_abs_diff_eq!(stats.mean(), 5.5, epsilon = 1e-10);
266        assert_abs_diff_eq!(stats.min(), 1.0);
267        assert_abs_diff_eq!(stats.max(), 10.0);
268    }
269
270    #[test]
271    fn test_streaming_analyzer_ewma() {
272        let config = StreamConfig::default();
273        let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
274
275        analyzer.add_observation(10.0).expect("Operation failed");
276        let ewma1 = analyzer.get_ewma().expect("Operation failed");
277        assert_abs_diff_eq!(ewma1, 10.0);
278
279        analyzer.add_observation(20.0).expect("Operation failed");
280        let ewma2 = analyzer.get_ewma().expect("Operation failed");
281        // EWMA should be between 10 and 20
282        assert!(ewma2 > 10.0);
283        assert!(ewma2 < 20.0);
284    }
285
286    #[test]
287    fn test_streaming_analyzer_buffer() {
288        let mut config = StreamConfig::default();
289        config.window_size = 3;
290        let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
291
292        // Add more data than buffer size
293        for i in 1..=5 {
294            analyzer
295                .add_observation(i as f64)
296                .expect("Operation failed");
297        }
298
299        let buffer = analyzer.get_buffer();
300        assert_eq!(buffer.len(), 3);
301        // Should contain last 3 observations
302        assert_eq!(*buffer.get(0).expect("Operation failed"), 3.0);
303        assert_eq!(*buffer.get(1).expect("Operation failed"), 4.0);
304        assert_eq!(*buffer.get(2).expect("Operation failed"), 5.0);
305    }
306
307    #[test]
308    fn test_memory_cleanup() {
309        let mut config = StreamConfig::default();
310        config.window_size = 1000;
311        config.memory_threshold = 50;
312        let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
313
314        // Add many observations to exceed memory threshold
315        for i in 1..=100 {
316            analyzer
317                .add_observation(i as f64)
318                .expect("Operation failed");
319        }
320
321        analyzer.cleanup_memory();
322
323        // Buffer should be reduced to half the threshold
324        assert_eq!(analyzer.get_buffer().len(), 25);
325    }
326}