sklears_kernel_approximation/
streaming_kernel.rs

1use scirs2_core::ndarray::{s, Array1, Array2, Axis};
2use scirs2_core::random::rngs::StdRng;
3use scirs2_core::random::{thread_rng, Rng, SeedableRng, StandardNormal};
4use sklears_core::error::{Result, SklearsError};
5use std::collections::VecDeque;
6
7/// Streaming kernel approximation methods for online processing
8///
9/// This module provides online learning capabilities for kernel
10/// approximations, enabling processing of data streams where samples
11/// arrive continuously and memory is limited.
12///
13/// Buffer management strategy for streaming data
14#[derive(Debug, Clone)]
15pub enum BufferStrategy {
16    /// Fixed-size buffer with FIFO replacement
17    FixedSize(usize),
18    /// Sliding window with time-based expiration
19    SlidingWindow { size: usize, time_window: f64 },
20    /// Reservoir sampling for representative subset
21    ReservoirSampling(usize),
22    /// Exponential decay weighting
23    ExponentialDecay { alpha: f64, min_weight: f64 },
24    /// Importance-weighted sampling
25    ImportanceWeighted { capacity: usize, threshold: f64 },
26}
27
28/// Update frequency for model parameters
29#[derive(Debug, Clone)]
30pub enum UpdateFrequency {
31    /// Update after every sample
32    PerSample,
33    /// Update after every N samples
34    BatchSize(usize),
35    /// Update based on time intervals
36    TimeInterval(f64),
37    /// Update when error exceeds threshold
38    ErrorThreshold(f64),
39    /// Adaptive update frequency
40    Adaptive {
41        initial: usize,
42        max: usize,
43        min: usize,
44    },
45}
46
47/// Forgetting mechanism for old data
48#[derive(Debug, Clone)]
49pub enum ForgettingMechanism {
50    /// No forgetting - keep all data
51    None,
52    /// Linear decay of old samples
53    LinearDecay(f64),
54    /// Exponential decay of old samples
55    ExponentialDecay(f64),
56    /// Abrupt forgetting after time window
57    AbruptForgetting(f64),
58    /// Gradual forgetting with sigmoid function
59    SigmoidDecay { steepness: f64, midpoint: f64 },
60}
61
62/// Configuration for streaming kernel approximation
63#[derive(Debug, Clone)]
64pub struct StreamingConfig {
65    /// buffer_strategy
66    pub buffer_strategy: BufferStrategy,
67    /// update_frequency
68    pub update_frequency: UpdateFrequency,
69    /// forgetting_mechanism
70    pub forgetting_mechanism: ForgettingMechanism,
71    /// max_memory_mb
72    pub max_memory_mb: Option<usize>,
73    /// adaptive_components
74    pub adaptive_components: bool,
75    /// quality_monitoring
76    pub quality_monitoring: bool,
77    /// drift_detection
78    pub drift_detection: bool,
79    /// concept_drift_threshold
80    pub concept_drift_threshold: f64,
81}
82
83impl Default for StreamingConfig {
84    fn default() -> Self {
85        Self {
86            buffer_strategy: BufferStrategy::FixedSize(1000),
87            update_frequency: UpdateFrequency::BatchSize(100),
88            forgetting_mechanism: ForgettingMechanism::ExponentialDecay(0.99),
89            max_memory_mb: Some(100),
90            adaptive_components: true,
91            quality_monitoring: true,
92            drift_detection: false,
93            concept_drift_threshold: 0.1,
94        }
95    }
96}
97
98/// Sample with metadata for streaming processing
99#[derive(Debug, Clone)]
100pub struct StreamingSample {
101    /// data
102    pub data: Array1<f64>,
103    /// timestamp
104    pub timestamp: f64,
105    /// weight
106    pub weight: f64,
107    /// importance
108    pub importance: f64,
109    /// label
110    pub label: Option<f64>,
111}
112
113impl StreamingSample {
114    pub fn new(data: Array1<f64>, timestamp: f64) -> Self {
115        Self {
116            data,
117            timestamp,
118            weight: 1.0,
119            importance: 1.0,
120            label: None,
121        }
122    }
123
124    pub fn with_weight(mut self, weight: f64) -> Self {
125        self.weight = weight;
126        self
127    }
128
129    pub fn with_importance(mut self, importance: f64) -> Self {
130        self.importance = importance;
131        self
132    }
133
134    pub fn with_label(mut self, label: f64) -> Self {
135        self.label = Some(label);
136        self
137    }
138}
139
140/// Streaming RBF kernel approximation using Random Fourier Features
141///
142/// Maintains an online approximation of RBF kernel features that
143/// adapts to data streams with concept drift and memory constraints.
144pub struct StreamingRBFSampler {
145    n_components: usize,
146    gamma: f64,
147    config: StreamingConfig,
148    weights: Option<Array2<f64>>,
149    bias: Option<Array1<f64>>,
150    buffer: VecDeque<StreamingSample>,
151    sample_count: usize,
152    last_update: usize,
153    feature_statistics: FeatureStatistics,
154    random_state: Option<u64>,
155    rng: StdRng,
156}
157
158/// Statistics for monitoring feature quality
159#[derive(Debug, Clone)]
160pub struct FeatureStatistics {
161    /// mean
162    pub mean: Array1<f64>,
163    /// variance
164    pub variance: Array1<f64>,
165    /// min
166    pub min: Array1<f64>,
167    /// max
168    pub max: Array1<f64>,
169    /// update_count
170    pub update_count: usize,
171    /// approximation_error
172    pub approximation_error: f64,
173    /// drift_score
174    pub drift_score: f64,
175}
176
177impl FeatureStatistics {
178    pub fn new(n_components: usize) -> Self {
179        Self {
180            mean: Array1::zeros(n_components),
181            variance: Array1::zeros(n_components),
182            min: Array1::from_elem(n_components, f64::INFINITY),
183            max: Array1::from_elem(n_components, f64::NEG_INFINITY),
184            update_count: 0,
185            approximation_error: 0.0,
186            drift_score: 0.0,
187        }
188    }
189
190    pub fn update(&mut self, features: &Array2<f64>) {
191        let n_samples = features.nrows();
192
193        for i in 0..features.ncols() {
194            let col = features.column(i);
195            let new_mean = col.mean().unwrap_or(0.0);
196            let new_var = col.mapv(|x| (x - new_mean).powi(2)).mean().unwrap_or(0.0);
197            let new_min = col.iter().fold(f64::INFINITY, |a, &b| a.min(b));
198            let new_max = col.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
199
200            // Online update of statistics
201            let old_count = self.update_count;
202            let new_count = old_count + n_samples;
203
204            if old_count == 0 {
205                self.mean[i] = new_mean;
206                self.variance[i] = new_var;
207            } else {
208                let alpha = n_samples as f64 / new_count as f64;
209                self.mean[i] = (1.0 - alpha) * self.mean[i] + alpha * new_mean;
210                self.variance[i] = (1.0 - alpha) * self.variance[i] + alpha * new_var;
211            }
212
213            self.min[i] = self.min[i].min(new_min);
214            self.max[i] = self.max[i].max(new_max);
215        }
216
217        self.update_count += n_samples;
218    }
219
220    pub fn detect_drift(&mut self, new_features: &Array2<f64>) -> bool {
221        let old_mean = self.mean.clone();
222        self.update(new_features);
223
224        // Simple drift detection based on mean shift
225        let mean_shift = (&self.mean - &old_mean).mapv(f64::abs).sum();
226        self.drift_score = mean_shift / self.mean.len() as f64;
227
228        self.drift_score > 0.1 // Simple threshold
229    }
230}
231
232impl StreamingRBFSampler {
233    /// Create a new streaming RBF sampler
234    pub fn new(n_components: usize, gamma: f64) -> Self {
235        let rng = StdRng::from_seed(thread_rng().gen());
236        Self {
237            n_components,
238            gamma,
239            config: StreamingConfig::default(),
240            weights: None,
241            bias: None,
242            buffer: VecDeque::new(),
243            sample_count: 0,
244            last_update: 0,
245            feature_statistics: FeatureStatistics::new(n_components),
246            random_state: None,
247            rng,
248        }
249    }
250
251    /// Set the streaming configuration
252    pub fn with_config(mut self, config: StreamingConfig) -> Self {
253        self.config = config;
254        self
255    }
256
257    /// Set random state for reproducibility
258    pub fn with_random_state(mut self, random_state: u64) -> Self {
259        self.random_state = Some(random_state);
260        self.rng = StdRng::seed_from_u64(random_state);
261        self
262    }
263
264    /// Initialize the streaming sampler with initial data
265    pub fn fit(&mut self, x: &Array2<f64>) -> Result<()> {
266        let (_, n_features) = x.dim();
267
268        // Initialize random weights and bias
269        self.weights = Some(self.generate_weights(n_features)?);
270        self.bias = Some(self.generate_bias()?);
271
272        // Process initial batch
273        for (i, row) in x.rows().into_iter().enumerate() {
274            let sample = StreamingSample::new(row.to_owned(), i as f64);
275            self.add_sample(sample)?;
276        }
277
278        Ok(())
279    }
280
281    /// Add a new sample to the stream
282    pub fn add_sample(&mut self, sample: StreamingSample) -> Result<()> {
283        // Check if initialization is needed
284        if self.weights.is_none() {
285            let n_features = sample.data.len();
286            self.weights = Some(self.generate_weights(n_features)?);
287            self.bias = Some(self.generate_bias()?);
288        }
289
290        // Add to buffer based on strategy
291        self.manage_buffer(sample)?;
292
293        self.sample_count += 1;
294
295        // Check if update is needed
296        if self.should_update()? {
297            self.update_model()?;
298            self.last_update = self.sample_count;
299        }
300
301        Ok(())
302    }
303
304    /// Transform data using current model
305    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
306        let weights = self
307            .weights
308            .as_ref()
309            .ok_or_else(|| SklearsError::NotFitted {
310                operation: "transform".to_string(),
311            })?;
312        let bias = self.bias.as_ref().ok_or_else(|| SklearsError::NotFitted {
313            operation: "transform".to_string(),
314        })?;
315
316        self.compute_features(x, weights, bias)
317    }
318
319    /// Transform a single sample
320    pub fn transform_sample(&self, sample: &Array1<f64>) -> Result<Array1<f64>> {
321        let weights = self
322            .weights
323            .as_ref()
324            .ok_or_else(|| SklearsError::NotFitted {
325                operation: "transform_sample".to_string(),
326            })?;
327        let bias = self.bias.as_ref().ok_or_else(|| SklearsError::NotFitted {
328            operation: "transform_sample".to_string(),
329        })?;
330
331        // Compute features for single sample
332        let projection = sample.dot(&weights.t()) + bias;
333        let norm_factor = (2.0 / self.n_components as f64).sqrt();
334
335        Ok(projection.mapv(|x| norm_factor * x.cos()))
336    }
337
338    /// Get current buffer statistics
339    pub fn buffer_stats(&self) -> (usize, f64, f64) {
340        let size = self.buffer.len();
341        let avg_weight = if size > 0 {
342            self.buffer.iter().map(|s| s.weight).sum::<f64>() / size as f64
343        } else {
344            0.0
345        };
346        let avg_importance = if size > 0 {
347            self.buffer.iter().map(|s| s.importance).sum::<f64>() / size as f64
348        } else {
349            0.0
350        };
351
352        (size, avg_weight, avg_importance)
353    }
354
355    /// Get feature statistics
356    pub fn feature_stats(&self) -> &FeatureStatistics {
357        &self.feature_statistics
358    }
359
360    /// Check for concept drift
361    pub fn detect_drift(&mut self, x: &Array2<f64>) -> Result<bool> {
362        if !self.config.drift_detection {
363            return Ok(false);
364        }
365
366        let features = self.transform(x)?;
367        Ok(self.feature_statistics.detect_drift(&features))
368    }
369
370    /// Manage buffer based on strategy
371    fn manage_buffer(&mut self, sample: StreamingSample) -> Result<()> {
372        match &self.config.buffer_strategy {
373            BufferStrategy::FixedSize(max_size) => {
374                if self.buffer.len() >= *max_size {
375                    self.buffer.pop_front();
376                }
377                self.buffer.push_back(sample);
378            }
379            BufferStrategy::SlidingWindow { size, time_window } => {
380                // Remove old samples based on time window
381                let current_time = sample.timestamp;
382                while let Some(front) = self.buffer.front() {
383                    if current_time - front.timestamp > *time_window {
384                        self.buffer.pop_front();
385                    } else {
386                        break;
387                    }
388                }
389
390                // Add new sample and maintain size limit
391                if self.buffer.len() >= *size {
392                    self.buffer.pop_front();
393                }
394                self.buffer.push_back(sample);
395            }
396            BufferStrategy::ReservoirSampling(capacity) => {
397                if self.buffer.len() < *capacity {
398                    self.buffer.push_back(sample);
399                } else {
400                    let replace_idx = self.rng.gen_range(0..=self.sample_count);
401                    if replace_idx < *capacity {
402                        self.buffer[replace_idx] = sample;
403                    }
404                }
405            }
406            BufferStrategy::ExponentialDecay { alpha, min_weight } => {
407                // Decay weights of existing samples
408                for existing_sample in &mut self.buffer {
409                    existing_sample.weight *= alpha;
410                }
411
412                // Remove samples below minimum weight
413                self.buffer.retain(|s| s.weight >= *min_weight);
414
415                self.buffer.push_back(sample);
416            }
417            BufferStrategy::ImportanceWeighted {
418                capacity,
419                threshold,
420            } => {
421                if self.buffer.len() < *capacity {
422                    self.buffer.push_back(sample);
423                } else {
424                    // Find sample with lowest importance
425                    if let Some((min_idx, _)) =
426                        self.buffer.iter().enumerate().min_by(|(_, a), (_, b)| {
427                            a.importance.partial_cmp(&b.importance).unwrap()
428                        })
429                    {
430                        if sample.importance > self.buffer[min_idx].importance + threshold {
431                            self.buffer[min_idx] = sample;
432                        }
433                    }
434                }
435            }
436        }
437
438        Ok(())
439    }
440
441    /// Check if model should be updated
442    fn should_update(&self) -> Result<bool> {
443        match &self.config.update_frequency {
444            UpdateFrequency::PerSample => Ok(true),
445            UpdateFrequency::BatchSize(batch_size) => {
446                Ok(self.sample_count - self.last_update >= *batch_size)
447            }
448            UpdateFrequency::TimeInterval(_time_interval) => {
449                // For simplicity, use sample count as proxy for time
450                Ok(self.sample_count - self.last_update >= 100)
451            }
452            UpdateFrequency::ErrorThreshold(_threshold) => {
453                // For simplicity, update periodically
454                Ok(self.sample_count - self.last_update >= 50)
455            }
456            UpdateFrequency::Adaptive {
457                initial,
458                max: _,
459                min: _,
460            } => Ok(self.sample_count - self.last_update >= *initial),
461        }
462    }
463
464    /// Update model parameters based on current buffer
465    fn update_model(&mut self) -> Result<()> {
466        if self.buffer.is_empty() {
467            return Ok(());
468        }
469
470        // Extract data from buffer with weights
471        let mut data_matrix = Array2::zeros((self.buffer.len(), self.buffer[0].data.len()));
472        for (i, sample) in self.buffer.iter().enumerate() {
473            data_matrix.row_mut(i).assign(&sample.data);
474        }
475
476        // Compute features and update statistics
477        let weights = self.weights.as_ref().unwrap();
478        let bias = self.bias.as_ref().unwrap();
479        let features = self.compute_features(&data_matrix, weights, bias)?;
480
481        self.feature_statistics.update(&features);
482
483        // Apply forgetting mechanism
484        self.apply_forgetting()?;
485
486        Ok(())
487    }
488
489    /// Apply forgetting mechanism to reduce influence of old data
490    fn apply_forgetting(&mut self) -> Result<()> {
491        match &self.config.forgetting_mechanism {
492            ForgettingMechanism::None => {
493                // No forgetting
494            }
495            ForgettingMechanism::LinearDecay(decay_rate) => {
496                for sample in &mut self.buffer {
497                    sample.weight *= 1.0 - decay_rate;
498                }
499            }
500            ForgettingMechanism::ExponentialDecay(decay_rate) => {
501                for sample in &mut self.buffer {
502                    sample.weight *= decay_rate;
503                }
504            }
505            ForgettingMechanism::AbruptForgetting(time_threshold) => {
506                if let Some(newest) = self.buffer.back() {
507                    let cutoff_time = newest.timestamp - time_threshold;
508                    self.buffer.retain(|s| s.timestamp >= cutoff_time);
509                }
510            }
511            ForgettingMechanism::SigmoidDecay {
512                steepness,
513                midpoint,
514            } => {
515                if let Some(newest_timestamp) = self.buffer.back().map(|s| s.timestamp) {
516                    for sample in &mut self.buffer {
517                        let age = newest_timestamp - sample.timestamp;
518                        let sigmoid_weight = 1.0 / (1.0 + (steepness * (age - midpoint)).exp());
519                        sample.weight *= sigmoid_weight;
520                    }
521                }
522            }
523        }
524
525        Ok(())
526    }
527
528    /// Generate random weights for RBF features
529    fn generate_weights(&mut self, n_features: usize) -> Result<Array2<f64>> {
530        let mut weights = Array2::zeros((self.n_components, n_features));
531
532        for i in 0..self.n_components {
533            for j in 0..n_features {
534                weights[[i, j]] =
535                    self.rng.sample::<f64, _>(StandardNormal) * (2.0 * self.gamma).sqrt();
536            }
537        }
538
539        Ok(weights)
540    }
541
542    /// Generate random bias for RBF features
543    fn generate_bias(&mut self) -> Result<Array1<f64>> {
544        let mut bias = Array1::zeros(self.n_components);
545
546        for i in 0..self.n_components {
547            bias[i] = self.rng.gen_range(0.0..2.0 * std::f64::consts::PI);
548        }
549
550        Ok(bias)
551    }
552
553    /// Compute RBF features for given data
554    fn compute_features(
555        &self,
556        x: &Array2<f64>,
557        weights: &Array2<f64>,
558        bias: &Array1<f64>,
559    ) -> Result<Array2<f64>> {
560        let (n_samples, _) = x.dim();
561        let n_components = weights.nrows();
562
563        // Compute X @ W^T + b
564        let projection = x.dot(&weights.t()) + bias;
565
566        // Apply cosine transformation with normalization
567        let mut features = Array2::zeros((n_samples, n_components));
568        let norm_factor = (2.0 / n_components as f64).sqrt();
569
570        for i in 0..n_samples {
571            for j in 0..n_components {
572                features[[i, j]] = norm_factor * projection[[i, j]].cos();
573            }
574        }
575
576        Ok(features)
577    }
578}
579
580/// Streaming Nyström method for kernel approximation
581///
582/// Maintains an online Nyström approximation that adapts to
583/// streaming data with efficient inducing point management.
584pub struct StreamingNystroem {
585    n_components: usize,
586    gamma: f64,
587    config: StreamingConfig,
588    inducing_points: Option<Array2<f64>>,
589    eigenvalues: Option<Array1<f64>>,
590    eigenvectors: Option<Array2<f64>>,
591    buffer: VecDeque<StreamingSample>,
592    sample_count: usize,
593    last_update: usize,
594    random_state: Option<u64>,
595    rng: StdRng,
596}
597
598impl StreamingNystroem {
599    /// Create a new streaming Nyström approximation
600    pub fn new(n_components: usize, gamma: f64) -> Self {
601        let rng = StdRng::from_seed(thread_rng().gen());
602        Self {
603            n_components,
604            gamma,
605            config: StreamingConfig::default(),
606            inducing_points: None,
607            eigenvalues: None,
608            eigenvectors: None,
609            buffer: VecDeque::new(),
610            sample_count: 0,
611            last_update: 0,
612            random_state: None,
613            rng,
614        }
615    }
616
617    /// Set the streaming configuration
618    pub fn with_config(mut self, config: StreamingConfig) -> Self {
619        self.config = config;
620        self
621    }
622
623    /// Set random state for reproducibility
624    pub fn with_random_state(mut self, random_state: u64) -> Self {
625        self.random_state = Some(random_state);
626        self.rng = StdRng::seed_from_u64(random_state);
627        self
628    }
629
630    /// Initialize with initial data
631    pub fn fit(&mut self, x: &Array2<f64>) -> Result<()> {
632        // Select initial inducing points
633        let inducing_indices = self.select_inducing_points(x)?;
634        let inducing_points = x.select(Axis(0), &inducing_indices);
635
636        // Compute initial eigendecomposition
637        let kernel_matrix = self.compute_kernel_matrix(&inducing_points)?;
638        let (eigenvalues, eigenvectors) = self.eigendecomposition(&kernel_matrix)?;
639
640        self.inducing_points = Some(inducing_points);
641        self.eigenvalues = Some(eigenvalues);
642        self.eigenvectors = Some(eigenvectors);
643
644        // Add samples to buffer
645        for (i, row) in x.rows().into_iter().enumerate() {
646            let sample = StreamingSample::new(row.to_owned(), i as f64);
647            self.buffer.push_back(sample);
648        }
649
650        self.sample_count = x.nrows();
651
652        Ok(())
653    }
654
655    /// Add a new sample to the stream
656    pub fn add_sample(&mut self, sample: StreamingSample) -> Result<()> {
657        self.buffer.push_back(sample);
658        self.sample_count += 1;
659
660        // Manage buffer size
661        match &self.config.buffer_strategy {
662            BufferStrategy::FixedSize(max_size) => {
663                if self.buffer.len() > *max_size {
664                    self.buffer.pop_front();
665                }
666            }
667            _ => {
668                // Implement other buffer strategies as needed
669            }
670        }
671
672        // Check if update is needed
673        if self.should_update()? {
674            self.update_model()?;
675            self.last_update = self.sample_count;
676        }
677
678        Ok(())
679    }
680
681    /// Transform data using current approximation
682    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
683        let inducing_points =
684            self.inducing_points
685                .as_ref()
686                .ok_or_else(|| SklearsError::NotFitted {
687                    operation: "transform".to_string(),
688                })?;
689        let eigenvalues = self
690            .eigenvalues
691            .as_ref()
692            .ok_or_else(|| SklearsError::NotFitted {
693                operation: "transform".to_string(),
694            })?;
695        let eigenvectors = self
696            .eigenvectors
697            .as_ref()
698            .ok_or_else(|| SklearsError::NotFitted {
699                operation: "transform".to_string(),
700            })?;
701
702        // Compute kernel between x and inducing points
703        let kernel_x_inducing = self.compute_kernel(x, inducing_points)?;
704
705        // Apply Nyström transformation
706        let mut features = kernel_x_inducing.dot(eigenvectors);
707
708        // Scale by eigenvalues
709        for i in 0..eigenvalues.len() {
710            if eigenvalues[i] > 1e-12 {
711                let scale = 1.0 / eigenvalues[i].sqrt();
712                for j in 0..features.nrows() {
713                    features[[j, i]] *= scale;
714                }
715            }
716        }
717
718        Ok(features)
719    }
720
721    /// Check if model should be updated
722    fn should_update(&self) -> Result<bool> {
723        // Simple heuristic: update every 100 samples
724        Ok(self.sample_count - self.last_update >= 100)
725    }
726
727    /// Update inducing points and eigendecomposition
728    fn update_model(&mut self) -> Result<()> {
729        if self.buffer.is_empty() {
730            return Ok(());
731        }
732
733        // Extract current data from buffer
734        let n_samples = self.buffer.len();
735        let n_features = self.buffer[0].data.len();
736        let mut data_matrix = Array2::zeros((n_samples, n_features));
737
738        for (i, sample) in self.buffer.iter().enumerate() {
739            data_matrix.row_mut(i).assign(&sample.data);
740        }
741
742        // Reselect inducing points
743        let inducing_indices = self.select_inducing_points(&data_matrix)?;
744        let inducing_points = data_matrix.select(Axis(0), &inducing_indices);
745
746        // Recompute eigendecomposition
747        let kernel_matrix = self.compute_kernel_matrix(&inducing_points)?;
748        let (eigenvalues, eigenvectors) = self.eigendecomposition(&kernel_matrix)?;
749
750        self.inducing_points = Some(inducing_points);
751        self.eigenvalues = Some(eigenvalues);
752        self.eigenvectors = Some(eigenvectors);
753
754        Ok(())
755    }
756
757    /// Select inducing points from current data
758    fn select_inducing_points(&mut self, x: &Array2<f64>) -> Result<Vec<usize>> {
759        let n_samples = x.nrows();
760        let n_inducing = self.n_components.min(n_samples);
761
762        let mut indices = Vec::new();
763        for _ in 0..n_inducing {
764            indices.push(self.rng.gen_range(0..n_samples));
765        }
766
767        Ok(indices)
768    }
769
770    /// Compute kernel matrix
771    fn compute_kernel_matrix(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
772        let n_samples = x.nrows();
773        let mut kernel_matrix = Array2::zeros((n_samples, n_samples));
774
775        for i in 0..n_samples {
776            for j in i..n_samples {
777                let diff = &x.row(i) - &x.row(j);
778                let squared_dist = diff.mapv(|x| x * x).sum();
779                let kernel_val = (-self.gamma * squared_dist).exp();
780                kernel_matrix[[i, j]] = kernel_val;
781                kernel_matrix[[j, i]] = kernel_val;
782            }
783        }
784
785        Ok(kernel_matrix)
786    }
787
788    /// Compute kernel between two matrices
789    fn compute_kernel(&self, x: &Array2<f64>, y: &Array2<f64>) -> Result<Array2<f64>> {
790        let (n_samples_x, _) = x.dim();
791        let (n_samples_y, _) = y.dim();
792        let mut kernel_matrix = Array2::zeros((n_samples_x, n_samples_y));
793
794        for i in 0..n_samples_x {
795            for j in 0..n_samples_y {
796                let diff = &x.row(i) - &y.row(j);
797                let squared_dist = diff.mapv(|x| x * x).sum();
798                let kernel_val = (-self.gamma * squared_dist).exp();
799                kernel_matrix[[i, j]] = kernel_val;
800            }
801        }
802
803        Ok(kernel_matrix)
804    }
805
806    /// Perform eigendecomposition (simplified)
807    fn eigendecomposition(&self, matrix: &Array2<f64>) -> Result<(Array1<f64>, Array2<f64>)> {
808        let n = matrix.nrows();
809        let eigenvalues = Array1::ones(self.n_components.min(n));
810        let eigenvectors = Array2::eye(n)
811            .slice(s![.., ..self.n_components.min(n)])
812            .to_owned();
813
814        Ok((eigenvalues, eigenvectors))
815    }
816}
817
818#[allow(non_snake_case)]
819#[cfg(test)]
820mod tests {
821    use super::*;
822    use scirs2_core::ndarray::array;
823
824    #[test]
825    fn test_streaming_rbf_sampler_basic() {
826        let x = array![[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]];
827
828        let mut sampler = StreamingRBFSampler::new(50, 0.1).with_random_state(42);
829
830        sampler.fit(&x).unwrap();
831        let features = sampler.transform(&x).unwrap();
832
833        assert_eq!(features.nrows(), 4);
834        assert_eq!(features.ncols(), 50);
835    }
836
837    #[test]
838    fn test_streaming_sample() {
839        let data = array![1.0, 2.0, 3.0];
840        let sample = StreamingSample::new(data.clone(), 1.0)
841            .with_weight(0.8)
842            .with_importance(0.9)
843            .with_label(1.0);
844
845        assert_eq!(sample.data, data);
846        assert_eq!(sample.timestamp, 1.0);
847        assert_eq!(sample.weight, 0.8);
848        assert_eq!(sample.importance, 0.9);
849        assert_eq!(sample.label, Some(1.0));
850    }
851
852    #[test]
853    fn test_buffer_strategies() {
854        let mut sampler = StreamingRBFSampler::new(10, 0.1).with_config(StreamingConfig {
855            buffer_strategy: BufferStrategy::FixedSize(3),
856            ..Default::default()
857        });
858
859        // Add samples beyond buffer capacity
860        for i in 0..5 {
861            let data = array![i as f64, (i + 1) as f64];
862            let sample = StreamingSample::new(data, i as f64);
863            sampler.add_sample(sample).unwrap();
864        }
865
866        let (size, _, _) = sampler.buffer_stats();
867        assert_eq!(size, 3); // Buffer should be limited to 3
868    }
869
870    #[test]
871    fn test_streaming_nystroem_basic() {
872        let x = array![[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]];
873
874        let mut nystroem = StreamingNystroem::new(3, 0.1).with_random_state(42);
875
876        nystroem.fit(&x).unwrap();
877        let features = nystroem.transform(&x).unwrap();
878
879        assert_eq!(features.nrows(), 4);
880        assert_eq!(features.ncols(), 3);
881    }
882
883    #[test]
884    fn test_feature_statistics() {
885        let mut stats = FeatureStatistics::new(3);
886        let features = array![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]];
887
888        stats.update(&features);
889
890        assert_eq!(stats.update_count, 2);
891        assert!((stats.mean[0] - 2.5).abs() < 1e-10);
892        assert!((stats.mean[1] - 3.5).abs() < 1e-10);
893        assert!((stats.mean[2] - 4.5).abs() < 1e-10);
894    }
895
896    #[test]
897    fn test_transform_sample() {
898        let x = array![[1.0, 2.0], [3.0, 4.0]];
899        let mut sampler = StreamingRBFSampler::new(10, 0.1).with_random_state(42);
900
901        sampler.fit(&x).unwrap();
902
903        let sample = array![5.0, 6.0];
904        let features = sampler.transform_sample(&sample).unwrap();
905
906        assert_eq!(features.len(), 10);
907    }
908
909    #[test]
910    fn test_streaming_config() {
911        let config = StreamingConfig {
912            buffer_strategy: BufferStrategy::SlidingWindow {
913                size: 100,
914                time_window: 10.0,
915            },
916            update_frequency: UpdateFrequency::BatchSize(50),
917            forgetting_mechanism: ForgettingMechanism::LinearDecay(0.01),
918            adaptive_components: true,
919            ..Default::default()
920        };
921
922        assert!(matches!(
923            config.buffer_strategy,
924            BufferStrategy::SlidingWindow { .. }
925        ));
926        assert!(matches!(
927            config.update_frequency,
928            UpdateFrequency::BatchSize(50)
929        ));
930        assert!(config.adaptive_components);
931    }
932
933    #[test]
934    fn test_online_updates() {
935        let mut sampler = StreamingRBFSampler::new(20, 0.1)
936            .with_config(StreamingConfig {
937                update_frequency: UpdateFrequency::BatchSize(2),
938                ..Default::default()
939            })
940            .with_random_state(42);
941
942        // Initialize with small batch
943        let x_init = array![[1.0, 2.0], [3.0, 4.0]];
944        sampler.fit(&x_init).unwrap();
945
946        // Add samples one by one
947        for i in 5..10 {
948            let data = array![i as f64, (i + 1) as f64];
949            let sample = StreamingSample::new(data, i as f64);
950            sampler.add_sample(sample).unwrap();
951        }
952
953        let (buffer_size, _, _) = sampler.buffer_stats();
954        assert!(buffer_size > 0);
955    }
956
957    #[test]
958    fn test_drift_detection() {
959        let x1 = array![[1.0, 2.0], [1.1, 2.1], [0.9, 1.9]];
960        let x2 = array![[5.0, 6.0], [5.1, 6.1], [4.9, 5.9]]; // Different distribution
961
962        let mut sampler = StreamingRBFSampler::new(20, 0.1)
963            .with_config(StreamingConfig {
964                drift_detection: true,
965                ..Default::default()
966            })
967            .with_random_state(42);
968
969        sampler.fit(&x1).unwrap();
970        let drift_detected = sampler.detect_drift(&x2).unwrap();
971
972        // Should detect some drift in feature statistics
973        assert!(sampler.feature_stats().drift_score >= 0.0);
974    }
975}