1use scirs2_core::ndarray::Array1;
7use scirs2_core::numeric::{Float, FromPrimitive};
8use std::collections::{HashMap, VecDeque};
9use std::fmt::Debug;
10
11use super::config::StreamConfig;
12use super::statistics::OnlineStats;
13use crate::error::{Result, TimeSeriesError};
14use crate::streaming::StreamingAnalyzer;
15
16#[derive(Debug)]
18pub struct MultiSeriesAnalyzer<F: Float + Debug> {
19 analyzers: HashMap<String, StreamingAnalyzer<F>>,
20 config: StreamConfig,
21}
22
23impl<F: Float + Debug + Clone + FromPrimitive> MultiSeriesAnalyzer<F> {
24 pub fn new(config: StreamConfig) -> Self {
26 Self {
27 analyzers: HashMap::new(),
28 config,
29 }
30 }
31
32 pub fn add_series(&mut self, seriesid: String) -> Result<()> {
34 let analyzer = StreamingAnalyzer::new(self.config.clone())?;
35 self.analyzers.insert(seriesid, analyzer);
36 Ok(())
37 }
38
39 pub fn add_observation(&mut self, seriesid: &str, value: F) -> Result<()> {
41 if let Some(analyzer) = self.analyzers.get_mut(seriesid) {
42 analyzer.add_observation(value)
43 } else {
44 Err(TimeSeriesError::InvalidInput(format!(
45 "Series '{seriesid}' not found"
46 )))
47 }
48 }
49
50 pub fn get_analyzer(&self, seriesid: &str) -> Option<&StreamingAnalyzer<F>> {
52 self.analyzers.get(seriesid)
53 }
54
55 pub fn get_analyzer_mut(&mut self, seriesid: &str) -> Option<&mut StreamingAnalyzer<F>> {
57 self.analyzers.get_mut(seriesid)
58 }
59
60 pub fn get_series_ids(&self) -> Vec<String> {
62 self.analyzers.keys().cloned().collect()
63 }
64
65 pub fn remove_series(&mut self, seriesid: &str) -> bool {
67 self.analyzers.remove(seriesid).is_some()
68 }
69
70 pub fn get_correlation(&self, series1: &str, series2: &str) -> Result<F> {
72 let analyzer1 = self.analyzers.get(series1).ok_or_else(|| {
73 TimeSeriesError::InvalidInput(format!("Series '{series1}' not found"))
74 })?;
75
76 let analyzer2 = self.analyzers.get(series2).ok_or_else(|| {
77 TimeSeriesError::InvalidInput(format!("Series '{series2}' not found"))
78 })?;
79
80 let buffer1 = analyzer1.get_buffer();
81 let buffer2 = analyzer2.get_buffer();
82
83 let min_len = std::cmp::min(buffer1.len(), buffer2.len());
84 if min_len < 2 {
85 return Ok(F::zero());
86 }
87
88 let mean1 = buffer1
90 .iter()
91 .take(min_len)
92 .cloned()
93 .fold(F::zero(), |acc, x| acc + x)
94 / F::from(min_len).expect("Failed to convert to float");
95 let mean2 = buffer2
96 .iter()
97 .take(min_len)
98 .cloned()
99 .fold(F::zero(), |acc, x| acc + x)
100 / F::from(min_len).expect("Failed to convert to float");
101
102 let mut numerator = F::zero();
103 let mut sum1_sq = F::zero();
104 let mut sum2_sq = F::zero();
105
106 for i in 0..min_len {
107 let diff1 = buffer1[i] - mean1;
108 let diff2 = buffer2[i] - mean2;
109 numerator = numerator + diff1 * diff2;
110 sum1_sq = sum1_sq + diff1 * diff1;
111 sum2_sq = sum2_sq + diff2 * diff2;
112 }
113
114 let denominator = (sum1_sq * sum2_sq).sqrt();
115 if denominator > F::epsilon() {
116 Ok(numerator / denominator)
117 } else {
118 Ok(F::zero())
119 }
120 }
121}
122
123#[derive(Debug)]
125pub struct StreamingAnomalyDetector<F: Float + Debug> {
126 feature_buffer: VecDeque<Vec<F>>,
128 max_buffer_size: usize,
130 threshold: F,
132 window_size: usize,
134 num_features: usize,
136}
137
138impl<F: Float + Debug + Clone> StreamingAnomalyDetector<F> {
139 pub fn new(
141 max_buffer_size: usize,
142 threshold: F,
143 window_size: usize,
144 num_features: usize,
145 ) -> Self {
146 Self {
147 feature_buffer: VecDeque::with_capacity(max_buffer_size),
148 max_buffer_size,
149 threshold,
150 window_size,
151 num_features,
152 }
153 }
154
155 fn extract_features(&self, window: &[F]) -> Vec<F> {
157 if window.is_empty() {
158 return vec![F::zero(); self.num_features];
159 }
160
161 let mut features = Vec::with_capacity(self.num_features);
162 let n = F::from(window.len()).expect("Operation failed");
163
164 let mean = window.iter().fold(F::zero(), |acc, &x| acc + x) / n;
166 features.push(mean);
167
168 let variance = window
170 .iter()
171 .map(|&x| (x - mean) * (x - mean))
172 .fold(F::zero(), |acc, x| acc + x)
173 / n;
174 features.push(variance.sqrt());
175
176 let skewness = window
178 .iter()
179 .map(|&x| {
180 let normalized = (x - mean) / variance.sqrt();
181 normalized * normalized * normalized
182 })
183 .fold(F::zero(), |acc, x| acc + x)
184 / n;
185 features.push(skewness);
186
187 let min_val = window.iter().fold(F::infinity(), |acc, &x| acc.min(x));
189 let max_val = window.iter().fold(F::neg_infinity(), |acc, &x| acc.max(x));
190 features.push(max_val - min_val);
191
192 if window.len() > 1 {
194 let x_mean = F::from(window.len() - 1).expect("Operation failed")
195 / F::from(2).expect("Failed to convert constant to float");
196 let mut num = F::zero();
197 let mut den = F::zero();
198
199 for (i, &y) in window.iter().enumerate() {
200 let x = F::from(i).expect("Failed to convert to float");
201 num = num + (x - x_mean) * (y - mean);
202 den = den + (x - x_mean) * (x - x_mean);
203 }
204
205 let slope = if den > F::zero() {
206 num / den
207 } else {
208 F::zero()
209 };
210 features.push(slope);
211 } else {
212 features.push(F::zero());
213 }
214
215 features
216 }
217
218 pub fn update(&mut self, window: &[F]) -> Result<bool> {
220 if window.len() < self.window_size {
221 return Ok(false); }
223
224 let features = self.extract_features(&window[window.len() - self.window_size..]);
225
226 if self.feature_buffer.is_empty() {
227 if self.feature_buffer.len() >= self.max_buffer_size {
229 self.feature_buffer.pop_front();
230 }
231 self.feature_buffer.push_back(features);
232 return Ok(false);
233 }
234
235 let mut min_distance = F::infinity();
237 for stored_features in &self.feature_buffer {
238 let distance = features
239 .iter()
240 .zip(stored_features.iter())
241 .map(|(&a, &b)| (a - b) * (a - b))
242 .fold(F::zero(), |acc, x| acc + x)
243 .sqrt();
244 min_distance = min_distance.min(distance);
245 }
246
247 if self.feature_buffer.len() >= self.max_buffer_size {
249 self.feature_buffer.pop_front();
250 }
251 self.feature_buffer.push_back(features);
252
253 Ok(min_distance > self.threshold)
255 }
256
257 pub fn adapt_threshold(&mut self, factor: F) {
259 if self.feature_buffer.len() > 2 {
260 let mut total_distance = F::zero();
262 let mut count = 0;
263
264 for i in 0..self.feature_buffer.len() {
265 for j in i + 1..self.feature_buffer.len() {
266 let distance = self.feature_buffer[i]
267 .iter()
268 .zip(self.feature_buffer[j].iter())
269 .map(|(&a, &b)| (a - b) * (a - b))
270 .fold(F::zero(), |acc, x| acc + x)
271 .sqrt();
272 total_distance = total_distance + distance;
273 count += 1;
274 }
275 }
276
277 if count > 0 {
278 let avg_distance =
279 total_distance / F::from(count).expect("Failed to convert to float");
280 self.threshold = avg_distance * factor;
281 }
282 }
283 }
284}
285
286#[derive(Debug)]
288pub struct StreamingPatternMatcher<F: Float + Debug> {
289 patterns: Vec<Vec<F>>,
291 pattern_names: Vec<String>,
293 buffer: VecDeque<F>,
295 max_buffer_size: usize,
297 threshold: F,
299}
300
301impl<F: Float + Debug + Clone> StreamingPatternMatcher<F> {
302 pub fn new(_max_buffersize: usize, threshold: F) -> Self {
304 Self {
305 patterns: Vec::new(),
306 pattern_names: Vec::new(),
307 buffer: VecDeque::with_capacity(_max_buffersize),
308 max_buffer_size: _max_buffersize,
309 threshold,
310 }
311 }
312
313 pub fn add_pattern(&mut self, pattern: Vec<F>, name: String) -> Result<()> {
315 if pattern.is_empty() {
316 return Err(TimeSeriesError::InvalidInput(
317 "Pattern cannot be empty".to_string(),
318 ));
319 }
320 self.patterns.push(pattern);
321 self.pattern_names.push(name);
322 Ok(())
323 }
324
325 pub fn update(&mut self, value: F) -> Vec<PatternMatch> {
327 if self.buffer.len() >= self.max_buffer_size {
329 self.buffer.pop_front();
330 }
331 self.buffer.push_back(value);
332
333 let mut matches = Vec::new();
334
335 for (i, pattern) in self.patterns.iter().enumerate() {
337 if self.buffer.len() >= pattern.len() {
338 let recent_data: Vec<F> = self
339 .buffer
340 .iter()
341 .rev()
342 .take(pattern.len())
343 .rev()
344 .cloned()
345 .collect();
346
347 if let Ok(correlation) = self.normalized_correlation(&recent_data, pattern) {
348 if correlation >= self.threshold {
349 matches.push(PatternMatch {
350 pattern_name: self.pattern_names[i].clone(),
351 correlation: correlation.to_f64().expect("Operation failed"),
352 start_index: self.buffer.len() - pattern.len(),
353 pattern_length: pattern.len(),
354 });
355 }
356 }
357 }
358 }
359
360 matches
361 }
362
363 fn normalized_correlation(&self, a: &[F], b: &[F]) -> Result<F> {
365 if a.len() != b.len() || a.is_empty() {
366 return Err(TimeSeriesError::InvalidInput(
367 "Sequences must have the same non-zero length".to_string(),
368 ));
369 }
370
371 let n = F::from(a.len()).expect("Operation failed");
372
373 let mean_a = a.iter().fold(F::zero(), |acc, &x| acc + x) / n;
375 let mean_b = b.iter().fold(F::zero(), |acc, &x| acc + x) / n;
376
377 let mut num = F::zero();
379 let mut den_a = F::zero();
380 let mut den_b = F::zero();
381
382 for (&val_a, &val_b) in a.iter().zip(b.iter()) {
383 let diff_a = val_a - mean_a;
384 let diff_b = val_b - mean_b;
385
386 num = num + diff_a * diff_b;
387 den_a = den_a + diff_a * diff_a;
388 den_b = den_b + diff_b * diff_b;
389 }
390
391 let denominator = (den_a * den_b).sqrt();
392 if denominator > F::zero() {
393 Ok(num / denominator)
394 } else {
395 Ok(F::zero())
396 }
397 }
398}
399
400#[derive(Debug, Clone)]
402pub struct PatternMatch {
403 pub pattern_name: String,
405 pub correlation: f64,
407 pub start_index: usize,
409 pub pattern_length: usize,
411}
412
413#[derive(Debug)]
415pub struct CircularBuffer<F: Float> {
416 buffer: Vec<F>,
418 position: usize,
420 capacity: usize,
422 is_full: bool,
424}
425
426impl<F: Float + Debug + Clone + Default> CircularBuffer<F> {
427 pub fn new(capacity: usize) -> Self {
429 Self {
430 buffer: vec![F::default(); capacity],
431 position: 0,
432 capacity,
433 is_full: false,
434 }
435 }
436
437 pub fn push(&mut self, value: F) {
439 self.buffer[self.position] = value;
440 self.position = (self.position + 1) % self.capacity;
441
442 if self.position == 0 {
443 self.is_full = true;
444 }
445 }
446
447 pub fn len(&self) -> usize {
449 if self.is_full {
450 self.capacity
451 } else {
452 self.position
453 }
454 }
455
456 pub fn is_empty(&self) -> bool {
458 !self.is_full && self.position == 0
459 }
460
461 pub fn recent(&self, n: usize) -> Vec<F> {
463 let available = self.len();
464 let take = n.min(available);
465 let mut result = Vec::with_capacity(take);
466
467 if self.is_full {
468 let start_pos = (self.position + self.capacity - take) % self.capacity;
470
471 if start_pos + take <= self.capacity {
472 result.extend_from_slice(&self.buffer[start_pos..start_pos + take]);
474 } else {
475 let first_part = self.capacity - start_pos;
477 result.extend_from_slice(&self.buffer[start_pos..]);
478 result.extend_from_slice(&self.buffer[..take - first_part]);
479 }
480 } else {
481 let start = self.position.saturating_sub(take);
483 result.extend_from_slice(&self.buffer[start..self.position]);
484 }
485
486 result
487 }
488
489 pub fn to_vec(&self) -> Vec<F> {
491 self.recent(self.len())
492 }
493
494 pub fn window_stats(&self, windowsize: usize) -> OnlineStats<F> {
496 let recent_data = self.recent(windowsize);
497 let mut stats = OnlineStats::new();
498
499 for value in recent_data {
500 stats.update(value);
501 }
502
503 stats
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 #[test]
512 fn test_anomaly_detector() {
513 let mut detector = StreamingAnomalyDetector::new(100, 2.0, 10, 5);
514
515 let normal_data: Vec<f64> = (0..20).map(|x| x as f64).collect();
517
518 for window in normal_data.windows(10) {
519 let is_anomaly = detector.update(window).expect("Operation failed");
520 assert!(!is_anomaly, "Normal data should not be anomalous");
521 }
522
523 let mut anomalous_data = normal_data.clone();
525 anomalous_data.extend(vec![1000.0; 10]); let result = detector
528 .update(&anomalous_data[anomalous_data.len() - 10..])
529 .expect("Operation failed");
530 assert!(result, "Clear anomaly should be detected");
531 }
532
533 #[test]
534 fn test_pattern_matcher() {
535 let mut matcher = StreamingPatternMatcher::new(100, 0.8);
536
537 let pattern = vec![1.0, 2.0, 3.0, 2.0, 1.0];
539 matcher
540 .add_pattern(pattern.clone(), "triangle".to_string())
541 .expect("Operation failed");
542
543 for &value in &pattern {
545 let matches = matcher.update(value);
546 if !matches.is_empty() {
547 assert_eq!(matches[0].pattern_name, "triangle");
548 assert!(matches[0].correlation >= 0.8);
549 }
550 }
551 }
552
553 #[test]
554 fn test_circular_buffer() {
555 let mut buffer = CircularBuffer::new(5);
556
557 for i in 1..=3 {
559 buffer.push(i as f64);
560 }
561
562 assert_eq!(buffer.len(), 3);
563 assert_eq!(buffer.recent(2), vec![2.0, 3.0]);
564
565 for i in 4..=7 {
567 buffer.push(i as f64);
568 }
569
570 assert_eq!(buffer.len(), 5);
571 assert_eq!(buffer.to_vec(), vec![3.0, 4.0, 5.0, 6.0, 7.0]);
572 }
573}