use std::collections::VecDeque;
use crate::error::{Result, TimeSeriesError};
#[derive(Debug, Clone)]
pub struct OnlineMean {
count: u64,
mean: f64,
m2: f64,
}
impl OnlineMean {
pub fn new() -> Self {
Self {
count: 0,
mean: 0.0,
m2: 0.0,
}
}
pub fn update(&mut self, x: f64) {
self.count += 1;
let delta = x - self.mean;
self.mean += delta / self.count as f64;
let delta2 = x - self.mean;
self.m2 += delta * delta2;
}
pub fn mean(&self) -> f64 {
self.mean
}
pub fn variance(&self) -> f64 {
if self.count < 2 {
0.0
} else {
self.m2 / (self.count - 1) as f64
}
}
pub fn std(&self) -> f64 {
self.variance().sqrt()
}
pub fn population_variance(&self) -> f64 {
if self.count == 0 {
0.0
} else {
self.m2 / self.count as f64
}
}
pub fn count(&self) -> u64 {
self.count
}
pub fn merge(&self, other: &OnlineMean) -> OnlineMean {
let combined_count = self.count + other.count;
if combined_count == 0 {
return OnlineMean::new();
}
let delta = other.mean - self.mean;
let combined_mean =
(self.mean * self.count as f64 + other.mean * other.count as f64) / combined_count as f64;
let combined_m2 = self.m2
+ other.m2
+ delta * delta * (self.count as f64 * other.count as f64) / combined_count as f64;
OnlineMean {
count: combined_count,
mean: combined_mean,
m2: combined_m2,
}
}
}
impl Default for OnlineMean {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct OnlineLinearRegression {
dim: usize,
coeffs: Vec<f64>,
p_inv: Vec<f64>,
forgetting_factor: f64,
n_updates: u64,
}
impl OnlineLinearRegression {
pub fn new(dim: usize, forgetting_factor: f64, regularization: f64) -> Result<Self> {
if dim == 0 {
return Err(TimeSeriesError::InvalidParameter {
name: "dim".to_string(),
message: "Dimension must be at least 1".to_string(),
});
}
if !(f64::EPSILON..=1.0).contains(&forgetting_factor) {
return Err(TimeSeriesError::InvalidParameter {
name: "forgetting_factor".to_string(),
message: "Forgetting factor must be in (0, 1]".to_string(),
});
}
if regularization < 0.0 {
return Err(TimeSeriesError::InvalidParameter {
name: "regularization".to_string(),
message: "Regularization must be non-negative".to_string(),
});
}
let init_scale = if regularization > 0.0 {
1.0 / regularization
} else {
1e6
};
let mut p_inv = vec![0.0; dim * dim];
for i in 0..dim {
p_inv[i * dim + i] = init_scale;
}
Ok(Self {
dim,
coeffs: vec![0.0; dim],
p_inv,
forgetting_factor,
n_updates: 0,
})
}
pub fn update(&mut self, x: &[f64], y: f64) -> Result<()> {
if x.len() != self.dim {
return Err(TimeSeriesError::DimensionMismatch {
expected: self.dim,
actual: x.len(),
});
}
let mut g = vec![0.0; self.dim];
for i in 0..self.dim {
for j in 0..self.dim {
g[i] += self.p_inv[i * self.dim + j] * x[j];
}
}
let mut denom = self.forgetting_factor;
for j in 0..self.dim {
denom += x[j] * g[j];
}
let k: Vec<f64> = g.iter().map(|&gi| gi / denom).collect();
let y_hat = self.predict(x)?;
let eps = y - y_hat;
for i in 0..self.dim {
self.coeffs[i] += k[i] * eps;
}
for i in 0..self.dim {
for j in 0..self.dim {
self.p_inv[i * self.dim + j] =
(self.p_inv[i * self.dim + j] - k[i] * g[j]) / self.forgetting_factor;
}
}
self.n_updates += 1;
Ok(())
}
pub fn predict(&self, x: &[f64]) -> Result<f64> {
if x.len() != self.dim {
return Err(TimeSeriesError::DimensionMismatch {
expected: self.dim,
actual: x.len(),
});
}
let y_hat = self.coeffs.iter().zip(x.iter()).map(|(b, xi)| b * xi).sum();
Ok(y_hat)
}
pub fn coefficients(&self) -> &[f64] {
&self.coeffs
}
pub fn n_updates(&self) -> u64 {
self.n_updates
}
pub fn p_inv(&self) -> &[f64] {
&self.p_inv
}
}
#[derive(Debug, Clone)]
pub struct OnlineARIMA {
p: usize,
d: usize,
q: usize,
rls: OnlineLinearRegression,
raw_history: VecDeque<f64>,
diff_history: VecDeque<f64>,
innovations: VecDeque<f64>,
ready: bool,
}
impl OnlineARIMA {
pub fn new(
p: usize,
d: usize,
q: usize,
forgetting_factor: f64,
regularization: f64,
) -> Result<Self> {
let dim = p + q;
let rls = if dim > 0 {
OnlineLinearRegression::new(dim, forgetting_factor, regularization)?
} else {
OnlineLinearRegression::new(1, forgetting_factor, regularization)?
};
Ok(Self {
p,
d,
q,
rls,
raw_history: VecDeque::new(),
diff_history: VecDeque::new(),
innovations: VecDeque::new(),
ready: false,
})
}
fn difference_value(&self, new_raw: f64) -> f64 {
if self.d == 0 {
return new_raw;
}
let mut val = new_raw;
let hist: Vec<f64> = self.raw_history.iter().copied().collect();
let len = hist.len();
for order in 0..self.d {
if order < len {
val -= hist[len - 1 - order];
}
}
val
}
fn undifference(&self, diff_forecast: f64) -> f64 {
if self.d == 0 {
return diff_forecast;
}
let hist: Vec<f64> = self.raw_history.iter().copied().collect();
let len = hist.len();
let mut val = diff_forecast;
for order in 1..=self.d {
if order <= len {
let idx = len - order;
let sign = if (order % 2) == 1 { 1.0 } else { -1.0 };
val += sign * hist[idx];
}
}
val
}
fn build_features(&self) -> Option<Vec<f64>> {
let dim = self.p + self.q;
if dim == 0 {
return None;
}
let mut feats = vec![0.0; dim];
let diff_len = self.diff_history.len();
for i in 0..self.p {
if i < diff_len {
feats[i] = self.diff_history[diff_len - 1 - i];
}
}
let innov_len = self.innovations.len();
for i in 0..self.q {
if i < innov_len {
feats[self.p + i] = self.innovations[innov_len - 1 - i];
}
}
Some(feats)
}
pub fn update(&mut self, raw_value: f64) -> Result<()> {
let diff_val = self.difference_value(raw_value);
if let Some(feats) = self.build_features() {
if self.ready {
self.rls.update(&feats, diff_val)?;
let predicted = self.rls.predict(&feats)?;
let innovation = diff_val - predicted;
self.push_innovation(innovation);
} else {
let predicted_zero = 0.0;
let innovation = diff_val - predicted_zero;
self.push_innovation(innovation);
}
} else {
self.push_innovation(diff_val);
}
self.raw_history.push_back(raw_value);
if self.raw_history.len() > self.d.max(1) + self.p + 10 {
self.raw_history.pop_front();
}
self.diff_history.push_back(diff_val);
if self.diff_history.len() > self.p + 10 {
self.diff_history.pop_front();
}
let needed = self.p.max(self.q) + self.d + 2;
if !self.ready && self.raw_history.len() >= needed {
self.ready = true;
}
Ok(())
}
fn push_innovation(&mut self, innov: f64) {
self.innovations.push_back(innov);
if self.innovations.len() > self.q + 10 {
self.innovations.pop_front();
}
}
pub fn predict_diff(&self) -> Result<f64> {
if !self.ready {
return Ok(0.0);
}
let dim = self.p + self.q;
if dim == 0 {
return Ok(0.0);
}
let feats = self.build_features().unwrap_or_else(|| vec![0.0; dim]);
self.rls.predict(&feats)
}
pub fn predict_level(&self) -> Result<f64> {
let diff_pred = self.predict_diff()?;
Ok(self.undifference(diff_pred))
}
pub fn forecast(&self, steps: usize) -> Result<Vec<f64>> {
if steps == 0 {
return Ok(Vec::new());
}
let mut forecasts = Vec::with_capacity(steps);
let mut raw_hist: VecDeque<f64> = self.raw_history.clone();
let mut diff_hist: VecDeque<f64> = self.diff_history.clone();
let mut innov_hist: VecDeque<f64> = self.innovations.clone();
for _ in 0..steps {
let dim = self.p + self.q;
let mut feats = vec![0.0; dim.max(1)];
if dim > 0 {
let dl = diff_hist.len();
for i in 0..self.p {
if i < dl {
feats[i] = diff_hist[dl - 1 - i];
}
}
let il = innov_hist.len();
for i in 0..self.q {
if i < il {
feats[self.p + i] = innov_hist[il - 1 - i];
}
}
}
let diff_pred = if dim > 0 {
self.rls.predict(&feats[..dim])?
} else {
0.0
};
let level_pred = if self.d == 0 {
diff_pred
} else {
let rlen = raw_hist.len();
let mut val = diff_pred;
for order in 1..=self.d {
if order <= rlen {
let idx = rlen - order;
let sign = if (order % 2) == 1 { 1.0 } else { -1.0 };
val += sign * raw_hist[idx];
}
}
val
};
forecasts.push(level_pred);
raw_hist.push_back(level_pred);
if raw_hist.len() > self.d.max(1) + self.p + 10 {
raw_hist.pop_front();
}
let diff_new = if self.d == 0 {
level_pred
} else {
let rlen = raw_hist.len();
let mut v = level_pred;
let hist_vec: Vec<f64> = raw_hist.iter().copied().collect();
for order in 0..self.d {
if order + 1 < rlen {
v -= hist_vec[rlen - 2 - order];
}
}
v
};
diff_hist.push_back(diff_new);
if diff_hist.len() > self.p + 10 {
diff_hist.pop_front();
}
innov_hist.push_back(0.0);
if innov_hist.len() > self.q + 10 {
innov_hist.pop_front();
}
}
Ok(forecasts)
}
pub fn is_ready(&self) -> bool {
self.ready
}
pub fn n_observations(&self) -> usize {
self.raw_history.len()
}
}
#[derive(Debug, Clone)]
pub struct ADWINDetector {
delta: f64,
window: VecDeque<f64>,
total: f64,
last_detected: bool,
}
impl ADWINDetector {
pub fn new(delta: f64) -> Self {
Self {
delta: delta.max(f64::EPSILON),
window: VecDeque::new(),
total: 0.0,
last_detected: false,
}
}
pub fn update(&mut self, x: f64) -> bool {
self.window.push_back(x);
self.total += x;
let detected = self.test_and_compress();
self.last_detected = detected;
detected
}
fn test_and_compress(&mut self) -> bool {
let n = self.window.len();
if n < 2 {
return false;
}
let ln_factor = (4.0 * (n as f64).powi(2) / self.delta).ln();
let mut prefix_sum = 0.0;
let mut detected = false;
for i in 0..(n - 1) {
prefix_sum += self.window[i];
let n0 = (i + 1) as f64;
let n1 = (n - i - 1) as f64;
let mean0 = prefix_sum / n0;
let mean1 = (self.total - prefix_sum) / n1;
let delta_mu = (mean0 - mean1).abs();
let m_star = 1.0 / (1.0 / n0 + 1.0 / n1);
let eps_cut = (ln_factor / (2.0 * m_star)).sqrt();
if delta_mu > eps_cut {
for _ in 0..=(i) {
if let Some(v) = self.window.pop_front() {
self.total -= v;
}
}
detected = true;
break;
}
}
detected
}
pub fn is_change_detected(&self) -> bool {
self.last_detected
}
pub fn window_len(&self) -> usize {
self.window.len()
}
pub fn window_mean(&self) -> Option<f64> {
if self.window.is_empty() {
None
} else {
Some(self.total / self.window.len() as f64)
}
}
pub fn reset(&mut self) {
self.window.clear();
self.total = 0.0;
self.last_detected = false;
}
pub fn window(&self) -> &VecDeque<f64> {
&self.window
}
}
#[derive(Debug, Clone)]
pub struct PageHinkley {
threshold: f64,
delta: f64,
alpha: f64,
detect_increase: bool,
mean_est: f64,
ph_stat: f64,
extreme_stat: f64,
n: u64,
last_detected: bool,
}
impl PageHinkley {
pub fn new(threshold: f64, delta: f64, alpha: f64, detect_increase: bool) -> Self {
Self {
threshold,
delta,
alpha: alpha.clamp(f64::EPSILON, 1.0),
detect_increase,
mean_est: 0.0,
ph_stat: 0.0,
extreme_stat: 0.0,
n: 0,
last_detected: false,
}
}
pub fn update(&mut self, x: f64) -> bool {
self.n += 1;
if self.n == 1 {
self.mean_est = x;
} else {
self.mean_est = (1.0 - self.alpha) * self.mean_est + self.alpha * x;
}
let dev = x - self.mean_est;
if self.detect_increase {
self.ph_stat += dev - self.delta;
if self.ph_stat < self.extreme_stat {
self.extreme_stat = self.ph_stat;
}
let ph_val = self.ph_stat - self.extreme_stat;
self.last_detected = ph_val > self.threshold;
} else {
self.ph_stat += -dev - self.delta;
if self.ph_stat < self.extreme_stat {
self.extreme_stat = self.ph_stat;
}
let ph_val = self.ph_stat - self.extreme_stat;
self.last_detected = ph_val > self.threshold;
}
self.last_detected
}
pub fn is_change_detected(&self) -> bool {
self.last_detected
}
pub fn statistic(&self) -> f64 {
self.ph_stat
}
pub fn mean_estimate(&self) -> f64 {
self.mean_est
}
pub fn reset(&mut self) {
self.mean_est = 0.0;
self.ph_stat = 0.0;
self.extreme_stat = 0.0;
self.n = 0;
self.last_detected = false;
}
}
#[derive(Debug, Clone)]
pub struct OnlineQuantile {
p: f64,
q: [f64; 5],
n: [i64; 5],
n_desired: [f64; 5],
dn: [f64; 5],
count: u64,
init_buffer: Vec<f64>,
}
impl OnlineQuantile {
pub fn new(p: f64) -> Result<Self> {
if p <= 0.0 || p >= 1.0 {
return Err(TimeSeriesError::InvalidParameter {
name: "p".to_string(),
message: "Quantile must be strictly between 0 and 1".to_string(),
});
}
let dn = [0.0, p / 2.0, p, (1.0 + p) / 2.0, 1.0];
Ok(Self {
p,
q: [0.0; 5],
n: [1, 2, 3, 4, 5],
n_desired: [1.0, 1.0 + 2.0 * p, 1.0 + 4.0 * p, 3.0 + 2.0 * p, 5.0],
dn,
count: 0,
init_buffer: Vec::with_capacity(5),
})
}
pub fn update(&mut self, x: f64) {
self.count += 1;
if self.count <= 5 {
self.init_buffer.push(x);
if self.count == 5 {
self.init_buffer.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
for i in 0..5 {
self.q[i] = self.init_buffer[i];
self.n[i] = (i + 1) as i64;
}
self.n_desired = [
1.0,
1.0 + 2.0 * self.p,
1.0 + 4.0 * self.p,
3.0 + 2.0 * self.p,
5.0,
];
}
return;
}
let k = if x < self.q[0] {
self.q[0] = x;
0i64
} else if x >= self.q[4] {
self.q[4] = x;
3
} else {
let mut ki = 3i64;
for i in 0..4 {
if x < self.q[i + 1] {
ki = i as i64;
break;
}
}
ki
};
for i in (k + 1) as usize..5 {
self.n[i] += 1;
}
for i in 0..5 {
self.n_desired[i] += self.dn[i];
}
for i in 1..4 {
let d = self.n_desired[i] - self.n[i] as f64;
if (d >= 1.0 && (self.n[i + 1] - self.n[i]) > 1)
|| (d <= -1.0 && (self.n[i - 1] - self.n[i]) < -1)
{
let sign = if d >= 0.0 { 1.0 } else { -1.0 };
let q_new = self.parabolic(i, sign);
if self.q[i - 1] < q_new && q_new < self.q[i + 1] {
self.q[i] = q_new;
} else {
self.q[i] = self.linear(i, sign);
}
self.n[i] += sign as i64;
}
}
}
fn parabolic(&self, i: usize, d: f64) -> f64 {
let qi = self.q[i];
let qi_1 = self.q[i - 1];
let qi1 = self.q[i + 1];
let ni = self.n[i] as f64;
let ni_1 = self.n[i - 1] as f64;
let ni1 = self.n[i + 1] as f64;
qi + d / (ni1 - ni_1)
* ((ni - ni_1 + d) * (qi1 - qi) / (ni1 - ni)
+ (ni1 - ni - d) * (qi - qi_1) / (ni - ni_1))
}
fn linear(&self, i: usize, d: f64) -> f64 {
let j = if d > 0.0 { i + 1 } else { i - 1 };
self.q[i] + d * (self.q[j] - self.q[i]) / (self.n[j] - self.n[i]) as f64
}
pub fn quantile(&self) -> f64 {
if self.count < 5 {
if self.init_buffer.is_empty() {
return 0.0;
}
let mut sorted = self.init_buffer.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((sorted.len() as f64 * self.p).ceil() as usize).saturating_sub(1);
sorted[idx.min(sorted.len() - 1)]
} else {
self.q[2]
}
}
pub fn count(&self) -> u64 {
self.count
}
pub fn p(&self) -> f64 {
self.p
}
pub fn markers(&self) -> [f64; 5] {
self.q
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_online_mean_welford() {
let data = [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
let mut tracker = OnlineMean::new();
for &x in &data {
tracker.update(x);
}
assert!((tracker.mean() - 5.0).abs() < 1e-10, "mean mismatch");
let sample_var = 32.0 / 7.0;
assert!(
(tracker.variance() - sample_var).abs() < 1e-9,
"variance mismatch: got {}",
tracker.variance()
);
assert_eq!(tracker.count(), 8);
}
#[test]
fn test_online_mean_merge() {
let mut a = OnlineMean::new();
let mut b = OnlineMean::new();
for &x in &[1.0f64, 2.0, 3.0] {
a.update(x);
}
for &x in &[4.0f64, 5.0, 6.0] {
b.update(x);
}
let merged = a.merge(&b);
assert!((merged.mean() - 3.5).abs() < 1e-10);
assert_eq!(merged.count(), 6);
}
#[test]
fn test_online_linear_regression_simple() {
let mut model = OnlineLinearRegression::new(2, 1.0, 1e-6).expect("failed to create model");
for i in 1..=50i64 {
let x = i as f64;
let y = 2.0 * x + 1.0;
model.update(&[x, 1.0], y).expect("unexpected None or Err");
}
let pred = model.predict(&[10.0, 1.0]).expect("failed to create pred");
assert!((pred - 21.0).abs() < 0.5, "prediction error too large: {}", pred);
}
#[test]
fn test_online_linear_regression_dim_check() {
let mut model = OnlineLinearRegression::new(3, 1.0, 1e-4).expect("failed to create model");
let err = model.update(&[1.0, 2.0], 5.0); assert!(err.is_err());
let err2 = model.predict(&[1.0]); assert!(err2.is_err());
}
#[test]
fn test_online_arima_basic() {
let mut model = OnlineARIMA::new(2, 1, 1, 0.99, 1e-4).expect("failed to create model");
for t in 0..60 {
model.update(t as f64).expect("unexpected None or Err");
}
assert!(model.is_ready());
let fc = model.forecast(5).expect("failed to create fc");
assert_eq!(fc.len(), 5);
for i in 1..fc.len() {
assert!(fc[i] >= fc[i - 1] - 2.0, "forecast should be roughly increasing");
}
}
#[test]
fn test_adwin_stable_data() {
let mut d = ADWINDetector::new(0.002);
let mut any_drift = false;
for _ in 0..200 {
if d.update(5.0) {
any_drift = true;
}
}
let _ = any_drift; }
#[test]
fn test_adwin_detects_drift() {
let mut d = ADWINDetector::new(0.01);
for _ in 0..100 {
d.update(0.0);
}
let mut detected = false;
for _ in 0..50 {
if d.update(100.0) {
detected = true;
break;
}
}
assert!(detected, "ADWIN should detect a large step change");
}
#[test]
fn test_page_hinkley_detects_increase() {
let mut ph = PageHinkley::new(50.0, 0.01, 0.005, true);
for _ in 0..100 {
ph.update(0.0);
}
let mut detected = false;
for _ in 0..80 {
if ph.update(5.0) {
detected = true;
break;
}
}
assert!(detected, "PH should detect mean increase");
}
#[test]
fn test_page_hinkley_no_false_alarm_stable() {
let mut ph = PageHinkley::new(200.0, 0.0, 0.001, true);
let mut alarm_count = 0u32;
for _ in 0..500 {
if ph.update(1.0) {
alarm_count += 1;
}
}
assert!(
alarm_count < 3,
"PH should not alarm on stable data (got {alarm_count})"
);
}
#[test]
fn test_online_quantile_median() {
let mut oq = OnlineQuantile::new(0.5).expect("failed to create oq");
for x in 1..=1000 {
oq.update(x as f64);
}
let est = oq.quantile();
assert!(
(est - 500.0).abs() < 20.0,
"median estimate {est} should be close to 500"
);
}
#[test]
fn test_online_quantile_percentile_95() {
let mut oq = OnlineQuantile::new(0.95).expect("failed to create oq");
for x in 0..=1000 {
oq.update(x as f64);
}
let est = oq.quantile();
assert!(
(est - 950.0).abs() < 30.0,
"95th percentile estimate {est} should be near 950"
);
}
#[test]
fn test_online_quantile_invalid_p() {
assert!(OnlineQuantile::new(0.0).is_err());
assert!(OnlineQuantile::new(1.0).is_err());
assert!(OnlineQuantile::new(-0.1).is_err());
}
#[test]
fn test_online_mean_single_value() {
let mut m = OnlineMean::new();
m.update(42.0);
assert_eq!(m.mean(), 42.0);
assert_eq!(m.variance(), 0.0);
assert_eq!(m.count(), 1);
}
}