use crate::lab::conformal::{HealthThresholdCalibrator, HealthThresholdConfig, ThresholdMode};
use crate::lab::oracle::eprocess::{EProcess, EProcessConfig};
use crate::raptorq::decoder::DecodeStats;
use serde::Serialize;
use serde_json::to_string as json_string;
use std::collections::BTreeMap;
pub const G8_SCHEMA_VERSION: &str = "raptorq-g8-anytime-regression-v1";
pub const G8_REPLAY_REF: &str = "replay:rq-track-g-regression-v1";
const MIN_CALIBRATION_SAMPLES: usize = 10;
const CONFORMAL_ALPHA: f64 = 0.05;
const EPROCESS_P0: f64 = 0.10;
const EPROCESS_LAMBDA: f64 = 0.5;
const EPROCESS_ALPHA: f64 = 0.05;
const TRACKED_METRICS: &[&str] = &[
"gauss_ops",
"dense_core_rows",
"dense_core_cols",
"inactivated",
"pivots_selected",
"peel_frontier_peak",
];
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum RegressionVerdict {
Accept,
Calibrating,
Warning,
Regressed,
}
impl RegressionVerdict {
#[must_use]
#[inline]
pub const fn label(self) -> &'static str {
match self {
Self::Calibrating => "calibrating",
Self::Accept => "accept",
Self::Warning => "warning",
Self::Regressed => "regressed",
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct MetricRegressionResult {
pub metric: String,
pub value: f64,
pub threshold: Option<f64>,
pub e_value: f64,
pub calibration_n: usize,
pub exceeds_threshold: bool,
pub verdict: RegressionVerdict,
}
#[derive(Debug, Clone, Serialize)]
pub struct RegressionReport {
pub schema_version: &'static str,
pub replay_ref: &'static str,
pub metrics: Vec<MetricRegressionResult>,
pub overall_verdict: RegressionVerdict,
pub total_observations: usize,
pub regressed_count: usize,
pub warning_count: usize,
pub regime_state: Option<String>,
}
pub struct RegressionMonitor {
calibrator: HealthThresholdCalibrator,
e_processes: BTreeMap<String, EProcess>,
total_observations: usize,
calibration_complete: bool,
}
impl RegressionMonitor {
#[must_use]
#[inline]
pub fn new() -> Self {
let config = HealthThresholdConfig::new(CONFORMAL_ALPHA, ThresholdMode::Upper)
.min_samples(MIN_CALIBRATION_SAMPLES);
let calibrator = HealthThresholdCalibrator::new(config);
let eprocess_config = EProcessConfig {
p0: EPROCESS_P0,
lambda: EPROCESS_LAMBDA,
alpha: EPROCESS_ALPHA,
..Default::default()
};
let mut e_processes = BTreeMap::new();
for &metric in TRACKED_METRICS {
e_processes.insert(
metric.to_string(),
EProcess::new_without_history(metric, eprocess_config.clone()),
);
}
Self {
calibrator,
e_processes,
total_observations: 0,
calibration_complete: false,
}
}
pub fn calibrate(&mut self, stats: &DecodeStats) {
let values = Self::extract_metrics(stats);
for (metric, value) in &values {
self.calibrator.calibrate(metric, *value);
}
self.total_observations += 1;
if !self.calibration_complete {
self.calibration_complete = TRACKED_METRICS
.iter()
.all(|m| self.calibrator.is_metric_calibrated(m));
}
}
#[must_use]
pub fn check(&mut self, stats: &DecodeStats) -> RegressionReport {
let values = Self::extract_metrics(stats);
let mut results = Vec::with_capacity(TRACKED_METRICS.len());
let mut overall_verdict = RegressionVerdict::Accept;
let mut regressed_count = 0usize;
let mut warning_count = 0usize;
self.total_observations += 1;
for (metric, value) in &values {
let check_result = if self.calibrator.threshold(metric).is_some() {
self.calibrator.check_and_track(metric, *value)
} else {
self.calibrator.calibrate(metric, *value);
None
};
let (threshold, exceeds_threshold, calibration_n) =
check_result.as_ref().map_or((None, false, 0), |cr| {
(Some(cr.threshold), !cr.conforming, cr.calibration_n)
});
if threshold.is_some()
&& let Some(ep) = self.e_processes.get_mut(metric.as_str())
{
ep.observe(exceeds_threshold);
}
let e_value = self
.e_processes
.get(metric.as_str())
.map_or(1.0, EProcess::e_value);
let e_rejected = self
.e_processes
.get(metric.as_str())
.is_some_and(|ep| ep.rejected);
let verdict = if threshold.is_none() {
RegressionVerdict::Calibrating
} else if e_rejected {
RegressionVerdict::Regressed
} else if exceeds_threshold {
RegressionVerdict::Warning
} else {
RegressionVerdict::Accept
};
match verdict {
RegressionVerdict::Regressed => regressed_count += 1,
RegressionVerdict::Warning => warning_count += 1,
_ => {}
}
if (verdict as u8) > (overall_verdict as u8) {
overall_verdict = verdict;
}
results.push(MetricRegressionResult {
metric: metric.clone(),
value: *value,
threshold,
e_value,
calibration_n,
exceeds_threshold,
verdict,
});
}
if !self.calibration_complete {
self.calibration_complete = TRACKED_METRICS
.iter()
.all(|metric| self.calibrator.is_metric_calibrated(metric));
}
let regime_state = stats
.policy_mode
.map(ToString::to_string)
.or_else(|| stats.hard_regime_branch.map(ToString::to_string))
.or_else(|| {
stats
.hard_regime_activated
.then(|| "hard-regime".to_string())
});
RegressionReport {
schema_version: G8_SCHEMA_VERSION,
replay_ref: G8_REPLAY_REF,
metrics: results,
overall_verdict,
total_observations: self.total_observations,
regressed_count,
warning_count,
regime_state,
}
}
#[must_use]
pub fn is_calibrated(&self) -> bool {
self.calibration_complete
}
#[must_use]
pub fn total_observations(&self) -> usize {
self.total_observations
}
#[must_use]
pub fn e_value(&self, metric: &str) -> Option<f64> {
self.e_processes.get(metric).map(EProcess::e_value)
}
#[must_use]
pub fn threshold(&self, metric: &str) -> Option<f64> {
self.calibrator.threshold(metric)
}
#[must_use]
pub fn any_regressed(&self) -> bool {
self.e_processes.values().any(|ep| ep.rejected)
}
#[must_use]
pub fn regressed_metrics(&self) -> Vec<String> {
self.e_processes
.iter()
.filter(|(_, ep)| ep.rejected)
.map(|(name, _)| name.clone())
.collect()
}
pub fn reset_evidence(&mut self) {
for ep in self.e_processes.values_mut() {
ep.reset();
}
}
#[allow(clippy::cast_precision_loss)]
fn extract_metrics(stats: &DecodeStats) -> Vec<(String, f64)> {
vec![
("gauss_ops".to_string(), stats.gauss_ops as f64),
("dense_core_rows".to_string(), stats.dense_core_rows as f64),
("dense_core_cols".to_string(), stats.dense_core_cols as f64),
("inactivated".to_string(), stats.inactivated as f64),
("pivots_selected".to_string(), stats.pivots_selected as f64),
(
"peel_frontier_peak".to_string(),
stats.peel_frontier_peak as f64,
),
]
}
}
impl Default for RegressionMonitor {
fn default() -> Self {
Self::new()
}
}
#[must_use]
pub fn regression_log_lines(report: &RegressionReport) -> Vec<String> {
report
.metrics
.iter()
.map(|result| {
let schema_version =
json_string(report.schema_version).expect("schema version should serialize");
let replay_ref = json_string(report.replay_ref).expect("replay ref should serialize");
let metric = json_string(&result.metric).expect("metric should serialize");
let regime_state = json_string(report.regime_state.as_deref().unwrap_or("unknown"))
.expect("regime state should serialize");
let verdict = json_string(result.verdict.label()).expect("verdict should serialize");
format!(
"{{\"schema_version\":{},\"replay_ref\":{},\
\"metric\":{},\"value\":{:.3},\"threshold\":{},\
\"e_value\":{:.6},\"exceeds_threshold\":{},\
\"verdict\":{},\"calibration_n\":{},\
\"total_observations\":{},\"regime_state\":{}}}",
schema_version,
replay_ref,
metric,
result.value,
result
.threshold
.map_or_else(|| "null".to_string(), |t| format!("{t:.3}")),
result.e_value,
result.exceeds_threshold,
verdict,
result.calibration_n,
report.total_observations,
regime_state,
)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
fn make_baseline_stats(gauss_ops: usize, inactivated: usize) -> DecodeStats {
DecodeStats {
gauss_ops,
inactivated,
dense_core_rows: gauss_ops / 2,
dense_core_cols: gauss_ops / 3,
pivots_selected: inactivated,
peel_frontier_peak: 4,
policy_mode: Some("stable"),
..Default::default()
}
}
#[test]
fn monitor_starts_in_calibrating_state() {
let mut monitor = RegressionMonitor::new();
let stats = make_baseline_stats(10, 3);
let report = monitor.check(&stats);
assert_eq!(
report.overall_verdict,
RegressionVerdict::Calibrating,
"should be calibrating before min samples"
);
for metric in TRACKED_METRICS {
assert!(
(monitor.e_value(metric).expect("tracked metric") - 1.0).abs() < f64::EPSILON,
"pre-calibration check should not mutate e-value for {metric}"
);
}
}
#[test]
fn check_only_warmup_marks_monitor_calibrated() {
let mut monitor = RegressionMonitor::new();
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10 + i % 3, 3 + i % 2);
let _ = monitor.check(&stats);
}
assert!(
monitor.is_calibrated(),
"check-only warmup should flip the public calibration state once all metrics are calibrated"
);
assert!(
TRACKED_METRICS
.iter()
.all(|metric| monitor.threshold(metric).is_some()),
"every tracked metric should expose a threshold after check-only warmup"
);
}
#[test]
fn regression_log_lines_render_one_line_per_metric() {
let report = RegressionReport {
schema_version: G8_SCHEMA_VERSION,
replay_ref: G8_REPLAY_REF,
metrics: vec![MetricRegressionResult {
metric: "gauss_ops".to_string(),
value: 12.0,
threshold: Some(15.0),
e_value: 1.25,
calibration_n: 10,
exceeds_threshold: false,
verdict: RegressionVerdict::Accept,
}],
overall_verdict: RegressionVerdict::Accept,
total_observations: 11,
regressed_count: 0,
warning_count: 0,
regime_state: Some("stable".to_string()),
};
let lines = regression_log_lines(&report);
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("\"metric\":\"gauss_ops\""));
assert!(lines[0].contains("\"verdict\":\"accept\""));
assert!(lines[0].contains("\"regime_state\":\"stable\""));
}
#[test]
fn regression_log_lines_escape_string_fields() {
let regime_state = "retuned\\fallback\"line\nbreak".to_string();
let metric = "gauss\"ops\nburst".to_string();
let report = RegressionReport {
schema_version: G8_SCHEMA_VERSION,
replay_ref: G8_REPLAY_REF,
metrics: vec![MetricRegressionResult {
metric: metric.clone(),
value: 12.0,
threshold: Some(15.0),
e_value: 1.25,
calibration_n: 10,
exceeds_threshold: false,
verdict: RegressionVerdict::Accept,
}],
overall_verdict: RegressionVerdict::Accept,
total_observations: 11,
regressed_count: 0,
warning_count: 0,
regime_state: Some(regime_state.clone()),
};
let lines = regression_log_lines(&report);
let parsed: serde_json::Value =
serde_json::from_str(&lines[0]).expect("rendered line must stay valid JSON");
assert_eq!(parsed["metric"].as_str(), Some(metric.as_str()));
assert_eq!(parsed["regime_state"].as_str(), Some(regime_state.as_str()));
}
#[test]
fn monitor_transitions_to_accept_after_calibration() {
let mut monitor = RegressionMonitor::new();
for i in 0..MIN_CALIBRATION_SAMPLES + 5 {
let stats = make_baseline_stats(10 + i % 3, 3);
monitor.calibrate(&stats);
}
assert!(
monitor.is_calibrated(),
"should be calibrated after {} samples",
MIN_CALIBRATION_SAMPLES + 5
);
let stats = make_baseline_stats(11, 3);
let report = monitor.check(&stats);
assert_eq!(
report.overall_verdict,
RegressionVerdict::Accept,
"normal observation should be accepted"
);
}
#[test]
fn monitor_detects_warning_on_threshold_exceedance() {
let mut monitor = RegressionMonitor::new();
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10 + i % 2, 3);
monitor.calibrate(&stats);
}
let stats = make_baseline_stats(1000, 3);
let report = monitor.check(&stats);
assert!(
matches!(
report.overall_verdict,
RegressionVerdict::Warning | RegressionVerdict::Regressed
),
"large deviation should trigger warning or regression, got {:?}",
report.overall_verdict
);
}
#[test]
fn monitor_accumulates_evidence_for_regression() {
let mut monitor = RegressionMonitor::new();
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10 + i % 2, 3);
monitor.calibrate(&stats);
}
let mut any_regressed = false;
for _ in 0..200 {
let stats = make_baseline_stats(1000, 100);
let report = monitor.check(&stats);
if report.overall_verdict == RegressionVerdict::Regressed {
any_regressed = true;
break;
}
}
assert!(
any_regressed,
"sustained large deviation should trigger regression detection"
);
assert!(monitor.any_regressed());
assert!(!monitor.regressed_metrics().is_empty());
}
#[test]
fn pre_calibration_checks_do_not_dilute_later_regression_evidence() {
let mut clean = RegressionMonitor::new();
let mut with_prechecks = RegressionMonitor::new();
for i in 0..MIN_CALIBRATION_SAMPLES {
let stats = make_baseline_stats(10 + i % 2, 3);
clean.calibrate(&stats);
let report = with_prechecks.check(&stats);
assert_eq!(
report.overall_verdict,
RegressionVerdict::Calibrating,
"warmup observations fed via check() must remain calibration-only until activation"
);
}
let anomaly = make_baseline_stats(1_000, 100);
let clean_report = clean.check(&anomaly);
let prechecked_report = with_prechecks.check(&anomaly);
assert_eq!(
clean_report.overall_verdict, prechecked_report.overall_verdict,
"pre-calibration checks should not change the first live verdict"
);
for metric in TRACKED_METRICS {
let clean_e = clean.e_value(metric).expect("tracked metric");
let prechecked_e = with_prechecks.e_value(metric).expect("tracked metric");
assert!(
(clean_e - prechecked_e).abs() < f64::EPSILON,
"pre-calibration checks should not dilute e-process evidence for {metric}: clean={clean_e}, prechecked={prechecked_e}"
);
}
}
#[test]
fn metamorphic_decode_tolerance_bound_is_monotone_at_threshold_edge() {
fn calibrated_monitor() -> RegressionMonitor {
let mut monitor = RegressionMonitor::new();
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10 + i % 3, 3);
monitor.calibrate(&stats);
}
monitor
}
let threshold_probe = calibrated_monitor();
let threshold = threshold_probe
.threshold("gauss_ops")
.expect("gauss_ops threshold should be calibrated");
let tolerated_value = threshold.floor().max(10.0) as usize;
let violating_value = threshold.ceil() as usize + 1;
let mut tolerated_stats = make_baseline_stats(10, 3);
tolerated_stats.gauss_ops = tolerated_value;
let mut violating_stats = tolerated_stats.clone();
violating_stats.gauss_ops = violating_value;
let tolerated_report = calibrated_monitor().check(&tolerated_stats);
let violating_report = calibrated_monitor().check(&violating_stats);
let tolerated_metric = tolerated_report
.metrics
.iter()
.find(|metric| metric.metric == "gauss_ops")
.expect("gauss_ops metric missing from tolerated report");
let violating_metric = violating_report
.metrics
.iter()
.find(|metric| metric.metric == "gauss_ops")
.expect("gauss_ops metric missing from violating report");
assert!(
!tolerated_metric.exceeds_threshold,
"value {tolerated_value} should remain within threshold {threshold}"
);
assert_eq!(
tolerated_metric.verdict,
RegressionVerdict::Accept,
"within-threshold observation should be accepted"
);
assert!(
violating_metric.exceeds_threshold,
"value {violating_value} should exceed threshold {threshold}"
);
assert!(
matches!(
violating_metric.verdict,
RegressionVerdict::Warning | RegressionVerdict::Regressed
),
"threshold violation should escalate verdict, got {:?}",
violating_metric.verdict
);
assert!(
(violating_report.overall_verdict as u8) >= (tolerated_report.overall_verdict as u8),
"crossing the learned tolerance bound must not lower overall severity"
);
}
#[test]
fn monitor_stable_workload_no_false_alarm() {
let mut monitor = RegressionMonitor::new();
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10 + i % 3, 3 + i % 2);
monitor.calibrate(&stats);
}
for i in 0..100 {
let stats = make_baseline_stats(10 + i % 3, 3 + i % 2);
let report = monitor.check(&stats);
assert_ne!(
report.overall_verdict,
RegressionVerdict::Regressed,
"stable workload should not trigger false alarm at check {i}"
);
}
assert!(
!monitor.any_regressed(),
"stable workload should have no regressions"
);
}
#[test]
fn monitor_reset_evidence_clears_e_processes() {
let mut monitor = RegressionMonitor::new();
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10, 3 + i % 2);
monitor.calibrate(&stats);
}
for _ in 0..200 {
let stats = make_baseline_stats(1000, 100);
let _ = monitor.check(&stats);
}
monitor.reset_evidence();
assert!(
!monitor.any_regressed(),
"reset should clear regression state"
);
let stats = make_baseline_stats(11, 3);
let report = monitor.check(&stats);
assert_ne!(
report.overall_verdict,
RegressionVerdict::Regressed,
"post-reset should not show regression"
);
}
#[test]
fn regression_report_schema_and_replay_ref() {
let mut monitor = RegressionMonitor::new();
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10 + i, 3);
monitor.calibrate(&stats);
}
let stats = make_baseline_stats(10, 3);
let report = monitor.check(&stats);
assert_eq!(report.schema_version, G8_SCHEMA_VERSION);
assert_eq!(report.replay_ref, G8_REPLAY_REF);
assert_eq!(report.metrics.len(), TRACKED_METRICS.len());
}
#[test]
fn verdict_ordering_is_correct() {
assert!((RegressionVerdict::Accept as u8) < (RegressionVerdict::Calibrating as u8));
assert!((RegressionVerdict::Calibrating as u8) < (RegressionVerdict::Warning as u8));
assert!((RegressionVerdict::Warning as u8) < (RegressionVerdict::Regressed as u8));
}
#[test]
fn e_value_and_threshold_accessors() {
let mut monitor = RegressionMonitor::new();
assert!(monitor.threshold("gauss_ops").is_none());
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10 + i, 3);
monitor.calibrate(&stats);
}
assert!(monitor.threshold("gauss_ops").is_some());
assert!(monitor.e_value("gauss_ops").is_some());
}
#[test]
fn regime_state_covariate_captured() {
let mut monitor = RegressionMonitor::new();
for i in 0..(MIN_CALIBRATION_SAMPLES + 5) {
let stats = make_baseline_stats(10 + i, 3);
monitor.calibrate(&stats);
}
let mut stats = make_baseline_stats(10, 3);
stats.policy_mode = Some("retuned");
let report = monitor.check(&stats);
assert_eq!(
report.regime_state,
Some("retuned".to_string()),
"regime state should be captured from DecodeStats"
);
}
#[test]
fn deterministic_replay_produces_identical_reports() {
let observations: Vec<DecodeStats> = (0..50)
.map(|i| make_baseline_stats(10 + i % 5, 3 + i % 3))
.collect();
let mut mon_a = RegressionMonitor::new();
let mut mon_b = RegressionMonitor::new();
for obs in observations.iter().take(MIN_CALIBRATION_SAMPLES + 5) {
mon_a.calibrate(obs);
mon_b.calibrate(obs);
}
for obs in observations.iter().skip(MIN_CALIBRATION_SAMPLES + 5) {
let report_a = mon_a.check(obs);
let report_b = mon_b.check(obs);
assert_eq!(
report_a.overall_verdict, report_b.overall_verdict,
"deterministic replay violated"
);
for (ra, rb) in report_a.metrics.iter().zip(report_b.metrics.iter()) {
assert_eq!(ra.metric, rb.metric);
assert!(
(ra.e_value - rb.e_value).abs() < f64::EPSILON,
"expected {}, got {}",
rb.e_value,
ra.e_value
);
assert_eq!(ra.verdict, rb.verdict);
}
}
}
}