scirs2_series/streaming/
mod.rs1use scirs2_core::ndarray::{Array1, Array2};
17use scirs2_core::numeric::{Float, FromPrimitive};
18use std::collections::{HashMap, VecDeque};
19use std::fmt::Debug;
20use std::time::{Duration, Instant};
21
22use crate::error::{Result, TimeSeriesError};
23use statrs::statistics::Statistics;
24
25pub mod change_detection;
27pub mod cointegration;
28pub mod config;
29pub mod forecasting;
30pub mod memory_management;
31pub mod online_learning;
32pub mod statistics;
33
34pub use change_detection::CusumDetector;
36pub use config::{ChangePoint, ChangeType, StreamConfig};
37pub use forecasting::{ModelState, StreamingForecaster};
38pub use memory_management::{
39 CircularBuffer, MultiSeriesAnalyzer, PatternMatch, StreamingAnomalyDetector,
40 StreamingPatternMatcher,
41};
42pub use online_learning::{AdaptiveARIMA, AdaptiveLinearRegression};
43pub use statistics::{OnlineStats, EWMA};
44
45#[derive(Debug)]
50pub struct StreamingAnalyzer<F: Float + Debug> {
51 config: StreamConfig,
53 stats: OnlineStats<F>,
55 ewma: EWMA<F>,
57 cusum: CusumDetector<F>,
59 buffer: VecDeque<F>,
61 last_update: Instant,
63}
64
65impl<F: Float + Debug + Clone + FromPrimitive> StreamingAnalyzer<F> {
66 pub fn new(config: StreamConfig) -> Result<Self> {
68 let ewma = EWMA::new(F::from(0.1).expect("Failed to convert constant to float"))?;
69 let cusum = CusumDetector::new(
70 F::from(config.change_detection_threshold).expect("Failed to convert to float"),
71 F::from(0.5).expect("Failed to convert constant to float"),
72 );
73
74 let window_size = config.window_size;
75
76 Ok(Self {
77 config,
78 stats: OnlineStats::new(),
79 ewma,
80 cusum,
81 buffer: VecDeque::with_capacity(window_size),
82 last_update: Instant::now(),
83 })
84 }
85
86 pub fn add_observation(&mut self, value: F) -> Result<()> {
88 self.stats.update(value);
90 self.ewma.update(value);
91
92 if self.buffer.len() >= self.config.window_size {
94 self.buffer.pop_front();
95 }
96 self.buffer.push_back(value);
97
98 self.cusum.update(value);
100
101 self.last_update = Instant::now();
102 Ok(())
103 }
104
105 pub fn get_stats(&self) -> &OnlineStats<F> {
107 &self.stats
108 }
109
110 pub fn get_ewma(&self) -> Option<F> {
112 self.ewma.value()
113 }
114
115 pub fn detect_change(&self) -> bool {
117 self.cusum.is_change_detected()
118 }
119
120 pub fn get_change_signal(&self) -> (F, F) {
122 self.cusum.get_signals()
123 }
124
125 pub fn get_buffer(&self) -> &VecDeque<F> {
127 &self.buffer
128 }
129
130 pub fn get_config(&self) -> &StreamConfig {
132 &self.config
133 }
134
135 pub fn time_since_update(&self) -> Duration {
137 self.last_update.elapsed()
138 }
139
140 pub fn reset(&mut self) {
142 self.stats = OnlineStats::new();
143 self.ewma = EWMA::new(F::from(0.1).expect("Failed to convert constant to float"))
144 .expect("Operation failed");
145 self.cusum = CusumDetector::new(
146 F::from(self.config.change_detection_threshold).expect("Failed to convert to float"),
147 F::from(0.5).expect("Failed to convert constant to float"),
148 );
149 self.buffer.clear();
150 self.last_update = Instant::now();
151 }
152
153 pub fn cleanup_memory(&mut self) {
155 if self.buffer.len() > self.config.memory_threshold {
156 let target_size = self.config.memory_threshold / 2;
157 let to_remove = self.buffer.len() - target_size;
158
159 for _ in 0..to_remove {
160 self.buffer.pop_front();
161 }
162 }
163
164 self.last_update = Instant::now();
165 }
166
167 pub fn get_change_points(&self) -> Vec<ChangePoint> {
169 if self.detect_change() {
171 vec![ChangePoint {
172 index: self.stats.count(),
173 timestamp: Some(Instant::now()),
174 confidence: self
175 .cusum
176 .get_signals()
177 .0
178 .max(self.cusum.get_signals().1)
179 .to_f64()
180 .unwrap_or(0.0),
181 change_type: ChangeType::MeanShift,
182 }]
183 } else {
184 Vec::new()
185 }
186 }
187
188 pub fn is_outlier(&self, value: F) -> bool {
190 if self.stats.count() < 10 {
191 return false; }
193
194 let mean = self.stats.mean();
195 let std_dev = self.stats.std_dev();
196 let z_score = ((value - mean) / std_dev).abs();
197
198 z_score > F::from(3.0).expect("Failed to convert constant to float")
200 }
201
202 pub fn forecast(&self, _steps: usize) -> Result<Array1<F>> {
204 if let Some(ewma_value) = self.ewma.value() {
206 Ok(Array1::from_elem(_steps, ewma_value))
207 } else {
208 Ok(Array1::zeros(_steps))
209 }
210 }
211
212 pub fn observation_count(&self) -> usize {
214 self.stats.count()
215 }
216
217 pub fn buffer_size(&self) -> usize {
219 self.buffer.len()
220 }
221
222 pub fn time_since_last_update(&self) -> Duration {
224 self.time_since_update()
225 }
226}
227
228pub mod adaptive {
232 pub use super::online_learning::{AdaptiveARIMA, AdaptiveLinearRegression};
233}
234
235pub mod advanced {
239 pub use super::forecasting::{ModelState, StreamingForecaster};
240 pub use super::memory_management::{
241 CircularBuffer, PatternMatch, StreamingAnomalyDetector, StreamingPatternMatcher,
242 };
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use approx::assert_abs_diff_eq;
249
250 #[test]
251 fn test_streaming_analyzer_basic() {
252 let config = StreamConfig::default();
253 let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
254
255 for i in 1..=10 {
257 analyzer
258 .add_observation(i as f64)
259 .expect("Operation failed");
260 }
261
262 let stats = analyzer.get_stats();
264 assert_eq!(stats.count(), 10);
265 assert_abs_diff_eq!(stats.mean(), 5.5, epsilon = 1e-10);
266 assert_abs_diff_eq!(stats.min(), 1.0);
267 assert_abs_diff_eq!(stats.max(), 10.0);
268 }
269
270 #[test]
271 fn test_streaming_analyzer_ewma() {
272 let config = StreamConfig::default();
273 let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
274
275 analyzer.add_observation(10.0).expect("Operation failed");
276 let ewma1 = analyzer.get_ewma().expect("Operation failed");
277 assert_abs_diff_eq!(ewma1, 10.0);
278
279 analyzer.add_observation(20.0).expect("Operation failed");
280 let ewma2 = analyzer.get_ewma().expect("Operation failed");
281 assert!(ewma2 > 10.0);
283 assert!(ewma2 < 20.0);
284 }
285
286 #[test]
287 fn test_streaming_analyzer_buffer() {
288 let mut config = StreamConfig::default();
289 config.window_size = 3;
290 let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
291
292 for i in 1..=5 {
294 analyzer
295 .add_observation(i as f64)
296 .expect("Operation failed");
297 }
298
299 let buffer = analyzer.get_buffer();
300 assert_eq!(buffer.len(), 3);
301 assert_eq!(*buffer.get(0).expect("Operation failed"), 3.0);
303 assert_eq!(*buffer.get(1).expect("Operation failed"), 4.0);
304 assert_eq!(*buffer.get(2).expect("Operation failed"), 5.0);
305 }
306
307 #[test]
308 fn test_memory_cleanup() {
309 let mut config = StreamConfig::default();
310 config.window_size = 1000;
311 config.memory_threshold = 50;
312 let mut analyzer = StreamingAnalyzer::<f64>::new(config).expect("Operation failed");
313
314 for i in 1..=100 {
316 analyzer
317 .add_observation(i as f64)
318 .expect("Operation failed");
319 }
320
321 analyzer.cleanup_memory();
322
323 assert_eq!(analyzer.get_buffer().len(), 25);
325 }
326}