use crate::traits::PhiEngine;
use crate::types::{ComputeBudget, StreamingPhiResult, TransitionMatrix};
pub struct StreamingPhiEstimator {
n: usize,
counts: Vec<f64>,
cached_tpm: Option<TransitionMatrix>,
forgetting_factor: f64,
min_observations: usize,
total_transitions: usize,
prev_state: Option<usize>,
ewma_alpha: f64,
phi_ewma: f64,
phi_m2: f64,
phi_mean: f64,
history: Vec<f64>,
history_idx: usize,
max_history: usize,
cusum_pos: f64,
cusum_neg: f64,
cusum_threshold: f64,
}
impl StreamingPhiEstimator {
pub fn new(n: usize) -> Self {
Self {
n,
counts: vec![0.0; n * n],
cached_tpm: None,
forgetting_factor: 0.99,
min_observations: n * 2,
total_transitions: 0,
prev_state: None,
ewma_alpha: 0.1,
phi_ewma: 0.0,
phi_m2: 0.0,
phi_mean: 0.0,
history: Vec::with_capacity(1000),
history_idx: 0,
max_history: 1000,
cusum_pos: 0.0,
cusum_neg: 0.0,
cusum_threshold: 3.0,
}
}
pub fn with_forgetting_factor(mut self, lambda: f64) -> Self {
assert!(lambda > 0.0 && lambda <= 1.0);
self.forgetting_factor = lambda;
self
}
pub fn with_ewma_alpha(mut self, alpha: f64) -> Self {
assert!(alpha > 0.0 && alpha <= 1.0);
self.ewma_alpha = alpha;
self
}
pub fn with_cusum_threshold(mut self, threshold: f64) -> Self {
self.cusum_threshold = threshold;
self
}
pub fn observe<E: PhiEngine>(
&mut self,
state: usize,
engine: &E,
budget: &ComputeBudget,
) -> Option<StreamingPhiResult> {
assert!(state < self.n, "state {} out of range for n={}", state, self.n);
if let Some(prev) = self.prev_state {
if self.forgetting_factor < 1.0 {
for c in &mut self.counts {
*c *= self.forgetting_factor;
}
}
self.counts[prev * self.n + state] += 1.0;
self.total_transitions += 1;
self.cached_tpm = None;
}
self.prev_state = Some(state);
if self.total_transitions < self.min_observations {
return None;
}
if self.cached_tpm.is_none() {
self.cached_tpm = Some(self.build_tpm_inner());
}
let tpm = self.cached_tpm.as_ref().unwrap().clone();
let phi_result = engine.compute_phi(&tpm, Some(state), budget).ok()?;
let phi = phi_result.phi;
if self.history.is_empty() {
self.phi_ewma = phi;
self.phi_mean = phi;
} else {
self.phi_ewma = self.ewma_alpha * phi + (1.0 - self.ewma_alpha) * self.phi_ewma;
}
let count = self.history.len() as f64 + 1.0;
let delta = phi - self.phi_mean;
self.phi_mean += delta / count;
let delta2 = phi - self.phi_mean;
self.phi_m2 += delta * delta2;
let variance = if count > 1.0 {
self.phi_m2 / (count - 1.0)
} else {
0.0
};
let change_detected = self.update_cusum(phi);
if self.history.len() < self.max_history {
self.history.push(phi);
self.history_idx = self.history.len();
} else {
self.history[self.history_idx % self.max_history] = phi;
self.history_idx += 1;
}
Some(StreamingPhiResult {
phi,
time_steps: self.total_transitions,
phi_ewma: self.phi_ewma,
phi_variance: variance,
change_detected,
history: self.history.clone(),
})
}
fn build_tpm_inner(&self) -> TransitionMatrix {
let n = self.n;
let mut data = vec![0.0f64; n * n];
for i in 0..n {
let mut row_sum = 0.0;
for j in 0..n {
row_sum += self.counts[i * n + j];
}
if row_sum > 0.0 {
let inv = 1.0 / row_sum;
for j in 0..n {
data[i * n + j] = self.counts[i * n + j] * inv;
}
} else {
let uniform = 1.0 / n as f64;
for j in 0..n {
data[i * n + j] = uniform;
}
}
}
TransitionMatrix::new(n, data)
}
fn update_cusum(&mut self, phi: f64) -> bool {
let deviation = phi - self.phi_mean;
self.cusum_pos = (self.cusum_pos + deviation).max(0.0);
self.cusum_neg = (self.cusum_neg - deviation).max(0.0);
let detected = self.cusum_pos > self.cusum_threshold
|| self.cusum_neg > self.cusum_threshold;
if detected {
self.cusum_pos = 0.0;
self.cusum_neg = 0.0;
}
detected
}
pub fn num_transitions(&self) -> usize {
self.total_transitions
}
pub fn reset(&mut self) {
self.counts.fill(0.0);
self.cached_tpm = None;
self.total_transitions = 0;
self.prev_state = None;
self.phi_ewma = 0.0;
self.phi_m2 = 0.0;
self.phi_mean = 0.0;
self.history.clear();
self.history_idx = 0;
self.cusum_pos = 0.0;
self.cusum_neg = 0.0;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::phi::SpectralPhiEngine;
#[test]
fn streaming_accumulates_data() {
let mut estimator = StreamingPhiEstimator::new(4);
let engine = SpectralPhiEngine::default();
let budget = ComputeBudget::fast();
let states = [0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3];
let mut got_result = false;
for &s in &states {
if let Some(result) = estimator.observe(s, &engine, &budget) {
assert!(result.phi >= 0.0);
assert!(result.time_steps > 0);
got_result = true;
}
}
assert!(got_result, "should produce result after enough observations");
}
#[test]
fn streaming_ewma_smooths() {
let mut estimator = StreamingPhiEstimator::new(4)
.with_ewma_alpha(0.5)
.with_forgetting_factor(1.0);
let engine = SpectralPhiEngine::default();
let budget = ComputeBudget::fast();
for _ in 0..50 {
for s in 0..4 {
estimator.observe(s, &engine, &budget);
}
}
assert!(estimator.num_transitions() > 0);
}
#[test]
fn streaming_reset_clears() {
let mut estimator = StreamingPhiEstimator::new(4);
let engine = SpectralPhiEngine::default();
let budget = ComputeBudget::fast();
for s in 0..4 {
estimator.observe(s, &engine, &budget);
}
assert!(estimator.num_transitions() > 0);
estimator.reset();
assert_eq!(estimator.num_transitions(), 0);
}
}