scirs2_transform/
streaming.rs

1//! Streaming transformations for continuous data processing
2//!
3//! This module provides utilities for processing data streams in real-time,
4//! maintaining running statistics and transforming data incrementally.
5
6use scirs2_core::ndarray::{Array1, Array2};
7use scirs2_linalg::eigh;
8use std::collections::VecDeque;
9
10use crate::error::{Result, TransformError};
11
12/// Trait for transformers that support streaming/incremental updates
13pub trait StreamingTransformer: Send + Sync {
14    /// Update the transformer with a new batch of data
15    fn partial_fit(&mut self, x: &Array2<f64>) -> Result<()>;
16
17    /// Transform a batch of data using current statistics
18    fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>>;
19
20    /// Reset the transformer to initial state
21    fn reset(&mut self);
22
23    /// Get the number of samples seen so far
24    fn n_samples_seen(&self) -> usize;
25}
26
27/// Streaming standard scaler that maintains running statistics
28pub struct StreamingStandardScaler {
29    /// Running mean for each feature
30    mean: Array1<f64>,
31    /// Running variance for each feature
32    variance: Array1<f64>,
33    /// Number of samples seen
34    n_samples: usize,
35    /// Whether to center the data
36    with_mean: bool,
37    /// Whether to scale to unit variance
38    withstd: bool,
39    /// Epsilon for numerical stability
40    epsilon: f64,
41}
42
43impl StreamingStandardScaler {
44    /// Create a new streaming standard scaler
45    pub fn new(_nfeatures: usize, with_mean: bool, withstd: bool) -> Self {
46        StreamingStandardScaler {
47            mean: Array1::zeros(_nfeatures),
48            variance: Array1::zeros(_nfeatures),
49            n_samples: 0,
50            with_mean,
51            withstd,
52            epsilon: 1e-8,
53        }
54    }
55
56    /// Update statistics using Welford's online algorithm
57    fn update_statistics(&mut self, x: &Array2<f64>) {
58        let batch_size = x.shape()[0];
59        let nfeatures = x.shape()[1];
60
61        for i in 0..batch_size {
62            self.n_samples += 1;
63            let n = self.n_samples as f64;
64
65            for j in 0..nfeatures {
66                let value = x[[i, j]];
67                let delta = value - self.mean[j];
68                self.mean[j] += delta / n;
69
70                if self.withstd {
71                    let delta2 = value - self.mean[j];
72                    self.variance[j] += delta * delta2;
73                }
74            }
75        }
76    }
77
78    /// Get the current standard deviation
79    fn get_std(&self) -> Array1<f64> {
80        if self.n_samples <= 1 {
81            Array1::ones(self.mean.len())
82        } else {
83            self.variance
84                .mapv(|v| (v / (self.n_samples - 1) as f64).sqrt().max(self.epsilon))
85        }
86    }
87}
88
89impl StreamingTransformer for StreamingStandardScaler {
90    fn partial_fit(&mut self, x: &Array2<f64>) -> Result<()> {
91        if x.shape()[1] != self.mean.len() {
92            return Err(TransformError::InvalidInput(format!(
93                "Expected {} features, got {}",
94                self.mean.len(),
95                x.shape()[1]
96            )));
97        }
98
99        self.update_statistics(x);
100        Ok(())
101    }
102
103    fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
104        if x.shape()[1] != self.mean.len() {
105            return Err(TransformError::InvalidInput(format!(
106                "Expected {} features, got {}",
107                self.mean.len(),
108                x.shape()[1]
109            )));
110        }
111
112        let mut result = x.to_owned();
113
114        if self.with_mean {
115            for i in 0..result.shape()[0] {
116                for j in 0..result.shape()[1] {
117                    result[[i, j]] -= self.mean[j];
118                }
119            }
120        }
121
122        if self.withstd {
123            let std = self.get_std();
124            for i in 0..result.shape()[0] {
125                for j in 0..result.shape()[1] {
126                    result[[i, j]] /= std[j];
127                }
128            }
129        }
130
131        Ok(result)
132    }
133
134    fn reset(&mut self) {
135        self.mean.fill(0.0);
136        self.variance.fill(0.0);
137        self.n_samples = 0;
138    }
139
140    fn n_samples_seen(&self) -> usize {
141        self.n_samples
142    }
143}
144
145/// Streaming min-max scaler that tracks min and max values
146pub struct StreamingMinMaxScaler {
147    /// Minimum values for each feature
148    min: Array1<f64>,
149    /// Maximum values for each feature
150    max: Array1<f64>,
151    /// Target range
152    featurerange: (f64, f64),
153    /// Number of samples seen
154    n_samples: usize,
155}
156
157impl StreamingMinMaxScaler {
158    /// Create a new streaming min-max scaler
159    pub fn new(_nfeatures: usize, featurerange: (f64, f64)) -> Self {
160        StreamingMinMaxScaler {
161            min: Array1::from_elem(_nfeatures, f64::INFINITY),
162            max: Array1::from_elem(_nfeatures, f64::NEG_INFINITY),
163            featurerange,
164            n_samples: 0,
165        }
166    }
167
168    /// Update min and max values
169    fn update_bounds(&mut self, x: &Array2<f64>) {
170        for i in 0..x.shape()[0] {
171            for j in 0..x.shape()[1] {
172                let value = x[[i, j]];
173                self.min[j] = self.min[j].min(value);
174                self.max[j] = self.max[j].max(value);
175            }
176            self.n_samples += 1;
177        }
178    }
179}
180
181impl StreamingTransformer for StreamingMinMaxScaler {
182    fn partial_fit(&mut self, x: &Array2<f64>) -> Result<()> {
183        if x.shape()[1] != self.min.len() {
184            return Err(TransformError::InvalidInput(format!(
185                "Expected {} features, got {}",
186                self.min.len(),
187                x.shape()[1]
188            )));
189        }
190
191        self.update_bounds(x);
192        Ok(())
193    }
194
195    fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
196        if x.shape()[1] != self.min.len() {
197            return Err(TransformError::InvalidInput(format!(
198                "Expected {} features, got {}",
199                self.min.len(),
200                x.shape()[1]
201            )));
202        }
203
204        let mut result = Array2::zeros((x.nrows(), x.ncols()));
205        let (min_val, max_val) = self.featurerange;
206        let scale = max_val - min_val;
207
208        for i in 0..x.shape()[0] {
209            for j in 0..x.shape()[1] {
210                let range = self.max[j] - self.min[j];
211                if range > 1e-10 {
212                    result[[i, j]] = (x[[i, j]] - self.min[j]) / range * scale + min_val;
213                } else {
214                    result[[i, j]] = (min_val + max_val) / 2.0;
215                }
216            }
217        }
218
219        Ok(result)
220    }
221
222    fn reset(&mut self) {
223        self.min.fill(f64::INFINITY);
224        self.max.fill(f64::NEG_INFINITY);
225        self.n_samples = 0;
226    }
227
228    fn n_samples_seen(&self) -> usize {
229        self.n_samples
230    }
231}
232
233/// Streaming quantile tracker using P² algorithm
234pub struct StreamingQuantileTracker {
235    /// Quantiles to track
236    quantiles: Vec<f64>,
237    /// P² algorithm state for each feature and quantile
238    p2_states: Vec<Vec<P2State>>,
239    /// Number of features
240    nfeatures: usize,
241}
242
243/// P² algorithm state for a single quantile
244struct P2State {
245    /// Marker positions
246    n: [f64; 5],
247    /// Marker values
248    q: [f64; 5],
249    /// Desired marker positions
250    n_prime: [f64; 5],
251    /// Number of observations
252    count: usize,
253    /// Target quantile
254    p: f64,
255}
256
257impl P2State {
258    fn new(p: f64) -> Self {
259        P2State {
260            n: [1.0, 2.0, 3.0, 4.0, 5.0],
261            q: [0.0; 5],
262            n_prime: [1.0, 1.0 + 2.0 * p, 1.0 + 4.0 * p, 3.0 + 2.0 * p, 5.0],
263            count: 0,
264            p,
265        }
266    }
267
268    fn update(&mut self, value: f64) {
269        if self.count < 5 {
270            self.q[self.count] = value;
271            self.count += 1;
272
273            if self.count == 5 {
274                // Sort initial observations
275                self.q.sort_by(|a, b| a.partial_cmp(b).unwrap());
276            }
277            return;
278        }
279
280        // Find cell k such that q[k] <= value < q[k+1]
281        let mut k = 0;
282        for i in 1..5 {
283            if value < self.q[i] {
284                k = i - 1;
285                break;
286            }
287        }
288        if value >= self.q[4] {
289            k = 3;
290        }
291
292        // Update marker positions
293        for i in (k + 1)..5 {
294            self.n[i] += 1.0;
295        }
296
297        // Update desired marker positions
298        for i in 0..5 {
299            self.n_prime[i] += match i {
300                0 => 0.0,
301                1 => self.p / 2.0,
302                2 => self.p,
303                3 => (1.0 + self.p) / 2.0,
304                4 => 1.0,
305                _ => unreachable!(),
306            };
307        }
308
309        // Adjust marker values
310        for i in 1..4 {
311            let d = self.n_prime[i] - self.n[i];
312
313            if (d >= 1.0 && self.n[i + 1] - self.n[i] > 1.0)
314                || (d <= -1.0 && self.n[i - 1] - self.n[i] < -1.0)
315            {
316                let d_sign = d.signum();
317
318                // Try parabolic interpolation
319                let qi = self.parabolic_interpolation(i, d_sign);
320
321                if self.q[i - 1] < qi && qi < self.q[i + 1] {
322                    self.q[i] = qi;
323                } else {
324                    // Fall back to linear interpolation
325                    self.q[i] = self.linear_interpolation(i, d_sign);
326                }
327
328                self.n[i] += d_sign;
329            }
330        }
331    }
332
333    fn parabolic_interpolation(&self, i: usize, d: f64) -> f64 {
334        let qi = self.q[i];
335        let qim1 = self.q[i - 1];
336        let qip1 = self.q[i + 1];
337        let ni = self.n[i];
338        let nim1 = self.n[i - 1];
339        let nip1 = self.n[i + 1];
340
341        qi + d / (nip1 - nim1)
342            * ((ni - nim1 + d) * (qip1 - qi) / (nip1 - ni)
343                + (nip1 - ni - d) * (qi - qim1) / (ni - nim1))
344    }
345
346    fn linear_interpolation(&self, i: usize, d: f64) -> f64 {
347        let j = if d > 0.0 { i + 1 } else { i - 1 };
348        self.q[i] + d * (self.q[j] - self.q[i]) / (self.n[j] - self.n[i])
349    }
350
351    fn quantile(&self) -> f64 {
352        if self.count < 5 {
353            // Not enough data, return median of available values
354            let mut sorted = self.q[..self.count].to_vec();
355            sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
356            sorted[sorted.len() / 2]
357        } else {
358            self.q[2] // Middle marker estimates the quantile
359        }
360    }
361}
362
363impl StreamingQuantileTracker {
364    /// Create a new streaming quantile tracker
365    pub fn new(nfeatures: usize, quantiles: Vec<f64>) -> Result<Self> {
366        // Validate quantiles
367        for &q in &quantiles {
368            if !(0.0..=1.0).contains(&q) {
369                return Err(TransformError::InvalidInput(format!(
370                    "Quantile {q} must be between 0 and 1"
371                )));
372            }
373        }
374
375        let mut p2_states = Vec::with_capacity(nfeatures);
376        for _ in 0..nfeatures {
377            let feature_states: Vec<P2State> = quantiles.iter().map(|&q| P2State::new(q)).collect();
378            p2_states.push(feature_states);
379        }
380
381        Ok(StreamingQuantileTracker {
382            quantiles,
383            p2_states,
384            nfeatures,
385        })
386    }
387
388    /// Update quantile estimates with new data
389    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
390        if x.shape()[1] != self.nfeatures {
391            return Err(TransformError::InvalidInput(format!(
392                "Expected {} features, got {}",
393                self.nfeatures,
394                x.shape()[1]
395            )));
396        }
397
398        for i in 0..x.shape()[0] {
399            for j in 0..x.shape()[1] {
400                let value = x[[i, j]];
401                for k in 0..self.quantiles.len() {
402                    self.p2_states[j][k].update(value);
403                }
404            }
405        }
406
407        Ok(())
408    }
409
410    /// Get current quantile estimates
411    pub fn get_quantiles(&self) -> Array2<f64> {
412        let mut result = Array2::zeros((self.nfeatures, self.quantiles.len()));
413
414        for j in 0..self.nfeatures {
415            for k in 0..self.quantiles.len() {
416                result[[j, k]] = self.p2_states[j][k].quantile();
417            }
418        }
419
420        result
421    }
422}
423
424/// Window-based streaming transformer that maintains a sliding window
425pub struct WindowedStreamingTransformer<T: StreamingTransformer> {
426    /// Underlying transformer
427    transformer: T,
428    /// Sliding window of recent data
429    window: VecDeque<Array2<f64>>,
430    /// Maximum window size
431    windowsize: usize,
432    /// Current number of samples in window
433    current_size: usize,
434}
435
436impl<T: StreamingTransformer> WindowedStreamingTransformer<T> {
437    /// Create a new windowed streaming transformer
438    pub fn new(_transformer: T, windowsize: usize) -> Self {
439        WindowedStreamingTransformer {
440            transformer: _transformer,
441            window: VecDeque::with_capacity(windowsize),
442            windowsize,
443            current_size: 0,
444        }
445    }
446
447    /// Update the transformer with new data
448    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
449        // Add new data to window
450        self.window.push_back(x.to_owned());
451        self.current_size += x.shape()[0];
452
453        // Remove old data if window is full
454        while self.current_size > self.windowsize && !self.window.is_empty() {
455            if let Some(old_data) = self.window.pop_front() {
456                self.current_size -= old_data.shape()[0];
457            }
458        }
459
460        // Refit transformer on window data
461        self.transformer.reset();
462        for data in &self.window {
463            self.transformer.partial_fit(data)?;
464        }
465
466        Ok(())
467    }
468
469    /// Transform data using the windowed statistics
470    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
471        self.transformer.transform(x)
472    }
473}
474
475/// Streaming Principal Component Analysis using incremental SVD
476pub struct StreamingPCA {
477    /// Current mean of the data
478    mean: Array1<f64>,
479    /// Principal components (loading vectors)
480    components: Option<Array2<f64>>,
481    /// Explained variance for each component
482    explained_variance: Option<Array1<f64>>,
483    /// Number of components to keep
484    n_components: usize,
485    /// Number of features
486    nfeatures: usize,
487    /// Number of samples seen
488    n_samples: usize,
489    /// Forgetting factor for incremental updates (0 < alpha <= 1)
490    alpha: f64,
491    /// Minimum number of samples before PCA is computed
492    min_samples: usize,
493    /// Accumulated covariance matrix
494    cov_matrix: Array2<f64>,
495}
496
497impl StreamingPCA {
498    /// Create a new streaming PCA
499    pub fn new(
500        nfeatures: usize,
501        n_components: usize,
502        alpha: f64,
503        min_samples: usize,
504    ) -> Result<Self> {
505        if n_components > nfeatures {
506            return Err(TransformError::InvalidInput(
507                "n_components cannot be larger than nfeatures".to_string(),
508            ));
509        }
510        if alpha <= 0.0 || alpha > 1.0 {
511            return Err(TransformError::InvalidInput(
512                "alpha must be in (0, 1]".to_string(),
513            ));
514        }
515
516        Ok(StreamingPCA {
517            mean: Array1::zeros(nfeatures),
518            components: None,
519            explained_variance: None,
520            n_components,
521            nfeatures,
522            n_samples: 0,
523            alpha,
524            min_samples,
525            cov_matrix: Array2::zeros((nfeatures, nfeatures)),
526        })
527    }
528
529    /// Update PCA with new batch of data
530    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
531        if x.shape()[1] != self.nfeatures {
532            return Err(TransformError::InvalidInput(format!(
533                "Expected {} features, got {}",
534                self.nfeatures,
535                x.shape()[1]
536            )));
537        }
538
539        let _batch_size = x.shape()[0];
540
541        // Update mean using exponential moving average
542        for sample in x.rows() {
543            self.n_samples += 1;
544            let weight = if self.n_samples == 1 { 1.0 } else { self.alpha };
545
546            for (i, &value) in sample.iter().enumerate() {
547                self.mean[i] = (1.0 - weight) * self.mean[i] + weight * value;
548            }
549        }
550
551        // Update covariance matrix
552        if self.n_samples >= self.min_samples {
553            for sample in x.rows() {
554                let centered = &sample.to_owned() - &self.mean;
555                let outer_product = centered
556                    .clone()
557                    .insert_axis(scirs2_core::ndarray::Axis(1))
558                    .dot(&centered.insert_axis(scirs2_core::ndarray::Axis(0)));
559
560                let weight = self.alpha;
561                self.cov_matrix = (1.0 - weight) * &self.cov_matrix + weight * outer_product;
562            }
563
564            // Compute PCA from covariance matrix
565            self.compute_pca()?;
566        }
567
568        Ok(())
569    }
570
571    fn compute_pca(&mut self) -> Result<()> {
572        // Perform proper eigendecomposition of covariance matrix
573        let (eigenvalues, eigenvectors) = eigh(&self.cov_matrix.view(), None).map_err(|e| {
574            TransformError::ComputationError(format!("Eigendecomposition failed: {e}"))
575        })?;
576
577        // Sort by eigenvalues in descending order
578        let mut eigen_pairs: Vec<(f64, Array1<f64>)> = eigenvalues
579            .iter()
580            .zip(eigenvectors.columns())
581            .map(|(&val, vec)| (val, vec.to_owned()))
582            .collect();
583
584        eigen_pairs.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
585
586        // Take top n_components
587        let mut components = Array2::zeros((self.n_components, self.nfeatures));
588        let mut explained_var = Array1::zeros(self.n_components);
589
590        for (i, (eigenval, eigenvec)) in eigen_pairs.iter().take(self.n_components).enumerate() {
591            components.row_mut(i).assign(eigenvec);
592            explained_var[i] = eigenval.max(0.0);
593        }
594
595        self.components = Some(components);
596        self.explained_variance = Some(explained_var);
597        Ok(())
598    }
599
600    /// Transform data using current PCA
601    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
602        if let Some(ref components) = self.components {
603            if x.shape()[1] != self.nfeatures {
604                return Err(TransformError::InvalidInput(format!(
605                    "Expected {} features, got {}",
606                    self.nfeatures,
607                    x.shape()[1]
608                )));
609            }
610
611            let mut result = Array2::zeros((x.shape()[0], self.n_components));
612
613            for (i, sample) in x.rows().into_iter().enumerate() {
614                let centered = &sample.to_owned() - &self.mean;
615                let transformed = components.dot(&centered);
616                result.row_mut(i).assign(&transformed);
617            }
618
619            Ok(result)
620        } else {
621            Err(TransformError::TransformationError(
622                "PCA not computed yet, need more samples".to_string(),
623            ))
624        }
625    }
626
627    /// Get explained variance ratio
628    pub fn explained_variance_ratio(&self) -> Option<Array1<f64>> {
629        self.explained_variance.as_ref().map(|var| {
630            let total_var = var.sum();
631            if total_var > 0.0 {
632                var / total_var
633            } else {
634                Array1::zeros(var.len())
635            }
636        })
637    }
638
639    /// Reset the PCA to initial state
640    pub fn reset(&mut self) {
641        self.mean.fill(0.0);
642        self.components = None;
643        self.explained_variance = None;
644        self.n_samples = 0;
645        self.cov_matrix.fill(0.0);
646    }
647}
648
649/// Streaming outlier detector using statistical methods
650pub struct StreamingOutlierDetector {
651    /// Running statistics for each feature
652    means: Array1<f64>,
653    variances: Array1<f64>,
654    /// Number of samples seen
655    n_samples: usize,
656    /// Number of features
657    nfeatures: usize,
658    /// Threshold for outlier detection (standard deviations)
659    threshold: f64,
660    /// Method for outlier detection
661    method: OutlierMethod,
662}
663
664/// Methods for outlier detection in streaming data
665#[derive(Debug, Clone)]
666pub enum OutlierMethod {
667    /// Z-score based detection
668    ZScore,
669    /// Modified Z-score using median absolute deviation
670    ModifiedZScore,
671    /// Isolation forest-like scoring
672    IsolationScore,
673}
674
675impl StreamingOutlierDetector {
676    /// Create a new streaming outlier detector
677    pub fn new(nfeatures: usize, threshold: f64, method: OutlierMethod) -> Self {
678        StreamingOutlierDetector {
679            means: Array1::zeros(nfeatures),
680            variances: Array1::zeros(nfeatures),
681            n_samples: 0,
682            nfeatures,
683            threshold,
684            method,
685        }
686    }
687
688    /// Update statistics with new data
689    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
690        if x.shape()[1] != self.nfeatures {
691            return Err(TransformError::InvalidInput(format!(
692                "Expected {} features, got {}",
693                self.nfeatures,
694                x.shape()[1]
695            )));
696        }
697
698        // Update running statistics using Welford's algorithm
699        for sample in x.rows() {
700            self.n_samples += 1;
701            let n = self.n_samples as f64;
702
703            for (i, &value) in sample.iter().enumerate() {
704                let delta = value - self.means[i];
705                self.means[i] += delta / n;
706                let delta2 = value - self.means[i];
707                self.variances[i] += delta * delta2;
708            }
709        }
710
711        Ok(())
712    }
713
714    /// Detect outliers in new data
715    pub fn detect_outliers(&self, x: &Array2<f64>) -> Result<Array1<bool>> {
716        if x.shape()[1] != self.nfeatures {
717            return Err(TransformError::InvalidInput(format!(
718                "Expected {} features, got {}",
719                self.nfeatures,
720                x.shape()[1]
721            )));
722        }
723
724        if self.n_samples < 2 {
725            // Not enough data for outlier detection
726            return Ok(Array1::from_elem(x.shape()[0], false));
727        }
728
729        let mut outliers = Array1::from_elem(x.shape()[0], false);
730
731        match self.method {
732            OutlierMethod::ZScore => {
733                let stds = self.get_standard_deviations();
734
735                for (i, sample) in x.rows().into_iter().enumerate() {
736                    let mut is_outlier = false;
737                    for (j, &value) in sample.iter().enumerate() {
738                        if stds[j] > 1e-8 {
739                            let z_score = (value - self.means[j]).abs() / stds[j];
740                            if z_score > self.threshold {
741                                is_outlier = true;
742                                break;
743                            }
744                        }
745                    }
746                    outliers[i] = is_outlier;
747                }
748            }
749            OutlierMethod::ModifiedZScore => {
750                // Simplified modified z-score using running estimates
751                for (i, sample) in x.rows().into_iter().enumerate() {
752                    let mut is_outlier = false;
753                    for (j, &value) in sample.iter().enumerate() {
754                        let mad_estimate =
755                            (self.variances[j] / (self.n_samples - 1) as f64).sqrt() * 0.6745;
756                        if mad_estimate > 1e-8 {
757                            let modified_z = 0.6745 * (value - self.means[j]).abs() / mad_estimate;
758                            if modified_z > self.threshold {
759                                is_outlier = true;
760                                break;
761                            }
762                        }
763                    }
764                    outliers[i] = is_outlier;
765                }
766            }
767            OutlierMethod::IsolationScore => {
768                // Enhanced isolation forest-like scoring using path length estimation
769                for (i, sample) in x.rows().into_iter().enumerate() {
770                    let anomaly_score = self.compute_isolation_score(sample);
771                    outliers[i] = anomaly_score > self.threshold;
772                }
773            }
774        }
775
776        Ok(outliers)
777    }
778
779    fn get_standard_deviations(&self) -> Array1<f64> {
780        if self.n_samples <= 1 {
781            Array1::ones(self.nfeatures)
782        } else {
783            self.variances
784                .mapv(|v| (v / (self.n_samples - 1) as f64).sqrt().max(1e-8))
785        }
786    }
787
788    /// Compute isolation score using path length estimation
789    fn compute_isolation_score(&self, sample: scirs2_core::ndarray::ArrayView1<f64>) -> f64 {
790        let mut path_length = 0.0;
791        let mut current_sample = sample.to_owned();
792
793        // Simulate isolation tree path length with statistical approximation
794        let max_depth = ((self.n_samples as f64).log2().ceil() as usize).min(20);
795
796        for depth in 0..max_depth {
797            let mut min_split_distance = f64::INFINITY;
798
799            // Find the most isolating dimension
800            for j in 0..self.nfeatures {
801                let std_dev = (self.variances[j] / (self.n_samples - 1) as f64).sqrt();
802                if std_dev > 1e-8 {
803                    // Distance from mean normalized by standard deviation
804                    let normalized_distance = (current_sample[j] - self.means[j]).abs() / std_dev;
805
806                    // Estimate how "isolating" this split would be
807                    let split_effectiveness = normalized_distance * (1.0 + depth as f64 * 0.1);
808                    min_split_distance = min_split_distance.min(split_effectiveness);
809                }
810            }
811
812            // If sample is well-isolated in any dimension, break early
813            if min_split_distance > 3.0 {
814                path_length += depth as f64 + min_split_distance / 3.0;
815                break;
816            }
817
818            path_length += 1.0;
819
820            // Adjust sample position for next iteration (simulating tree traversal)
821            for j in 0..self.nfeatures {
822                let adjustment = (current_sample[j] - self.means[j]) * 0.1;
823                current_sample[j] -= adjustment;
824            }
825        }
826
827        // Shorter path lengths indicate anomalies
828        // Normalize by expected path length for a dataset of this size
829        let expected_path_length = if self.n_samples > 2 {
830            2.0 * ((self.n_samples - 1) as f64).ln() + (std::f64::consts::E * 0.57721566)
831                - 2.0 * (self.n_samples - 1) as f64 / self.n_samples as f64
832        } else {
833            1.0
834        };
835
836        // Return anomaly score (higher = more anomalous)
837        2.0_f64.powf(-path_length / expected_path_length)
838    }
839
840    /// Get anomaly scores for samples
841    pub fn anomaly_scores(&self, x: &Array2<f64>) -> Result<Array1<f64>> {
842        if x.shape()[1] != self.nfeatures {
843            return Err(TransformError::InvalidInput(format!(
844                "Expected {} features, got {}",
845                self.nfeatures,
846                x.shape()[1]
847            )));
848        }
849
850        if self.n_samples < 2 {
851            return Ok(Array1::zeros(x.shape()[0]));
852        }
853
854        let mut scores = Array1::zeros(x.shape()[0]);
855        let stds = self.get_standard_deviations();
856
857        for (i, sample) in x.rows().into_iter().enumerate() {
858            let mut score = 0.0;
859            for (j, &value) in sample.iter().enumerate() {
860                if stds[j] > 1e-8 {
861                    let z_score = (value - self.means[j]).abs() / stds[j];
862                    score += z_score;
863                }
864            }
865            scores[i] = score / self.nfeatures as f64;
866        }
867
868        Ok(scores)
869    }
870
871    /// Reset detector to initial state
872    pub fn reset(&mut self) {
873        self.means.fill(0.0);
874        self.variances.fill(0.0);
875        self.n_samples = 0;
876    }
877}
878
879/// Streaming feature selector based on variance or correlation thresholds
880pub struct StreamingFeatureSelector {
881    /// Feature variances
882    variances: Array1<f64>,
883    /// Feature means
884    means: Array1<f64>,
885    /// Correlation matrix (upper triangular)
886    correlations: Array2<f64>,
887    /// Number of samples seen
888    n_samples: usize,
889    /// Number of features
890    nfeatures: usize,
891    /// Variance threshold for feature selection
892    variance_threshold: f64,
893    /// Correlation threshold for removing highly correlated features
894    correlationthreshold: f64,
895    /// Selected feature indices
896    selected_features: Option<Vec<usize>>,
897}
898
899impl StreamingFeatureSelector {
900    /// Create a new streaming feature selector
901    pub fn new(nfeatures: usize, variance_threshold: f64, correlationthreshold: f64) -> Self {
902        StreamingFeatureSelector {
903            variances: Array1::zeros(nfeatures),
904            means: Array1::zeros(nfeatures),
905            correlations: Array2::zeros((nfeatures, nfeatures)),
906            n_samples: 0,
907            nfeatures,
908            variance_threshold,
909            correlationthreshold,
910            selected_features: None,
911        }
912    }
913
914    /// Update statistics with new data
915    pub fn update(&mut self, x: &Array2<f64>) -> Result<()> {
916        if x.shape()[1] != self.nfeatures {
917            return Err(TransformError::InvalidInput(format!(
918                "Expected {} features, got {}",
919                self.nfeatures,
920                x.shape()[1]
921            )));
922        }
923
924        // Update running statistics
925        for sample in x.rows() {
926            self.n_samples += 1;
927            let n = self.n_samples as f64;
928
929            // Update means and variances
930            for (i, &value) in sample.iter().enumerate() {
931                let delta = value - self.means[i];
932                self.means[i] += delta / n;
933                let delta2 = value - self.means[i];
934                self.variances[i] += delta * delta2;
935            }
936
937            // Update correlations (simplified running correlation)
938            if self.n_samples > 1 {
939                for i in 0..self.nfeatures {
940                    for j in (i + 1)..self.nfeatures {
941                        let val_i = sample[i] - self.means[i];
942                        let val_j = sample[j] - self.means[j];
943                        let covar_update = val_i * val_j / (n - 1.0);
944                        self.correlations[[i, j]] =
945                            (self.correlations[[i, j]] * (n - 2.0) + covar_update) / (n - 1.0);
946                    }
947                }
948            }
949        }
950
951        // Update selected features based on current statistics
952        if self.n_samples >= 10 {
953            // Minimum samples for stable statistics
954            self.update_selected_features();
955        }
956
957        Ok(())
958    }
959
960    fn update_selected_features(&mut self) {
961        let mut selected = Vec::new();
962        let current_variances = self.get_current_variances();
963
964        // First pass: select features based on variance threshold
965        for i in 0..self.nfeatures {
966            if current_variances[i] > self.variance_threshold {
967                selected.push(i);
968            }
969        }
970
971        // Second pass: remove highly correlated features
972        let mut final_selected = Vec::new();
973        for &i in &selected {
974            let mut should_include = true;
975
976            for &j in &final_selected {
977                if i != j {
978                    let corr = self.get_correlation(i, j, &current_variances);
979                    if corr.abs() > self.correlationthreshold {
980                        // Keep feature with higher variance
981                        if current_variances[i] <= current_variances[j] {
982                            should_include = false;
983                            break;
984                        }
985                    }
986                }
987            }
988
989            if should_include {
990                final_selected.push(i);
991            }
992        }
993
994        self.selected_features = Some(final_selected);
995    }
996
997    fn get_current_variances(&self) -> Array1<f64> {
998        if self.n_samples <= 1 {
999            Array1::zeros(self.nfeatures)
1000        } else {
1001            self.variances.mapv(|v| v / (self.n_samples - 1) as f64)
1002        }
1003    }
1004
1005    fn get_correlation(&self, i: usize, j: usize, variances: &Array1<f64>) -> f64 {
1006        let var_i = variances[i];
1007        let var_j = variances[j];
1008
1009        if var_i > 1e-8 && var_j > 1e-8 {
1010            let idx = if i < j { (i, j) } else { (j, i) };
1011            self.correlations[idx] / (var_i * var_j).sqrt()
1012        } else {
1013            0.0
1014        }
1015    }
1016
1017    /// Transform data by selecting features
1018    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
1019        if let Some(ref selected) = self.selected_features {
1020            if x.shape()[1] != self.nfeatures {
1021                return Err(TransformError::InvalidInput(format!(
1022                    "Expected {} features, got {}",
1023                    self.nfeatures,
1024                    x.shape()[1]
1025                )));
1026            }
1027
1028            if selected.is_empty() {
1029                return Ok(Array2::zeros((x.shape()[0], 0)));
1030            }
1031
1032            let mut result = Array2::zeros((x.shape()[0], selected.len()));
1033
1034            for (row_idx, sample) in x.rows().into_iter().enumerate() {
1035                for (col_idx, &feature_idx) in selected.iter().enumerate() {
1036                    result[[row_idx, col_idx]] = sample[feature_idx];
1037                }
1038            }
1039
1040            Ok(result)
1041        } else {
1042            // No features selected yet, return original data
1043            Ok(x.to_owned())
1044        }
1045    }
1046
1047    /// Get indices of selected features
1048    pub fn get_selected_features(&self) -> Option<&Vec<usize>> {
1049        self.selected_features.as_ref()
1050    }
1051
1052    /// Get number of selected features
1053    pub fn n_features_selected(&self) -> usize {
1054        self.selected_features
1055            .as_ref()
1056            .map_or(self.nfeatures, |s| s.len())
1057    }
1058
1059    /// Reset selector to initial state
1060    pub fn reset(&mut self) {
1061        self.variances.fill(0.0);
1062        self.means.fill(0.0);
1063        self.correlations.fill(0.0);
1064        self.n_samples = 0;
1065        self.selected_features = None;
1066    }
1067}