Skip to main content

ruvector_consciousness/
streaming.rs

1//! Online/streaming Φ estimation for time-series data.
2//!
3//! Computes Φ over a sliding window of observations, maintaining
4//! an empirical TPM that is updated incrementally. Designed for:
5//! - Neural data (EEG/fMRI time series)
6//! - Real-time BCI applications
7//! - Long-running consciousness monitoring
8//!
9//! Key features:
10//! - Exponential forgetting factor for non-stationarity
11//! - Change-point detection in Φ trajectory
12//! - EWMA smoothing for noise reduction
13
14use crate::traits::PhiEngine;
15use crate::types::{ComputeBudget, StreamingPhiResult, TransitionMatrix};
16
17
18// ---------------------------------------------------------------------------
19// Streaming Φ estimator
20// ---------------------------------------------------------------------------
21
22/// Online Φ estimator with empirical TPM and sliding window.
23pub struct StreamingPhiEstimator {
24    /// Number of states in the system.
25    n: usize,
26    /// Transition count matrix (row i, col j = count of i→j transitions).
27    counts: Vec<f64>,
28    /// Cached normalized TPM (invalidated on each observe).
29    cached_tpm: Option<TransitionMatrix>,
30    /// Exponential forgetting factor (0 < λ ≤ 1). 1.0 = no forgetting.
31    forgetting_factor: f64,
32    /// Minimum observations before computing Φ.
33    min_observations: usize,
34    /// Total transitions observed.
35    total_transitions: usize,
36    /// Previous state (for tracking transitions).
37    prev_state: Option<usize>,
38    /// EWMA smoothing factor for Φ (0 < α ≤ 1).
39    ewma_alpha: f64,
40    /// Running EWMA of Φ.
41    phi_ewma: f64,
42    /// Running variance (Welford's online algorithm).
43    phi_m2: f64,
44    phi_mean: f64,
45    /// History of recent Φ values (ring buffer).
46    history: Vec<f64>,
47    history_idx: usize,
48    max_history: usize,
49    /// Change-point detection: CUSUM parameters.
50    cusum_pos: f64,
51    cusum_neg: f64,
52    cusum_threshold: f64,
53}
54
55impl StreamingPhiEstimator {
56    /// Create a new streaming estimator for a system with `n` states.
57    pub fn new(n: usize) -> Self {
58        Self {
59            n,
60            counts: vec![0.0; n * n],
61            cached_tpm: None,
62            forgetting_factor: 0.99,
63            min_observations: n * 2,
64            total_transitions: 0,
65            prev_state: None,
66            ewma_alpha: 0.1,
67            phi_ewma: 0.0,
68            phi_m2: 0.0,
69            phi_mean: 0.0,
70            history: Vec::with_capacity(1000),
71            history_idx: 0,
72            max_history: 1000,
73            cusum_pos: 0.0,
74            cusum_neg: 0.0,
75            cusum_threshold: 3.0,
76        }
77    }
78
79    /// Configure forgetting factor (0 < λ ≤ 1). Lower = faster forgetting.
80    pub fn with_forgetting_factor(mut self, lambda: f64) -> Self {
81        assert!(lambda > 0.0 && lambda <= 1.0);
82        self.forgetting_factor = lambda;
83        self
84    }
85
86    /// Configure EWMA smoothing factor (0 < α ≤ 1). Higher = more responsive.
87    pub fn with_ewma_alpha(mut self, alpha: f64) -> Self {
88        assert!(alpha > 0.0 && alpha <= 1.0);
89        self.ewma_alpha = alpha;
90        self
91    }
92
93    /// Configure change-point detection threshold.
94    pub fn with_cusum_threshold(mut self, threshold: f64) -> Self {
95        self.cusum_threshold = threshold;
96        self
97    }
98
99    /// Observe a new state in the time series.
100    ///
101    /// Updates the empirical TPM and returns updated Φ estimate
102    /// if enough data has been accumulated.
103    pub fn observe<E: PhiEngine>(
104        &mut self,
105        state: usize,
106        engine: &E,
107        budget: &ComputeBudget,
108    ) -> Option<StreamingPhiResult> {
109        assert!(state < self.n, "state {} out of range for n={}", state, self.n);
110
111        // Record transition.
112        if let Some(prev) = self.prev_state {
113            // Apply forgetting factor to all counts.
114            if self.forgetting_factor < 1.0 {
115                for c in &mut self.counts {
116                    *c *= self.forgetting_factor;
117                }
118            }
119            // Increment transition count and invalidate cached TPM.
120            self.counts[prev * self.n + state] += 1.0;
121            self.total_transitions += 1;
122            self.cached_tpm = None;
123        }
124        self.prev_state = Some(state);
125
126        // Don't compute until we have enough data.
127        if self.total_transitions < self.min_observations {
128            return None;
129        }
130
131        // Build empirical TPM from counts (lazy: only when cache invalidated).
132        if self.cached_tpm.is_none() {
133            self.cached_tpm = Some(self.build_tpm_inner());
134        }
135        let tpm = self.cached_tpm.as_ref().unwrap().clone();
136
137        // Compute Φ.
138        let phi_result = engine.compute_phi(&tpm, Some(state), budget).ok()?;
139        let phi = phi_result.phi;
140
141        // Update EWMA.
142        if self.history.is_empty() {
143            self.phi_ewma = phi;
144            self.phi_mean = phi;
145        } else {
146            self.phi_ewma = self.ewma_alpha * phi + (1.0 - self.ewma_alpha) * self.phi_ewma;
147        }
148
149        // Update variance (Welford's).
150        let count = self.history.len() as f64 + 1.0;
151        let delta = phi - self.phi_mean;
152        self.phi_mean += delta / count;
153        let delta2 = phi - self.phi_mean;
154        self.phi_m2 += delta * delta2;
155
156        let variance = if count > 1.0 {
157            self.phi_m2 / (count - 1.0)
158        } else {
159            0.0
160        };
161
162        // Change-point detection (CUSUM).
163        let change_detected = self.update_cusum(phi);
164
165        // Update history (ring buffer).
166        if self.history.len() < self.max_history {
167            self.history.push(phi);
168            self.history_idx = self.history.len();
169        } else {
170            self.history[self.history_idx % self.max_history] = phi;
171            self.history_idx += 1;
172        }
173
174        Some(StreamingPhiResult {
175            phi,
176            time_steps: self.total_transitions,
177            phi_ewma: self.phi_ewma,
178            phi_variance: variance,
179            change_detected,
180            history: self.history.clone(),
181        })
182    }
183
184    /// Build a normalized TPM from transition counts.
185    fn build_tpm_inner(&self) -> TransitionMatrix {
186        let n = self.n;
187        let mut data = vec![0.0f64; n * n];
188
189        for i in 0..n {
190            let mut row_sum = 0.0;
191            for j in 0..n {
192                row_sum += self.counts[i * n + j];
193            }
194            if row_sum > 0.0 {
195                let inv = 1.0 / row_sum;
196                for j in 0..n {
197                    data[i * n + j] = self.counts[i * n + j] * inv;
198                }
199            } else {
200                // No transitions from state i: use uniform.
201                let uniform = 1.0 / n as f64;
202                for j in 0..n {
203                    data[i * n + j] = uniform;
204                }
205            }
206        }
207
208        TransitionMatrix::new(n, data)
209    }
210
211    /// CUSUM change-point detection.
212    /// Returns true if a change point is detected.
213    fn update_cusum(&mut self, phi: f64) -> bool {
214        let deviation = phi - self.phi_mean;
215        self.cusum_pos = (self.cusum_pos + deviation).max(0.0);
216        self.cusum_neg = (self.cusum_neg - deviation).max(0.0);
217
218        let detected = self.cusum_pos > self.cusum_threshold
219            || self.cusum_neg > self.cusum_threshold;
220
221        if detected {
222            // Reset after detection.
223            self.cusum_pos = 0.0;
224            self.cusum_neg = 0.0;
225        }
226
227        detected
228    }
229
230    /// Current number of observed transitions.
231    pub fn num_transitions(&self) -> usize {
232        self.total_transitions
233    }
234
235    /// Reset all state.
236    pub fn reset(&mut self) {
237        self.counts.fill(0.0);
238        self.cached_tpm = None;
239        self.total_transitions = 0;
240        self.prev_state = None;
241        self.phi_ewma = 0.0;
242        self.phi_m2 = 0.0;
243        self.phi_mean = 0.0;
244        self.history.clear();
245        self.history_idx = 0;
246        self.cusum_pos = 0.0;
247        self.cusum_neg = 0.0;
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use crate::phi::SpectralPhiEngine;
255
256    #[test]
257    fn streaming_accumulates_data() {
258        let mut estimator = StreamingPhiEstimator::new(4);
259        let engine = SpectralPhiEngine::default();
260        let budget = ComputeBudget::fast();
261
262        // Feed a sequence of states.
263        let states = [0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3];
264        let mut got_result = false;
265        for &s in &states {
266            if let Some(result) = estimator.observe(s, &engine, &budget) {
267                assert!(result.phi >= 0.0);
268                assert!(result.time_steps > 0);
269                got_result = true;
270            }
271        }
272        assert!(got_result, "should produce result after enough observations");
273    }
274
275    #[test]
276    fn streaming_ewma_smooths() {
277        let mut estimator = StreamingPhiEstimator::new(4)
278            .with_ewma_alpha(0.5)
279            .with_forgetting_factor(1.0);
280        let engine = SpectralPhiEngine::default();
281        let budget = ComputeBudget::fast();
282
283        // Feed many transitions.
284        for _ in 0..50 {
285            for s in 0..4 {
286                estimator.observe(s, &engine, &budget);
287            }
288        }
289
290        assert!(estimator.num_transitions() > 0);
291    }
292
293    #[test]
294    fn streaming_reset_clears() {
295        let mut estimator = StreamingPhiEstimator::new(4);
296        let engine = SpectralPhiEngine::default();
297        let budget = ComputeBudget::fast();
298
299        for s in 0..4 {
300            estimator.observe(s, &engine, &budget);
301        }
302        assert!(estimator.num_transitions() > 0);
303
304        estimator.reset();
305        assert_eq!(estimator.num_transitions(), 0);
306    }
307}