Skip to main content

sublinear_solver/
stream.rs

1//! Streaming event iterator over the SubLinear orchestrator.
2//!
3//! ADR-001's primitives (closure, single-entry Neumann, contrastive
4//! orchestrator, coherence gate, witness, budget) compose into a
5//! per-event call pattern. Real callers — RuView agents watching
6//! sensor / log / metric streams, Cognitum reflex loops over a
7//! learning system, Ruflo inner-loop planners — process an *iterator*
8//! of events, not a single one.
9//!
10//! This module lifts the single-event call into a stdlib `Iterator`
11//! adapter: feed it an event stream, get back an iterator of
12//! `(event_idx, Vec<AnomalyRow>, latency_us)` tuples. Native
13//! composition with `.filter()`, `.take()`, `.collect()`, etc.
14//!
15//! Each yielded event:
16//!
17//!   1. Probes the [`crate::coherence::delta_below_solve_threshold`]
18//!      skip gate first. If the gate trips, yields an empty `Vec`
19//!      and a near-zero latency — the "no event, no work" path.
20//!   2. Otherwise calls
21//!      [`crate::contrastive_solve_on_change_sublinear_auto`] to
22//!      compute the SubLinear top-k anomalies for the event.
23//!   3. Optionally consumes a slot from a caller-supplied
24//!      [`crate::PlanBudget`]; if the budget refuses, the iterator
25//!      ends.
26//!
27//! The whole pipeline stays SubLinear per event, with the gate +
28//! budget short-circuits letting callers cap end-to-end cost.
29
30use crate::budget::PlanBudget;
31use crate::complexity::{Complexity, ComplexityClass};
32use crate::contrastive::{contrastive_solve_on_change_sublinear_auto, AnomalyRow};
33use crate::error::Result;
34use crate::matrix::Matrix;
35use crate::types::Precision;
36use crate::SparseDelta;
37use alloc::vec::Vec;
38use core::time::Duration;
39
40/// One processed event: the original index in the input stream,
41/// the top-k anomalies (empty if the gate skipped or budget refused),
42/// the wall-time spent on this event, and a status flag.
43#[derive(Debug, Clone)]
44pub struct ProcessedEvent {
45    /// Position of this event in the input iterator.
46    pub event_idx: usize,
47    /// Top-k anomalies. Empty if `status != Solved`.
48    pub anomalies: Vec<AnomalyRow>,
49    /// Wall-time spent on this event.
50    pub latency: Duration,
51    /// What happened.
52    pub status: EventStatus,
53}
54
55/// What the orchestrator did with a single event.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum EventStatus {
58    /// The SubLinear orchestrator ran and returned a top-k.
59    Solved,
60    /// The coherence gate decided the delta was too small to warrant
61    /// a solve. `latency` is the gate-probe cost only.
62    Skipped,
63    /// The caller-supplied [`PlanBudget`] refused this event's
64    /// op-class. Iterator terminates after yielding this status.
65    BudgetRefused,
66    /// The orchestrator returned an error. `latency` is partial.
67    Errored,
68}
69
70/// Op marker for [`event_stream_iter`]. Per-event class is the
71/// orchestrator's — SubLinear on a strict-DD matrix.
72#[derive(Debug, Clone, Copy, Default)]
73pub struct EventStreamOp;
74
75impl Complexity for EventStreamOp {
76    const CLASS: ComplexityClass = ComplexityClass::SubLinear;
77    const DETAIL: &'static str =
78        "Per-event class is the auto-tuned SubLinear orchestrator's. The iterator wrapper \
79         adds O(1) gate probe + optional O(1) budget consume per event; aggregate cost is \
80         O(events · per_event_class).";
81}
82
83/// Configuration for the event-stream iterator.
84#[derive(Debug, Clone)]
85pub struct EventStreamConfig {
86    /// Audit tolerance passed to the auto-tuned orchestrator.
87    pub tolerance: Precision,
88    /// Top-k to extract per event.
89    pub k: usize,
90    /// Optional skip-gate tolerance. If `Some`, the coherence-gated
91    /// `delta_below_solve_threshold` check runs first and the
92    /// orchestrator is skipped when the delta is below threshold.
93    pub skip_threshold: Option<Precision>,
94    /// Cached coherence + min-diag for the skip gate. Required iff
95    /// `skip_threshold.is_some()`.
96    pub cached_coherence: Option<Precision>,
97    /// Cached min |diag(A)|. Required iff `skip_threshold.is_some()`.
98    pub cached_min_diag: Option<Precision>,
99}
100
101impl Default for EventStreamConfig {
102    fn default() -> Self {
103        Self {
104            tolerance: 1e-8,
105            k: 3,
106            skip_threshold: None,
107            cached_coherence: None,
108            cached_min_diag: None,
109        }
110    }
111}
112
113/// Process an iterator of `(SparseDelta, b_new)` events through the
114/// SubLinear orchestrator, yielding [`ProcessedEvent`]s. The optional
115/// `budget` argument tracks cumulative consumption across the stream;
116/// when it refuses, the iterator yields one final `BudgetRefused`
117/// event and ends.
118///
119/// `matrix` and `prev_solution` are borrowed for the lifetime of the
120/// iterator — the same baseline state applies to every event in the
121/// stream (matching the "stable baseline + sparse events" idiom RuView
122/// agents use).
123///
124/// # Examples
125///
126/// ```rust,no_run
127/// # use sublinear_solver::{Matrix, SparseDelta, PlanBudget};
128/// # use sublinear_solver::complexity::ComplexityClass;
129/// # use sublinear_solver::stream::{event_stream_iter, EventStreamConfig};
130/// # fn demo<M: Matrix>(matrix: &M, prev: &[f64],
131/// #                  events: Vec<(SparseDelta, Vec<f64>)>) {
132/// let mut budget = PlanBudget::new(ComplexityClass::SubLinear, 100);
133/// let cfg = EventStreamConfig { tolerance: 1e-8, k: 3, ..Default::default() };
134///
135/// for e in event_stream_iter(matrix, prev, events.into_iter(), &cfg, Some(&mut budget)) {
136///     println!("event {}: {} anomalies in {:?}", e.event_idx, e.anomalies.len(), e.latency);
137/// }
138/// # }
139/// ```
140pub fn event_stream_iter<'a, I>(
141    matrix: &'a dyn Matrix,
142    prev_solution: &'a [Precision],
143    events: I,
144    config: &'a EventStreamConfig,
145    budget: Option<&'a mut PlanBudget>,
146) -> EventStreamIter<'a, I>
147where
148    I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
149{
150    EventStreamIter {
151        matrix,
152        prev_solution,
153        events,
154        config,
155        budget,
156        idx: 0,
157        ended: false,
158    }
159}
160
161/// Iterator returned by [`event_stream_iter`].
162pub struct EventStreamIter<'a, I>
163where
164    I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
165{
166    matrix: &'a dyn Matrix,
167    prev_solution: &'a [Precision],
168    events: I,
169    config: &'a EventStreamConfig,
170    budget: Option<&'a mut PlanBudget>,
171    idx: usize,
172    ended: bool,
173}
174
175impl<'a, I> Iterator for EventStreamIter<'a, I>
176where
177    I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
178{
179    type Item = ProcessedEvent;
180
181    fn next(&mut self) -> Option<Self::Item> {
182        if self.ended {
183            return None;
184        }
185        let (delta, b_new) = self.events.next()?;
186        let idx = self.idx;
187        self.idx += 1;
188
189        let start = std_time_instant_now();
190
191        // (1) Skip gate (optional).
192        if let (Some(skip), Some(coh), Some(md)) = (
193            self.config.skip_threshold,
194            self.config.cached_coherence,
195            self.config.cached_min_diag,
196        ) {
197            if crate::coherence::delta_below_solve_threshold(coh, md, &delta.values, skip) {
198                let latency = std_time_instant_elapsed(start);
199                return Some(ProcessedEvent {
200                    event_idx: idx,
201                    anomalies: Vec::new(),
202                    latency,
203                    status: EventStatus::Skipped,
204                });
205            }
206        }
207
208        // (2) Budget gate (optional).
209        if let Some(budget) = self.budget.as_deref_mut() {
210            if budget.try_consume(ComplexityClass::SubLinear).is_err() {
211                self.ended = true;
212                let latency = std_time_instant_elapsed(start);
213                return Some(ProcessedEvent {
214                    event_idx: idx,
215                    anomalies: Vec::new(),
216                    latency,
217                    status: EventStatus::BudgetRefused,
218                });
219            }
220        }
221
222        // (3) SubLinear orchestrator.
223        let result: Result<Vec<AnomalyRow>> = contrastive_solve_on_change_sublinear_auto(
224            self.matrix,
225            self.prev_solution,
226            &b_new,
227            &delta,
228            self.config.tolerance,
229            self.config.k,
230        );
231        let latency = std_time_instant_elapsed(start);
232        match result {
233            Ok(anomalies) => Some(ProcessedEvent {
234                event_idx: idx,
235                anomalies,
236                latency,
237                status: EventStatus::Solved,
238            }),
239            Err(_e) => Some(ProcessedEvent {
240                event_idx: idx,
241                anomalies: Vec::new(),
242                latency,
243                status: EventStatus::Errored,
244            }),
245        }
246    }
247}
248
249// Wall-clock helpers. Gated to keep `no_std` builds compiling (the
250// iterator still works without timing — it just records Duration::ZERO).
251
252#[cfg(feature = "std")]
253fn std_time_instant_now() -> std::time::Instant {
254    std::time::Instant::now()
255}
256
257#[cfg(feature = "std")]
258fn std_time_instant_elapsed(start: std::time::Instant) -> Duration {
259    start.elapsed()
260}
261
262#[cfg(not(feature = "std"))]
263fn std_time_instant_now() {
264    // No-op stand-in.
265}
266
267#[cfg(not(feature = "std"))]
268fn std_time_instant_elapsed(_start: ()) -> Duration {
269    Duration::ZERO
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::matrix::SparseMatrix;
276    use crate::solver::{neumann::NeumannSolver, SolverAlgorithm, SolverOptions};
277
278    fn build_strong_ring(n: usize) -> SparseMatrix {
279        let mut t = Vec::new();
280        for i in 0..n {
281            t.push((i, i, 10.0_f64));
282            t.push((i, (i + 1) % n, 0.5));
283            t.push((i, (i + n - 1) % n, -0.5));
284        }
285        SparseMatrix::from_triplets(t, n, n).unwrap()
286    }
287
288    fn warmup_solve(m: &SparseMatrix, b: &[f64]) -> Vec<f64> {
289        let solver = NeumannSolver::new(64, 1e-12);
290        let opts = SolverOptions::default();
291        solver.solve(m, b, &opts).unwrap().solution
292    }
293
294    fn build_event(n: usize, idx: usize, dv: f64) -> (SparseDelta, Vec<f64>) {
295        let delta = SparseDelta::new(vec![idx], vec![dv]).unwrap();
296        let mut b = (0..n).map(|i| i as f64 + 1.0).collect::<Vec<_>>();
297        delta.apply_to(&mut b).unwrap();
298        (delta, b)
299    }
300
301    #[test]
302    fn op_class_is_sublinear() {
303        const _: () = assert!(matches!(
304            <EventStreamOp as Complexity>::CLASS,
305            ComplexityClass::SubLinear
306        ));
307    }
308
309    #[test]
310    fn empty_input_yields_empty_output() {
311        let n = 16;
312        let m = build_strong_ring(n);
313        let prev = warmup_solve(&m, &(0..n).map(|i| i as f64 + 1.0).collect::<Vec<_>>());
314        let cfg = EventStreamConfig::default();
315        let events: Vec<(SparseDelta, Vec<f64>)> = Vec::new();
316        let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
317        assert!(out.is_empty());
318    }
319
320    #[test]
321    fn yields_one_processed_event_per_input() {
322        let n = 16;
323        let m = build_strong_ring(n);
324        let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
325        let prev = warmup_solve(&m, &b_prev);
326        let cfg = EventStreamConfig {
327            tolerance: 1e-8,
328            k: 2,
329            ..Default::default()
330        };
331        let events = vec![
332            build_event(n, 3, 0.5),
333            build_event(n, 7, -0.3),
334            build_event(n, 11, 1.0),
335        ];
336        let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
337        assert_eq!(out.len(), 3);
338        for (i, e) in out.iter().enumerate() {
339            assert_eq!(e.event_idx, i);
340            assert_eq!(e.status, EventStatus::Solved);
341            assert!(!e.anomalies.is_empty());
342        }
343    }
344
345    #[test]
346    fn skip_gate_short_circuits_tiny_delta() {
347        // Cache coherence + min_diag so the gate can run.
348        let n = 16;
349        let m = build_strong_ring(n);
350        let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
351        let prev = warmup_solve(&m, &b_prev);
352        let coh = crate::coherence::coherence_score(&m);
353        let min_diag = (0..n)
354            .map(|i| Matrix::get(&m, i, i).unwrap_or(0.0).abs())
355            .filter(|x| *x > 0.0)
356            .fold(f64::INFINITY, |a, b| if a < b { a } else { b });
357        let cfg = EventStreamConfig {
358            tolerance: 1e-8,
359            k: 2,
360            skip_threshold: Some(1e-6),
361            cached_coherence: Some(coh),
362            cached_min_diag: Some(min_diag),
363        };
364        // Tiny delta → gate should skip.
365        let events = vec![build_event(n, 3, 1e-12)];
366        let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
367        assert_eq!(out.len(), 1);
368        assert_eq!(out[0].status, EventStatus::Skipped);
369        assert!(out[0].anomalies.is_empty());
370    }
371
372    #[test]
373    fn budget_terminates_stream_when_exhausted() {
374        let n = 16;
375        let m = build_strong_ring(n);
376        let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
377        let prev = warmup_solve(&m, &b_prev);
378        let cfg = EventStreamConfig::default();
379        let events = vec![
380            build_event(n, 3, 0.5),
381            build_event(n, 7, -0.3),
382            build_event(n, 11, 1.0),
383        ];
384        let mut budget = PlanBudget::new(ComplexityClass::SubLinear, 2);
385        let out: Vec<_> =
386            event_stream_iter(&m, &prev, events.into_iter(), &cfg, Some(&mut budget)).collect();
387        // Two Solved + one BudgetRefused, then iterator ends (3 total).
388        assert_eq!(out.len(), 3);
389        assert_eq!(out[0].status, EventStatus::Solved);
390        assert_eq!(out[1].status, EventStatus::Solved);
391        assert_eq!(out[2].status, EventStatus::BudgetRefused);
392        assert_eq!(budget.remaining_ops(), 0);
393    }
394}