1use crate::budget::PlanBudget;
31use crate::complexity::{Complexity, ComplexityClass};
32use crate::contrastive::{contrastive_solve_on_change_sublinear_auto, AnomalyRow};
33use crate::error::Result;
34use crate::matrix::Matrix;
35use crate::types::Precision;
36use crate::SparseDelta;
37use alloc::vec::Vec;
38use core::time::Duration;
39
40#[derive(Debug, Clone)]
44pub struct ProcessedEvent {
45 pub event_idx: usize,
47 pub anomalies: Vec<AnomalyRow>,
49 pub latency: Duration,
51 pub status: EventStatus,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum EventStatus {
58 Solved,
60 Skipped,
63 BudgetRefused,
66 Errored,
68}
69
70#[derive(Debug, Clone, Copy, Default)]
73pub struct EventStreamOp;
74
75impl Complexity for EventStreamOp {
76 const CLASS: ComplexityClass = ComplexityClass::SubLinear;
77 const DETAIL: &'static str =
78 "Per-event class is the auto-tuned SubLinear orchestrator's. The iterator wrapper \
79 adds O(1) gate probe + optional O(1) budget consume per event; aggregate cost is \
80 O(events · per_event_class).";
81}
82
83#[derive(Debug, Clone)]
85pub struct EventStreamConfig {
86 pub tolerance: Precision,
88 pub k: usize,
90 pub skip_threshold: Option<Precision>,
94 pub cached_coherence: Option<Precision>,
97 pub cached_min_diag: Option<Precision>,
99}
100
101impl Default for EventStreamConfig {
102 fn default() -> Self {
103 Self {
104 tolerance: 1e-8,
105 k: 3,
106 skip_threshold: None,
107 cached_coherence: None,
108 cached_min_diag: None,
109 }
110 }
111}
112
113pub fn event_stream_iter<'a, I>(
141 matrix: &'a dyn Matrix,
142 prev_solution: &'a [Precision],
143 events: I,
144 config: &'a EventStreamConfig,
145 budget: Option<&'a mut PlanBudget>,
146) -> EventStreamIter<'a, I>
147where
148 I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
149{
150 EventStreamIter {
151 matrix,
152 prev_solution,
153 events,
154 config,
155 budget,
156 idx: 0,
157 ended: false,
158 }
159}
160
161pub struct EventStreamIter<'a, I>
163where
164 I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
165{
166 matrix: &'a dyn Matrix,
167 prev_solution: &'a [Precision],
168 events: I,
169 config: &'a EventStreamConfig,
170 budget: Option<&'a mut PlanBudget>,
171 idx: usize,
172 ended: bool,
173}
174
175impl<'a, I> Iterator for EventStreamIter<'a, I>
176where
177 I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
178{
179 type Item = ProcessedEvent;
180
181 fn next(&mut self) -> Option<Self::Item> {
182 if self.ended {
183 return None;
184 }
185 let (delta, b_new) = self.events.next()?;
186 let idx = self.idx;
187 self.idx += 1;
188
189 let start = std_time_instant_now();
190
191 if let (Some(skip), Some(coh), Some(md)) = (
193 self.config.skip_threshold,
194 self.config.cached_coherence,
195 self.config.cached_min_diag,
196 ) {
197 if crate::coherence::delta_below_solve_threshold(coh, md, &delta.values, skip) {
198 let latency = std_time_instant_elapsed(start);
199 return Some(ProcessedEvent {
200 event_idx: idx,
201 anomalies: Vec::new(),
202 latency,
203 status: EventStatus::Skipped,
204 });
205 }
206 }
207
208 if let Some(budget) = self.budget.as_deref_mut() {
210 if budget.try_consume(ComplexityClass::SubLinear).is_err() {
211 self.ended = true;
212 let latency = std_time_instant_elapsed(start);
213 return Some(ProcessedEvent {
214 event_idx: idx,
215 anomalies: Vec::new(),
216 latency,
217 status: EventStatus::BudgetRefused,
218 });
219 }
220 }
221
222 let result: Result<Vec<AnomalyRow>> = contrastive_solve_on_change_sublinear_auto(
224 self.matrix,
225 self.prev_solution,
226 &b_new,
227 &delta,
228 self.config.tolerance,
229 self.config.k,
230 );
231 let latency = std_time_instant_elapsed(start);
232 match result {
233 Ok(anomalies) => Some(ProcessedEvent {
234 event_idx: idx,
235 anomalies,
236 latency,
237 status: EventStatus::Solved,
238 }),
239 Err(_e) => Some(ProcessedEvent {
240 event_idx: idx,
241 anomalies: Vec::new(),
242 latency,
243 status: EventStatus::Errored,
244 }),
245 }
246 }
247}
248
249#[cfg(feature = "std")]
253fn std_time_instant_now() -> std::time::Instant {
254 std::time::Instant::now()
255}
256
257#[cfg(feature = "std")]
258fn std_time_instant_elapsed(start: std::time::Instant) -> Duration {
259 start.elapsed()
260}
261
262#[cfg(not(feature = "std"))]
263fn std_time_instant_now() {
264 }
266
267#[cfg(not(feature = "std"))]
268fn std_time_instant_elapsed(_start: ()) -> Duration {
269 Duration::ZERO
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::matrix::SparseMatrix;
276 use crate::solver::{neumann::NeumannSolver, SolverAlgorithm, SolverOptions};
277
278 fn build_strong_ring(n: usize) -> SparseMatrix {
279 let mut t = Vec::new();
280 for i in 0..n {
281 t.push((i, i, 10.0_f64));
282 t.push((i, (i + 1) % n, 0.5));
283 t.push((i, (i + n - 1) % n, -0.5));
284 }
285 SparseMatrix::from_triplets(t, n, n).unwrap()
286 }
287
288 fn warmup_solve(m: &SparseMatrix, b: &[f64]) -> Vec<f64> {
289 let solver = NeumannSolver::new(64, 1e-12);
290 let opts = SolverOptions::default();
291 solver.solve(m, b, &opts).unwrap().solution
292 }
293
294 fn build_event(n: usize, idx: usize, dv: f64) -> (SparseDelta, Vec<f64>) {
295 let delta = SparseDelta::new(vec![idx], vec![dv]).unwrap();
296 let mut b = (0..n).map(|i| i as f64 + 1.0).collect::<Vec<_>>();
297 delta.apply_to(&mut b).unwrap();
298 (delta, b)
299 }
300
301 #[test]
302 fn op_class_is_sublinear() {
303 const _: () = assert!(matches!(
304 <EventStreamOp as Complexity>::CLASS,
305 ComplexityClass::SubLinear
306 ));
307 }
308
309 #[test]
310 fn empty_input_yields_empty_output() {
311 let n = 16;
312 let m = build_strong_ring(n);
313 let prev = warmup_solve(&m, &(0..n).map(|i| i as f64 + 1.0).collect::<Vec<_>>());
314 let cfg = EventStreamConfig::default();
315 let events: Vec<(SparseDelta, Vec<f64>)> = Vec::new();
316 let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
317 assert!(out.is_empty());
318 }
319
320 #[test]
321 fn yields_one_processed_event_per_input() {
322 let n = 16;
323 let m = build_strong_ring(n);
324 let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
325 let prev = warmup_solve(&m, &b_prev);
326 let cfg = EventStreamConfig {
327 tolerance: 1e-8,
328 k: 2,
329 ..Default::default()
330 };
331 let events = vec![
332 build_event(n, 3, 0.5),
333 build_event(n, 7, -0.3),
334 build_event(n, 11, 1.0),
335 ];
336 let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
337 assert_eq!(out.len(), 3);
338 for (i, e) in out.iter().enumerate() {
339 assert_eq!(e.event_idx, i);
340 assert_eq!(e.status, EventStatus::Solved);
341 assert!(!e.anomalies.is_empty());
342 }
343 }
344
345 #[test]
346 fn skip_gate_short_circuits_tiny_delta() {
347 let n = 16;
349 let m = build_strong_ring(n);
350 let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
351 let prev = warmup_solve(&m, &b_prev);
352 let coh = crate::coherence::coherence_score(&m);
353 let min_diag = (0..n)
354 .map(|i| Matrix::get(&m, i, i).unwrap_or(0.0).abs())
355 .filter(|x| *x > 0.0)
356 .fold(f64::INFINITY, |a, b| if a < b { a } else { b });
357 let cfg = EventStreamConfig {
358 tolerance: 1e-8,
359 k: 2,
360 skip_threshold: Some(1e-6),
361 cached_coherence: Some(coh),
362 cached_min_diag: Some(min_diag),
363 };
364 let events = vec![build_event(n, 3, 1e-12)];
366 let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
367 assert_eq!(out.len(), 1);
368 assert_eq!(out[0].status, EventStatus::Skipped);
369 assert!(out[0].anomalies.is_empty());
370 }
371
372 #[test]
373 fn budget_terminates_stream_when_exhausted() {
374 let n = 16;
375 let m = build_strong_ring(n);
376 let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
377 let prev = warmup_solve(&m, &b_prev);
378 let cfg = EventStreamConfig::default();
379 let events = vec![
380 build_event(n, 3, 0.5),
381 build_event(n, 7, -0.3),
382 build_event(n, 11, 1.0),
383 ];
384 let mut budget = PlanBudget::new(ComplexityClass::SubLinear, 2);
385 let out: Vec<_> =
386 event_stream_iter(&m, &prev, events.into_iter(), &cfg, Some(&mut budget)).collect();
387 assert_eq!(out.len(), 3);
389 assert_eq!(out[0].status, EventStatus::Solved);
390 assert_eq!(out[1].status, EventStatus::Solved);
391 assert_eq!(out[2].status, EventStatus::BudgetRefused);
392 assert_eq!(budget.remaining_ops(), 0);
393 }
394}