Skip to main content

scirs2_series/streaming/
memory_management.rs

1//! Memory management and multi-series utilities for streaming analysis
2//!
3//! This module provides memory-efficient data structures, anomaly detection,
4//! pattern matching, and utilities for managing multiple time series streams.
5
6use scirs2_core::ndarray::Array1;
7use scirs2_core::numeric::{Float, FromPrimitive};
8use std::collections::{HashMap, VecDeque};
9use std::fmt::Debug;
10
11use super::config::StreamConfig;
12use super::statistics::OnlineStats;
13use crate::error::{Result, TimeSeriesError};
14use crate::streaming::StreamingAnalyzer;
15
16/// Multi-series streaming analyzer for handling multiple time series simultaneously
17#[derive(Debug)]
18pub struct MultiSeriesAnalyzer<F: Float + Debug> {
19    analyzers: HashMap<String, StreamingAnalyzer<F>>,
20    config: StreamConfig,
21}
22
23impl<F: Float + Debug + Clone + FromPrimitive> MultiSeriesAnalyzer<F> {
24    /// Create new multi-series analyzer
25    pub fn new(config: StreamConfig) -> Self {
26        Self {
27            analyzers: HashMap::new(),
28            config,
29        }
30    }
31
32    /// Add new time series to track
33    pub fn add_series(&mut self, seriesid: String) -> Result<()> {
34        let analyzer = StreamingAnalyzer::new(self.config.clone())?;
35        self.analyzers.insert(seriesid, analyzer);
36        Ok(())
37    }
38
39    /// Add observation to specific series
40    pub fn add_observation(&mut self, seriesid: &str, value: F) -> Result<()> {
41        if let Some(analyzer) = self.analyzers.get_mut(seriesid) {
42            analyzer.add_observation(value)
43        } else {
44            Err(TimeSeriesError::InvalidInput(format!(
45                "Series '{seriesid}' not found"
46            )))
47        }
48    }
49
50    /// Get analyzer for specific series
51    pub fn get_analyzer(&self, seriesid: &str) -> Option<&StreamingAnalyzer<F>> {
52        self.analyzers.get(seriesid)
53    }
54
55    /// Get mutable analyzer for specific series
56    pub fn get_analyzer_mut(&mut self, seriesid: &str) -> Option<&mut StreamingAnalyzer<F>> {
57        self.analyzers.get_mut(seriesid)
58    }
59
60    /// Get all series IDs
61    pub fn get_series_ids(&self) -> Vec<String> {
62        self.analyzers.keys().cloned().collect()
63    }
64
65    /// Remove series
66    pub fn remove_series(&mut self, seriesid: &str) -> bool {
67        self.analyzers.remove(seriesid).is_some()
68    }
69
70    /// Get cross-series correlation (simplified)
71    pub fn get_correlation(&self, series1: &str, series2: &str) -> Result<F> {
72        let analyzer1 = self.analyzers.get(series1).ok_or_else(|| {
73            TimeSeriesError::InvalidInput(format!("Series '{series1}' not found"))
74        })?;
75
76        let analyzer2 = self.analyzers.get(series2).ok_or_else(|| {
77            TimeSeriesError::InvalidInput(format!("Series '{series2}' not found"))
78        })?;
79
80        let buffer1 = analyzer1.get_buffer();
81        let buffer2 = analyzer2.get_buffer();
82
83        let min_len = std::cmp::min(buffer1.len(), buffer2.len());
84        if min_len < 2 {
85            return Ok(F::zero());
86        }
87
88        // Calculate Pearson correlation
89        let mean1 = buffer1
90            .iter()
91            .take(min_len)
92            .cloned()
93            .fold(F::zero(), |acc, x| acc + x)
94            / F::from(min_len).expect("Failed to convert to float");
95        let mean2 = buffer2
96            .iter()
97            .take(min_len)
98            .cloned()
99            .fold(F::zero(), |acc, x| acc + x)
100            / F::from(min_len).expect("Failed to convert to float");
101
102        let mut numerator = F::zero();
103        let mut sum1_sq = F::zero();
104        let mut sum2_sq = F::zero();
105
106        for i in 0..min_len {
107            let diff1 = buffer1[i] - mean1;
108            let diff2 = buffer2[i] - mean2;
109            numerator = numerator + diff1 * diff2;
110            sum1_sq = sum1_sq + diff1 * diff1;
111            sum2_sq = sum2_sq + diff2 * diff2;
112        }
113
114        let denominator = (sum1_sq * sum2_sq).sqrt();
115        if denominator > F::epsilon() {
116            Ok(numerator / denominator)
117        } else {
118            Ok(F::zero())
119        }
120    }
121}
122
123/// Online anomaly detection using Isolation Forest-like approach
124#[derive(Debug)]
125pub struct StreamingAnomalyDetector<F: Float + Debug> {
126    /// Recent feature vectors for comparison
127    feature_buffer: VecDeque<Vec<F>>,
128    /// Maximum buffer size
129    max_buffer_size: usize,
130    /// Anomaly threshold
131    threshold: F,
132    /// Feature extractors
133    window_size: usize,
134    /// Number of features to extract
135    num_features: usize,
136}
137
138impl<F: Float + Debug + Clone> StreamingAnomalyDetector<F> {
139    /// Create new anomaly detector
140    pub fn new(
141        max_buffer_size: usize,
142        threshold: F,
143        window_size: usize,
144        num_features: usize,
145    ) -> Self {
146        Self {
147            feature_buffer: VecDeque::with_capacity(max_buffer_size),
148            max_buffer_size,
149            threshold,
150            window_size,
151            num_features,
152        }
153    }
154
155    /// Extract features from a time series window
156    fn extract_features(&self, window: &[F]) -> Vec<F> {
157        if window.is_empty() {
158            return vec![F::zero(); self.num_features];
159        }
160
161        let mut features = Vec::with_capacity(self.num_features);
162        let n = F::from(window.len()).expect("Operation failed");
163
164        // Feature 1: Mean
165        let mean = window.iter().fold(F::zero(), |acc, &x| acc + x) / n;
166        features.push(mean);
167
168        // Feature 2: Standard deviation
169        let variance = window
170            .iter()
171            .map(|&x| (x - mean) * (x - mean))
172            .fold(F::zero(), |acc, x| acc + x)
173            / n;
174        features.push(variance.sqrt());
175
176        // Feature 3: Skewness (simplified)
177        let skewness = window
178            .iter()
179            .map(|&x| {
180                let normalized = (x - mean) / variance.sqrt();
181                normalized * normalized * normalized
182            })
183            .fold(F::zero(), |acc, x| acc + x)
184            / n;
185        features.push(skewness);
186
187        // Feature 4: Range
188        let min_val = window.iter().fold(F::infinity(), |acc, &x| acc.min(x));
189        let max_val = window.iter().fold(F::neg_infinity(), |acc, &x| acc.max(x));
190        features.push(max_val - min_val);
191
192        // Feature 5: Trend (slope of linear regression)
193        if window.len() > 1 {
194            let x_mean = F::from(window.len() - 1).expect("Operation failed")
195                / F::from(2).expect("Failed to convert constant to float");
196            let mut num = F::zero();
197            let mut den = F::zero();
198
199            for (i, &y) in window.iter().enumerate() {
200                let x = F::from(i).expect("Failed to convert to float");
201                num = num + (x - x_mean) * (y - mean);
202                den = den + (x - x_mean) * (x - x_mean);
203            }
204
205            let slope = if den > F::zero() {
206                num / den
207            } else {
208                F::zero()
209            };
210            features.push(slope);
211        } else {
212            features.push(F::zero());
213        }
214
215        features
216    }
217
218    /// Update detector with new window and check for anomalies
219    pub fn update(&mut self, window: &[F]) -> Result<bool> {
220        if window.len() < self.window_size {
221            return Ok(false); // Not enough data
222        }
223
224        let features = self.extract_features(&window[window.len() - self.window_size..]);
225
226        if self.feature_buffer.is_empty() {
227            // First observation - just store
228            if self.feature_buffer.len() >= self.max_buffer_size {
229                self.feature_buffer.pop_front();
230            }
231            self.feature_buffer.push_back(features);
232            return Ok(false);
233        }
234
235        // Calculate isolation score (simplified)
236        let mut min_distance = F::infinity();
237        for stored_features in &self.feature_buffer {
238            let distance = features
239                .iter()
240                .zip(stored_features.iter())
241                .map(|(&a, &b)| (a - b) * (a - b))
242                .fold(F::zero(), |acc, x| acc + x)
243                .sqrt();
244            min_distance = min_distance.min(distance);
245        }
246
247        // Add current features to buffer
248        if self.feature_buffer.len() >= self.max_buffer_size {
249            self.feature_buffer.pop_front();
250        }
251        self.feature_buffer.push_back(features);
252
253        // Check if anomaly (isolated point)
254        Ok(min_distance > self.threshold)
255    }
256
257    /// Update threshold based on recent observations
258    pub fn adapt_threshold(&mut self, factor: F) {
259        if self.feature_buffer.len() > 2 {
260            // Calculate average distance between recent features
261            let mut total_distance = F::zero();
262            let mut count = 0;
263
264            for i in 0..self.feature_buffer.len() {
265                for j in i + 1..self.feature_buffer.len() {
266                    let distance = self.feature_buffer[i]
267                        .iter()
268                        .zip(self.feature_buffer[j].iter())
269                        .map(|(&a, &b)| (a - b) * (a - b))
270                        .fold(F::zero(), |acc, x| acc + x)
271                        .sqrt();
272                    total_distance = total_distance + distance;
273                    count += 1;
274                }
275            }
276
277            if count > 0 {
278                let avg_distance =
279                    total_distance / F::from(count).expect("Failed to convert to float");
280                self.threshold = avg_distance * factor;
281            }
282        }
283    }
284}
285
286/// Online pattern matching for streaming time series
287#[derive(Debug)]
288pub struct StreamingPatternMatcher<F: Float + Debug> {
289    /// Template patterns to match against
290    patterns: Vec<Vec<F>>,
291    /// Pattern names
292    pattern_names: Vec<String>,
293    /// Recent data buffer for pattern matching
294    buffer: VecDeque<F>,
295    /// Maximum buffer size
296    max_buffer_size: usize,
297    /// Matching threshold (normalized correlation)
298    threshold: F,
299}
300
301impl<F: Float + Debug + Clone> StreamingPatternMatcher<F> {
302    /// Create new pattern matcher
303    pub fn new(_max_buffersize: usize, threshold: F) -> Self {
304        Self {
305            patterns: Vec::new(),
306            pattern_names: Vec::new(),
307            buffer: VecDeque::with_capacity(_max_buffersize),
308            max_buffer_size: _max_buffersize,
309            threshold,
310        }
311    }
312
313    /// Add a pattern to match against
314    pub fn add_pattern(&mut self, pattern: Vec<F>, name: String) -> Result<()> {
315        if pattern.is_empty() {
316            return Err(TimeSeriesError::InvalidInput(
317                "Pattern cannot be empty".to_string(),
318            ));
319        }
320        self.patterns.push(pattern);
321        self.pattern_names.push(name);
322        Ok(())
323    }
324
325    /// Update buffer and check for pattern matches
326    pub fn update(&mut self, value: F) -> Vec<PatternMatch> {
327        // Add to buffer
328        if self.buffer.len() >= self.max_buffer_size {
329            self.buffer.pop_front();
330        }
331        self.buffer.push_back(value);
332
333        let mut matches = Vec::new();
334
335        // Check each pattern
336        for (i, pattern) in self.patterns.iter().enumerate() {
337            if self.buffer.len() >= pattern.len() {
338                let recent_data: Vec<F> = self
339                    .buffer
340                    .iter()
341                    .rev()
342                    .take(pattern.len())
343                    .rev()
344                    .cloned()
345                    .collect();
346
347                if let Ok(correlation) = self.normalized_correlation(&recent_data, pattern) {
348                    if correlation >= self.threshold {
349                        matches.push(PatternMatch {
350                            pattern_name: self.pattern_names[i].clone(),
351                            correlation: correlation.to_f64().expect("Operation failed"),
352                            start_index: self.buffer.len() - pattern.len(),
353                            pattern_length: pattern.len(),
354                        });
355                    }
356                }
357            }
358        }
359
360        matches
361    }
362
363    /// Calculate normalized correlation between two sequences
364    fn normalized_correlation(&self, a: &[F], b: &[F]) -> Result<F> {
365        if a.len() != b.len() || a.is_empty() {
366            return Err(TimeSeriesError::InvalidInput(
367                "Sequences must have the same non-zero length".to_string(),
368            ));
369        }
370
371        let n = F::from(a.len()).expect("Operation failed");
372
373        // Calculate means
374        let mean_a = a.iter().fold(F::zero(), |acc, &x| acc + x) / n;
375        let mean_b = b.iter().fold(F::zero(), |acc, &x| acc + x) / n;
376
377        // Calculate correlation components
378        let mut num = F::zero();
379        let mut den_a = F::zero();
380        let mut den_b = F::zero();
381
382        for (&val_a, &val_b) in a.iter().zip(b.iter()) {
383            let diff_a = val_a - mean_a;
384            let diff_b = val_b - mean_b;
385
386            num = num + diff_a * diff_b;
387            den_a = den_a + diff_a * diff_a;
388            den_b = den_b + diff_b * diff_b;
389        }
390
391        let denominator = (den_a * den_b).sqrt();
392        if denominator > F::zero() {
393            Ok(num / denominator)
394        } else {
395            Ok(F::zero())
396        }
397    }
398}
399
400/// Pattern match result
401#[derive(Debug, Clone)]
402pub struct PatternMatch {
403    /// Name of the matched pattern
404    pub pattern_name: String,
405    /// Correlation coefficient with the pattern
406    pub correlation: f64,
407    /// Starting index in the time series
408    pub start_index: usize,
409    /// Length of the matched pattern
410    pub pattern_length: usize,
411}
412
413/// Memory-efficient circular buffer for streaming data
414#[derive(Debug)]
415pub struct CircularBuffer<F: Float> {
416    /// Internal buffer
417    buffer: Vec<F>,
418    /// Current write position
419    position: usize,
420    /// Maximum capacity
421    capacity: usize,
422    /// Whether buffer is full
423    is_full: bool,
424}
425
426impl<F: Float + Debug + Clone + Default> CircularBuffer<F> {
427    /// Create new circular buffer
428    pub fn new(capacity: usize) -> Self {
429        Self {
430            buffer: vec![F::default(); capacity],
431            position: 0,
432            capacity,
433            is_full: false,
434        }
435    }
436
437    /// Add new value to buffer
438    pub fn push(&mut self, value: F) {
439        self.buffer[self.position] = value;
440        self.position = (self.position + 1) % self.capacity;
441
442        if self.position == 0 {
443            self.is_full = true;
444        }
445    }
446
447    /// Get current size of buffer
448    pub fn len(&self) -> usize {
449        if self.is_full {
450            self.capacity
451        } else {
452            self.position
453        }
454    }
455
456    /// Check if buffer is empty
457    pub fn is_empty(&self) -> bool {
458        !self.is_full && self.position == 0
459    }
460
461    /// Get slice of recent n values
462    pub fn recent(&self, n: usize) -> Vec<F> {
463        let available = self.len();
464        let take = n.min(available);
465        let mut result = Vec::with_capacity(take);
466
467        if self.is_full {
468            // Buffer is full, need to handle wrap-around
469            let start_pos = (self.position + self.capacity - take) % self.capacity;
470
471            if start_pos + take <= self.capacity {
472                // No wrap-around needed
473                result.extend_from_slice(&self.buffer[start_pos..start_pos + take]);
474            } else {
475                // Need to handle wrap-around
476                let first_part = self.capacity - start_pos;
477                result.extend_from_slice(&self.buffer[start_pos..]);
478                result.extend_from_slice(&self.buffer[..take - first_part]);
479            }
480        } else {
481            // Buffer not full, simple case
482            let start = self.position.saturating_sub(take);
483            result.extend_from_slice(&self.buffer[start..self.position]);
484        }
485
486        result
487    }
488
489    /// Get all values in chronological order
490    pub fn to_vec(&self) -> Vec<F> {
491        self.recent(self.len())
492    }
493
494    /// Calculate statistics over recent window
495    pub fn window_stats(&self, windowsize: usize) -> OnlineStats<F> {
496        let recent_data = self.recent(windowsize);
497        let mut stats = OnlineStats::new();
498
499        for value in recent_data {
500            stats.update(value);
501        }
502
503        stats
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510
511    #[test]
512    fn test_anomaly_detector() {
513        let mut detector = StreamingAnomalyDetector::new(100, 2.0, 10, 5);
514
515        // Add normal data
516        let normal_data: Vec<f64> = (0..20).map(|x| x as f64).collect();
517
518        for window in normal_data.windows(10) {
519            let is_anomaly = detector.update(window).expect("Operation failed");
520            assert!(!is_anomaly, "Normal data should not be anomalous");
521        }
522
523        // Add anomalous data
524        let mut anomalous_data = normal_data.clone();
525        anomalous_data.extend(vec![1000.0; 10]); // Clear anomaly
526
527        let result = detector
528            .update(&anomalous_data[anomalous_data.len() - 10..])
529            .expect("Operation failed");
530        assert!(result, "Clear anomaly should be detected");
531    }
532
533    #[test]
534    fn test_pattern_matcher() {
535        let mut matcher = StreamingPatternMatcher::new(100, 0.8);
536
537        // Add a simple pattern
538        let pattern = vec![1.0, 2.0, 3.0, 2.0, 1.0];
539        matcher
540            .add_pattern(pattern.clone(), "triangle".to_string())
541            .expect("Operation failed");
542
543        // Add matching data
544        for &value in &pattern {
545            let matches = matcher.update(value);
546            if !matches.is_empty() {
547                assert_eq!(matches[0].pattern_name, "triangle");
548                assert!(matches[0].correlation >= 0.8);
549            }
550        }
551    }
552
553    #[test]
554    fn test_circular_buffer() {
555        let mut buffer = CircularBuffer::new(5);
556
557        // Add data
558        for i in 1..=3 {
559            buffer.push(i as f64);
560        }
561
562        assert_eq!(buffer.len(), 3);
563        assert_eq!(buffer.recent(2), vec![2.0, 3.0]);
564
565        // Fill buffer completely
566        for i in 4..=7 {
567            buffer.push(i as f64);
568        }
569
570        assert_eq!(buffer.len(), 5);
571        assert_eq!(buffer.to_vec(), vec![3.0, 4.0, 5.0, 6.0, 7.0]);
572    }
573}