use crate::budget::PlanBudget;
use crate::complexity::{Complexity, ComplexityClass};
use crate::contrastive::{contrastive_solve_on_change_sublinear_auto, AnomalyRow};
use crate::error::Result;
use crate::matrix::Matrix;
use crate::types::Precision;
use crate::SparseDelta;
use alloc::vec::Vec;
use core::time::Duration;
#[derive(Debug, Clone)]
pub struct ProcessedEvent {
pub event_idx: usize,
pub anomalies: Vec<AnomalyRow>,
pub latency: Duration,
pub status: EventStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EventStatus {
Solved,
Skipped,
BudgetRefused,
Errored,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct EventStreamOp;
impl Complexity for EventStreamOp {
const CLASS: ComplexityClass = ComplexityClass::SubLinear;
const DETAIL: &'static str =
"Per-event class is the auto-tuned SubLinear orchestrator's. The iterator wrapper \
adds O(1) gate probe + optional O(1) budget consume per event; aggregate cost is \
O(events · per_event_class).";
}
#[derive(Debug, Clone)]
pub struct EventStreamConfig {
pub tolerance: Precision,
pub k: usize,
pub skip_threshold: Option<Precision>,
pub cached_coherence: Option<Precision>,
pub cached_min_diag: Option<Precision>,
}
impl Default for EventStreamConfig {
fn default() -> Self {
Self {
tolerance: 1e-8,
k: 3,
skip_threshold: None,
cached_coherence: None,
cached_min_diag: None,
}
}
}
pub fn event_stream_iter<'a, I>(
matrix: &'a dyn Matrix,
prev_solution: &'a [Precision],
events: I,
config: &'a EventStreamConfig,
budget: Option<&'a mut PlanBudget>,
) -> EventStreamIter<'a, I>
where
I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
{
EventStreamIter {
matrix,
prev_solution,
events,
config,
budget,
idx: 0,
ended: false,
}
}
pub struct EventStreamIter<'a, I>
where
I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
{
matrix: &'a dyn Matrix,
prev_solution: &'a [Precision],
events: I,
config: &'a EventStreamConfig,
budget: Option<&'a mut PlanBudget>,
idx: usize,
ended: bool,
}
impl<'a, I> Iterator for EventStreamIter<'a, I>
where
I: Iterator<Item = (SparseDelta, Vec<Precision>)>,
{
type Item = ProcessedEvent;
fn next(&mut self) -> Option<Self::Item> {
if self.ended {
return None;
}
let (delta, b_new) = self.events.next()?;
let idx = self.idx;
self.idx += 1;
let start = std_time_instant_now();
if let (Some(skip), Some(coh), Some(md)) = (
self.config.skip_threshold,
self.config.cached_coherence,
self.config.cached_min_diag,
) {
if crate::coherence::delta_below_solve_threshold(coh, md, &delta.values, skip) {
let latency = std_time_instant_elapsed(start);
return Some(ProcessedEvent {
event_idx: idx,
anomalies: Vec::new(),
latency,
status: EventStatus::Skipped,
});
}
}
if let Some(budget) = self.budget.as_deref_mut() {
if budget.try_consume(ComplexityClass::SubLinear).is_err() {
self.ended = true;
let latency = std_time_instant_elapsed(start);
return Some(ProcessedEvent {
event_idx: idx,
anomalies: Vec::new(),
latency,
status: EventStatus::BudgetRefused,
});
}
}
let result: Result<Vec<AnomalyRow>> = contrastive_solve_on_change_sublinear_auto(
self.matrix,
self.prev_solution,
&b_new,
&delta,
self.config.tolerance,
self.config.k,
);
let latency = std_time_instant_elapsed(start);
match result {
Ok(anomalies) => Some(ProcessedEvent {
event_idx: idx,
anomalies,
latency,
status: EventStatus::Solved,
}),
Err(_e) => Some(ProcessedEvent {
event_idx: idx,
anomalies: Vec::new(),
latency,
status: EventStatus::Errored,
}),
}
}
}
#[cfg(feature = "std")]
fn std_time_instant_now() -> std::time::Instant {
std::time::Instant::now()
}
#[cfg(feature = "std")]
fn std_time_instant_elapsed(start: std::time::Instant) -> Duration {
start.elapsed()
}
#[cfg(not(feature = "std"))]
fn std_time_instant_now() {
}
#[cfg(not(feature = "std"))]
fn std_time_instant_elapsed(_start: ()) -> Duration {
Duration::ZERO
}
#[cfg(test)]
mod tests {
use super::*;
use crate::matrix::SparseMatrix;
use crate::solver::{neumann::NeumannSolver, SolverAlgorithm, SolverOptions};
fn build_strong_ring(n: usize) -> SparseMatrix {
let mut t = Vec::new();
for i in 0..n {
t.push((i, i, 10.0_f64));
t.push((i, (i + 1) % n, 0.5));
t.push((i, (i + n - 1) % n, -0.5));
}
SparseMatrix::from_triplets(t, n, n).unwrap()
}
fn warmup_solve(m: &SparseMatrix, b: &[f64]) -> Vec<f64> {
let solver = NeumannSolver::new(64, 1e-12);
let opts = SolverOptions::default();
solver.solve(m, b, &opts).unwrap().solution
}
fn build_event(n: usize, idx: usize, dv: f64) -> (SparseDelta, Vec<f64>) {
let delta = SparseDelta::new(vec![idx], vec![dv]).unwrap();
let mut b = (0..n).map(|i| i as f64 + 1.0).collect::<Vec<_>>();
delta.apply_to(&mut b).unwrap();
(delta, b)
}
#[test]
fn op_class_is_sublinear() {
const _: () = assert!(matches!(
<EventStreamOp as Complexity>::CLASS,
ComplexityClass::SubLinear
));
}
#[test]
fn empty_input_yields_empty_output() {
let n = 16;
let m = build_strong_ring(n);
let prev = warmup_solve(&m, &(0..n).map(|i| i as f64 + 1.0).collect::<Vec<_>>());
let cfg = EventStreamConfig::default();
let events: Vec<(SparseDelta, Vec<f64>)> = Vec::new();
let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
assert!(out.is_empty());
}
#[test]
fn yields_one_processed_event_per_input() {
let n = 16;
let m = build_strong_ring(n);
let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
let prev = warmup_solve(&m, &b_prev);
let cfg = EventStreamConfig {
tolerance: 1e-8,
k: 2,
..Default::default()
};
let events = vec![
build_event(n, 3, 0.5),
build_event(n, 7, -0.3),
build_event(n, 11, 1.0),
];
let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
assert_eq!(out.len(), 3);
for (i, e) in out.iter().enumerate() {
assert_eq!(e.event_idx, i);
assert_eq!(e.status, EventStatus::Solved);
assert!(!e.anomalies.is_empty());
}
}
#[test]
fn skip_gate_short_circuits_tiny_delta() {
let n = 16;
let m = build_strong_ring(n);
let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
let prev = warmup_solve(&m, &b_prev);
let coh = crate::coherence::coherence_score(&m);
let min_diag = (0..n)
.map(|i| Matrix::get(&m, i, i).unwrap_or(0.0).abs())
.filter(|x| *x > 0.0)
.fold(f64::INFINITY, |a, b| if a < b { a } else { b });
let cfg = EventStreamConfig {
tolerance: 1e-8,
k: 2,
skip_threshold: Some(1e-6),
cached_coherence: Some(coh),
cached_min_diag: Some(min_diag),
};
let events = vec![build_event(n, 3, 1e-12)];
let out: Vec<_> = event_stream_iter(&m, &prev, events.into_iter(), &cfg, None).collect();
assert_eq!(out.len(), 1);
assert_eq!(out[0].status, EventStatus::Skipped);
assert!(out[0].anomalies.is_empty());
}
#[test]
fn budget_terminates_stream_when_exhausted() {
let n = 16;
let m = build_strong_ring(n);
let b_prev: Vec<f64> = (0..n).map(|i| i as f64 + 1.0).collect();
let prev = warmup_solve(&m, &b_prev);
let cfg = EventStreamConfig::default();
let events = vec![
build_event(n, 3, 0.5),
build_event(n, 7, -0.3),
build_event(n, 11, 1.0),
];
let mut budget = PlanBudget::new(ComplexityClass::SubLinear, 2);
let out: Vec<_> =
event_stream_iter(&m, &prev, events.into_iter(), &cfg, Some(&mut budget)).collect();
assert_eq!(out.len(), 3);
assert_eq!(out[0].status, EventStatus::Solved);
assert_eq!(out[1].status, EventStatus::Solved);
assert_eq!(out[2].status, EventStatus::BudgetRefused);
assert_eq!(budget.remaining_ops(), 0);
}
}