sklears_kernel_approximation/
streaming_kernel.rs

1use scirs2_core::ndarray::{s, Array1, Array2, Axis};
2use scirs2_core::random::rngs::StdRng;
3use scirs2_core::random::{thread_rng, Rng, SeedableRng};
4use scirs2_core::StandardNormal;
5use sklears_core::error::{Result, SklearsError};
6use std::collections::VecDeque;
7
8/// Streaming kernel approximation methods for online processing
9///
10/// This module provides online learning capabilities for kernel
11/// approximations, enabling processing of data streams where samples
12/// arrive continuously and memory is limited.
13///
14/// Buffer management strategy for streaming data
15#[derive(Debug, Clone)]
16pub enum BufferStrategy {
17    /// Fixed-size buffer with FIFO replacement
18    FixedSize(usize),
19    /// Sliding window with time-based expiration
20    SlidingWindow { size: usize, time_window: f64 },
21    /// Reservoir sampling for representative subset
22    ReservoirSampling(usize),
23    /// Exponential decay weighting
24    ExponentialDecay { alpha: f64, min_weight: f64 },
25    /// Importance-weighted sampling
26    ImportanceWeighted { capacity: usize, threshold: f64 },
27}
28
29/// Update frequency for model parameters
30#[derive(Debug, Clone)]
31pub enum UpdateFrequency {
32    /// Update after every sample
33    PerSample,
34    /// Update after every N samples
35    BatchSize(usize),
36    /// Update based on time intervals
37    TimeInterval(f64),
38    /// Update when error exceeds threshold
39    ErrorThreshold(f64),
40    /// Adaptive update frequency
41    Adaptive {
42        initial: usize,
43        max: usize,
44        min: usize,
45    },
46}
47
48/// Forgetting mechanism for old data
49#[derive(Debug, Clone)]
50pub enum ForgettingMechanism {
51    /// No forgetting - keep all data
52    None,
53    /// Linear decay of old samples
54    LinearDecay(f64),
55    /// Exponential decay of old samples
56    ExponentialDecay(f64),
57    /// Abrupt forgetting after time window
58    AbruptForgetting(f64),
59    /// Gradual forgetting with sigmoid function
60    SigmoidDecay { steepness: f64, midpoint: f64 },
61}
62
63/// Configuration for streaming kernel approximation
64#[derive(Debug, Clone)]
65pub struct StreamingConfig {
66    /// buffer_strategy
67    pub buffer_strategy: BufferStrategy,
68    /// update_frequency
69    pub update_frequency: UpdateFrequency,
70    /// forgetting_mechanism
71    pub forgetting_mechanism: ForgettingMechanism,
72    /// max_memory_mb
73    pub max_memory_mb: Option<usize>,
74    /// adaptive_components
75    pub adaptive_components: bool,
76    /// quality_monitoring
77    pub quality_monitoring: bool,
78    /// drift_detection
79    pub drift_detection: bool,
80    /// concept_drift_threshold
81    pub concept_drift_threshold: f64,
82}
83
84impl Default for StreamingConfig {
85    fn default() -> Self {
86        Self {
87            buffer_strategy: BufferStrategy::FixedSize(1000),
88            update_frequency: UpdateFrequency::BatchSize(100),
89            forgetting_mechanism: ForgettingMechanism::ExponentialDecay(0.99),
90            max_memory_mb: Some(100),
91            adaptive_components: true,
92            quality_monitoring: true,
93            drift_detection: false,
94            concept_drift_threshold: 0.1,
95        }
96    }
97}
98
99/// Sample with metadata for streaming processing
100#[derive(Debug, Clone)]
101pub struct StreamingSample {
102    /// data
103    pub data: Array1<f64>,
104    /// timestamp
105    pub timestamp: f64,
106    /// weight
107    pub weight: f64,
108    /// importance
109    pub importance: f64,
110    /// label
111    pub label: Option<f64>,
112}
113
114impl StreamingSample {
115    pub fn new(data: Array1<f64>, timestamp: f64) -> Self {
116        Self {
117            data,
118            timestamp,
119            weight: 1.0,
120            importance: 1.0,
121            label: None,
122        }
123    }
124
125    pub fn with_weight(mut self, weight: f64) -> Self {
126        self.weight = weight;
127        self
128    }
129
130    pub fn with_importance(mut self, importance: f64) -> Self {
131        self.importance = importance;
132        self
133    }
134
135    pub fn with_label(mut self, label: f64) -> Self {
136        self.label = Some(label);
137        self
138    }
139}
140
141/// Streaming RBF kernel approximation using Random Fourier Features
142///
143/// Maintains an online approximation of RBF kernel features that
144/// adapts to data streams with concept drift and memory constraints.
145pub struct StreamingRBFSampler {
146    n_components: usize,
147    gamma: f64,
148    config: StreamingConfig,
149    weights: Option<Array2<f64>>,
150    bias: Option<Array1<f64>>,
151    buffer: VecDeque<StreamingSample>,
152    sample_count: usize,
153    last_update: usize,
154    feature_statistics: FeatureStatistics,
155    random_state: Option<u64>,
156    rng: StdRng,
157}
158
159/// Statistics for monitoring feature quality
160#[derive(Debug, Clone)]
161pub struct FeatureStatistics {
162    /// mean
163    pub mean: Array1<f64>,
164    /// variance
165    pub variance: Array1<f64>,
166    /// min
167    pub min: Array1<f64>,
168    /// max
169    pub max: Array1<f64>,
170    /// update_count
171    pub update_count: usize,
172    /// approximation_error
173    pub approximation_error: f64,
174    /// drift_score
175    pub drift_score: f64,
176}
177
178impl FeatureStatistics {
179    pub fn new(n_components: usize) -> Self {
180        Self {
181            mean: Array1::zeros(n_components),
182            variance: Array1::zeros(n_components),
183            min: Array1::from_elem(n_components, f64::INFINITY),
184            max: Array1::from_elem(n_components, f64::NEG_INFINITY),
185            update_count: 0,
186            approximation_error: 0.0,
187            drift_score: 0.0,
188        }
189    }
190
191    pub fn update(&mut self, features: &Array2<f64>) {
192        let n_samples = features.nrows();
193
194        for i in 0..features.ncols() {
195            let col = features.column(i);
196            let new_mean = col.mean().unwrap_or(0.0);
197            let new_var = col.mapv(|x| (x - new_mean).powi(2)).mean().unwrap_or(0.0);
198            let new_min = col.iter().fold(f64::INFINITY, |a, &b| a.min(b));
199            let new_max = col.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
200
201            // Online update of statistics
202            let old_count = self.update_count;
203            let new_count = old_count + n_samples;
204
205            if old_count == 0 {
206                self.mean[i] = new_mean;
207                self.variance[i] = new_var;
208            } else {
209                let alpha = n_samples as f64 / new_count as f64;
210                self.mean[i] = (1.0 - alpha) * self.mean[i] + alpha * new_mean;
211                self.variance[i] = (1.0 - alpha) * self.variance[i] + alpha * new_var;
212            }
213
214            self.min[i] = self.min[i].min(new_min);
215            self.max[i] = self.max[i].max(new_max);
216        }
217
218        self.update_count += n_samples;
219    }
220
221    pub fn detect_drift(&mut self, new_features: &Array2<f64>) -> bool {
222        let old_mean = self.mean.clone();
223        self.update(new_features);
224
225        // Simple drift detection based on mean shift
226        let mean_shift = (&self.mean - &old_mean).mapv(f64::abs).sum();
227        self.drift_score = mean_shift / self.mean.len() as f64;
228
229        self.drift_score > 0.1 // Simple threshold
230    }
231}
232
233impl StreamingRBFSampler {
234    /// Create a new streaming RBF sampler
235    pub fn new(n_components: usize, gamma: f64) -> Self {
236        let rng = StdRng::from_seed(thread_rng().gen());
237        Self {
238            n_components,
239            gamma,
240            config: StreamingConfig::default(),
241            weights: None,
242            bias: None,
243            buffer: VecDeque::new(),
244            sample_count: 0,
245            last_update: 0,
246            feature_statistics: FeatureStatistics::new(n_components),
247            random_state: None,
248            rng,
249        }
250    }
251
252    /// Set the streaming configuration
253    pub fn with_config(mut self, config: StreamingConfig) -> Self {
254        self.config = config;
255        self
256    }
257
258    /// Set random state for reproducibility
259    pub fn with_random_state(mut self, random_state: u64) -> Self {
260        self.random_state = Some(random_state);
261        self.rng = StdRng::seed_from_u64(random_state);
262        self
263    }
264
265    /// Initialize the streaming sampler with initial data
266    pub fn fit(&mut self, x: &Array2<f64>) -> Result<()> {
267        let (_, n_features) = x.dim();
268
269        // Initialize random weights and bias
270        self.weights = Some(self.generate_weights(n_features)?);
271        self.bias = Some(self.generate_bias()?);
272
273        // Process initial batch
274        for (i, row) in x.rows().into_iter().enumerate() {
275            let sample = StreamingSample::new(row.to_owned(), i as f64);
276            self.add_sample(sample)?;
277        }
278
279        Ok(())
280    }
281
282    /// Add a new sample to the stream
283    pub fn add_sample(&mut self, sample: StreamingSample) -> Result<()> {
284        // Check if initialization is needed
285        if self.weights.is_none() {
286            let n_features = sample.data.len();
287            self.weights = Some(self.generate_weights(n_features)?);
288            self.bias = Some(self.generate_bias()?);
289        }
290
291        // Add to buffer based on strategy
292        self.manage_buffer(sample)?;
293
294        self.sample_count += 1;
295
296        // Check if update is needed
297        if self.should_update()? {
298            self.update_model()?;
299            self.last_update = self.sample_count;
300        }
301
302        Ok(())
303    }
304
305    /// Transform data using current model
306    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
307        let weights = self
308            .weights
309            .as_ref()
310            .ok_or_else(|| SklearsError::NotFitted {
311                operation: "transform".to_string(),
312            })?;
313        let bias = self.bias.as_ref().ok_or_else(|| SklearsError::NotFitted {
314            operation: "transform".to_string(),
315        })?;
316
317        self.compute_features(x, weights, bias)
318    }
319
320    /// Transform a single sample
321    pub fn transform_sample(&self, sample: &Array1<f64>) -> Result<Array1<f64>> {
322        let weights = self
323            .weights
324            .as_ref()
325            .ok_or_else(|| SklearsError::NotFitted {
326                operation: "transform_sample".to_string(),
327            })?;
328        let bias = self.bias.as_ref().ok_or_else(|| SklearsError::NotFitted {
329            operation: "transform_sample".to_string(),
330        })?;
331
332        // Compute features for single sample
333        let projection = sample.dot(&weights.t()) + bias;
334        let norm_factor = (2.0 / self.n_components as f64).sqrt();
335
336        Ok(projection.mapv(|x| norm_factor * x.cos()))
337    }
338
339    /// Get current buffer statistics
340    pub fn buffer_stats(&self) -> (usize, f64, f64) {
341        let size = self.buffer.len();
342        let avg_weight = if size > 0 {
343            self.buffer.iter().map(|s| s.weight).sum::<f64>() / size as f64
344        } else {
345            0.0
346        };
347        let avg_importance = if size > 0 {
348            self.buffer.iter().map(|s| s.importance).sum::<f64>() / size as f64
349        } else {
350            0.0
351        };
352
353        (size, avg_weight, avg_importance)
354    }
355
356    /// Get feature statistics
357    pub fn feature_stats(&self) -> &FeatureStatistics {
358        &self.feature_statistics
359    }
360
361    /// Check for concept drift
362    pub fn detect_drift(&mut self, x: &Array2<f64>) -> Result<bool> {
363        if !self.config.drift_detection {
364            return Ok(false);
365        }
366
367        let features = self.transform(x)?;
368        Ok(self.feature_statistics.detect_drift(&features))
369    }
370
371    /// Manage buffer based on strategy
372    fn manage_buffer(&mut self, sample: StreamingSample) -> Result<()> {
373        match &self.config.buffer_strategy {
374            BufferStrategy::FixedSize(max_size) => {
375                if self.buffer.len() >= *max_size {
376                    self.buffer.pop_front();
377                }
378                self.buffer.push_back(sample);
379            }
380            BufferStrategy::SlidingWindow { size, time_window } => {
381                // Remove old samples based on time window
382                let current_time = sample.timestamp;
383                while let Some(front) = self.buffer.front() {
384                    if current_time - front.timestamp > *time_window {
385                        self.buffer.pop_front();
386                    } else {
387                        break;
388                    }
389                }
390
391                // Add new sample and maintain size limit
392                if self.buffer.len() >= *size {
393                    self.buffer.pop_front();
394                }
395                self.buffer.push_back(sample);
396            }
397            BufferStrategy::ReservoirSampling(capacity) => {
398                if self.buffer.len() < *capacity {
399                    self.buffer.push_back(sample);
400                } else {
401                    let replace_idx = self.rng.gen_range(0..self.sample_count + 1);
402                    if replace_idx < *capacity {
403                        self.buffer[replace_idx] = sample;
404                    }
405                }
406            }
407            BufferStrategy::ExponentialDecay { alpha, min_weight } => {
408                // Decay weights of existing samples
409                for existing_sample in &mut self.buffer {
410                    existing_sample.weight *= alpha;
411                }
412
413                // Remove samples below minimum weight
414                self.buffer.retain(|s| s.weight >= *min_weight);
415
416                self.buffer.push_back(sample);
417            }
418            BufferStrategy::ImportanceWeighted {
419                capacity,
420                threshold,
421            } => {
422                if self.buffer.len() < *capacity {
423                    self.buffer.push_back(sample);
424                } else {
425                    // Find sample with lowest importance
426                    if let Some((min_idx, _)) =
427                        self.buffer.iter().enumerate().min_by(|(_, a), (_, b)| {
428                            a.importance.partial_cmp(&b.importance).unwrap()
429                        })
430                    {
431                        if sample.importance > self.buffer[min_idx].importance + threshold {
432                            self.buffer[min_idx] = sample;
433                        }
434                    }
435                }
436            }
437        }
438
439        Ok(())
440    }
441
442    /// Check if model should be updated
443    fn should_update(&self) -> Result<bool> {
444        match &self.config.update_frequency {
445            UpdateFrequency::PerSample => Ok(true),
446            UpdateFrequency::BatchSize(batch_size) => {
447                Ok(self.sample_count - self.last_update >= *batch_size)
448            }
449            UpdateFrequency::TimeInterval(_time_interval) => {
450                // For simplicity, use sample count as proxy for time
451                Ok(self.sample_count - self.last_update >= 100)
452            }
453            UpdateFrequency::ErrorThreshold(_threshold) => {
454                // For simplicity, update periodically
455                Ok(self.sample_count - self.last_update >= 50)
456            }
457            UpdateFrequency::Adaptive {
458                initial,
459                max: _,
460                min: _,
461            } => Ok(self.sample_count - self.last_update >= *initial),
462        }
463    }
464
465    /// Update model parameters based on current buffer
466    fn update_model(&mut self) -> Result<()> {
467        if self.buffer.is_empty() {
468            return Ok(());
469        }
470
471        // Extract data from buffer with weights
472        let mut data_matrix = Array2::zeros((self.buffer.len(), self.buffer[0].data.len()));
473        for (i, sample) in self.buffer.iter().enumerate() {
474            data_matrix.row_mut(i).assign(&sample.data);
475        }
476
477        // Compute features and update statistics
478        let weights = self.weights.as_ref().unwrap();
479        let bias = self.bias.as_ref().unwrap();
480        let features = self.compute_features(&data_matrix, weights, bias)?;
481
482        self.feature_statistics.update(&features);
483
484        // Apply forgetting mechanism
485        self.apply_forgetting()?;
486
487        Ok(())
488    }
489
490    /// Apply forgetting mechanism to reduce influence of old data
491    fn apply_forgetting(&mut self) -> Result<()> {
492        match &self.config.forgetting_mechanism {
493            ForgettingMechanism::None => {
494                // No forgetting
495            }
496            ForgettingMechanism::LinearDecay(decay_rate) => {
497                for sample in &mut self.buffer {
498                    sample.weight *= 1.0 - decay_rate;
499                }
500            }
501            ForgettingMechanism::ExponentialDecay(decay_rate) => {
502                for sample in &mut self.buffer {
503                    sample.weight *= decay_rate;
504                }
505            }
506            ForgettingMechanism::AbruptForgetting(time_threshold) => {
507                if let Some(newest) = self.buffer.back() {
508                    let cutoff_time = newest.timestamp - time_threshold;
509                    self.buffer.retain(|s| s.timestamp >= cutoff_time);
510                }
511            }
512            ForgettingMechanism::SigmoidDecay {
513                steepness,
514                midpoint,
515            } => {
516                if let Some(newest_timestamp) = self.buffer.back().map(|s| s.timestamp) {
517                    for sample in &mut self.buffer {
518                        let age = newest_timestamp - sample.timestamp;
519                        let sigmoid_weight = 1.0 / (1.0 + (steepness * (age - midpoint)).exp());
520                        sample.weight *= sigmoid_weight;
521                    }
522                }
523            }
524        }
525
526        Ok(())
527    }
528
529    /// Generate random weights for RBF features
530    fn generate_weights(&mut self, n_features: usize) -> Result<Array2<f64>> {
531        let mut weights = Array2::zeros((self.n_components, n_features));
532
533        for i in 0..self.n_components {
534            for j in 0..n_features {
535                weights[[i, j]] =
536                    self.rng.sample::<f64, _>(StandardNormal) * (2.0 * self.gamma).sqrt();
537            }
538        }
539
540        Ok(weights)
541    }
542
543    /// Generate random bias for RBF features
544    fn generate_bias(&mut self) -> Result<Array1<f64>> {
545        let mut bias = Array1::zeros(self.n_components);
546
547        for i in 0..self.n_components {
548            bias[i] = self.rng.gen_range(0.0..2.0 * std::f64::consts::PI);
549        }
550
551        Ok(bias)
552    }
553
554    /// Compute RBF features for given data
555    fn compute_features(
556        &self,
557        x: &Array2<f64>,
558        weights: &Array2<f64>,
559        bias: &Array1<f64>,
560    ) -> Result<Array2<f64>> {
561        let (n_samples, _) = x.dim();
562        let n_components = weights.nrows();
563
564        // Compute X @ W^T + b
565        let projection = x.dot(&weights.t()) + bias;
566
567        // Apply cosine transformation with normalization
568        let mut features = Array2::zeros((n_samples, n_components));
569        let norm_factor = (2.0 / n_components as f64).sqrt();
570
571        for i in 0..n_samples {
572            for j in 0..n_components {
573                features[[i, j]] = norm_factor * projection[[i, j]].cos();
574            }
575        }
576
577        Ok(features)
578    }
579}
580
581/// Streaming Nyström method for kernel approximation
582///
583/// Maintains an online Nyström approximation that adapts to
584/// streaming data with efficient inducing point management.
585pub struct StreamingNystroem {
586    n_components: usize,
587    gamma: f64,
588    config: StreamingConfig,
589    inducing_points: Option<Array2<f64>>,
590    eigenvalues: Option<Array1<f64>>,
591    eigenvectors: Option<Array2<f64>>,
592    buffer: VecDeque<StreamingSample>,
593    sample_count: usize,
594    last_update: usize,
595    random_state: Option<u64>,
596    rng: StdRng,
597}
598
599impl StreamingNystroem {
600    /// Create a new streaming Nyström approximation
601    pub fn new(n_components: usize, gamma: f64) -> Self {
602        let rng = StdRng::from_seed(thread_rng().gen());
603        Self {
604            n_components,
605            gamma,
606            config: StreamingConfig::default(),
607            inducing_points: None,
608            eigenvalues: None,
609            eigenvectors: None,
610            buffer: VecDeque::new(),
611            sample_count: 0,
612            last_update: 0,
613            random_state: None,
614            rng,
615        }
616    }
617
618    /// Set the streaming configuration
619    pub fn with_config(mut self, config: StreamingConfig) -> Self {
620        self.config = config;
621        self
622    }
623
624    /// Set random state for reproducibility
625    pub fn with_random_state(mut self, random_state: u64) -> Self {
626        self.random_state = Some(random_state);
627        self.rng = StdRng::seed_from_u64(random_state);
628        self
629    }
630
631    /// Initialize with initial data
632    pub fn fit(&mut self, x: &Array2<f64>) -> Result<()> {
633        // Select initial inducing points
634        let inducing_indices = self.select_inducing_points(x)?;
635        let inducing_points = x.select(Axis(0), &inducing_indices);
636
637        // Compute initial eigendecomposition
638        let kernel_matrix = self.compute_kernel_matrix(&inducing_points)?;
639        let (eigenvalues, eigenvectors) = self.eigendecomposition(&kernel_matrix)?;
640
641        self.inducing_points = Some(inducing_points);
642        self.eigenvalues = Some(eigenvalues);
643        self.eigenvectors = Some(eigenvectors);
644
645        // Add samples to buffer
646        for (i, row) in x.rows().into_iter().enumerate() {
647            let sample = StreamingSample::new(row.to_owned(), i as f64);
648            self.buffer.push_back(sample);
649        }
650
651        self.sample_count = x.nrows();
652
653        Ok(())
654    }
655
656    /// Add a new sample to the stream
657    pub fn add_sample(&mut self, sample: StreamingSample) -> Result<()> {
658        self.buffer.push_back(sample);
659        self.sample_count += 1;
660
661        // Manage buffer size
662        match &self.config.buffer_strategy {
663            BufferStrategy::FixedSize(max_size) => {
664                if self.buffer.len() > *max_size {
665                    self.buffer.pop_front();
666                }
667            }
668            _ => {
669                // Implement other buffer strategies as needed
670            }
671        }
672
673        // Check if update is needed
674        if self.should_update()? {
675            self.update_model()?;
676            self.last_update = self.sample_count;
677        }
678
679        Ok(())
680    }
681
682    /// Transform data using current approximation
683    pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
684        let inducing_points =
685            self.inducing_points
686                .as_ref()
687                .ok_or_else(|| SklearsError::NotFitted {
688                    operation: "transform".to_string(),
689                })?;
690        let eigenvalues = self
691            .eigenvalues
692            .as_ref()
693            .ok_or_else(|| SklearsError::NotFitted {
694                operation: "transform".to_string(),
695            })?;
696        let eigenvectors = self
697            .eigenvectors
698            .as_ref()
699            .ok_or_else(|| SklearsError::NotFitted {
700                operation: "transform".to_string(),
701            })?;
702
703        // Compute kernel between x and inducing points
704        let kernel_x_inducing = self.compute_kernel(x, inducing_points)?;
705
706        // Apply Nyström transformation
707        let mut features = kernel_x_inducing.dot(eigenvectors);
708
709        // Scale by eigenvalues
710        for i in 0..eigenvalues.len() {
711            if eigenvalues[i] > 1e-12 {
712                let scale = 1.0 / eigenvalues[i].sqrt();
713                for j in 0..features.nrows() {
714                    features[[j, i]] *= scale;
715                }
716            }
717        }
718
719        Ok(features)
720    }
721
722    /// Check if model should be updated
723    fn should_update(&self) -> Result<bool> {
724        // Simple heuristic: update every 100 samples
725        Ok(self.sample_count - self.last_update >= 100)
726    }
727
728    /// Update inducing points and eigendecomposition
729    fn update_model(&mut self) -> Result<()> {
730        if self.buffer.is_empty() {
731            return Ok(());
732        }
733
734        // Extract current data from buffer
735        let n_samples = self.buffer.len();
736        let n_features = self.buffer[0].data.len();
737        let mut data_matrix = Array2::zeros((n_samples, n_features));
738
739        for (i, sample) in self.buffer.iter().enumerate() {
740            data_matrix.row_mut(i).assign(&sample.data);
741        }
742
743        // Reselect inducing points
744        let inducing_indices = self.select_inducing_points(&data_matrix)?;
745        let inducing_points = data_matrix.select(Axis(0), &inducing_indices);
746
747        // Recompute eigendecomposition
748        let kernel_matrix = self.compute_kernel_matrix(&inducing_points)?;
749        let (eigenvalues, eigenvectors) = self.eigendecomposition(&kernel_matrix)?;
750
751        self.inducing_points = Some(inducing_points);
752        self.eigenvalues = Some(eigenvalues);
753        self.eigenvectors = Some(eigenvectors);
754
755        Ok(())
756    }
757
758    /// Select inducing points from current data
759    fn select_inducing_points(&mut self, x: &Array2<f64>) -> Result<Vec<usize>> {
760        let n_samples = x.nrows();
761        let n_inducing = self.n_components.min(n_samples);
762
763        let mut indices = Vec::new();
764        for _ in 0..n_inducing {
765            indices.push(self.rng.gen_range(0..n_samples));
766        }
767
768        Ok(indices)
769    }
770
771    /// Compute kernel matrix
772    fn compute_kernel_matrix(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
773        let n_samples = x.nrows();
774        let mut kernel_matrix = Array2::zeros((n_samples, n_samples));
775
776        for i in 0..n_samples {
777            for j in i..n_samples {
778                let diff = &x.row(i) - &x.row(j);
779                let squared_dist = diff.mapv(|x| x * x).sum();
780                let kernel_val = (-self.gamma * squared_dist).exp();
781                kernel_matrix[[i, j]] = kernel_val;
782                kernel_matrix[[j, i]] = kernel_val;
783            }
784        }
785
786        Ok(kernel_matrix)
787    }
788
789    /// Compute kernel between two matrices
790    fn compute_kernel(&self, x: &Array2<f64>, y: &Array2<f64>) -> Result<Array2<f64>> {
791        let (n_samples_x, _) = x.dim();
792        let (n_samples_y, _) = y.dim();
793        let mut kernel_matrix = Array2::zeros((n_samples_x, n_samples_y));
794
795        for i in 0..n_samples_x {
796            for j in 0..n_samples_y {
797                let diff = &x.row(i) - &y.row(j);
798                let squared_dist = diff.mapv(|x| x * x).sum();
799                let kernel_val = (-self.gamma * squared_dist).exp();
800                kernel_matrix[[i, j]] = kernel_val;
801            }
802        }
803
804        Ok(kernel_matrix)
805    }
806
807    /// Perform eigendecomposition (simplified)
808    fn eigendecomposition(&self, matrix: &Array2<f64>) -> Result<(Array1<f64>, Array2<f64>)> {
809        let n = matrix.nrows();
810        let eigenvalues = Array1::ones(self.n_components.min(n));
811        let eigenvectors = Array2::eye(n)
812            .slice(s![.., ..self.n_components.min(n)])
813            .to_owned();
814
815        Ok((eigenvalues, eigenvectors))
816    }
817}
818
819#[allow(non_snake_case)]
820#[cfg(test)]
821mod tests {
822    use super::*;
823    use scirs2_core::ndarray::array;
824
825    #[test]
826    fn test_streaming_rbf_sampler_basic() {
827        let x = array![[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]];
828
829        let mut sampler = StreamingRBFSampler::new(50, 0.1).with_random_state(42);
830
831        sampler.fit(&x).unwrap();
832        let features = sampler.transform(&x).unwrap();
833
834        assert_eq!(features.nrows(), 4);
835        assert_eq!(features.ncols(), 50);
836    }
837
838    #[test]
839    fn test_streaming_sample() {
840        let data = array![1.0, 2.0, 3.0];
841        let sample = StreamingSample::new(data.clone(), 1.0)
842            .with_weight(0.8)
843            .with_importance(0.9)
844            .with_label(1.0);
845
846        assert_eq!(sample.data, data);
847        assert_eq!(sample.timestamp, 1.0);
848        assert_eq!(sample.weight, 0.8);
849        assert_eq!(sample.importance, 0.9);
850        assert_eq!(sample.label, Some(1.0));
851    }
852
853    #[test]
854    fn test_buffer_strategies() {
855        let mut sampler = StreamingRBFSampler::new(10, 0.1).with_config(StreamingConfig {
856            buffer_strategy: BufferStrategy::FixedSize(3),
857            ..Default::default()
858        });
859
860        // Add samples beyond buffer capacity
861        for i in 0..5 {
862            let data = array![i as f64, (i + 1) as f64];
863            let sample = StreamingSample::new(data, i as f64);
864            sampler.add_sample(sample).unwrap();
865        }
866
867        let (size, _, _) = sampler.buffer_stats();
868        assert_eq!(size, 3); // Buffer should be limited to 3
869    }
870
871    #[test]
872    fn test_streaming_nystroem_basic() {
873        let x = array![[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]];
874
875        let mut nystroem = StreamingNystroem::new(3, 0.1).with_random_state(42);
876
877        nystroem.fit(&x).unwrap();
878        let features = nystroem.transform(&x).unwrap();
879
880        assert_eq!(features.nrows(), 4);
881        assert_eq!(features.ncols(), 3);
882    }
883
884    #[test]
885    fn test_feature_statistics() {
886        let mut stats = FeatureStatistics::new(3);
887        let features = array![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]];
888
889        stats.update(&features);
890
891        assert_eq!(stats.update_count, 2);
892        assert!((stats.mean[0] - 2.5).abs() < 1e-10);
893        assert!((stats.mean[1] - 3.5).abs() < 1e-10);
894        assert!((stats.mean[2] - 4.5).abs() < 1e-10);
895    }
896
897    #[test]
898    fn test_transform_sample() {
899        let x = array![[1.0, 2.0], [3.0, 4.0]];
900        let mut sampler = StreamingRBFSampler::new(10, 0.1).with_random_state(42);
901
902        sampler.fit(&x).unwrap();
903
904        let sample = array![5.0, 6.0];
905        let features = sampler.transform_sample(&sample).unwrap();
906
907        assert_eq!(features.len(), 10);
908    }
909
910    #[test]
911    fn test_streaming_config() {
912        let config = StreamingConfig {
913            buffer_strategy: BufferStrategy::SlidingWindow {
914                size: 100,
915                time_window: 10.0,
916            },
917            update_frequency: UpdateFrequency::BatchSize(50),
918            forgetting_mechanism: ForgettingMechanism::LinearDecay(0.01),
919            adaptive_components: true,
920            ..Default::default()
921        };
922
923        assert!(matches!(
924            config.buffer_strategy,
925            BufferStrategy::SlidingWindow { .. }
926        ));
927        assert!(matches!(
928            config.update_frequency,
929            UpdateFrequency::BatchSize(50)
930        ));
931        assert!(config.adaptive_components);
932    }
933
934    #[test]
935    fn test_online_updates() {
936        let mut sampler = StreamingRBFSampler::new(20, 0.1)
937            .with_config(StreamingConfig {
938                update_frequency: UpdateFrequency::BatchSize(2),
939                ..Default::default()
940            })
941            .with_random_state(42);
942
943        // Initialize with small batch
944        let x_init = array![[1.0, 2.0], [3.0, 4.0]];
945        sampler.fit(&x_init).unwrap();
946
947        // Add samples one by one
948        for i in 5..10 {
949            let data = array![i as f64, (i + 1) as f64];
950            let sample = StreamingSample::new(data, i as f64);
951            sampler.add_sample(sample).unwrap();
952        }
953
954        let (buffer_size, _, _) = sampler.buffer_stats();
955        assert!(buffer_size > 0);
956    }
957
958    #[test]
959    fn test_drift_detection() {
960        let x1 = array![[1.0, 2.0], [1.1, 2.1], [0.9, 1.9]];
961        let x2 = array![[5.0, 6.0], [5.1, 6.1], [4.9, 5.9]]; // Different distribution
962
963        let mut sampler = StreamingRBFSampler::new(20, 0.1)
964            .with_config(StreamingConfig {
965                drift_detection: true,
966                ..Default::default()
967            })
968            .with_random_state(42);
969
970        sampler.fit(&x1).unwrap();
971        let _drift_detected = sampler.detect_drift(&x2).unwrap();
972
973        // Should detect some drift in feature statistics
974        assert!(sampler.feature_stats().drift_score >= 0.0);
975    }
976}