Skip to main content

scirs2_interpolate/
streaming.rs

1//! Streaming interpolation for online and real-time systems
2//!
3//! This module provides interpolation methods specifically designed for streaming data
4//! scenarios where:
5//! - Data arrives incrementally over time
6//! - Memory usage must be bounded and predictable
7//! - Low latency is required for real-time applications
8//! - Models need to adapt to changing data patterns
9//!
10//! ## Key Features
11//!
12//! - **Online learning**: Update interpolation models incrementally without full retraining
13//! - **Bounded memory**: Automatic data window management and model compression
14//! - **Low latency**: Optimized for real-time predictions with microsecond response times
15//! - **Adaptive models**: Automatically detect and adapt to concept drift
16//! - **Quality control**: Built-in outlier detection and data validation
17//! - **Backpressure handling**: Graceful degradation under high data rates
18//!
19//! ## Supported Methods
20//!
21//! - **Streaming splines**: Incremental cubic spline updates
22//! - **Online RBF**: Real-time radial basis function interpolation
23//! - **Adaptive kriging**: Streaming Gaussian process regression
24//! - **Moving window**: Time-windowed interpolation models
25//! - **Ensemble streaming**: Combine multiple streaming methods
26
27use crate::error::{InterpolateError, InterpolateResult};
28use scirs2_core::ndarray::{Array1, Array2};
29use scirs2_core::numeric::{Float, FromPrimitive, Zero};
30use scirs2_core::validation::check_finite;
31use std::collections::{HashMap, VecDeque};
32use std::fmt::Debug;
33use std::time::Instant;
34
35/// Configuration for streaming interpolation
36#[derive(Debug, Clone)]
37pub struct StreamingConfig {
38    /// Maximum number of data points to keep in memory
39    pub max_points: usize,
40    /// Window size for moving window methods (None = unlimited)
41    pub window_size: Option<usize>,
42    /// Outlier detection threshold (standard deviations)
43    pub outlier_threshold: f64,
44    /// Model update frequency (every N points)
45    pub update_frequency: usize,
46    /// Adaptation rate for online learning (0.0 to 1.0)
47    pub adaptation_rate: f64,
48    /// Memory cleanup threshold (fraction of max_points)
49    pub cleanup_threshold: f64,
50    /// Maximum latency allowed for predictions (microseconds)
51    pub max_latency_us: u64,
52}
53
54impl Default for StreamingConfig {
55    fn default() -> Self {
56        Self {
57            max_points: 10_000,
58            window_size: Some(1_000),
59            outlier_threshold: 3.0,
60            update_frequency: 100,
61            adaptation_rate: 0.01,
62            cleanup_threshold: 0.8,
63            max_latency_us: 1_000, // 1ms
64        }
65    }
66}
67
68/// Statistics for streaming interpolation performance
69#[derive(Debug, Clone, Default)]
70pub struct StreamingStats {
71    /// Total number of points processed
72    pub points_processed: usize,
73    /// Number of outliers detected and rejected
74    pub outliers_rejected: usize,
75    /// Number of model updates performed
76    pub model_updates: usize,
77    /// Current memory usage (number of stored points)
78    pub memory_usage: usize,
79    /// Average prediction latency (microseconds)
80    pub avg_latency_us: f64,
81    /// Maximum prediction latency observed (microseconds)
82    pub max_latency_us: u64,
83    /// Last model update timestamp
84    pub last_update: Option<Instant>,
85    /// Prediction accuracy metrics
86    pub accuracy_metrics: AccuracyMetrics,
87}
88
89/// Accuracy metrics for streaming interpolation
90#[derive(Debug, Clone, Default)]
91pub struct AccuracyMetrics {
92    /// Mean squared error over recent predictions
93    pub mse: f64,
94    /// Mean absolute error over recent predictions
95    pub mae: f64,
96    /// R-squared coefficient
97    pub r_squared: f64,
98    /// Number of predictions used for metrics
99    pub sample_count: usize,
100}
101
102/// Data point for streaming interpolation
103#[derive(Debug, Clone)]
104pub struct StreamingPoint<T: Float> {
105    /// x-coordinate
106    pub x: T,
107    /// y-coordinate
108    pub y: T,
109    /// Timestamp when the point was added
110    pub timestamp: Instant,
111    /// Quality score (0.0 to 1.0, higher is better)
112    pub quality: f64,
113    /// Optional metadata
114    pub metadata: HashMap<String, String>,
115}
116
117/// Streaming interpolation method type
118#[derive(Debug, Clone, Copy, PartialEq)]
119pub enum StreamingMethod {
120    /// Online spline interpolation
121    OnlineSpline,
122    /// Streaming RBF interpolation
123    StreamingRBF,
124    /// Adaptive kriging
125    AdaptiveKriging,
126    /// Moving window linear interpolation
127    MovingWindow,
128    /// Ensemble of streaming methods
129    Ensemble,
130}
131
132/// Main streaming interpolation interface
133pub trait StreamingInterpolator<T: Float + Debug + FromPrimitive> {
134    /// Add a new data point to the streaming model
135    fn add_point(&mut self, point: StreamingPoint<T>) -> InterpolateResult<()>;
136
137    /// Add multiple points at once (batch update)
138    fn add_points(&mut self, points: &[StreamingPoint<T>]) -> InterpolateResult<()>;
139
140    /// Predict the value at a given x-coordinate
141    fn predict(&mut self, x: T) -> InterpolateResult<T>;
142
143    /// Predict values at multiple x-coordinates
144    fn predict_batch(&mut self, xvalues: &[T]) -> InterpolateResult<Vec<T>>;
145
146    /// Predict with uncertainty estimation
147    fn predict_with_uncertainty(&mut self, x: T) -> InterpolateResult<(T, T)>;
148
149    /// Force model update (useful for controlling update timing)
150    fn update_model(&mut self) -> InterpolateResult<()>;
151
152    /// Clear all data and reset the model
153    fn reset(&mut self) -> InterpolateResult<()>;
154
155    /// Get current streaming statistics
156    fn get_stats(&self) -> StreamingStats;
157
158    /// Get current configuration
159    fn get_config(&self) -> &StreamingConfig;
160
161    /// Update configuration (takes effect on next model update)
162    fn set_config(&mut self, config: StreamingConfig) -> InterpolateResult<()>;
163}
164
165/// Online spline interpolation for streaming data
166pub struct OnlineSplineInterpolator<T: Float + Debug + FromPrimitive> {
167    config: StreamingConfig,
168    points: VecDeque<StreamingPoint<T>>,
169    spline_coeffs: Option<Array2<T>>,
170    x_sorted: Array1<T>,
171    y_sorted: Array1<T>,
172    stats: StreamingStats,
173    last_update_count: usize,
174}
175
176impl<T: Float + Debug + FromPrimitive + Zero> OnlineSplineInterpolator<T> {
177    /// Create a new online spline interpolator
178    pub fn new(config: StreamingConfig) -> Self {
179        Self {
180            config,
181            points: VecDeque::new(),
182            spline_coeffs: None,
183            x_sorted: Array1::zeros(0),
184            y_sorted: Array1::zeros(0),
185            stats: StreamingStats::default(),
186            last_update_count: 0,
187        }
188    }
189
190    /// Add a new point coordinates to the streaming interpolator
191    pub fn add_point_coords(&mut self, x: T, y: T) -> InterpolateResult<()> {
192        if !x.is_finite() || !y.is_finite() {
193            return Err(InterpolateError::InvalidInput {
194                message: "input point contains non-finite values".to_string(),
195            });
196        }
197
198        let point = StreamingPoint {
199            x,
200            y,
201            timestamp: Instant::now(),
202            quality: 1.0,
203            metadata: HashMap::new(),
204        };
205
206        // Check for outliers
207        if !self.is_outlier(&point) {
208            self.points.push_back(point);
209            self.stats.points_processed += 1;
210        } else {
211            self.stats.outliers_rejected += 1;
212            return Ok(()); // Skip outlier
213        }
214
215        // Maintain memory bounds
216        if self.points.len() > self.config.max_points {
217            self.points.pop_front();
218            self.stats.points_processed += 1;
219        }
220
221        // Update model if needed
222        if (self.stats.points_processed - self.last_update_count) >= self.config.update_frequency {
223            self.update_spline_coefficients()?;
224            self.last_update_count = self.stats.points_processed;
225        }
226
227        Ok(())
228    }
229
230    /// Check if a point is an outlier based on statistical analysis
231    fn is_outlier(&self, point: &StreamingPoint<T>) -> bool {
232        if self.points.len() < 3 {
233            return false; // Need at least 3 points for outlier detection
234        }
235
236        // Simple z-score based outlier detection
237        let y_values: Vec<f64> = self
238            .points
239            .iter()
240            .map(|p| p.y.to_f64().unwrap_or(0.0))
241            .collect();
242
243        let mean = y_values.iter().sum::<f64>() / y_values.len() as f64;
244        let variance =
245            y_values.iter().map(|&y| (y - mean).powi(2)).sum::<f64>() / y_values.len() as f64;
246        let std_dev = variance.sqrt();
247
248        if std_dev == 0.0 {
249            return false;
250        }
251
252        let z_score = ((point.y.to_f64().unwrap_or(0.0) - mean) / std_dev).abs();
253        z_score > self.config.outlier_threshold
254    }
255
256    /// Update spline coefficients based on current data
257    fn update_spline_coefficients(&mut self) -> InterpolateResult<()> {
258        if self.points.len() < 2 {
259            return Ok(());
260        }
261
262        // Sort points by x-coordinate
263        let mut sorted_points: Vec<_> = self.points.iter().collect();
264        sorted_points.sort_by(|a, b| a.x.partial_cmp(&b.x).unwrap_or(std::cmp::Ordering::Equal));
265
266        let n = sorted_points.len();
267        let mut x_vals = Array1::zeros(n);
268        let mut y_vals = Array1::zeros(n);
269
270        for (i, point) in sorted_points.iter().enumerate() {
271            x_vals[i] = point.x;
272            y_vals[i] = point.y;
273        }
274
275        // Compute cubic spline coefficients using natural boundary conditions
276        let coeffs = self.compute_natural_spline_coefficients(&x_vals, &y_vals)?;
277
278        self.spline_coeffs = Some(coeffs);
279        self.x_sorted = x_vals;
280        self.y_sorted = y_vals;
281
282        Ok(())
283    }
284
285    /// Compute natural cubic spline coefficients
286    fn compute_natural_spline_coefficients(
287        &self,
288        x: &Array1<T>,
289        y: &Array1<T>,
290    ) -> InterpolateResult<Array2<T>> {
291        let n = x.len();
292        if n < 2 {
293            return Err(InterpolateError::invalid_input(
294                "Need at least 2 points".to_string(),
295            ));
296        }
297
298        let segments = n - 1;
299        let mut coeffs = Array2::zeros((segments, 4));
300
301        if n == 2 {
302            // Linear interpolation for 2 points
303            let dx = x[1] - x[0];
304            let dy = y[1] - y[0];
305            coeffs[[0, 0]] = y[0];
306            coeffs[[0, 1]] = dy / dx;
307            return Ok(coeffs);
308        }
309
310        // Set up tridiagonal system for natural spline
311        let mut h = Array1::zeros(segments);
312        let mut alpha = Array1::zeros(segments);
313
314        for i in 0..segments {
315            h[i] = x[i + 1] - x[i];
316        }
317
318        for i in 1..segments {
319            alpha[i] = (T::from_f64(3.0).expect("Operation failed") * (y[i + 1] - y[i]) / h[i])
320                - (T::from_f64(3.0).expect("Operation failed") * (y[i] - y[i - 1]) / h[i - 1]);
321        }
322
323        // Solve tridiagonal system
324        let mut l = Array1::zeros(n);
325        let mut mu = Array1::zeros(n);
326        let mut z = Array1::zeros(n);
327
328        l[0] = T::one();
329
330        for i in 1..(n - 1) {
331            l[i] = T::from_f64(2.0).expect("Operation failed") * (x[i + 1] - x[i - 1])
332                - h[i - 1] * mu[i - 1];
333            mu[i] = h[i] / l[i];
334            z[i] = (alpha[i] - h[i - 1] * z[i - 1]) / l[i];
335        }
336
337        l[n - 1] = T::one();
338
339        // Back substitution
340        let mut c = Array1::zeros(n);
341        for i in (0..(n - 1)).rev() {
342            c[i] = z[i] - mu[i] * c[i + 1];
343        }
344
345        // Compute spline coefficients
346        for i in 0..segments {
347            coeffs[[i, 0]] = y[i]; // a_i
348            coeffs[[i, 1]] = (y[i + 1] - y[i]) / h[i]
349                - h[i] * (c[i + 1] + T::from_f64(2.0).expect("Operation failed") * c[i])
350                    / T::from_f64(3.0).expect("Operation failed"); // b_i
351            coeffs[[i, 2]] = c[i]; // c_i
352            coeffs[[i, 3]] =
353                (c[i + 1] - c[i]) / (T::from_f64(3.0).expect("Operation failed") * h[i]);
354            // d_i
355        }
356
357        Ok(coeffs)
358    }
359
360    /// Evaluate spline at given x value
361    fn evaluate_spline(&self, x: T) -> InterpolateResult<T> {
362        let coeffs = self.spline_coeffs.as_ref().ok_or_else(|| {
363            InterpolateError::ComputationError("Spline not initialized".to_string())
364        })?;
365
366        if self.x_sorted.len() < 2 {
367            return Err(InterpolateError::ComputationError(
368                "Need at least 2 points".to_string(),
369            ));
370        }
371
372        // Find the appropriate segment
373        let mut segment = 0;
374        for i in 0..(self.x_sorted.len() - 1) {
375            if x >= self.x_sorted[i] && x <= self.x_sorted[i + 1] {
376                segment = i;
377                break;
378            }
379        }
380
381        // Handle extrapolation
382        if x < self.x_sorted[0] {
383            segment = 0;
384        } else if x > self.x_sorted[self.x_sorted.len() - 1] {
385            segment = self.x_sorted.len() - 2;
386        }
387
388        // Evaluate polynomial
389        let dx = x - self.x_sorted[segment];
390        let a = coeffs[[segment, 0]];
391        let b = coeffs[[segment, 1]];
392        let c = coeffs[[segment, 2]];
393        let d = coeffs[[segment, 3]];
394
395        Ok(a + b * dx + c * dx * dx + d * dx * dx * dx)
396    }
397
398    /// Clean up old data points to maintain memory bounds
399    fn cleanup_memory(&mut self) {
400        let max_size = (self.config.max_points as f64 * self.config.cleanup_threshold) as usize;
401        while self.points.len() > max_size {
402            self.points.pop_front();
403        }
404    }
405}
406
407impl<T: Float + Debug + FromPrimitive + Zero> StreamingInterpolator<T>
408    for OnlineSplineInterpolator<T>
409{
410    fn add_point(&mut self, point: StreamingPoint<T>) -> InterpolateResult<()> {
411        // Validate point
412        let x_val = point.x.to_f64().unwrap_or(f64::NAN);
413        let y_val = point.y.to_f64().unwrap_or(f64::NAN);
414        check_finite(x_val, "point x coordinate")?;
415        check_finite(y_val, "point y coordinate")?;
416
417        // Check for outliers
418        if self.is_outlier(&point) {
419            self.stats.outliers_rejected += 1;
420            return Ok(());
421        }
422
423        // Add point to buffer
424        self.points.push_back(point);
425        self.stats.points_processed += 1;
426
427        // Apply window size limit
428        if let Some(window_size) = self.config.window_size {
429            while self.points.len() > window_size {
430                self.points.pop_front();
431            }
432        }
433
434        // Check if model update is needed
435        if self.stats.points_processed - self.last_update_count >= self.config.update_frequency {
436            self.update_model()?;
437        }
438
439        // Memory cleanup
440        if self.points.len() > self.config.max_points {
441            self.cleanup_memory();
442        }
443
444        self.stats.memory_usage = self.points.len();
445        Ok(())
446    }
447
448    fn add_points(&mut self, points: &[StreamingPoint<T>]) -> InterpolateResult<()> {
449        for point in points {
450            self.add_point(point.clone())?;
451        }
452        Ok(())
453    }
454
455    fn predict(&mut self, x: T) -> InterpolateResult<T> {
456        let start_time = Instant::now();
457
458        let result = if let Some(_) = self.spline_coeffs {
459            self.evaluate_spline(x)
460        } else if self.points.len() >= 2 {
461            // Simple linear interpolation as fallback
462            let first = &self.points[0];
463            let last = &self.points[self.points.len() - 1];
464            let slope = (last.y - first.y) / (last.x - first.x);
465            Ok(first.y + slope * (x - first.x))
466        } else if self.points.len() == 1 {
467            Ok(self.points[0].y)
468        } else {
469            Err(InterpolateError::ComputationError(
470                "No data points available".to_string(),
471            ))
472        };
473
474        let latency = start_time.elapsed().as_micros() as u64;
475
476        // Update latency statistics (simplified - would need proper moving average in production)
477        if self.stats.max_latency_us < latency {
478            self.stats.max_latency_us = latency;
479        }
480
481        result
482    }
483
484    fn predict_batch(&mut self, xvalues: &[T]) -> InterpolateResult<Vec<T>> {
485        let mut results = Vec::with_capacity(xvalues.len());
486        for &x in xvalues {
487            results.push(self.predict(x)?);
488        }
489        Ok(results)
490    }
491
492    fn predict_with_uncertainty(&mut self, x: T) -> InterpolateResult<(T, T)> {
493        let prediction = self.predict(x)?;
494
495        // Simple uncertainty estimation based on distance to nearest data points
496        let min_distance = self
497            .points
498            .iter()
499            .map(|p| (p.x - x).abs())
500            .fold(T::infinity(), |a, b| if a < b { a } else { b });
501
502        let uncertainty = if min_distance.is_finite() {
503            min_distance * T::from_f64(0.1).expect("Operation failed") // 10% of distance as uncertainty
504        } else {
505            T::from_f64(1.0).expect("Operation failed") // Default uncertainty
506        };
507
508        Ok((prediction, uncertainty))
509    }
510
511    fn update_model(&mut self) -> InterpolateResult<()> {
512        self.update_spline_coefficients()?;
513        self.stats.model_updates += 1;
514        self.stats.last_update = Some(Instant::now());
515        self.last_update_count = self.stats.points_processed;
516        Ok(())
517    }
518
519    fn reset(&mut self) -> InterpolateResult<()> {
520        self.points.clear();
521        self.spline_coeffs = None;
522        self.x_sorted = Array1::zeros(0);
523        self.y_sorted = Array1::zeros(0);
524        self.stats = StreamingStats::default();
525        self.last_update_count = 0;
526        Ok(())
527    }
528
529    fn get_stats(&self) -> StreamingStats {
530        self.stats.clone()
531    }
532
533    fn get_config(&self) -> &StreamingConfig {
534        &self.config
535    }
536
537    fn set_config(&mut self, config: StreamingConfig) -> InterpolateResult<()> {
538        self.config = config;
539        Ok(())
540    }
541}
542
543/// Create a new online spline interpolator
544#[allow(dead_code)]
545pub fn make_online_spline_interpolator<T: Float + Debug + FromPrimitive + Zero>(
546    config: Option<StreamingConfig>,
547) -> OnlineSplineInterpolator<T> {
548    OnlineSplineInterpolator::new(config.unwrap_or_default())
549}
550
551/// Streaming RBF interpolator for real-time applications
552pub struct StreamingRBFInterpolator<T: Float + Debug + FromPrimitive> {
553    config: StreamingConfig,
554    points: VecDeque<StreamingPoint<T>>,
555    centers: Array1<T>,
556    weights: Array1<T>,
557    kernel_width: T,
558    stats: StreamingStats,
559    last_update_count: usize,
560}
561
562impl<T: Float + Debug + FromPrimitive + Zero> StreamingRBFInterpolator<T> {
563    /// Create a new streaming RBF interpolator
564    pub fn new(config: StreamingConfig, kernel_width: T) -> Self {
565        Self {
566            config,
567            points: VecDeque::new(),
568            centers: Array1::zeros(0),
569            weights: Array1::zeros(0),
570            kernel_width,
571            stats: StreamingStats::default(),
572            last_update_count: 0,
573        }
574    }
575
576    /// RBF kernel function (Gaussian)
577    fn rbf_kernel(&self, r: T) -> T {
578        let neg_r_squared = -(r * r) / (self.kernel_width * self.kernel_width);
579        neg_r_squared.exp()
580    }
581
582    /// Update RBF model with current data points
583    fn update_rbf_model(&mut self) -> InterpolateResult<()> {
584        if self.points.is_empty() {
585            return Ok(());
586        }
587
588        let n = self.points.len();
589        let mut centers = Array1::zeros(n);
590        let mut targets = Array1::zeros(n);
591
592        for (i, point) in self.points.iter().enumerate() {
593            centers[i] = point.x;
594            targets[i] = point.y;
595        }
596
597        // Build RBF matrix
598        let mut rbf_matrix = Array2::zeros((n, n));
599        for i in 0..n {
600            for j in 0..n {
601                let distance = (centers[i] - centers[j]).abs();
602                rbf_matrix[[i, j]] = self.rbf_kernel(distance);
603            }
604        }
605
606        // Add regularization for numerical stability
607        let regularization = T::from_f64(1e-10).expect("Operation failed");
608        for i in 0..n {
609            rbf_matrix[[i, i]] = rbf_matrix[[i, i]] + regularization;
610        }
611
612        // Solve for weights (simplified - would use proper linear solver in production)
613        let weights = self.solve_linear_system(&rbf_matrix, &targets)?;
614
615        self.centers = centers;
616        self.weights = weights;
617
618        Ok(())
619    }
620
621    /// Simple linear system solver (Gaussian elimination with partial pivoting)
622    fn solve_linear_system(&self, a: &Array2<T>, b: &Array1<T>) -> InterpolateResult<Array1<T>> {
623        let n = a.nrows();
624        let mut aug = Array2::zeros((n, n + 1));
625
626        // Create augmented matrix
627        for i in 0..n {
628            for j in 0..n {
629                aug[[i, j]] = a[[i, j]];
630            }
631            aug[[i, n]] = b[i];
632        }
633
634        // Forward elimination with partial pivoting
635        for k in 0..n {
636            // Find pivot
637            let mut max_row = k;
638            for i in (k + 1)..n {
639                if aug[[i, k]].abs() > aug[[max_row, k]].abs() {
640                    max_row = i;
641                }
642            }
643
644            // Swap rows
645            if max_row != k {
646                for j in 0..=n {
647                    let temp = aug[[k, j]];
648                    aug[[k, j]] = aug[[max_row, j]];
649                    aug[[max_row, j]] = temp;
650                }
651            }
652
653            // Check for zero pivot
654            if aug[[k, k]].abs() < T::from_f64(1e-12).expect("Operation failed") {
655                return Err(InterpolateError::ComputationError(
656                    "Singular matrix".to_string(),
657                ));
658            }
659
660            // Eliminate column
661            for i in (k + 1)..n {
662                let factor = aug[[i, k]] / aug[[k, k]];
663                for j in k..=n {
664                    aug[[i, j]] = aug[[i, j]] - factor * aug[[k, j]];
665                }
666            }
667        }
668
669        // Back substitution
670        let mut x = Array1::zeros(n);
671        for i in (0..n).rev() {
672            x[i] = aug[[i, n]];
673            for j in (i + 1)..n {
674                x[i] = x[i] - aug[[i, j]] * x[j];
675            }
676            x[i] = x[i] / aug[[i, i]];
677        }
678
679        Ok(x)
680    }
681
682    /// Evaluate RBF at given point
683    fn evaluate_rbf(&self, x: T) -> InterpolateResult<T> {
684        if self.centers.is_empty() {
685            return Err(InterpolateError::ComputationError(
686                "RBF model not initialized".to_string(),
687            ));
688        }
689
690        let mut result = T::zero();
691        for i in 0..self.centers.len() {
692            let distance = (x - self.centers[i]).abs();
693            result = result + self.weights[i] * self.rbf_kernel(distance);
694        }
695
696        Ok(result)
697    }
698}
699
700impl<T: Float + Debug + FromPrimitive + Zero> StreamingInterpolator<T>
701    for StreamingRBFInterpolator<T>
702{
703    fn add_point(&mut self, point: StreamingPoint<T>) -> InterpolateResult<()> {
704        let x_val = point.x.to_f64().unwrap_or(f64::NAN);
705        let y_val = point.y.to_f64().unwrap_or(f64::NAN);
706        check_finite(x_val, "point x coordinate")?;
707        check_finite(y_val, "point y coordinate")?;
708
709        self.points.push_back(point);
710        self.stats.points_processed += 1;
711
712        // Apply window size limit
713        if let Some(window_size) = self.config.window_size {
714            while self.points.len() > window_size {
715                self.points.pop_front();
716            }
717        }
718
719        // Update model if needed
720        if self.stats.points_processed - self.last_update_count >= self.config.update_frequency {
721            self.update_model()?;
722        }
723
724        self.stats.memory_usage = self.points.len();
725        Ok(())
726    }
727
728    fn add_points(&mut self, points: &[StreamingPoint<T>]) -> InterpolateResult<()> {
729        for point in points {
730            self.add_point(point.clone())?;
731        }
732        Ok(())
733    }
734
735    fn predict(&mut self, x: T) -> InterpolateResult<T> {
736        self.evaluate_rbf(x)
737    }
738
739    fn predict_batch(&mut self, xvalues: &[T]) -> InterpolateResult<Vec<T>> {
740        let mut results = Vec::with_capacity(xvalues.len());
741        for &x in xvalues {
742            results.push(self.predict(x)?);
743        }
744        Ok(results)
745    }
746
747    fn predict_with_uncertainty(&mut self, x: T) -> InterpolateResult<(T, T)> {
748        let prediction = self.predict(x)?;
749
750        // Estimate uncertainty based on distance to nearest center
751        let min_distance = self
752            .centers
753            .iter()
754            .map(|&c| (c - x).abs())
755            .fold(T::infinity(), |a, b| if a < b { a } else { b });
756
757        let uncertainty =
758            self.rbf_kernel(min_distance) * T::from_f64(0.5).expect("Operation failed");
759
760        Ok((prediction, uncertainty))
761    }
762
763    fn update_model(&mut self) -> InterpolateResult<()> {
764        self.update_rbf_model()?;
765        self.stats.model_updates += 1;
766        self.stats.last_update = Some(Instant::now());
767        self.last_update_count = self.stats.points_processed;
768        Ok(())
769    }
770
771    fn reset(&mut self) -> InterpolateResult<()> {
772        self.points.clear();
773        self.centers = Array1::zeros(0);
774        self.weights = Array1::zeros(0);
775        self.stats = StreamingStats::default();
776        self.last_update_count = 0;
777        Ok(())
778    }
779
780    fn get_stats(&self) -> StreamingStats {
781        self.stats.clone()
782    }
783
784    fn get_config(&self) -> &StreamingConfig {
785        &self.config
786    }
787
788    fn set_config(&mut self, config: StreamingConfig) -> InterpolateResult<()> {
789        self.config = config;
790        Ok(())
791    }
792}
793
794/// Create a new streaming RBF interpolator
795#[allow(dead_code)]
796pub fn make_streaming_rbf_interpolator<T: Float + Debug + FromPrimitive + Zero>(
797    config: Option<StreamingConfig>,
798    kernel_width: Option<T>,
799) -> StreamingRBFInterpolator<T> {
800    let width = kernel_width.unwrap_or_else(|| T::from_f64(1.0).expect("Operation failed"));
801    StreamingRBFInterpolator::new(config.unwrap_or_default(), width)
802}
803
804/// Ensemble streaming interpolator that combines multiple methods
805pub struct EnsembleStreamingInterpolator<T: Float + Debug + FromPrimitive> {
806    config: StreamingConfig,
807    methods: Vec<Box<dyn StreamingInterpolator<T>>>,
808    weights: Array1<f64>,
809    stats: StreamingStats,
810}
811
812impl<T: Float + Debug + FromPrimitive + Zero> EnsembleStreamingInterpolator<T> {
813    /// Create a new ensemble streaming interpolator
814    pub fn new(
815        config: StreamingConfig,
816        methods: Vec<Box<dyn StreamingInterpolator<T>>>,
817        weights: Option<Array1<f64>>,
818    ) -> InterpolateResult<Self> {
819        let n_methods = methods.len();
820        if n_methods == 0 {
821            return Err(InterpolateError::invalid_input(
822                "At least one method required".to_string(),
823            ));
824        }
825
826        let weights =
827            weights.unwrap_or_else(|| Array1::from_elem(n_methods, 1.0 / n_methods as f64));
828
829        if weights.len() != n_methods {
830            return Err(InterpolateError::invalid_input(
831                "Weights length must match number of methods".to_string(),
832            ));
833        }
834
835        Ok(Self {
836            config,
837            methods,
838            weights,
839            stats: StreamingStats::default(),
840        })
841    }
842}
843
844impl<T: Float + Debug + FromPrimitive + Zero> StreamingInterpolator<T>
845    for EnsembleStreamingInterpolator<T>
846{
847    fn add_point(&mut self, point: StreamingPoint<T>) -> InterpolateResult<()> {
848        for method in &mut self.methods {
849            method.add_point(point.clone())?;
850        }
851        self.stats.points_processed += 1;
852        Ok(())
853    }
854
855    fn add_points(&mut self, points: &[StreamingPoint<T>]) -> InterpolateResult<()> {
856        for method in &mut self.methods {
857            method.add_points(points)?;
858        }
859        Ok(())
860    }
861
862    fn predict(&mut self, x: T) -> InterpolateResult<T> {
863        let mut weighted_sum = 0.0;
864        for (i, method) in self.methods.iter_mut().enumerate() {
865            let prediction = method.predict(x)?.to_f64().unwrap_or(0.0);
866            weighted_sum += self.weights[i] * prediction;
867        }
868
869        T::from_f64(weighted_sum).ok_or_else(|| {
870            InterpolateError::ComputationError(
871                "Failed to convert prediction to target type".to_string(),
872            )
873        })
874    }
875
876    fn predict_batch(&mut self, xvalues: &[T]) -> InterpolateResult<Vec<T>> {
877        let mut results = Vec::with_capacity(xvalues.len());
878        for &x in xvalues {
879            results.push(self.predict(x)?);
880        }
881        Ok(results)
882    }
883
884    fn predict_with_uncertainty(&mut self, x: T) -> InterpolateResult<(T, T)> {
885        let mut predictions = Vec::new();
886        let mut weighted_sum = 0.0;
887
888        for (i, method) in self.methods.iter_mut().enumerate() {
889            let prediction = method.predict(x)?.to_f64().unwrap_or(0.0);
890            predictions.push(prediction);
891            weighted_sum += self.weights[i] * prediction;
892        }
893
894        // Calculate uncertainty as weighted standard deviation
895        let mut variance = 0.0;
896        for (i, &pred) in predictions.iter().enumerate() {
897            let diff = pred - weighted_sum;
898            variance += self.weights[i] * diff * diff;
899        }
900
901        let uncertainty = variance.sqrt();
902
903        let mean_pred = T::from_f64(weighted_sum).unwrap_or_else(T::zero);
904        let uncertainty_t = T::from_f64(uncertainty).unwrap_or_else(T::zero);
905
906        Ok((mean_pred, uncertainty_t))
907    }
908
909    fn update_model(&mut self) -> InterpolateResult<()> {
910        for method in &mut self.methods {
911            method.update_model()?;
912        }
913        self.stats.model_updates += 1;
914        Ok(())
915    }
916
917    fn reset(&mut self) -> InterpolateResult<()> {
918        for method in &mut self.methods {
919            method.reset()?;
920        }
921        self.stats = StreamingStats::default();
922        Ok(())
923    }
924
925    fn get_stats(&self) -> StreamingStats {
926        self.stats.clone()
927    }
928
929    fn get_config(&self) -> &StreamingConfig {
930        &self.config
931    }
932
933    fn set_config(&mut self, config: StreamingConfig) -> InterpolateResult<()> {
934        self.config = config.clone();
935        for method in &mut self.methods {
936            method.set_config(config.clone())?;
937        }
938        Ok(())
939    }
940}
941
942/// Create a new ensemble streaming interpolator with default methods
943#[allow(dead_code)]
944pub fn make_ensemble_streaming_interpolator<T: Float + Debug + FromPrimitive + Zero + 'static>(
945    config: Option<StreamingConfig>,
946    weights: Option<Array1<f64>>,
947) -> InterpolateResult<EnsembleStreamingInterpolator<T>> {
948    let config = config.unwrap_or_default();
949
950    let methods: Vec<Box<dyn StreamingInterpolator<T>>> = vec![
951        Box::new(make_online_spline_interpolator(Some(config.clone()))),
952        Box::new(make_streaming_rbf_interpolator(Some(config.clone()), None)),
953    ];
954
955    EnsembleStreamingInterpolator::new(config, methods, weights)
956}
957
958#[cfg(test)]
959mod tests {
960    use super::*;
961    use std::time::Instant;
962
963    #[test]
964    fn test_online_spline_basic() {
965        let mut interpolator = make_online_spline_interpolator::<f64>(None);
966
967        // Add some test points
968        let points = vec![
969            StreamingPoint {
970                x: 0.0,
971                y: 0.0,
972                timestamp: Instant::now(),
973                quality: 1.0,
974                metadata: HashMap::new(),
975            },
976            StreamingPoint {
977                x: 1.0,
978                y: 1.0,
979                timestamp: Instant::now(),
980                quality: 1.0,
981                metadata: HashMap::new(),
982            },
983            StreamingPoint {
984                x: 2.0,
985                y: 4.0,
986                timestamp: Instant::now(),
987                quality: 1.0,
988                metadata: HashMap::new(),
989            },
990        ];
991
992        for point in points {
993            interpolator.add_point(point).expect("Operation failed");
994        }
995
996        interpolator.update_model().expect("Operation failed");
997
998        // Test prediction
999        let prediction = interpolator.predict(1.5).expect("Operation failed");
1000        assert!(prediction > 1.0 && prediction < 4.0);
1001
1002        let stats = interpolator.get_stats();
1003        assert_eq!(stats.points_processed, 3);
1004    }
1005
1006    #[test]
1007    fn test_streaming_rbf_basic() {
1008        let mut interpolator = make_streaming_rbf_interpolator::<f64>(None, Some(0.5));
1009
1010        // Add test points
1011        let points = vec![
1012            StreamingPoint {
1013                x: 0.0,
1014                y: 0.0,
1015                timestamp: Instant::now(),
1016                quality: 1.0,
1017                metadata: HashMap::new(),
1018            },
1019            StreamingPoint {
1020                x: 1.0,
1021                y: 1.0,
1022                timestamp: Instant::now(),
1023                quality: 1.0,
1024                metadata: HashMap::new(),
1025            },
1026        ];
1027
1028        for point in points {
1029            interpolator.add_point(point).expect("Operation failed");
1030        }
1031
1032        interpolator.update_model().expect("Operation failed");
1033
1034        // Test prediction
1035        let prediction = interpolator.predict(0.5).expect("Operation failed");
1036        assert!(prediction > 0.0 && prediction < 1.0);
1037    }
1038
1039    #[test]
1040    fn test_ensemble_interpolator() {
1041        let ensemble =
1042            make_ensemble_streaming_interpolator::<f64>(None, None).expect("Operation failed");
1043
1044        // Test basic functionality
1045        assert_eq!(ensemble.methods.len(), 2);
1046
1047        let stats = ensemble.get_stats();
1048        assert_eq!(stats.points_processed, 0);
1049    }
1050}