use crate::error::StatsError;
use scirs2_core::ndarray::{Array1, Array2};
#[derive(Debug, Clone)]
pub struct StreamingGpConfig {
pub max_inducing: usize,
pub length_scale: f64,
pub signal_var: f64,
pub noise_var: f64,
pub novelty_threshold: f64,
}
impl Default for StreamingGpConfig {
fn default() -> Self {
Self {
max_inducing: 50,
length_scale: 1.0,
signal_var: 1.0,
noise_var: 0.01,
novelty_threshold: 0.1,
}
}
}
pub struct StreamingGp {
inducing_x: Array2<f64>,
m_u: Array1<f64>,
s_u: Array2<f64>,
d: usize,
config: StreamingGpConfig,
}
impl StreamingGp {
pub fn new(d: usize, config: StreamingGpConfig) -> Self {
Self {
inducing_x: Array2::zeros((0, d)),
m_u: Array1::zeros(0),
s_u: Array2::zeros((0, 0)),
d,
config,
}
}
pub fn n_inducing(&self) -> usize {
self.inducing_x.nrows()
}
pub fn kernel(&self, x: &Array1<f64>, y: &Array1<f64>) -> f64 {
let diff: f64 = x
.iter()
.zip(y.iter())
.map(|(&xi, &yi)| (xi - yi).powi(2))
.sum();
let two_l2 = 2.0 * self.config.length_scale.powi(2);
self.config.signal_var * (-diff / two_l2).exp()
}
pub fn kernel_vec(&self, x: &Array1<f64>, z: &Array2<f64>) -> Array1<f64> {
let m = z.nrows();
let mut kv = Array1::<f64>::zeros(m);
for i in 0..m {
let zi = z.row(i).to_owned();
kv[i] = self.kernel(x, &zi);
}
kv
}
pub fn kernel_matrix(&self, z: &Array2<f64>) -> Array2<f64> {
let m = z.nrows();
let mut km = Array2::<f64>::zeros((m, m));
for i in 0..m {
let zi = z.row(i).to_owned();
for j in i..m {
let zj = z.row(j).to_owned();
let k = self.kernel(&zi, &zj);
km[[i, j]] = k;
km[[j, i]] = k;
}
}
km
}
pub fn update(&mut self, x_new: &Array1<f64>, y_new: f64) {
self.maybe_add_inducing(x_new);
let m = self.n_inducing();
if m == 0 {
return; }
let k_star = self.kernel_vec(x_new, &self.inducing_x.clone());
let k_xx = self.kernel(x_new, x_new);
let k_zz = self.kernel_matrix(&self.inducing_x.clone());
let alpha = cholesky_solve(&k_zz, &k_star);
let kstar_alpha: f64 = k_star.iter().zip(alpha.iter()).map(|(&a, &b)| a * b).sum();
let pred_var = (k_xx - kstar_alpha + self.config.noise_var).max(1e-10);
let m_pred: f64 = k_star
.iter()
.zip(self.m_u.iter())
.map(|(&a, &b)| a * b)
.sum();
let residual = y_new - m_pred;
let s_u_k: Array1<f64> = mat_vec_mul(&self.s_u, &k_star);
let kappa: Array1<f64> = s_u_k.mapv(|v| v / pred_var);
for i in 0..m {
self.m_u[i] += kappa[i] * residual;
}
for i in 0..m {
for j in 0..m {
self.s_u[[i, j]] -= kappa[i] * s_u_k[j];
}
}
for i in 0..m {
if self.s_u[[i, i]] < 0.0 {
self.s_u[[i, i]] = 0.0;
}
}
}
pub fn predict(&self, x_star: &Array1<f64>) -> (f64, f64) {
let m = self.n_inducing();
if m == 0 {
return (0.0, self.config.signal_var);
}
let k_star = self.kernel_vec(x_star, &self.inducing_x);
let k_xx = self.kernel(x_star, x_star);
let k_zz = self.kernel_matrix(&self.inducing_x);
let alpha_m = cholesky_solve(&k_zz, &self.m_u);
let mean: f64 = k_star
.iter()
.zip(alpha_m.iter())
.map(|(&a, &b)| a * b)
.sum();
let alpha_k = cholesky_solve(&k_zz, &k_star);
let kstar_alpha: f64 = k_star
.iter()
.zip(alpha_k.iter())
.map(|(&a, &b)| a * b)
.sum();
let s_u_ak = mat_vec_mul(&self.s_u, &alpha_k);
let su_term: f64 = alpha_k
.iter()
.zip(s_u_ak.iter())
.map(|(&a, &b)| a * b)
.sum();
let var = (k_xx - kstar_alpha + su_term + self.config.noise_var).max(0.0);
(mean, var)
}
fn maybe_add_inducing(&mut self, x: &Array1<f64>) {
let (_mean, var) = self.predict(x);
let gp_var = (var - self.config.noise_var).max(0.0);
if gp_var <= self.config.novelty_threshold {
return;
}
if self.n_inducing() >= self.config.max_inducing {
return; }
self.append_inducing(x);
}
fn append_inducing(&mut self, x: &Array1<f64>) {
let m = self.n_inducing();
let new_m = m + 1;
let mut new_ix = Array2::<f64>::zeros((new_m, self.d));
for i in 0..m {
for j in 0..self.d {
new_ix[[i, j]] = self.inducing_x[[i, j]];
}
}
for j in 0..self.d {
new_ix[[m, j]] = x[j];
}
self.inducing_x = new_ix;
let mut new_mu = Array1::<f64>::zeros(new_m);
for i in 0..m {
new_mu[i] = self.m_u[i];
}
self.m_u = new_mu;
let prior_var = self.kernel(x, x);
let mut new_su = Array2::<f64>::zeros((new_m, new_m));
for i in 0..m {
for j in 0..m {
new_su[[i, j]] = self.s_u[[i, j]];
}
}
new_su[[m, m]] = prior_var;
self.s_u = new_su;
}
}
fn mat_vec_mul(m: &Array2<f64>, v: &Array1<f64>) -> Array1<f64> {
let n = m.nrows();
let mut result = Array1::<f64>::zeros(n);
for i in 0..n {
let mut s = 0.0_f64;
for j in 0..m.ncols() {
s += m[[i, j]] * v[j];
}
result[i] = s;
}
result
}
fn cholesky_solve(a: &Array2<f64>, b: &Array1<f64>) -> Array1<f64> {
let n = a.nrows();
if n == 0 {
return Array1::zeros(0);
}
if n == 1 {
let denom = a[[0, 0]].max(1e-12);
return Array1::from_vec(vec![b[0] / denom]);
}
let jitter = 1e-8;
let mut l = Array2::<f64>::zeros((n, n));
let mut a_jit = a.clone();
for i in 0..n {
a_jit[[i, i]] += jitter;
}
'outer: for i in 0..n {
for j in 0..=i {
let mut s = a_jit[[i, j]];
for p in 0..j {
s -= l[[i, p]] * l[[j, p]];
}
if i == j {
if s <= 0.0 {
break 'outer;
}
l[[i, j]] = s.sqrt();
} else {
l[[i, j]] = s / l[[j, j]];
}
}
}
let mut y = Array1::<f64>::zeros(n);
for i in 0..n {
let mut s = b[i];
for j in 0..i {
s -= l[[i, j]] * y[j];
}
let diag = l[[i, i]];
y[i] = if diag.abs() > 1e-15 { s / diag } else { 0.0 };
}
let mut x = Array1::<f64>::zeros(n);
for i in (0..n).rev() {
let mut s = y[i];
for j in (i + 1)..n {
s -= l[[j, i]] * x[j];
}
let diag = l[[i, i]];
x[i] = if diag.abs() > 1e-15 { s / diag } else { 0.0 };
}
x
}
#[cfg(test)]
mod tests {
use super::*;
fn cfg_small() -> StreamingGpConfig {
StreamingGpConfig {
max_inducing: 10,
length_scale: 1.0,
signal_var: 1.0,
noise_var: 0.01,
novelty_threshold: 0.1,
}
}
fn vec1(x: f64) -> Array1<f64> {
Array1::from_vec(vec![x])
}
#[test]
fn test_default_config() {
let cfg = StreamingGpConfig::default();
assert_eq!(cfg.max_inducing, 50);
assert!((cfg.length_scale - 1.0).abs() < 1e-10);
assert!((cfg.signal_var - 1.0).abs() < 1e-10);
assert!((cfg.noise_var - 0.01).abs() < 1e-10);
assert!((cfg.novelty_threshold - 0.1).abs() < 1e-10);
}
#[test]
fn test_empty_gp_predict_returns_signal_var() {
let gp = StreamingGp::new(1, cfg_small());
let (mean, var) = gp.predict(&vec1(0.0));
assert!((mean - 0.0).abs() < 1e-10);
assert!(
(var - 1.0).abs() < 1e-10,
"expected signal_var=1.0, got {var}"
);
}
#[test]
fn test_single_observation_mean_moves() {
let mut gp = StreamingGp::new(1, cfg_small());
gp.update(&vec1(0.0), 5.0);
let (mean, _var) = gp.predict(&vec1(0.0));
assert!(
mean.abs() > 0.01 || gp.n_inducing() == 0,
"mean should move toward observation"
);
}
#[test]
fn test_inducing_set_grows() {
let mut gp = StreamingGp::new(1, cfg_small());
for i in 0..5 {
gp.update(&vec1(i as f64 * 3.0), 1.0); }
assert!(gp.n_inducing() > 0, "inducing set should grow");
}
#[test]
fn test_inducing_budget_respected() {
let cfg = StreamingGpConfig {
max_inducing: 3,
novelty_threshold: 0.0, ..Default::default()
};
let mut gp = StreamingGp::new(1, cfg);
for i in 0..10 {
gp.update(&vec1(i as f64 * 10.0), 1.0);
}
assert!(
gp.n_inducing() <= 3,
"inducing set exceeded budget: {}",
gp.n_inducing()
);
}
#[test]
fn test_kernel_symmetry() {
let gp = StreamingGp::new(2, cfg_small());
let x = Array1::from_vec(vec![1.0, 2.0]);
let y = Array1::from_vec(vec![3.0, 1.0]);
let kxy = gp.kernel(&x, &y);
let kyx = gp.kernel(&y, &x);
assert!((kxy - kyx).abs() < 1e-12, "kernel not symmetric");
}
#[test]
fn test_kernel_self_equals_signal_var() {
let gp = StreamingGp::new(1, cfg_small());
let x = vec1(2.0);
assert!((gp.kernel(&x, &x) - 1.0).abs() < 1e-10);
}
#[test]
fn test_prediction_variance_decreases_with_data() {
let cfg = StreamingGpConfig {
max_inducing: 20,
novelty_threshold: 0.0, ..Default::default()
};
let mut gp = StreamingGp::new(1, cfg);
let test_x = vec1(0.5);
let (_m0, v0) = gp.predict(&test_x);
gp.update(&vec1(0.5), 1.0);
let (_m1, v1) = gp.predict(&test_x);
assert!(
v1 <= v0 + 1e-9,
"variance should not increase after adding nearby point: v0={v0}, v1={v1}"
);
}
#[test]
fn test_novelty_threshold_prevents_duplicates() {
let cfg = StreamingGpConfig {
max_inducing: 20,
novelty_threshold: 0.5, ..Default::default()
};
let mut gp = StreamingGp::new(1, cfg);
let x = vec1(0.0);
gp.update(&x, 1.0);
let n_after_first = gp.n_inducing();
gp.update(&x, 1.0);
let n_after_second = gp.n_inducing();
assert_eq!(
n_after_first, n_after_second,
"duplicate point should not be added (high novelty threshold)"
);
}
#[test]
fn test_predict_at_training_point_approaches_observation() {
let cfg = StreamingGpConfig {
max_inducing: 5,
noise_var: 1e-4,
novelty_threshold: 100.0, signal_var: 2.0,
..Default::default()
};
let mut gp = StreamingGp::new(1, cfg);
let x = vec1(0.0);
let y = 3.7_f64;
gp.update(&x, y);
let (mean, _var) = gp.predict(&x);
assert!(mean.is_finite(), "mean should be finite, got {mean}");
}
}