use crate::error::{StatsError, StatsResult};
use scirs2_core::ndarray::{s, Array1, Array2, Axis};
#[derive(Debug, Clone)]
pub struct StreamingGpConfig {
pub n_inducing: usize,
pub kernel_variance: f64,
pub kernel_lengthscale: f64,
pub noise_variance: f64,
pub forgetting_factor: f64,
pub update_inducing: bool,
}
impl Default for StreamingGpConfig {
fn default() -> Self {
Self {
n_inducing: 50,
kernel_variance: 1.0,
kernel_lengthscale: 1.0,
noise_variance: 0.1,
forgetting_factor: 1.0,
update_inducing: false,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamingGpState {
pub inducing_points: Array2<f64>,
pub m: Array1<f64>,
pub s: Array2<f64>,
pub n_observed: usize,
}
pub struct StreamingGp {
config: StreamingGpConfig,
state: Option<StreamingGpState>,
}
impl StreamingGp {
pub fn new(config: StreamingGpConfig) -> Self {
Self {
config,
state: None,
}
}
pub fn initialize(&mut self, x: &Array2<f64>, y: &[f64]) -> StatsResult<()> {
let n = x.nrows();
let d = x.ncols();
if n == 0 {
return Err(StatsError::InsufficientData(
"initialize requires at least one data point".to_string(),
));
}
if y.len() != n {
return Err(StatsError::DimensionMismatch(format!(
"x has {} rows but y has {} elements",
n,
y.len()
)));
}
let m = self.config.n_inducing.min(n);
let inducing = kmeans_plus_plus_init(x, m);
let k_uu = self.se_kernel_matrix(&inducing, &inducing);
let l_uu = cholesky_lower(&k_uu)?;
let k_uu_inv = cholesky_inverse(&l_uu)?;
let k_un = self.se_kernel_cross(&inducing, x);
let sigma_sq = self.config.noise_variance.max(f64::EPSILON);
let forgetting = self.config.forgetting_factor.clamp(1e-6, 1.0);
let a_mat = mat_mul(&k_uu_inv, &k_un); let precision_update = mat_mul_t(&a_mat, &a_mat); let precision_update = mat_scalar_mul(&precision_update, forgetting / sigma_sq);
let posterior_precision = mat_add(&k_uu_inv, &precision_update);
let s = mat_inverse_cholesky(&posterior_precision)?;
let y_arr = Array1::from(y.to_vec());
let kuu_inv_kun_y = mat_vec_mul(&k_uu_inv, &mat_vec_mul(&k_un, &y_arr));
let eta_scaled = kuu_inv_kun_y * (forgetting / sigma_sq);
let m = mat_vec_mul(&s, &eta_scaled);
self.state = Some(StreamingGpState {
inducing_points: inducing,
m,
s,
n_observed: n,
});
Ok(())
}
pub fn update(&mut self, x: &Array2<f64>, y: &[f64]) -> StatsResult<()> {
if self.state.is_none() {
return Err(StatsError::ComputationError(
"StreamingGp must be initialized before calling update".to_string(),
));
}
let n = x.nrows();
if n == 0 {
return Ok(()); }
if y.len() != n {
return Err(StatsError::DimensionMismatch(format!(
"x has {} rows but y has {} elements",
n,
y.len()
)));
}
let sigma_sq = self.config.noise_variance.max(f64::EPSILON);
let forgetting = self.config.forgetting_factor.clamp(1e-6, 1.0);
let z = {
let state = self.state.as_ref().expect("checked above");
state.inducing_points.clone()
};
let (m_old, s_old) = {
let state = self.state.as_ref().expect("checked above");
(state.m.clone(), state.s.clone())
};
let k_uu = self.se_kernel_matrix(&z, &z);
let l_uu = cholesky_lower(&k_uu)?;
let k_uu_inv = cholesky_inverse(&l_uu)?;
let k_un = self.se_kernel_cross(&z, x);
let s_inv = mat_inverse_cholesky(&s_old)?;
let eta_old = mat_vec_mul(&s_inv, &m_old);
let a_mat = mat_mul(&k_uu_inv, &k_un); let delta_precision = mat_mul_t(&a_mat, &a_mat); let delta_precision = mat_scalar_mul(&delta_precision, 1.0 / sigma_sq);
let y_arr = Array1::from(y.to_vec());
let delta_eta_raw = mat_vec_mul(&a_mat, &y_arr);
let delta_eta = &delta_eta_raw * (1.0 / sigma_sq);
let eta_old_scaled = &eta_old * forgetting;
let eta_new_arr = &eta_old_scaled + &delta_eta;
let omega_new = mat_add(&mat_scalar_mul(&s_inv, forgetting), &delta_precision);
let s_new = mat_inverse_cholesky(&omega_new)?;
let m_new = mat_vec_mul(&s_new, &eta_new_arr);
{
let state = self.state.as_mut().expect("checked above");
state.m = m_new;
state.s = s_new;
state.n_observed += n;
if self.config.update_inducing {
let new_z = kmeans_refine(&z, x, state.n_observed);
state.inducing_points = new_z;
}
}
Ok(())
}
pub fn predict(&self, x_new: &Array2<f64>) -> StatsResult<(Array1<f64>, Array1<f64>)> {
let state = self.state.as_ref().ok_or_else(|| {
StatsError::ComputationError(
"StreamingGp must be initialized before calling predict".to_string(),
)
})?;
let n_test = x_new.nrows();
if n_test == 0 {
return Ok((Array1::zeros(0), Array1::zeros(0)));
}
let z = &state.inducing_points;
let k_uu = self.se_kernel_matrix(z, z);
let l_uu = cholesky_lower(&k_uu)?;
let k_uu_inv = cholesky_inverse(&l_uu)?;
let k_xu = self.se_kernel_cross(x_new, z);
let kuu_inv_kux = mat_mul(&k_uu_inv, &k_xu.t().to_owned());
let kuu_inv_m = mat_vec_mul(&k_uu_inv, &state.m);
let mean_arr = mat_vec_mul(&k_xu, &kuu_inv_m);
let kuu_inv_s_kuu_inv = mat_mul(&mat_mul(&k_uu_inv, &state.s), &k_uu_inv);
let sigma_f_sq = self.config.kernel_variance;
let noise_sq = self.config.noise_variance;
let mut variance = Array1::zeros(n_test);
for i in 0..n_test {
let k_xx_i = sigma_f_sq;
let k_xu_i = k_xu.row(i);
let kuu_inv_kux_i = kuu_inv_kux.column(i);
let q_xx_i: f64 = k_xu_i
.iter()
.zip(kuu_inv_kux_i.iter())
.map(|(&a, &b)| a * b)
.sum();
let kuu_inv_s_kuu_inv_col =
mat_vec_mul(&kuu_inv_s_kuu_inv, &kuu_inv_kux.column(i).to_owned());
let correction: f64 = kuu_inv_kux
.column(i)
.iter()
.zip(kuu_inv_s_kuu_inv_col.iter())
.map(|(&a, &b)| a * b)
.sum();
let v = (k_xx_i - q_xx_i + correction + noise_sq).max(f64::EPSILON);
variance[i] = v;
}
Ok((mean_arr, variance))
}
pub fn state(&self) -> Option<&StreamingGpState> {
self.state.as_ref()
}
pub fn config(&self) -> &StreamingGpConfig {
&self.config
}
fn se_kernel_matrix(&self, x1: &Array2<f64>, x2: &Array2<f64>) -> Array2<f64> {
let n1 = x1.nrows();
let n2 = x2.nrows();
let mut k = Array2::zeros((n1, n2));
for i in 0..n1 {
for j in 0..n2 {
k[[i, j]] = self.se_kernel_scalar(&x1.row(i).to_owned(), &x2.row(j).to_owned());
}
}
k
}
fn se_kernel_cross(&self, x1: &Array2<f64>, x2: &Array2<f64>) -> Array2<f64> {
self.se_kernel_matrix(x1, x2)
}
fn se_kernel_scalar(&self, x1: &Array1<f64>, x2: &Array1<f64>) -> f64 {
let l = self.config.kernel_lengthscale.max(f64::EPSILON);
let sigma_f_sq = self.config.kernel_variance;
let diff: f64 = x1
.iter()
.zip(x2.iter())
.map(|(&a, &b)| (a - b).powi(2))
.sum();
sigma_f_sq * (-diff / (2.0 * l * l)).exp()
}
}
fn cholesky_lower(a: &Array2<f64>) -> StatsResult<Array2<f64>> {
let n = a.nrows();
if a.ncols() != n {
return Err(StatsError::DimensionMismatch(
"Cholesky: matrix must be square".to_string(),
));
}
let jitter = 1e-8 * a.diag().iter().cloned().fold(0.0f64, f64::max).max(1.0);
let mut l = Array2::<f64>::zeros((n, n));
for j in 0..n {
let mut sum_sq = 0.0;
for k in 0..j {
sum_sq += l[[j, k]] * l[[j, k]];
}
let diag_val = a[[j, j]] + jitter - sum_sq;
if diag_val <= 0.0 {
return Err(StatsError::ComputationError(format!(
"Cholesky: non-positive-definite matrix (diag[{}]={:.3e})",
j, diag_val
)));
}
l[[j, j]] = diag_val.sqrt();
for i in (j + 1)..n {
let mut sum = 0.0;
for k in 0..j {
sum += l[[i, k]] * l[[j, k]];
}
l[[i, j]] = (a[[i, j]] - sum) / l[[j, j]];
}
}
Ok(l)
}
fn cholesky_inverse(l: &Array2<f64>) -> StatsResult<Array2<f64>> {
let n = l.nrows();
let mut inv = Array2::<f64>::zeros((n, n));
for col in 0..n {
let mut y = vec![0.0_f64; n];
y[col] = 1.0;
for i in 0..n {
let mut s = y[i];
for k in 0..i {
s -= l[[i, k]] * y[k];
}
let diag = l[[i, i]];
if diag.abs() < f64::EPSILON {
return Err(StatsError::ComputationError(
"Cholesky inverse: zero diagonal".to_string(),
));
}
y[i] = s / diag;
}
let mut x = vec![0.0_f64; n];
for i in (0..n).rev() {
let mut s = y[i];
for k in (i + 1)..n {
s -= l[[k, i]] * x[k];
}
let diag = l[[i, i]];
if diag.abs() < f64::EPSILON {
return Err(StatsError::ComputationError(
"Cholesky inverse: zero diagonal".to_string(),
));
}
x[i] = s / diag;
}
for i in 0..n {
inv[[i, col]] = x[i];
}
}
Ok(inv)
}
fn mat_inverse_cholesky(a: &Array2<f64>) -> StatsResult<Array2<f64>> {
let l = cholesky_lower(a)?;
cholesky_inverse(&l)
}
fn mat_mul(a: &Array2<f64>, b: &Array2<f64>) -> Array2<f64> {
a.dot(b)
}
fn mat_mul_t(a: &Array2<f64>, b: &Array2<f64>) -> Array2<f64> {
a.dot(&b.t().to_owned())
}
fn mat_vec_mul(a: &Array2<f64>, x: &Array1<f64>) -> Array1<f64> {
a.dot(x)
}
fn mat_add(a: &Array2<f64>, b: &Array2<f64>) -> Array2<f64> {
a + b
}
fn mat_scalar_mul(a: &Array2<f64>, s: f64) -> Array2<f64> {
a * s
}
fn vec_scalar_mul(v: &[f64], s: f64) -> Vec<f64> {
v.iter().map(|&x| x * s).collect()
}
fn vec_add(a: &[f64], b: &[f64]) -> Vec<f64> {
a.iter().zip(b.iter()).map(|(&x, &y)| x + y).collect()
}
fn kmeans_plus_plus_init(x: &Array2<f64>, m: usize) -> Array2<f64> {
let n = x.nrows();
let d = x.ncols();
if m >= n {
return x.clone();
}
let mut selected: Vec<usize> = Vec::with_capacity(m);
selected.push(0);
let mut rng_state: u64 = 12345;
for _ in 1..m {
let mut min_dists = vec![f64::INFINITY; n];
for &idx in &selected {
let cx = x.row(idx);
for j in 0..n {
let dist_sq: f64 = x
.row(j)
.iter()
.zip(cx.iter())
.map(|(&a, &b)| (a - b).powi(2))
.sum();
if dist_sq < min_dists[j] {
min_dists[j] = dist_sq;
}
}
}
let total: f64 = min_dists.iter().sum();
rng_state = rng_state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let u = (rng_state >> 11) as f64 / (1u64 << 53) as f64;
let threshold = u * total;
let mut cumulative = 0.0;
let mut chosen = n - 1;
for (j, &d) in min_dists.iter().enumerate() {
cumulative += d;
if cumulative >= threshold {
chosen = j;
break;
}
}
if !selected.contains(&chosen) {
selected.push(chosen);
} else {
let farthest = min_dists
.iter()
.enumerate()
.filter(|(j, _)| !selected.contains(j))
.max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.map(|(j, _)| j)
.unwrap_or(0);
selected.push(farthest);
}
}
let mut z = Array2::<f64>::zeros((m, d));
for (i, &idx) in selected.iter().enumerate() {
for j in 0..d {
z[[i, j]] = x[[idx, j]];
}
}
z
}
fn kmeans_refine(z: &Array2<f64>, x_new: &Array2<f64>, _n_total: usize) -> Array2<f64> {
z.clone()
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::Array2;
fn make_gp(n_inducing: usize) -> StreamingGp {
StreamingGp::new(StreamingGpConfig {
n_inducing,
kernel_variance: 1.0,
kernel_lengthscale: 1.0,
noise_variance: 0.1,
..Default::default()
})
}
fn linspace_col(start: f64, end: f64, n: usize) -> Array2<f64> {
let step = if n > 1 {
(end - start) / (n - 1) as f64
} else {
0.0
};
Array2::from_shape_vec((n, 1), (0..n).map(|i| start + i as f64 * step).collect())
.expect("shape ok")
}
#[test]
fn test_streaming_gp_initialize_sets_state() {
let mut gp = make_gp(5);
let x = linspace_col(0.0, 4.0, 10);
let y: Vec<f64> = x.column(0).iter().map(|&v| v.sin()).collect();
gp.initialize(&x, &y).expect("init ok");
let state = gp.state().expect("state should be set");
assert_eq!(state.inducing_points.nrows(), 5);
assert_eq!(state.inducing_points.ncols(), 1);
assert_eq!(state.m.len(), 5);
assert_eq!(state.s.nrows(), 5);
assert_eq!(state.s.ncols(), 5);
}
#[test]
fn test_streaming_gp_initialize_fewer_points_than_inducing() {
let mut gp = make_gp(20);
let x = linspace_col(0.0, 2.0, 5);
let y = vec![0.0, 0.5, 1.0, 0.5, 0.0];
gp.initialize(&x, &y).expect("init ok");
let state = gp.state().expect("state set");
assert_eq!(state.inducing_points.nrows(), 5); }
#[test]
fn test_streaming_gp_empty_init_error() {
let mut gp = make_gp(5);
let x = Array2::<f64>::zeros((0, 1));
let y: Vec<f64> = vec![];
assert!(gp.initialize(&x, &y).is_err());
}
#[test]
fn test_streaming_gp_dimension_mismatch_error() {
let mut gp = make_gp(5);
let x = linspace_col(0.0, 4.0, 5);
let y = vec![0.0, 1.0]; assert!(gp.initialize(&x, &y).is_err());
}
#[test]
fn test_streaming_gp_predict_shape() {
let mut gp = make_gp(5);
let x_train = linspace_col(0.0, 4.0, 10);
let y_train: Vec<f64> = x_train.column(0).iter().map(|&v| v.sin()).collect();
gp.initialize(&x_train, &y_train).expect("init ok");
let x_test = linspace_col(0.0, 4.0, 7);
let (mean, var) = gp.predict(&x_test).expect("predict ok");
assert_eq!(mean.len(), 7);
assert_eq!(var.len(), 7);
}
#[test]
fn test_streaming_gp_variance_positive() {
let mut gp = make_gp(10);
let x_train = linspace_col(0.0, 5.0, 20);
let y_train: Vec<f64> = x_train.column(0).iter().map(|&v| v.sin()).collect();
gp.initialize(&x_train, &y_train).expect("init ok");
let x_test = linspace_col(-1.0, 6.0, 30);
let (_, var) = gp.predict(&x_test).expect("predict ok");
for (i, &v) in var.iter().enumerate() {
assert!(v > 0.0, "variance[{}] = {} should be > 0", i, v);
}
}
#[test]
fn test_streaming_gp_update_does_not_error() {
let mut gp = make_gp(8);
let x_init = linspace_col(0.0, 3.0, 10);
let y_init: Vec<f64> = vec![0.0; 10];
gp.initialize(&x_init, &y_init).expect("init ok");
let x_new = linspace_col(3.0, 6.0, 5);
let y_new = vec![1.0, 1.5, 2.0, 1.5, 1.0];
gp.update(&x_new, &y_new).expect("update ok");
let state = gp.state().expect("state set");
assert_eq!(state.n_observed, 15);
}
#[test]
fn test_streaming_gp_1d_regression_sin() {
let mut gp = StreamingGp::new(StreamingGpConfig {
n_inducing: 15,
kernel_variance: 1.0,
kernel_lengthscale: 1.0,
noise_variance: 0.01,
..Default::default()
});
let x_train = linspace_col(0.0, 6.28, 40);
let y_train: Vec<f64> = x_train.column(0).iter().map(|&v| v.sin()).collect();
gp.initialize(&x_train, &y_train).expect("init ok");
let x_test =
Array2::from_shape_vec((1, 1), vec![std::f64::consts::FRAC_PI_2]).expect("shape ok");
let (mean, _var) = gp.predict(&x_test).expect("predict ok");
assert!(
(mean[0] - 1.0).abs() < 0.3,
"sin(π/2) prediction should be near 1, got {}",
mean[0]
);
}
#[test]
fn test_streaming_gp_predict_before_init_error() {
let gp = make_gp(5);
let x_test = linspace_col(0.0, 1.0, 3);
assert!(gp.predict(&x_test).is_err());
}
#[test]
fn test_streaming_gp_update_before_init_error() {
let mut gp = make_gp(5);
let x_new = linspace_col(0.0, 1.0, 3);
let y_new = vec![0.0, 0.5, 1.0];
assert!(gp.update(&x_new, &y_new).is_err());
}
#[test]
fn test_streaming_gp_update_improves_prediction() {
let mut gp = StreamingGp::new(StreamingGpConfig {
n_inducing: 8,
kernel_variance: 2.0,
kernel_lengthscale: 1.5,
noise_variance: 0.05,
..Default::default()
});
let x_init = linspace_col(0.0, 4.0, 10);
let y_init = vec![0.0; 10];
gp.initialize(&x_init, &y_init).expect("init ok");
let x_query = Array2::from_shape_vec((1, 1), vec![5.0]).expect("shape ok");
let (mean_before, _) = gp.predict(&x_query).expect("predict ok");
for _ in 0..5 {
let x_new = Array2::from_shape_vec((5, 1), vec![5.0; 5]).expect("shape ok");
let y_new = vec![1.0; 5];
gp.update(&x_new, &y_new).expect("update ok");
}
let (mean_after, _) = gp.predict(&x_query).expect("predict ok");
let before_dist = (mean_before[0] - 1.0).abs();
let after_dist = (mean_after[0] - 1.0).abs();
assert!(
after_dist <= before_dist + 0.3,
"Prediction should improve: before={}, after={} (target=1.0)",
mean_before[0],
mean_after[0]
);
}
#[test]
fn test_streaming_gp_multidim_input() {
let mut gp = StreamingGp::new(StreamingGpConfig {
n_inducing: 5,
..Default::default()
});
let x_data: Vec<f64> = (0..10)
.flat_map(|i| vec![i as f64 * 0.5, (i as f64 * 0.3).sin()])
.collect();
let x_train = Array2::from_shape_vec((10, 2), x_data).expect("shape ok");
let y_train = vec![1.0; 10];
gp.initialize(&x_train, &y_train).expect("init ok");
let x_test: Vec<f64> = vec![1.0, 0.5, 2.0, 0.3];
let x_test_arr = Array2::from_shape_vec((2, 2), x_test).expect("shape ok");
let (mean, var) = gp.predict(&x_test_arr).expect("predict ok");
assert_eq!(mean.len(), 2);
assert!(var.iter().all(|&v| v > 0.0), "variances must be positive");
}
#[test]
fn test_streaming_gp_empty_update_is_noop() {
let mut gp = make_gp(5);
let x_init = linspace_col(0.0, 4.0, 10);
let y_init = vec![0.0; 10];
gp.initialize(&x_init, &y_init).expect("init ok");
let state_before = gp.state().expect("state set").n_observed;
let x_empty = Array2::<f64>::zeros((0, 1));
gp.update(&x_empty, &[]).expect("empty update ok");
let state_after = gp.state().expect("state set").n_observed;
assert_eq!(state_before, state_after);
}
#[test]
fn test_streaming_gp_predict_empty_test_set() {
let mut gp = make_gp(5);
let x_init = linspace_col(0.0, 4.0, 10);
let y_init = vec![0.0; 10];
gp.initialize(&x_init, &y_init).expect("init ok");
let x_empty = Array2::<f64>::zeros((0, 1));
let (mean, var) = gp.predict(&x_empty).expect("predict ok");
assert_eq!(mean.len(), 0);
assert_eq!(var.len(), 0);
}
#[test]
fn test_cholesky_known_matrix() {
let a = Array2::from_shape_vec((2, 2), vec![4.0, 2.0, 2.0, 3.0]).expect("shape ok");
let l = cholesky_lower(&a).expect("chol ok");
assert!((l[[0, 0]] - 2.0).abs() < 1e-5, "L[0,0]={}", l[[0, 0]]);
assert!((l[[1, 0]] - 1.0).abs() < 1e-5, "L[1,0]={}", l[[1, 0]]);
assert!(
(l[[1, 1]] - 2.0f64.sqrt()).abs() < 1e-5,
"L[1,1]={}",
l[[1, 1]]
);
let reconstructed = l.dot(&l.t());
assert!((reconstructed[[0, 0]] - 4.0).abs() < 1e-5);
assert!((reconstructed[[1, 1]] - 3.0).abs() < 1e-5);
}
}