ruvector_consciousness/
streaming.rs1use crate::traits::PhiEngine;
15use crate::types::{ComputeBudget, StreamingPhiResult, TransitionMatrix};
16
17
18pub struct StreamingPhiEstimator {
24 n: usize,
26 counts: Vec<f64>,
28 cached_tpm: Option<TransitionMatrix>,
30 forgetting_factor: f64,
32 min_observations: usize,
34 total_transitions: usize,
36 prev_state: Option<usize>,
38 ewma_alpha: f64,
40 phi_ewma: f64,
42 phi_m2: f64,
44 phi_mean: f64,
45 history: Vec<f64>,
47 history_idx: usize,
48 max_history: usize,
49 cusum_pos: f64,
51 cusum_neg: f64,
52 cusum_threshold: f64,
53}
54
55impl StreamingPhiEstimator {
56 pub fn new(n: usize) -> Self {
58 Self {
59 n,
60 counts: vec![0.0; n * n],
61 cached_tpm: None,
62 forgetting_factor: 0.99,
63 min_observations: n * 2,
64 total_transitions: 0,
65 prev_state: None,
66 ewma_alpha: 0.1,
67 phi_ewma: 0.0,
68 phi_m2: 0.0,
69 phi_mean: 0.0,
70 history: Vec::with_capacity(1000),
71 history_idx: 0,
72 max_history: 1000,
73 cusum_pos: 0.0,
74 cusum_neg: 0.0,
75 cusum_threshold: 3.0,
76 }
77 }
78
79 pub fn with_forgetting_factor(mut self, lambda: f64) -> Self {
81 assert!(lambda > 0.0 && lambda <= 1.0);
82 self.forgetting_factor = lambda;
83 self
84 }
85
86 pub fn with_ewma_alpha(mut self, alpha: f64) -> Self {
88 assert!(alpha > 0.0 && alpha <= 1.0);
89 self.ewma_alpha = alpha;
90 self
91 }
92
93 pub fn with_cusum_threshold(mut self, threshold: f64) -> Self {
95 self.cusum_threshold = threshold;
96 self
97 }
98
99 pub fn observe<E: PhiEngine>(
104 &mut self,
105 state: usize,
106 engine: &E,
107 budget: &ComputeBudget,
108 ) -> Option<StreamingPhiResult> {
109 assert!(state < self.n, "state {} out of range for n={}", state, self.n);
110
111 if let Some(prev) = self.prev_state {
113 if self.forgetting_factor < 1.0 {
115 for c in &mut self.counts {
116 *c *= self.forgetting_factor;
117 }
118 }
119 self.counts[prev * self.n + state] += 1.0;
121 self.total_transitions += 1;
122 self.cached_tpm = None;
123 }
124 self.prev_state = Some(state);
125
126 if self.total_transitions < self.min_observations {
128 return None;
129 }
130
131 if self.cached_tpm.is_none() {
133 self.cached_tpm = Some(self.build_tpm_inner());
134 }
135 let tpm = self.cached_tpm.as_ref().unwrap().clone();
136
137 let phi_result = engine.compute_phi(&tpm, Some(state), budget).ok()?;
139 let phi = phi_result.phi;
140
141 if self.history.is_empty() {
143 self.phi_ewma = phi;
144 self.phi_mean = phi;
145 } else {
146 self.phi_ewma = self.ewma_alpha * phi + (1.0 - self.ewma_alpha) * self.phi_ewma;
147 }
148
149 let count = self.history.len() as f64 + 1.0;
151 let delta = phi - self.phi_mean;
152 self.phi_mean += delta / count;
153 let delta2 = phi - self.phi_mean;
154 self.phi_m2 += delta * delta2;
155
156 let variance = if count > 1.0 {
157 self.phi_m2 / (count - 1.0)
158 } else {
159 0.0
160 };
161
162 let change_detected = self.update_cusum(phi);
164
165 if self.history.len() < self.max_history {
167 self.history.push(phi);
168 self.history_idx = self.history.len();
169 } else {
170 self.history[self.history_idx % self.max_history] = phi;
171 self.history_idx += 1;
172 }
173
174 Some(StreamingPhiResult {
175 phi,
176 time_steps: self.total_transitions,
177 phi_ewma: self.phi_ewma,
178 phi_variance: variance,
179 change_detected,
180 history: self.history.clone(),
181 })
182 }
183
184 fn build_tpm_inner(&self) -> TransitionMatrix {
186 let n = self.n;
187 let mut data = vec![0.0f64; n * n];
188
189 for i in 0..n {
190 let mut row_sum = 0.0;
191 for j in 0..n {
192 row_sum += self.counts[i * n + j];
193 }
194 if row_sum > 0.0 {
195 let inv = 1.0 / row_sum;
196 for j in 0..n {
197 data[i * n + j] = self.counts[i * n + j] * inv;
198 }
199 } else {
200 let uniform = 1.0 / n as f64;
202 for j in 0..n {
203 data[i * n + j] = uniform;
204 }
205 }
206 }
207
208 TransitionMatrix::new(n, data)
209 }
210
211 fn update_cusum(&mut self, phi: f64) -> bool {
214 let deviation = phi - self.phi_mean;
215 self.cusum_pos = (self.cusum_pos + deviation).max(0.0);
216 self.cusum_neg = (self.cusum_neg - deviation).max(0.0);
217
218 let detected = self.cusum_pos > self.cusum_threshold
219 || self.cusum_neg > self.cusum_threshold;
220
221 if detected {
222 self.cusum_pos = 0.0;
224 self.cusum_neg = 0.0;
225 }
226
227 detected
228 }
229
230 pub fn num_transitions(&self) -> usize {
232 self.total_transitions
233 }
234
235 pub fn reset(&mut self) {
237 self.counts.fill(0.0);
238 self.cached_tpm = None;
239 self.total_transitions = 0;
240 self.prev_state = None;
241 self.phi_ewma = 0.0;
242 self.phi_m2 = 0.0;
243 self.phi_mean = 0.0;
244 self.history.clear();
245 self.history_idx = 0;
246 self.cusum_pos = 0.0;
247 self.cusum_neg = 0.0;
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use crate::phi::SpectralPhiEngine;
255
256 #[test]
257 fn streaming_accumulates_data() {
258 let mut estimator = StreamingPhiEstimator::new(4);
259 let engine = SpectralPhiEngine::default();
260 let budget = ComputeBudget::fast();
261
262 let states = [0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3];
264 let mut got_result = false;
265 for &s in &states {
266 if let Some(result) = estimator.observe(s, &engine, &budget) {
267 assert!(result.phi >= 0.0);
268 assert!(result.time_steps > 0);
269 got_result = true;
270 }
271 }
272 assert!(got_result, "should produce result after enough observations");
273 }
274
275 #[test]
276 fn streaming_ewma_smooths() {
277 let mut estimator = StreamingPhiEstimator::new(4)
278 .with_ewma_alpha(0.5)
279 .with_forgetting_factor(1.0);
280 let engine = SpectralPhiEngine::default();
281 let budget = ComputeBudget::fast();
282
283 for _ in 0..50 {
285 for s in 0..4 {
286 estimator.observe(s, &engine, &budget);
287 }
288 }
289
290 assert!(estimator.num_transitions() > 0);
291 }
292
293 #[test]
294 fn streaming_reset_clears() {
295 let mut estimator = StreamingPhiEstimator::new(4);
296 let engine = SpectralPhiEngine::default();
297 let budget = ComputeBudget::fast();
298
299 for s in 0..4 {
300 estimator.observe(s, &engine, &budget);
301 }
302 assert!(estimator.num_transitions() > 0);
303
304 estimator.reset();
305 assert_eq!(estimator.num_transitions(), 0);
306 }
307}