Skip to main content

scirs2_transform/
streaming.rs

1//! Streaming transformations for continuous data processing
2//!
3//! This module provides utilities for processing data streams in real-time,
4//! maintaining running statistics and transforming data incrementally.
5
6use scirs2_core::ndarray::{Array1, Array2};
7use scirs2_linalg::eigh;
8use std::collections::VecDeque;
9
10use crate::error::{Result, TransformError};
11
12/// Trait for transformers that support streaming/incremental updates
13pub trait StreamingTransformer: Send + Sync {
14    /// Update the transformer with a new batch of data
15    fn partial_fit(&mut self, x: &Array2<f64>) -> Result<()>;
16
17    /// Transform a batch of data using current statistics
18    fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>>;
19
20    /// Reset the transformer to initial state
21    fn reset(&mut self);
22
23    /// Get the number of samples seen so far
24    fn n_samples_seen(&self) -> usize;
25}
26
27/// Streaming standard scaler that maintains running statistics
28pub struct StreamingStandardScaler {
29    /// Running mean for each feature
30    mean: Array1<f64>,
31    /// Running variance for each feature
32    variance: Array1<f64>,
33    /// Number of samples seen
34    n_samples: usize,
35    /// Whether to center the data
36    with_mean: bool,
37    /// Whether to scale to unit variance
38    withstd: bool,
39    /// Epsilon for numerical stability
40    epsilon: f64,
41}
42
43impl StreamingStandardScaler {
44    /// Create a new streaming standard scaler
45    pub fn new(_nfeatures: usize, with_mean: bool, withstd: bool) -> Self {
46        StreamingStandardScaler {
47            mean: Array1::zeros(_nfeatures),
48            variance: Array1::zeros(_nfeatures),
49            n_samples: 0,
50            with_mean,
51            withstd,
52            epsilon: 1e-8,
53        }
54    }
55
56    /// Update statistics using Welford's online algorithm
57    fn update_statistics(&mut self, x: &Array2<f64>) {
58        let batch_size = x.shape()[0];
59        let nfeatures = x.shape()[1];
60
61        for i in 0..batch_size {
62            self.n_samples += 1;
63            let n = self.n_samples as f64;
64
65            for j in 0..nfeatures {
66                let value = x[[i, j]];
67                let delta = value - self.mean[j];
68                self.mean[j] += delta / n;
69
70                if self.withstd {
71                    let delta2 = value - self.mean[j];
72                    self.variance[j] += delta * delta2;
73                }
74            }
75        }
76    }
77
78    /// Get the current standard deviation
79    fn get_std(&self) -> Array1<f64> {
80        if self.n_samples <= 1 {
81            Array1::ones(self.mean.len())
82        } else {
83            self.variance
84                .mapv(|v| (v / (self.n_samples - 1) as f64).sqrt().max(self.epsilon))
85        }
86    }
87}
88
89impl StreamingTransformer for StreamingStandardScaler {
90    fn partial_fit(&mut self, x: &Array2<f64>) -> Result<()> {
91        if x.shape()[1] != self.mean.len() {
92            return Err(TransformError::InvalidInput(format!(
93                "Expected {} features, got {}",
94                self.mean.len(),
95                x.shape()[1]
96            )));
97        }
98
99        self.update_statistics(x);
100        Ok(())
101    }
102
103    fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
104        if x.shape()[1] != self.mean.len() {
105            return Err(TransformError::InvalidInput(format!(
106                "Expected {} features, got {}",
107                self.mean.len(),
108                x.shape()[1]
109            )));
110        }
111
112        let mut result = x.to_owned();
113
114        if self.with_mean {
115            for i in 0..result.shape()[0] {
116                for j in 0..result.shape()[1] {
117                    result[[i, j]] -= self.mean[j];
118                }
119            }
120        }
121
122        if self.withstd {
123            let std = self.get_std();
124            for i in 0..result.shape()[0] {
125                for j in 0..result.shape()[1] {
126                    result[[i, j]] /= std[j];
127                }
128            }
129        }
130
131        Ok(result)
132    }
133
134    fn reset(&mut self) {
135        self.mean.fill(0.0);
136        self.variance.fill(0.0);
137        self.n_samples = 0;
138    }
139
140    fn n_samples_seen(&self) -> usize {
141        self.n_samples
142    }
143}
144
145/// Streaming min-max scaler that tracks min and max values
146pub struct StreamingMinMaxScaler {
147    /// Minimum values for each feature
148    min: Array1<f64>,
149    /// Maximum values for each feature
150    max: Array1<f64>,
151    /// Target range
152    featurerange: (f64, f64),
153    /// Number of samples seen
154    n_samples: usize,
155}
156
157impl StreamingMinMaxScaler {
158    /// Create a new streaming min-max scaler
159    pub fn new(_nfeatures: usize, featurerange: (f64, f64)) -> Self {
160        StreamingMinMaxScaler {
161            min: Array1::from_elem(_nfeatures, f64::INFINITY),
162            max: Array1::from_elem(_nfeatures, f64::NEG_INFINITY),
163            featurerange,
164            n_samples: 0,
165        }
166    }
167
168    /// Update min and max values
169    fn update_bounds(&mut self, x: &Array2<f64>) {
170        for i in 0..x.shape()[0] {
171            for j in 0..x.shape()[1] {
172                let value = x[[i, j]];
173                self.min[j] = self.min[j].min(value);
174                self.max[j] = self.max[j].max(value);
175            }
176            self.n_samples += 1;
177        }
178    }
179}
180
181impl StreamingTransformer for StreamingMinMaxScaler {
182    fn partial_fit(&mut self, x: &Array2<f64>) -> Result<()> {
183        if x.shape()[1] != self.min.len() {
184            return Err(TransformError::InvalidInput(format!(
185                "Expected {} features, got {}",
186                self.min.len(),
187                x.shape()[1]
188            )));
189        }
190
191        self.update_bounds(x);
192        Ok(())
193    }
194
195    fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
196        if x.shape()[1] != self.min.len() {
197            return Err(TransformError::InvalidInput(format!(
198                "Expected {} features, got {}",
199                self.min.len(),
200                x.shape()[1]
201            )));
202        }
203
204        let mut result = Array2::zeros((x.nrows(), x.ncols()));
205        let (min_val, max_val) = self.featurerange;
206        let scale = max_val - min_val;
207
208        for i in 0..x.shape()[0] {
209            for j in 0..x.shape()[1] {
210                let range = self.max[j] - self.min[j];
211                if range > 1e-10 {
212                    result[[i, j]] = (x[[i, j]] - self.min[j]) / range * scale + min_val;
213                } else {
214                    result[[i, j]] = (min_val + max_val) / 2.0;
215                }
216            }
217        }
218
219        Ok(result)
220    }
221
222    fn reset(&mut self) {
223        self.min.fill(f64::INFINITY);
224        self.max.fill(f64::NEG_INFINITY);
225        self.n_samples = 0;
226    }
227
228    fn n_samples_seen(&self) -> usize {
229        self.n_samples
230    }
231}
232
233/// Streaming quantile tracker using P² algorithm
234pub struct StreamingQuantileTracker {
235    /// Quantiles to track
236    quantiles: Vec<f64>,
237    /// P² algorithm state for each feature and quantile
238    p2_states: Vec<Vec<P2State>>,
239    /// Number of features
240    nfeatures: usize,
241}
242
243/// P² algorithm state for a single quantile
244struct P2State {
245    /// Marker positions
246    n: [f64; 5],
247    /// Marker values
248    q: [f64; 5],
249    /// Desired marker positions
250    n_prime: [f64; 5],
251    /// Number of observations
252    count: usize,
253    /// Target quantile
254    p: f64,
255}
256
257impl P2State {
258    fn new(p: f64) -> Self {
259        P2State {
260            n: [1.0, 2.0, 3.0, 4.0, 5.0],
261            q: [0.0; 5],
262            n_prime: [1.0, 1.0 + 2.0 * p, 1.0 + 4.0 * p, 3.0 + 2.0 * p, 5.0],
263            count: 0,
264            p,
265        }
266    }
267
268    fn update(&mut self, value: f64) {
269        if self.count < 5 {
270            self.q[self.count] = value;
271            self.count += 1;
272
273            if self.count == 5 {
274                // Sort initial observations
275                self.q
276                    .sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
277            }
278            return;
279        }
280
281        // Find cell k such that q[k] <= value < q[k+1]
282        let mut k = 0;
283        for i in 1..5 {
284            if value < self.q[i] {
285                k = i - 1;
286                break;
287            }
288        }
289        if value >= self.q[4] {
290            k = 3;
291        }
292
293        // Update marker positions
294        for i in (k + 1)..5 {
295            self.n[i] += 1.0;
296        }
297
298        // Update desired marker positions
299        for i in 0..5 {
300            self.n_prime[i] += match i {
301                0 => 0.0,
302                1 => self.p / 2.0,
303                2 => self.p,
304                3 => (1.0 + self.p) / 2.0,
305                4 => 1.0,
306                _ => unreachable!(),
307            };
308        }
309
310        // Adjust marker values
311        for i in 1..4 {
312            let d = self.n_prime[i] - self.n[i];
313
314            if (d >= 1.0 && self.n[i + 1] - self.n[i] > 1.0)
315                || (d <= -1.0 && self.n[i - 1] - self.n[i] < -1.0)
316            {
317                let d_sign = d.signum();
318
319                // Try parabolic interpolation
320                let qi = self.parabolic_interpolation(i, d_sign);
321
322                if self.q[i - 1] < qi && qi < self.q[i + 1] {
323                    self.q[i] = qi;
324                } else {
325                    // Fall back to linear interpolation
326                    self.q[i] = self.linear_interpolation(i, d_sign);
327                }
328
329                self.n[i] += d_sign;
330            }
331        }
332    }
333
334    fn parabolic_interpolation(&self, i: usize, d: f64) -> f64 {
335        let qi = self.q[i];
336        let qim1 = self.q[i - 1];
337        let qip1 = self.q[i + 1];
338        let ni = self.n[i];
339        let nim1 = self.n[i - 1];
340        let nip1 = self.n[i + 1];
341
342        qi + d / (nip1 - nim1)
343            * ((ni - nim1 + d) * (qip1 - qi) / (nip1 - ni)
344                + (nip1 - ni - d) * (qi - qim1) / (ni - nim1))
345    }
346
347    fn linear_interpolation(&self, i: usize, d: f64) -> f64 {
348        let j = if d > 0.0 { i + 1 } else { i - 1 };
349        self.q[i] + d * (self.q[j] - self.q[i]) / (self.n[j] - self.n[i])
350    }
351
352    fn quantile(&self) -> f64 {
353        if self.count < 5 {
354            // Not enough data, return median of available values
355            let mut sorted = self.q[..self.count].to_vec();
356            sorted.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
357            sorted[sorted.len() / 2]
358        } else {
359            self.q[2] // Middle marker estimates the quantile
360        }
361    }
362}
363
364impl StreamingQuantileTracker {
365    /// Create a new streaming quantile tracker
366    pub fn new(nfeatures: usize, quantiles: Vec<f64>) -> Result<Self> {
367        // Validate quantiles
368        for &q in &quantiles {
369            if !(0.0..=1.0).contains(&q) {
370                return Err(TransformError::InvalidInput(format!(
371                    "Quantile {q} must be between 0 and 1"
372                )));
373            }
374        }
375
376        let mut p2_states = Vec::with_capacity(nfeatures);
377        for _ in 0..nfeatures {
378            let feature_states: Vec<P2State> = quantiles.iter().map(|&q| P2State::new(q)).collect();
379            p2_states.push(feature_states);
380        }
381
382        Ok(StreamingQuantileTracker {
383            quantiles,
384            p2_states,
385            nfeatures,
386        })
387    }
388
389    /// Update quantile estimates with new data
390    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
391        if x.shape()[1] != self.nfeatures {
392            return Err(TransformError::InvalidInput(format!(
393                "Expected {} features, got {}",
394                self.nfeatures,
395                x.shape()[1]
396            )));
397        }
398
399        for i in 0..x.shape()[0] {
400            for j in 0..x.shape()[1] {
401                let value = x[[i, j]];
402                for k in 0..self.quantiles.len() {
403                    self.p2_states[j][k].update(value);
404                }
405            }
406        }
407
408        Ok(())
409    }
410
411    /// Get current quantile estimates
412    pub fn get_quantiles(&self) -> Array2<f64> {
413        let mut result = Array2::zeros((self.nfeatures, self.quantiles.len()));
414
415        for j in 0..self.nfeatures {
416            for k in 0..self.quantiles.len() {
417                result[[j, k]] = self.p2_states[j][k].quantile();
418            }
419        }
420
421        result
422    }
423}
424
425/// Window-based streaming transformer that maintains a sliding window
426pub struct WindowedStreamingTransformer<T: StreamingTransformer> {
427    /// Underlying transformer
428    transformer: T,
429    /// Sliding window of recent data
430    window: VecDeque<Array2<f64>>,
431    /// Maximum window size
432    windowsize: usize,
433    /// Current number of samples in window
434    current_size: usize,
435}
436
437impl<T: StreamingTransformer> WindowedStreamingTransformer<T> {
438    /// Create a new windowed streaming transformer
439    pub fn new(_transformer: T, windowsize: usize) -> Self {
440        WindowedStreamingTransformer {
441            transformer: _transformer,
442            window: VecDeque::with_capacity(windowsize),
443            windowsize,
444            current_size: 0,
445        }
446    }
447
448    /// Update the transformer with new data
449    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
450        // Add new data to window
451        self.window.push_back(x.to_owned());
452        self.current_size += x.shape()[0];
453
454        // Remove old data if window is full
455        while self.current_size > self.windowsize && !self.window.is_empty() {
456            if let Some(old_data) = self.window.pop_front() {
457                self.current_size -= old_data.shape()[0];
458            }
459        }
460
461        // Refit transformer on window data
462        self.transformer.reset();
463        for data in &self.window {
464            self.transformer.partial_fit(data)?;
465        }
466
467        Ok(())
468    }
469
470    /// Transform data using the windowed statistics
471    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
472        self.transformer.transform(x)
473    }
474}
475
476/// Streaming Principal Component Analysis using incremental SVD
477pub struct StreamingPCA {
478    /// Current mean of the data
479    mean: Array1<f64>,
480    /// Principal components (loading vectors)
481    components: Option<Array2<f64>>,
482    /// Explained variance for each component
483    explained_variance: Option<Array1<f64>>,
484    /// Number of components to keep
485    n_components: usize,
486    /// Number of features
487    nfeatures: usize,
488    /// Number of samples seen
489    n_samples: usize,
490    /// Forgetting factor for incremental updates (0 < alpha <= 1)
491    alpha: f64,
492    /// Minimum number of samples before PCA is computed
493    min_samples: usize,
494    /// Accumulated covariance matrix
495    cov_matrix: Array2<f64>,
496}
497
498impl StreamingPCA {
499    /// Create a new streaming PCA
500    pub fn new(
501        nfeatures: usize,
502        n_components: usize,
503        alpha: f64,
504        min_samples: usize,
505    ) -> Result<Self> {
506        if n_components > nfeatures {
507            return Err(TransformError::InvalidInput(
508                "n_components cannot be larger than nfeatures".to_string(),
509            ));
510        }
511        if alpha <= 0.0 || alpha > 1.0 {
512            return Err(TransformError::InvalidInput(
513                "alpha must be in (0, 1]".to_string(),
514            ));
515        }
516
517        Ok(StreamingPCA {
518            mean: Array1::zeros(nfeatures),
519            components: None,
520            explained_variance: None,
521            n_components,
522            nfeatures,
523            n_samples: 0,
524            alpha,
525            min_samples,
526            cov_matrix: Array2::zeros((nfeatures, nfeatures)),
527        })
528    }
529
530    /// Update PCA with new batch of data
531    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
532        if x.shape()[1] != self.nfeatures {
533            return Err(TransformError::InvalidInput(format!(
534                "Expected {} features, got {}",
535                self.nfeatures,
536                x.shape()[1]
537            )));
538        }
539
540        let _batch_size = x.shape()[0];
541
542        // Update mean using exponential moving average
543        for sample in x.rows() {
544            self.n_samples += 1;
545            let weight = if self.n_samples == 1 { 1.0 } else { self.alpha };
546
547            for (i, &value) in sample.iter().enumerate() {
548                self.mean[i] = (1.0 - weight) * self.mean[i] + weight * value;
549            }
550        }
551
552        // Update covariance matrix
553        if self.n_samples >= self.min_samples {
554            for sample in x.rows() {
555                let centered = &sample.to_owned() - &self.mean;
556                let outer_product = centered
557                    .clone()
558                    .insert_axis(scirs2_core::ndarray::Axis(1))
559                    .dot(&centered.insert_axis(scirs2_core::ndarray::Axis(0)));
560
561                let weight = self.alpha;
562                self.cov_matrix = (1.0 - weight) * &self.cov_matrix + weight * outer_product;
563            }
564
565            // Compute PCA from covariance matrix
566            self.compute_pca()?;
567        }
568
569        Ok(())
570    }
571
572    fn compute_pca(&mut self) -> Result<()> {
573        // Perform proper eigendecomposition of covariance matrix
574        let (eigenvalues, eigenvectors) = eigh(&self.cov_matrix.view(), None).map_err(|e| {
575            TransformError::ComputationError(format!("Eigendecomposition failed: {e}"))
576        })?;
577
578        // Sort by eigenvalues in descending order
579        let mut eigen_pairs: Vec<(f64, Array1<f64>)> = eigenvalues
580            .iter()
581            .zip(eigenvectors.columns())
582            .map(|(&val, vec)| (val, vec.to_owned()))
583            .collect();
584
585        eigen_pairs.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
586
587        // Take top n_components
588        let mut components = Array2::zeros((self.n_components, self.nfeatures));
589        let mut explained_var = Array1::zeros(self.n_components);
590
591        for (i, (eigenval, eigenvec)) in eigen_pairs.iter().take(self.n_components).enumerate() {
592            components.row_mut(i).assign(eigenvec);
593            explained_var[i] = eigenval.max(0.0);
594        }
595
596        self.components = Some(components);
597        self.explained_variance = Some(explained_var);
598        Ok(())
599    }
600
601    /// Transform data using current PCA
602    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
603        if let Some(ref components) = self.components {
604            if x.shape()[1] != self.nfeatures {
605                return Err(TransformError::InvalidInput(format!(
606                    "Expected {} features, got {}",
607                    self.nfeatures,
608                    x.shape()[1]
609                )));
610            }
611
612            let mut result = Array2::zeros((x.shape()[0], self.n_components));
613
614            for (i, sample) in x.rows().into_iter().enumerate() {
615                let centered = &sample.to_owned() - &self.mean;
616                let transformed = components.dot(&centered);
617                result.row_mut(i).assign(&transformed);
618            }
619
620            Ok(result)
621        } else {
622            Err(TransformError::TransformationError(
623                "PCA not computed yet, need more samples".to_string(),
624            ))
625        }
626    }
627
628    /// Get explained variance ratio
629    pub fn explained_variance_ratio(&self) -> Option<Array1<f64>> {
630        self.explained_variance.as_ref().map(|var| {
631            let total_var = var.sum();
632            if total_var > 0.0 {
633                var / total_var
634            } else {
635                Array1::zeros(var.len())
636            }
637        })
638    }
639
640    /// Reset the PCA to initial state
641    pub fn reset(&mut self) {
642        self.mean.fill(0.0);
643        self.components = None;
644        self.explained_variance = None;
645        self.n_samples = 0;
646        self.cov_matrix.fill(0.0);
647    }
648}
649
650/// Streaming outlier detector using statistical methods
651pub struct StreamingOutlierDetector {
652    /// Running statistics for each feature
653    means: Array1<f64>,
654    variances: Array1<f64>,
655    /// Number of samples seen
656    n_samples: usize,
657    /// Number of features
658    nfeatures: usize,
659    /// Threshold for outlier detection (standard deviations)
660    threshold: f64,
661    /// Method for outlier detection
662    method: OutlierMethod,
663}
664
665/// Methods for outlier detection in streaming data
666#[derive(Debug, Clone)]
667pub enum OutlierMethod {
668    /// Z-score based detection
669    ZScore,
670    /// Modified Z-score using median absolute deviation
671    ModifiedZScore,
672    /// Isolation forest-like scoring
673    IsolationScore,
674}
675
676impl StreamingOutlierDetector {
677    /// Create a new streaming outlier detector
678    pub fn new(nfeatures: usize, threshold: f64, method: OutlierMethod) -> Self {
679        StreamingOutlierDetector {
680            means: Array1::zeros(nfeatures),
681            variances: Array1::zeros(nfeatures),
682            n_samples: 0,
683            nfeatures,
684            threshold,
685            method,
686        }
687    }
688
689    /// Update statistics with new data
690    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
691        if x.shape()[1] != self.nfeatures {
692            return Err(TransformError::InvalidInput(format!(
693                "Expected {} features, got {}",
694                self.nfeatures,
695                x.shape()[1]
696            )));
697        }
698
699        // Update running statistics using Welford's algorithm
700        for sample in x.rows() {
701            self.n_samples += 1;
702            let n = self.n_samples as f64;
703
704            for (i, &value) in sample.iter().enumerate() {
705                let delta = value - self.means[i];
706                self.means[i] += delta / n;
707                let delta2 = value - self.means[i];
708                self.variances[i] += delta * delta2;
709            }
710        }
711
712        Ok(())
713    }
714
715    /// Detect outliers in new data
716    pub fn detect_outliers(&self, x: &Array2<f64>) -> Result<Array1<bool>> {
717        if x.shape()[1] != self.nfeatures {
718            return Err(TransformError::InvalidInput(format!(
719                "Expected {} features, got {}",
720                self.nfeatures,
721                x.shape()[1]
722            )));
723        }
724
725        if self.n_samples < 2 {
726            // Not enough data for outlier detection
727            return Ok(Array1::from_elem(x.shape()[0], false));
728        }
729
730        let mut outliers = Array1::from_elem(x.shape()[0], false);
731
732        match self.method {
733            OutlierMethod::ZScore => {
734                let stds = self.get_standard_deviations();
735
736                for (i, sample) in x.rows().into_iter().enumerate() {
737                    let mut is_outlier = false;
738                    for (j, &value) in sample.iter().enumerate() {
739                        if stds[j] > 1e-8 {
740                            let z_score = (value - self.means[j]).abs() / stds[j];
741                            if z_score > self.threshold {
742                                is_outlier = true;
743                                break;
744                            }
745                        }
746                    }
747                    outliers[i] = is_outlier;
748                }
749            }
750            OutlierMethod::ModifiedZScore => {
751                // Simplified modified z-score using running estimates
752                for (i, sample) in x.rows().into_iter().enumerate() {
753                    let mut is_outlier = false;
754                    for (j, &value) in sample.iter().enumerate() {
755                        let mad_estimate =
756                            (self.variances[j] / (self.n_samples - 1) as f64).sqrt() * 0.6745;
757                        if mad_estimate > 1e-8 {
758                            let modified_z = 0.6745 * (value - self.means[j]).abs() / mad_estimate;
759                            if modified_z > self.threshold {
760                                is_outlier = true;
761                                break;
762                            }
763                        }
764                    }
765                    outliers[i] = is_outlier;
766                }
767            }
768            OutlierMethod::IsolationScore => {
769                // Enhanced isolation forest-like scoring using path length estimation
770                for (i, sample) in x.rows().into_iter().enumerate() {
771                    let anomaly_score = self.compute_isolation_score(sample);
772                    outliers[i] = anomaly_score > self.threshold;
773                }
774            }
775        }
776
777        Ok(outliers)
778    }
779
780    fn get_standard_deviations(&self) -> Array1<f64> {
781        if self.n_samples <= 1 {
782            Array1::ones(self.nfeatures)
783        } else {
784            self.variances
785                .mapv(|v| (v / (self.n_samples - 1) as f64).sqrt().max(1e-8))
786        }
787    }
788
789    /// Compute isolation score using path length estimation
790    fn compute_isolation_score(&self, sample: scirs2_core::ndarray::ArrayView1<f64>) -> f64 {
791        let mut path_length = 0.0;
792        let mut current_sample = sample.to_owned();
793
794        // Simulate isolation tree path length with statistical approximation
795        let max_depth = ((self.n_samples as f64).log2().ceil() as usize).min(20);
796
797        for depth in 0..max_depth {
798            let mut min_split_distance = f64::INFINITY;
799
800            // Find the most isolating dimension
801            for j in 0..self.nfeatures {
802                let std_dev = (self.variances[j] / (self.n_samples - 1) as f64).sqrt();
803                if std_dev > 1e-8 {
804                    // Distance from mean normalized by standard deviation
805                    let normalized_distance = (current_sample[j] - self.means[j]).abs() / std_dev;
806
807                    // Estimate how "isolating" this split would be
808                    let split_effectiveness = normalized_distance * (1.0 + depth as f64 * 0.1);
809                    min_split_distance = min_split_distance.min(split_effectiveness);
810                }
811            }
812
813            // If sample is well-isolated in any dimension, break early
814            if min_split_distance > 3.0 {
815                path_length += depth as f64 + min_split_distance / 3.0;
816                break;
817            }
818
819            path_length += 1.0;
820
821            // Adjust sample position for next iteration (simulating tree traversal)
822            for j in 0..self.nfeatures {
823                let adjustment = (current_sample[j] - self.means[j]) * 0.1;
824                current_sample[j] -= adjustment;
825            }
826        }
827
828        // Shorter path lengths indicate anomalies
829        // Normalize by expected path length for a dataset of this size
830        let expected_path_length = if self.n_samples > 2 {
831            2.0 * ((self.n_samples - 1) as f64).ln() + (std::f64::consts::E * 0.57721566)
832                - 2.0 * (self.n_samples - 1) as f64 / self.n_samples as f64
833        } else {
834            1.0
835        };
836
837        // Return anomaly score (higher = more anomalous)
838        2.0_f64.powf(-path_length / expected_path_length)
839    }
840
841    /// Get anomaly scores for samples
842    pub fn anomaly_scores(&self, x: &Array2<f64>) -> Result<Array1<f64>> {
843        if x.shape()[1] != self.nfeatures {
844            return Err(TransformError::InvalidInput(format!(
845                "Expected {} features, got {}",
846                self.nfeatures,
847                x.shape()[1]
848            )));
849        }
850
851        if self.n_samples < 2 {
852            return Ok(Array1::zeros(x.shape()[0]));
853        }
854
855        let mut scores = Array1::zeros(x.shape()[0]);
856        let stds = self.get_standard_deviations();
857
858        for (i, sample) in x.rows().into_iter().enumerate() {
859            let mut score = 0.0;
860            for (j, &value) in sample.iter().enumerate() {
861                if stds[j] > 1e-8 {
862                    let z_score = (value - self.means[j]).abs() / stds[j];
863                    score += z_score;
864                }
865            }
866            scores[i] = score / self.nfeatures as f64;
867        }
868
869        Ok(scores)
870    }
871
872    /// Reset detector to initial state
873    pub fn reset(&mut self) {
874        self.means.fill(0.0);
875        self.variances.fill(0.0);
876        self.n_samples = 0;
877    }
878}
879
880/// Streaming feature selector based on variance or correlation thresholds
881pub struct StreamingFeatureSelector {
882    /// Feature variances
883    variances: Array1<f64>,
884    /// Feature means
885    means: Array1<f64>,
886    /// Correlation matrix (upper triangular)
887    correlations: Array2<f64>,
888    /// Number of samples seen
889    n_samples: usize,
890    /// Number of features
891    nfeatures: usize,
892    /// Variance threshold for feature selection
893    variance_threshold: f64,
894    /// Correlation threshold for removing highly correlated features
895    correlationthreshold: f64,
896    /// Selected feature indices
897    selected_features: Option<Vec<usize>>,
898}
899
900impl StreamingFeatureSelector {
901    /// Create a new streaming feature selector
902    pub fn new(nfeatures: usize, variance_threshold: f64, correlationthreshold: f64) -> Self {
903        StreamingFeatureSelector {
904            variances: Array1::zeros(nfeatures),
905            means: Array1::zeros(nfeatures),
906            correlations: Array2::zeros((nfeatures, nfeatures)),
907            n_samples: 0,
908            nfeatures,
909            variance_threshold,
910            correlationthreshold,
911            selected_features: None,
912        }
913    }
914
915    /// Update statistics with new data
916    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
917        if x.shape()[1] != self.nfeatures {
918            return Err(TransformError::InvalidInput(format!(
919                "Expected {} features, got {}",
920                self.nfeatures,
921                x.shape()[1]
922            )));
923        }
924
925        // Update running statistics
926        for sample in x.rows() {
927            self.n_samples += 1;
928            let n = self.n_samples as f64;
929
930            // Update means and variances
931            for (i, &value) in sample.iter().enumerate() {
932                let delta = value - self.means[i];
933                self.means[i] += delta / n;
934                let delta2 = value - self.means[i];
935                self.variances[i] += delta * delta2;
936            }
937
938            // Update correlations (simplified running correlation)
939            if self.n_samples > 1 {
940                for i in 0..self.nfeatures {
941                    for j in (i + 1)..self.nfeatures {
942                        let val_i = sample[i] - self.means[i];
943                        let val_j = sample[j] - self.means[j];
944                        let covar_update = val_i * val_j / (n - 1.0);
945                        self.correlations[[i, j]] =
946                            (self.correlations[[i, j]] * (n - 2.0) + covar_update) / (n - 1.0);
947                    }
948                }
949            }
950        }
951
952        // Update selected features based on current statistics
953        if self.n_samples >= 10 {
954            // Minimum samples for stable statistics
955            self.update_selected_features();
956        }
957
958        Ok(())
959    }
960
961    fn update_selected_features(&mut self) {
962        let mut selected = Vec::new();
963        let current_variances = self.get_current_variances();
964
965        // First pass: select features based on variance threshold
966        for i in 0..self.nfeatures {
967            if current_variances[i] > self.variance_threshold {
968                selected.push(i);
969            }
970        }
971
972        // Second pass: remove highly correlated features
973        let mut final_selected = Vec::new();
974        for &i in &selected {
975            let mut should_include = true;
976
977            for &j in &final_selected {
978                if i != j {
979                    let corr = self.get_correlation(i, j, &current_variances);
980                    if corr.abs() > self.correlationthreshold {
981                        // Keep feature with higher variance
982                        if current_variances[i] <= current_variances[j] {
983                            should_include = false;
984                            break;
985                        }
986                    }
987                }
988            }
989
990            if should_include {
991                final_selected.push(i);
992            }
993        }
994
995        self.selected_features = Some(final_selected);
996    }
997
998    fn get_current_variances(&self) -> Array1<f64> {
999        if self.n_samples <= 1 {
1000            Array1::zeros(self.nfeatures)
1001        } else {
1002            self.variances.mapv(|v| v / (self.n_samples - 1) as f64)
1003        }
1004    }
1005
1006    fn get_correlation(&self, i: usize, j: usize, variances: &Array1<f64>) -> f64 {
1007        let var_i = variances[i];
1008        let var_j = variances[j];
1009
1010        if var_i > 1e-8 && var_j > 1e-8 {
1011            let idx = if i < j { (i, j) } else { (j, i) };
1012            self.correlations[idx] / (var_i * var_j).sqrt()
1013        } else {
1014            0.0
1015        }
1016    }
1017
1018    /// Transform data by selecting features
1019    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
1020        if let Some(ref selected) = self.selected_features {
1021            if x.shape()[1] != self.nfeatures {
1022                return Err(TransformError::InvalidInput(format!(
1023                    "Expected {} features, got {}",
1024                    self.nfeatures,
1025                    x.shape()[1]
1026                )));
1027            }
1028
1029            if selected.is_empty() {
1030                return Ok(Array2::zeros((x.shape()[0], 0)));
1031            }
1032
1033            let mut result = Array2::zeros((x.shape()[0], selected.len()));
1034
1035            for (row_idx, sample) in x.rows().into_iter().enumerate() {
1036                for (col_idx, &feature_idx) in selected.iter().enumerate() {
1037                    result[[row_idx, col_idx]] = sample[feature_idx];
1038                }
1039            }
1040
1041            Ok(result)
1042        } else {
1043            // No features selected yet, return original data
1044            Ok(x.to_owned())
1045        }
1046    }
1047
1048    /// Get indices of selected features
1049    pub fn get_selected_features(&self) -> Option<&Vec<usize>> {
1050        self.selected_features.as_ref()
1051    }
1052
1053    /// Get number of selected features
1054    pub fn n_features_selected(&self) -> usize {
1055        self.selected_features
1056            .as_ref()
1057            .map_or(self.nfeatures, |s| s.len())
1058    }
1059
1060    /// Reset selector to initial state
1061    pub fn reset(&mut self) {
1062        self.variances.fill(0.0);
1063        self.means.fill(0.0);
1064        self.correlations.fill(0.0);
1065        self.n_samples = 0;
1066        self.selected_features = None;
1067    }
1068}