1use crate::error::{InterpolateError, InterpolateResult};
28use scirs2_core::ndarray::{Array1, Array2};
29use scirs2_core::numeric::{Float, FromPrimitive, Zero};
30use scirs2_core::validation::check_finite;
31use std::collections::{HashMap, VecDeque};
32use std::fmt::Debug;
33use std::time::Instant;
34
35#[derive(Debug, Clone)]
37pub struct StreamingConfig {
38 pub max_points: usize,
40 pub window_size: Option<usize>,
42 pub outlier_threshold: f64,
44 pub update_frequency: usize,
46 pub adaptation_rate: f64,
48 pub cleanup_threshold: f64,
50 pub max_latency_us: u64,
52}
53
54impl Default for StreamingConfig {
55 fn default() -> Self {
56 Self {
57 max_points: 10_000,
58 window_size: Some(1_000),
59 outlier_threshold: 3.0,
60 update_frequency: 100,
61 adaptation_rate: 0.01,
62 cleanup_threshold: 0.8,
63 max_latency_us: 1_000, }
65 }
66}
67
68#[derive(Debug, Clone, Default)]
70pub struct StreamingStats {
71 pub points_processed: usize,
73 pub outliers_rejected: usize,
75 pub model_updates: usize,
77 pub memory_usage: usize,
79 pub avg_latency_us: f64,
81 pub max_latency_us: u64,
83 pub last_update: Option<Instant>,
85 pub accuracy_metrics: AccuracyMetrics,
87}
88
89#[derive(Debug, Clone, Default)]
91pub struct AccuracyMetrics {
92 pub mse: f64,
94 pub mae: f64,
96 pub r_squared: f64,
98 pub sample_count: usize,
100}
101
102#[derive(Debug, Clone)]
104pub struct StreamingPoint<T: Float> {
105 pub x: T,
107 pub y: T,
109 pub timestamp: Instant,
111 pub quality: f64,
113 pub metadata: HashMap<String, String>,
115}
116
117#[derive(Debug, Clone, Copy, PartialEq)]
119pub enum StreamingMethod {
120 OnlineSpline,
122 StreamingRBF,
124 AdaptiveKriging,
126 MovingWindow,
128 Ensemble,
130}
131
132pub trait StreamingInterpolator<T: Float + Debug + FromPrimitive> {
134 fn add_point(&mut self, point: StreamingPoint<T>) -> InterpolateResult<()>;
136
137 fn add_points(&mut self, points: &[StreamingPoint<T>]) -> InterpolateResult<()>;
139
140 fn predict(&mut self, x: T) -> InterpolateResult<T>;
142
143 fn predict_batch(&mut self, xvalues: &[T]) -> InterpolateResult<Vec<T>>;
145
146 fn predict_with_uncertainty(&mut self, x: T) -> InterpolateResult<(T, T)>;
148
149 fn update_model(&mut self) -> InterpolateResult<()>;
151
152 fn reset(&mut self) -> InterpolateResult<()>;
154
155 fn get_stats(&self) -> StreamingStats;
157
158 fn get_config(&self) -> &StreamingConfig;
160
161 fn set_config(&mut self, config: StreamingConfig) -> InterpolateResult<()>;
163}
164
165pub struct OnlineSplineInterpolator<T: Float + Debug + FromPrimitive> {
167 config: StreamingConfig,
168 points: VecDeque<StreamingPoint<T>>,
169 spline_coeffs: Option<Array2<T>>,
170 x_sorted: Array1<T>,
171 y_sorted: Array1<T>,
172 stats: StreamingStats,
173 last_update_count: usize,
174}
175
176impl<T: Float + Debug + FromPrimitive + Zero> OnlineSplineInterpolator<T> {
177 pub fn new(config: StreamingConfig) -> Self {
179 Self {
180 config,
181 points: VecDeque::new(),
182 spline_coeffs: None,
183 x_sorted: Array1::zeros(0),
184 y_sorted: Array1::zeros(0),
185 stats: StreamingStats::default(),
186 last_update_count: 0,
187 }
188 }
189
190 pub fn add_point_coords(&mut self, x: T, y: T) -> InterpolateResult<()> {
192 if !x.is_finite() || !y.is_finite() {
193 return Err(InterpolateError::InvalidInput {
194 message: "input point contains non-finite values".to_string(),
195 });
196 }
197
198 let point = StreamingPoint {
199 x,
200 y,
201 timestamp: Instant::now(),
202 quality: 1.0,
203 metadata: HashMap::new(),
204 };
205
206 if !self.is_outlier(&point) {
208 self.points.push_back(point);
209 self.stats.points_processed += 1;
210 } else {
211 self.stats.outliers_rejected += 1;
212 return Ok(()); }
214
215 if self.points.len() > self.config.max_points {
217 self.points.pop_front();
218 self.stats.points_processed += 1;
219 }
220
221 if (self.stats.points_processed - self.last_update_count) >= self.config.update_frequency {
223 self.update_spline_coefficients()?;
224 self.last_update_count = self.stats.points_processed;
225 }
226
227 Ok(())
228 }
229
230 fn is_outlier(&self, point: &StreamingPoint<T>) -> bool {
232 if self.points.len() < 3 {
233 return false; }
235
236 let y_values: Vec<f64> = self
238 .points
239 .iter()
240 .map(|p| p.y.to_f64().unwrap_or(0.0))
241 .collect();
242
243 let mean = y_values.iter().sum::<f64>() / y_values.len() as f64;
244 let variance =
245 y_values.iter().map(|&y| (y - mean).powi(2)).sum::<f64>() / y_values.len() as f64;
246 let std_dev = variance.sqrt();
247
248 if std_dev == 0.0 {
249 return false;
250 }
251
252 let z_score = ((point.y.to_f64().unwrap_or(0.0) - mean) / std_dev).abs();
253 z_score > self.config.outlier_threshold
254 }
255
256 fn update_spline_coefficients(&mut self) -> InterpolateResult<()> {
258 if self.points.len() < 2 {
259 return Ok(());
260 }
261
262 let mut sorted_points: Vec<_> = self.points.iter().collect();
264 sorted_points.sort_by(|a, b| a.x.partial_cmp(&b.x).unwrap_or(std::cmp::Ordering::Equal));
265
266 let n = sorted_points.len();
267 let mut x_vals = Array1::zeros(n);
268 let mut y_vals = Array1::zeros(n);
269
270 for (i, point) in sorted_points.iter().enumerate() {
271 x_vals[i] = point.x;
272 y_vals[i] = point.y;
273 }
274
275 let coeffs = self.compute_natural_spline_coefficients(&x_vals, &y_vals)?;
277
278 self.spline_coeffs = Some(coeffs);
279 self.x_sorted = x_vals;
280 self.y_sorted = y_vals;
281
282 Ok(())
283 }
284
285 fn compute_natural_spline_coefficients(
287 &self,
288 x: &Array1<T>,
289 y: &Array1<T>,
290 ) -> InterpolateResult<Array2<T>> {
291 let n = x.len();
292 if n < 2 {
293 return Err(InterpolateError::invalid_input(
294 "Need at least 2 points".to_string(),
295 ));
296 }
297
298 let segments = n - 1;
299 let mut coeffs = Array2::zeros((segments, 4));
300
301 if n == 2 {
302 let dx = x[1] - x[0];
304 let dy = y[1] - y[0];
305 coeffs[[0, 0]] = y[0];
306 coeffs[[0, 1]] = dy / dx;
307 return Ok(coeffs);
308 }
309
310 let mut h = Array1::zeros(segments);
312 let mut alpha = Array1::zeros(segments);
313
314 for i in 0..segments {
315 h[i] = x[i + 1] - x[i];
316 }
317
318 for i in 1..segments {
319 alpha[i] = (T::from_f64(3.0).expect("Operation failed") * (y[i + 1] - y[i]) / h[i])
320 - (T::from_f64(3.0).expect("Operation failed") * (y[i] - y[i - 1]) / h[i - 1]);
321 }
322
323 let mut l = Array1::zeros(n);
325 let mut mu = Array1::zeros(n);
326 let mut z = Array1::zeros(n);
327
328 l[0] = T::one();
329
330 for i in 1..(n - 1) {
331 l[i] = T::from_f64(2.0).expect("Operation failed") * (x[i + 1] - x[i - 1])
332 - h[i - 1] * mu[i - 1];
333 mu[i] = h[i] / l[i];
334 z[i] = (alpha[i] - h[i - 1] * z[i - 1]) / l[i];
335 }
336
337 l[n - 1] = T::one();
338
339 let mut c = Array1::zeros(n);
341 for i in (0..(n - 1)).rev() {
342 c[i] = z[i] - mu[i] * c[i + 1];
343 }
344
345 for i in 0..segments {
347 coeffs[[i, 0]] = y[i]; coeffs[[i, 1]] = (y[i + 1] - y[i]) / h[i]
349 - h[i] * (c[i + 1] + T::from_f64(2.0).expect("Operation failed") * c[i])
350 / T::from_f64(3.0).expect("Operation failed"); coeffs[[i, 2]] = c[i]; coeffs[[i, 3]] =
353 (c[i + 1] - c[i]) / (T::from_f64(3.0).expect("Operation failed") * h[i]);
354 }
356
357 Ok(coeffs)
358 }
359
360 fn evaluate_spline(&self, x: T) -> InterpolateResult<T> {
362 let coeffs = self.spline_coeffs.as_ref().ok_or_else(|| {
363 InterpolateError::ComputationError("Spline not initialized".to_string())
364 })?;
365
366 if self.x_sorted.len() < 2 {
367 return Err(InterpolateError::ComputationError(
368 "Need at least 2 points".to_string(),
369 ));
370 }
371
372 let mut segment = 0;
374 for i in 0..(self.x_sorted.len() - 1) {
375 if x >= self.x_sorted[i] && x <= self.x_sorted[i + 1] {
376 segment = i;
377 break;
378 }
379 }
380
381 if x < self.x_sorted[0] {
383 segment = 0;
384 } else if x > self.x_sorted[self.x_sorted.len() - 1] {
385 segment = self.x_sorted.len() - 2;
386 }
387
388 let dx = x - self.x_sorted[segment];
390 let a = coeffs[[segment, 0]];
391 let b = coeffs[[segment, 1]];
392 let c = coeffs[[segment, 2]];
393 let d = coeffs[[segment, 3]];
394
395 Ok(a + b * dx + c * dx * dx + d * dx * dx * dx)
396 }
397
398 fn cleanup_memory(&mut self) {
400 let max_size = (self.config.max_points as f64 * self.config.cleanup_threshold) as usize;
401 while self.points.len() > max_size {
402 self.points.pop_front();
403 }
404 }
405}
406
407impl<T: Float + Debug + FromPrimitive + Zero> StreamingInterpolator<T>
408 for OnlineSplineInterpolator<T>
409{
410 fn add_point(&mut self, point: StreamingPoint<T>) -> InterpolateResult<()> {
411 let x_val = point.x.to_f64().unwrap_or(f64::NAN);
413 let y_val = point.y.to_f64().unwrap_or(f64::NAN);
414 check_finite(x_val, "point x coordinate")?;
415 check_finite(y_val, "point y coordinate")?;
416
417 if self.is_outlier(&point) {
419 self.stats.outliers_rejected += 1;
420 return Ok(());
421 }
422
423 self.points.push_back(point);
425 self.stats.points_processed += 1;
426
427 if let Some(window_size) = self.config.window_size {
429 while self.points.len() > window_size {
430 self.points.pop_front();
431 }
432 }
433
434 if self.stats.points_processed - self.last_update_count >= self.config.update_frequency {
436 self.update_model()?;
437 }
438
439 if self.points.len() > self.config.max_points {
441 self.cleanup_memory();
442 }
443
444 self.stats.memory_usage = self.points.len();
445 Ok(())
446 }
447
448 fn add_points(&mut self, points: &[StreamingPoint<T>]) -> InterpolateResult<()> {
449 for point in points {
450 self.add_point(point.clone())?;
451 }
452 Ok(())
453 }
454
455 fn predict(&mut self, x: T) -> InterpolateResult<T> {
456 let start_time = Instant::now();
457
458 let result = if let Some(_) = self.spline_coeffs {
459 self.evaluate_spline(x)
460 } else if self.points.len() >= 2 {
461 let first = &self.points[0];
463 let last = &self.points[self.points.len() - 1];
464 let slope = (last.y - first.y) / (last.x - first.x);
465 Ok(first.y + slope * (x - first.x))
466 } else if self.points.len() == 1 {
467 Ok(self.points[0].y)
468 } else {
469 Err(InterpolateError::ComputationError(
470 "No data points available".to_string(),
471 ))
472 };
473
474 let latency = start_time.elapsed().as_micros() as u64;
475
476 if self.stats.max_latency_us < latency {
478 self.stats.max_latency_us = latency;
479 }
480
481 result
482 }
483
484 fn predict_batch(&mut self, xvalues: &[T]) -> InterpolateResult<Vec<T>> {
485 let mut results = Vec::with_capacity(xvalues.len());
486 for &x in xvalues {
487 results.push(self.predict(x)?);
488 }
489 Ok(results)
490 }
491
492 fn predict_with_uncertainty(&mut self, x: T) -> InterpolateResult<(T, T)> {
493 let prediction = self.predict(x)?;
494
495 let min_distance = self
497 .points
498 .iter()
499 .map(|p| (p.x - x).abs())
500 .fold(T::infinity(), |a, b| if a < b { a } else { b });
501
502 let uncertainty = if min_distance.is_finite() {
503 min_distance * T::from_f64(0.1).expect("Operation failed") } else {
505 T::from_f64(1.0).expect("Operation failed") };
507
508 Ok((prediction, uncertainty))
509 }
510
511 fn update_model(&mut self) -> InterpolateResult<()> {
512 self.update_spline_coefficients()?;
513 self.stats.model_updates += 1;
514 self.stats.last_update = Some(Instant::now());
515 self.last_update_count = self.stats.points_processed;
516 Ok(())
517 }
518
519 fn reset(&mut self) -> InterpolateResult<()> {
520 self.points.clear();
521 self.spline_coeffs = None;
522 self.x_sorted = Array1::zeros(0);
523 self.y_sorted = Array1::zeros(0);
524 self.stats = StreamingStats::default();
525 self.last_update_count = 0;
526 Ok(())
527 }
528
529 fn get_stats(&self) -> StreamingStats {
530 self.stats.clone()
531 }
532
533 fn get_config(&self) -> &StreamingConfig {
534 &self.config
535 }
536
537 fn set_config(&mut self, config: StreamingConfig) -> InterpolateResult<()> {
538 self.config = config;
539 Ok(())
540 }
541}
542
543#[allow(dead_code)]
545pub fn make_online_spline_interpolator<T: Float + Debug + FromPrimitive + Zero>(
546 config: Option<StreamingConfig>,
547) -> OnlineSplineInterpolator<T> {
548 OnlineSplineInterpolator::new(config.unwrap_or_default())
549}
550
551pub struct StreamingRBFInterpolator<T: Float + Debug + FromPrimitive> {
553 config: StreamingConfig,
554 points: VecDeque<StreamingPoint<T>>,
555 centers: Array1<T>,
556 weights: Array1<T>,
557 kernel_width: T,
558 stats: StreamingStats,
559 last_update_count: usize,
560}
561
562impl<T: Float + Debug + FromPrimitive + Zero> StreamingRBFInterpolator<T> {
563 pub fn new(config: StreamingConfig, kernel_width: T) -> Self {
565 Self {
566 config,
567 points: VecDeque::new(),
568 centers: Array1::zeros(0),
569 weights: Array1::zeros(0),
570 kernel_width,
571 stats: StreamingStats::default(),
572 last_update_count: 0,
573 }
574 }
575
576 fn rbf_kernel(&self, r: T) -> T {
578 let neg_r_squared = -(r * r) / (self.kernel_width * self.kernel_width);
579 neg_r_squared.exp()
580 }
581
582 fn update_rbf_model(&mut self) -> InterpolateResult<()> {
584 if self.points.is_empty() {
585 return Ok(());
586 }
587
588 let n = self.points.len();
589 let mut centers = Array1::zeros(n);
590 let mut targets = Array1::zeros(n);
591
592 for (i, point) in self.points.iter().enumerate() {
593 centers[i] = point.x;
594 targets[i] = point.y;
595 }
596
597 let mut rbf_matrix = Array2::zeros((n, n));
599 for i in 0..n {
600 for j in 0..n {
601 let distance = (centers[i] - centers[j]).abs();
602 rbf_matrix[[i, j]] = self.rbf_kernel(distance);
603 }
604 }
605
606 let regularization = T::from_f64(1e-10).expect("Operation failed");
608 for i in 0..n {
609 rbf_matrix[[i, i]] = rbf_matrix[[i, i]] + regularization;
610 }
611
612 let weights = self.solve_linear_system(&rbf_matrix, &targets)?;
614
615 self.centers = centers;
616 self.weights = weights;
617
618 Ok(())
619 }
620
621 fn solve_linear_system(&self, a: &Array2<T>, b: &Array1<T>) -> InterpolateResult<Array1<T>> {
623 let n = a.nrows();
624 let mut aug = Array2::zeros((n, n + 1));
625
626 for i in 0..n {
628 for j in 0..n {
629 aug[[i, j]] = a[[i, j]];
630 }
631 aug[[i, n]] = b[i];
632 }
633
634 for k in 0..n {
636 let mut max_row = k;
638 for i in (k + 1)..n {
639 if aug[[i, k]].abs() > aug[[max_row, k]].abs() {
640 max_row = i;
641 }
642 }
643
644 if max_row != k {
646 for j in 0..=n {
647 let temp = aug[[k, j]];
648 aug[[k, j]] = aug[[max_row, j]];
649 aug[[max_row, j]] = temp;
650 }
651 }
652
653 if aug[[k, k]].abs() < T::from_f64(1e-12).expect("Operation failed") {
655 return Err(InterpolateError::ComputationError(
656 "Singular matrix".to_string(),
657 ));
658 }
659
660 for i in (k + 1)..n {
662 let factor = aug[[i, k]] / aug[[k, k]];
663 for j in k..=n {
664 aug[[i, j]] = aug[[i, j]] - factor * aug[[k, j]];
665 }
666 }
667 }
668
669 let mut x = Array1::zeros(n);
671 for i in (0..n).rev() {
672 x[i] = aug[[i, n]];
673 for j in (i + 1)..n {
674 x[i] = x[i] - aug[[i, j]] * x[j];
675 }
676 x[i] = x[i] / aug[[i, i]];
677 }
678
679 Ok(x)
680 }
681
682 fn evaluate_rbf(&self, x: T) -> InterpolateResult<T> {
684 if self.centers.is_empty() {
685 return Err(InterpolateError::ComputationError(
686 "RBF model not initialized".to_string(),
687 ));
688 }
689
690 let mut result = T::zero();
691 for i in 0..self.centers.len() {
692 let distance = (x - self.centers[i]).abs();
693 result = result + self.weights[i] * self.rbf_kernel(distance);
694 }
695
696 Ok(result)
697 }
698}
699
700impl<T: Float + Debug + FromPrimitive + Zero> StreamingInterpolator<T>
701 for StreamingRBFInterpolator<T>
702{
703 fn add_point(&mut self, point: StreamingPoint<T>) -> InterpolateResult<()> {
704 let x_val = point.x.to_f64().unwrap_or(f64::NAN);
705 let y_val = point.y.to_f64().unwrap_or(f64::NAN);
706 check_finite(x_val, "point x coordinate")?;
707 check_finite(y_val, "point y coordinate")?;
708
709 self.points.push_back(point);
710 self.stats.points_processed += 1;
711
712 if let Some(window_size) = self.config.window_size {
714 while self.points.len() > window_size {
715 self.points.pop_front();
716 }
717 }
718
719 if self.stats.points_processed - self.last_update_count >= self.config.update_frequency {
721 self.update_model()?;
722 }
723
724 self.stats.memory_usage = self.points.len();
725 Ok(())
726 }
727
728 fn add_points(&mut self, points: &[StreamingPoint<T>]) -> InterpolateResult<()> {
729 for point in points {
730 self.add_point(point.clone())?;
731 }
732 Ok(())
733 }
734
735 fn predict(&mut self, x: T) -> InterpolateResult<T> {
736 self.evaluate_rbf(x)
737 }
738
739 fn predict_batch(&mut self, xvalues: &[T]) -> InterpolateResult<Vec<T>> {
740 let mut results = Vec::with_capacity(xvalues.len());
741 for &x in xvalues {
742 results.push(self.predict(x)?);
743 }
744 Ok(results)
745 }
746
747 fn predict_with_uncertainty(&mut self, x: T) -> InterpolateResult<(T, T)> {
748 let prediction = self.predict(x)?;
749
750 let min_distance = self
752 .centers
753 .iter()
754 .map(|&c| (c - x).abs())
755 .fold(T::infinity(), |a, b| if a < b { a } else { b });
756
757 let uncertainty =
758 self.rbf_kernel(min_distance) * T::from_f64(0.5).expect("Operation failed");
759
760 Ok((prediction, uncertainty))
761 }
762
763 fn update_model(&mut self) -> InterpolateResult<()> {
764 self.update_rbf_model()?;
765 self.stats.model_updates += 1;
766 self.stats.last_update = Some(Instant::now());
767 self.last_update_count = self.stats.points_processed;
768 Ok(())
769 }
770
771 fn reset(&mut self) -> InterpolateResult<()> {
772 self.points.clear();
773 self.centers = Array1::zeros(0);
774 self.weights = Array1::zeros(0);
775 self.stats = StreamingStats::default();
776 self.last_update_count = 0;
777 Ok(())
778 }
779
780 fn get_stats(&self) -> StreamingStats {
781 self.stats.clone()
782 }
783
784 fn get_config(&self) -> &StreamingConfig {
785 &self.config
786 }
787
788 fn set_config(&mut self, config: StreamingConfig) -> InterpolateResult<()> {
789 self.config = config;
790 Ok(())
791 }
792}
793
794#[allow(dead_code)]
796pub fn make_streaming_rbf_interpolator<T: Float + Debug + FromPrimitive + Zero>(
797 config: Option<StreamingConfig>,
798 kernel_width: Option<T>,
799) -> StreamingRBFInterpolator<T> {
800 let width = kernel_width.unwrap_or_else(|| T::from_f64(1.0).expect("Operation failed"));
801 StreamingRBFInterpolator::new(config.unwrap_or_default(), width)
802}
803
804pub struct EnsembleStreamingInterpolator<T: Float + Debug + FromPrimitive> {
806 config: StreamingConfig,
807 methods: Vec<Box<dyn StreamingInterpolator<T>>>,
808 weights: Array1<f64>,
809 stats: StreamingStats,
810}
811
812impl<T: Float + Debug + FromPrimitive + Zero> EnsembleStreamingInterpolator<T> {
813 pub fn new(
815 config: StreamingConfig,
816 methods: Vec<Box<dyn StreamingInterpolator<T>>>,
817 weights: Option<Array1<f64>>,
818 ) -> InterpolateResult<Self> {
819 let n_methods = methods.len();
820 if n_methods == 0 {
821 return Err(InterpolateError::invalid_input(
822 "At least one method required".to_string(),
823 ));
824 }
825
826 let weights =
827 weights.unwrap_or_else(|| Array1::from_elem(n_methods, 1.0 / n_methods as f64));
828
829 if weights.len() != n_methods {
830 return Err(InterpolateError::invalid_input(
831 "Weights length must match number of methods".to_string(),
832 ));
833 }
834
835 Ok(Self {
836 config,
837 methods,
838 weights,
839 stats: StreamingStats::default(),
840 })
841 }
842}
843
844impl<T: Float + Debug + FromPrimitive + Zero> StreamingInterpolator<T>
845 for EnsembleStreamingInterpolator<T>
846{
847 fn add_point(&mut self, point: StreamingPoint<T>) -> InterpolateResult<()> {
848 for method in &mut self.methods {
849 method.add_point(point.clone())?;
850 }
851 self.stats.points_processed += 1;
852 Ok(())
853 }
854
855 fn add_points(&mut self, points: &[StreamingPoint<T>]) -> InterpolateResult<()> {
856 for method in &mut self.methods {
857 method.add_points(points)?;
858 }
859 Ok(())
860 }
861
862 fn predict(&mut self, x: T) -> InterpolateResult<T> {
863 let mut weighted_sum = 0.0;
864 for (i, method) in self.methods.iter_mut().enumerate() {
865 let prediction = method.predict(x)?.to_f64().unwrap_or(0.0);
866 weighted_sum += self.weights[i] * prediction;
867 }
868
869 T::from_f64(weighted_sum).ok_or_else(|| {
870 InterpolateError::ComputationError(
871 "Failed to convert prediction to target type".to_string(),
872 )
873 })
874 }
875
876 fn predict_batch(&mut self, xvalues: &[T]) -> InterpolateResult<Vec<T>> {
877 let mut results = Vec::with_capacity(xvalues.len());
878 for &x in xvalues {
879 results.push(self.predict(x)?);
880 }
881 Ok(results)
882 }
883
884 fn predict_with_uncertainty(&mut self, x: T) -> InterpolateResult<(T, T)> {
885 let mut predictions = Vec::new();
886 let mut weighted_sum = 0.0;
887
888 for (i, method) in self.methods.iter_mut().enumerate() {
889 let prediction = method.predict(x)?.to_f64().unwrap_or(0.0);
890 predictions.push(prediction);
891 weighted_sum += self.weights[i] * prediction;
892 }
893
894 let mut variance = 0.0;
896 for (i, &pred) in predictions.iter().enumerate() {
897 let diff = pred - weighted_sum;
898 variance += self.weights[i] * diff * diff;
899 }
900
901 let uncertainty = variance.sqrt();
902
903 let mean_pred = T::from_f64(weighted_sum).unwrap_or_else(T::zero);
904 let uncertainty_t = T::from_f64(uncertainty).unwrap_or_else(T::zero);
905
906 Ok((mean_pred, uncertainty_t))
907 }
908
909 fn update_model(&mut self) -> InterpolateResult<()> {
910 for method in &mut self.methods {
911 method.update_model()?;
912 }
913 self.stats.model_updates += 1;
914 Ok(())
915 }
916
917 fn reset(&mut self) -> InterpolateResult<()> {
918 for method in &mut self.methods {
919 method.reset()?;
920 }
921 self.stats = StreamingStats::default();
922 Ok(())
923 }
924
925 fn get_stats(&self) -> StreamingStats {
926 self.stats.clone()
927 }
928
929 fn get_config(&self) -> &StreamingConfig {
930 &self.config
931 }
932
933 fn set_config(&mut self, config: StreamingConfig) -> InterpolateResult<()> {
934 self.config = config.clone();
935 for method in &mut self.methods {
936 method.set_config(config.clone())?;
937 }
938 Ok(())
939 }
940}
941
942#[allow(dead_code)]
944pub fn make_ensemble_streaming_interpolator<T: Float + Debug + FromPrimitive + Zero + 'static>(
945 config: Option<StreamingConfig>,
946 weights: Option<Array1<f64>>,
947) -> InterpolateResult<EnsembleStreamingInterpolator<T>> {
948 let config = config.unwrap_or_default();
949
950 let methods: Vec<Box<dyn StreamingInterpolator<T>>> = vec![
951 Box::new(make_online_spline_interpolator(Some(config.clone()))),
952 Box::new(make_streaming_rbf_interpolator(Some(config.clone()), None)),
953 ];
954
955 EnsembleStreamingInterpolator::new(config, methods, weights)
956}
957
958#[cfg(test)]
959mod tests {
960 use super::*;
961 use std::time::Instant;
962
963 #[test]
964 fn test_online_spline_basic() {
965 let mut interpolator = make_online_spline_interpolator::<f64>(None);
966
967 let points = vec![
969 StreamingPoint {
970 x: 0.0,
971 y: 0.0,
972 timestamp: Instant::now(),
973 quality: 1.0,
974 metadata: HashMap::new(),
975 },
976 StreamingPoint {
977 x: 1.0,
978 y: 1.0,
979 timestamp: Instant::now(),
980 quality: 1.0,
981 metadata: HashMap::new(),
982 },
983 StreamingPoint {
984 x: 2.0,
985 y: 4.0,
986 timestamp: Instant::now(),
987 quality: 1.0,
988 metadata: HashMap::new(),
989 },
990 ];
991
992 for point in points {
993 interpolator.add_point(point).expect("Operation failed");
994 }
995
996 interpolator.update_model().expect("Operation failed");
997
998 let prediction = interpolator.predict(1.5).expect("Operation failed");
1000 assert!(prediction > 1.0 && prediction < 4.0);
1001
1002 let stats = interpolator.get_stats();
1003 assert_eq!(stats.points_processed, 3);
1004 }
1005
1006 #[test]
1007 fn test_streaming_rbf_basic() {
1008 let mut interpolator = make_streaming_rbf_interpolator::<f64>(None, Some(0.5));
1009
1010 let points = vec![
1012 StreamingPoint {
1013 x: 0.0,
1014 y: 0.0,
1015 timestamp: Instant::now(),
1016 quality: 1.0,
1017 metadata: HashMap::new(),
1018 },
1019 StreamingPoint {
1020 x: 1.0,
1021 y: 1.0,
1022 timestamp: Instant::now(),
1023 quality: 1.0,
1024 metadata: HashMap::new(),
1025 },
1026 ];
1027
1028 for point in points {
1029 interpolator.add_point(point).expect("Operation failed");
1030 }
1031
1032 interpolator.update_model().expect("Operation failed");
1033
1034 let prediction = interpolator.predict(0.5).expect("Operation failed");
1036 assert!(prediction > 0.0 && prediction < 1.0);
1037 }
1038
1039 #[test]
1040 fn test_ensemble_interpolator() {
1041 let ensemble =
1042 make_ensemble_streaming_interpolator::<f64>(None, None).expect("Operation failed");
1043
1044 assert_eq!(ensemble.methods.len(), 2);
1046
1047 let stats = ensemble.get_stats();
1048 assert_eq!(stats.points_processed, 0);
1049 }
1050}