#![forbid(unsafe_code)]
use std::collections::VecDeque;
const SIGMA_MIN: f64 = 1e-9;
const E_MIN: f64 = 1e-100;
const E_MAX: f64 = 1e100;
const DEFAULT_ALPHA: f64 = 0.05;
const DEFAULT_LAMBDA: f64 = 0.5;
#[derive(Debug, Clone)]
pub struct FlakeConfig {
pub alpha: f64,
pub lambda: f64,
pub sigma: f64,
pub variance_window: usize,
pub min_observations: usize,
pub enable_logging: bool,
pub threshold: Option<f64>,
}
impl Default for FlakeConfig {
fn default() -> Self {
Self {
alpha: DEFAULT_ALPHA,
lambda: DEFAULT_LAMBDA,
sigma: 1.0,
variance_window: 50,
min_observations: 3,
enable_logging: false,
threshold: None,
}
}
}
impl FlakeConfig {
#[must_use]
pub fn new(alpha: f64) -> Self {
Self {
alpha: alpha.clamp(1e-10, 0.5),
..Default::default()
}
}
#[must_use]
pub fn with_lambda(mut self, lambda: f64) -> Self {
self.lambda = lambda.clamp(0.01, 2.0);
self
}
#[must_use]
pub fn with_sigma(mut self, sigma: f64) -> Self {
self.sigma = sigma.max(SIGMA_MIN);
self
}
#[must_use]
pub fn with_variance_window(mut self, window: usize) -> Self {
self.variance_window = window;
self
}
#[must_use]
pub fn with_min_observations(mut self, min: usize) -> Self {
self.min_observations = min.max(1);
self
}
#[must_use]
pub fn with_logging(mut self, enabled: bool) -> Self {
self.enable_logging = enabled;
self
}
#[must_use]
pub fn threshold(&self) -> f64 {
self.threshold.unwrap_or(1.0 / self.alpha)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct FlakeDecision {
pub is_flaky: bool,
pub e_value: f64,
pub threshold: f64,
pub observation_count: usize,
pub variance_estimate: f64,
pub warmed_up: bool,
}
impl FlakeDecision {
#[must_use]
pub fn should_fail(&self) -> bool {
self.is_flaky && self.warmed_up
}
}
#[derive(Debug, Clone)]
pub struct EvidenceLog {
pub observation_idx: usize,
pub residual: f64,
pub e_increment: f64,
pub e_cumulative: f64,
pub variance: f64,
pub decision: bool,
}
impl EvidenceLog {
#[must_use]
pub fn to_jsonl(&self) -> String {
format!(
r#"{{"idx":{},"residual":{:.6},"e_inc":{:.6},"e_cum":{:.6},"var":{:.6},"decision":{}}}"#,
self.observation_idx,
self.residual,
self.e_increment,
self.e_cumulative,
self.variance,
self.decision
)
}
}
#[derive(Debug, Clone)]
pub struct FlakeDetector {
config: FlakeConfig,
e_cumulative: f64,
observation_count: usize,
variance_window: VecDeque<f64>,
evidence_log: Vec<EvidenceLog>,
first_flaky_at: Option<usize>,
max_e_value: f64,
}
impl FlakeDetector {
#[must_use]
pub fn new(config: FlakeConfig) -> Self {
let capacity = if config.variance_window > 0 {
config.variance_window
} else {
1
};
Self {
config,
e_cumulative: 1.0, observation_count: 0,
variance_window: VecDeque::with_capacity(capacity),
evidence_log: Vec::new(),
first_flaky_at: None,
max_e_value: 1.0,
}
}
pub fn observe(&mut self, residual: f64) -> FlakeDecision {
if residual.is_nan() {
return FlakeDecision {
is_flaky: false,
e_value: self.e_cumulative,
threshold: self.config.threshold(),
observation_count: self.observation_count,
variance_estimate: self.current_sigma().powi(2),
warmed_up: self.observation_count >= self.config.min_observations,
};
}
self.observation_count += 1;
self.update_variance(residual);
let sigma = self.current_sigma();
let lambda = self.config.lambda;
let exponent = lambda * residual - (lambda * lambda * sigma * sigma) / 2.0;
let e_increment = exponent.exp().clamp(E_MIN, E_MAX);
self.e_cumulative = (self.e_cumulative * e_increment).clamp(E_MIN, E_MAX);
let threshold = self.config.threshold();
let is_flaky = self.e_cumulative > threshold;
let warmed_up = self.observation_count >= self.config.min_observations;
let decision = is_flaky && warmed_up;
if decision && self.first_flaky_at.is_none() {
self.first_flaky_at = Some(self.observation_count);
}
self.max_e_value = self.max_e_value.max(self.e_cumulative);
if self.config.enable_logging {
self.evidence_log.push(EvidenceLog {
observation_idx: self.observation_count,
residual,
e_increment,
e_cumulative: self.e_cumulative,
variance: sigma * sigma,
decision,
});
}
FlakeDecision {
is_flaky,
e_value: self.e_cumulative,
threshold,
observation_count: self.observation_count,
variance_estimate: sigma * sigma,
warmed_up,
}
}
pub fn observe_batch(&mut self, residuals: &[f64]) -> FlakeDecision {
let mut decision = FlakeDecision {
is_flaky: false,
e_value: self.e_cumulative,
threshold: self.config.threshold(),
observation_count: self.observation_count,
variance_estimate: self.current_sigma().powi(2),
warmed_up: false,
};
for &r in residuals {
decision = self.observe(r);
if decision.should_fail() {
break; }
}
decision
}
pub fn reset(&mut self) {
self.e_cumulative = 1.0;
self.observation_count = 0;
self.variance_window.clear();
self.evidence_log.clear();
self.first_flaky_at = None;
self.max_e_value = 1.0;
}
#[must_use]
pub fn e_value(&self) -> f64 {
self.e_cumulative
}
#[must_use]
pub fn observation_count(&self) -> usize {
self.observation_count
}
#[must_use]
pub fn is_warmed_up(&self) -> bool {
self.observation_count >= self.config.min_observations
}
#[must_use]
pub fn evidence_log(&self) -> &[EvidenceLog] {
&self.evidence_log
}
#[must_use]
pub fn evidence_to_jsonl(&self) -> String {
self.evidence_log
.iter()
.map(|e| e.to_jsonl())
.collect::<Vec<_>>()
.join("\n")
}
#[must_use]
pub fn current_sigma(&self) -> f64 {
if self.config.variance_window == 0 || self.variance_window.len() < 2 {
return self.config.sigma.max(SIGMA_MIN);
}
let n = self.variance_window.len() as f64;
let mean = self.variance_window.iter().sum::<f64>() / n;
let variance = self
.variance_window
.iter()
.map(|&x| {
let diff = x - mean;
diff * diff
})
.sum::<f64>()
/ (n - 1.0);
variance.sqrt().max(SIGMA_MIN)
}
fn update_variance(&mut self, residual: f64) {
if self.config.variance_window == 0 {
return;
}
if self.variance_window.len() >= self.config.variance_window {
self.variance_window.pop_front();
}
self.variance_window.push_back(residual);
}
#[must_use]
pub fn config(&self) -> &FlakeConfig {
&self.config
}
}
impl Default for FlakeDetector {
fn default() -> Self {
Self::new(FlakeConfig::default())
}
}
#[derive(Debug, Clone)]
pub struct FlakeSummary {
pub total_observations: usize,
pub final_e_value: f64,
pub is_flaky: bool,
pub first_flaky_at: Option<usize>,
pub max_e_value: f64,
pub threshold: f64,
}
impl FlakeDetector {
#[must_use]
pub fn summary(&self) -> FlakeSummary {
FlakeSummary {
total_observations: self.observation_count,
final_e_value: self.e_cumulative,
is_flaky: self.e_cumulative > self.config.threshold(),
first_flaky_at: self.first_flaky_at,
max_e_value: self.max_e_value,
threshold: self.config.threshold(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn unit_eprocess_threshold() {
let config = FlakeConfig::new(0.05).with_min_observations(1);
let mut detector = FlakeDetector::new(config);
for _ in 0..20 {
let decision = detector.observe(3.0); if decision.should_fail() {
assert!(decision.e_value > decision.threshold);
return;
}
}
let decision = detector.observe(0.0);
assert!(
decision.e_value > decision.threshold || !decision.is_flaky,
"Should either have triggered or not be flaky"
);
}
#[test]
fn unit_eprocess_nonnegative() {
let mut detector = FlakeDetector::default();
let residuals = [-5.0, -2.0, 0.0, 2.0, 5.0, -10.0, 10.0];
for r in residuals {
let decision = detector.observe(r);
assert!(
decision.e_value > 0.0,
"E-value must be positive, got {}",
decision.e_value
);
}
}
#[test]
fn unit_optional_stopping() {
let config = FlakeConfig::new(0.05)
.with_lambda(0.3)
.with_min_observations(1)
.with_logging(true);
let mut detector = FlakeDetector::new(config);
let stable_residuals: Vec<f64> = (0..100).map(|i| (i as f64 * 0.1).sin() * 0.1).collect();
let decision = detector.observe_batch(&stable_residuals);
assert!(
decision.e_value <= decision.threshold * 2.0 || !decision.should_fail(),
"Stable run should rarely trigger flakiness"
);
}
#[test]
fn unit_stable_run_no_false_positives() {
let config = FlakeConfig::new(0.05)
.with_sigma(1.0)
.with_lambda(0.5)
.with_min_observations(3);
let mut detector = FlakeDetector::new(config);
for _ in 0..50 {
let decision = detector.observe(0.0);
assert!(
!decision.should_fail(),
"Zero residuals should never trigger flakiness"
);
}
}
#[test]
fn unit_spike_detection() {
let config = FlakeConfig::new(0.05)
.with_sigma(1.0)
.with_lambda(0.5)
.with_min_observations(3)
.with_logging(true);
let mut detector = FlakeDetector::new(config);
for _ in 0..5 {
detector.observe(0.1);
}
let mut detected = false;
for _ in 0..20 {
let decision = detector.observe(5.0); if decision.should_fail() {
detected = true;
break;
}
}
assert!(detected, "Should detect sustained spike");
}
#[test]
fn unit_reset() {
let mut detector = FlakeDetector::default();
detector.observe(1.0);
detector.observe(2.0);
assert_eq!(detector.observation_count(), 2);
detector.reset();
assert_eq!(detector.observation_count(), 0);
assert!((detector.e_value() - 1.0).abs() < 1e-10);
}
#[test]
fn unit_variance_estimation() {
let config = FlakeConfig::default().with_variance_window(10);
let mut detector = FlakeDetector::new(config);
for _ in 0..20 {
detector.observe(1.0);
}
let sigma = detector.current_sigma();
assert!(
sigma < 0.1 || (sigma - 1.0).abs() < 0.5,
"Variance should converge"
);
}
#[test]
fn unit_evidence_log() {
let config = FlakeConfig::default()
.with_logging(true)
.with_min_observations(1);
let mut detector = FlakeDetector::new(config);
detector.observe(0.5);
detector.observe(1.0);
detector.observe(-0.5);
assert_eq!(detector.evidence_log().len(), 3);
let jsonl = detector.evidence_to_jsonl();
assert!(jsonl.contains("\"idx\":1"));
assert!(jsonl.contains("\"idx\":2"));
assert!(jsonl.contains("\"idx\":3"));
}
#[test]
fn unit_summary() {
let config = FlakeConfig::default()
.with_logging(true)
.with_min_observations(1);
let mut detector = FlakeDetector::new(config);
for _ in 0..10 {
detector.observe(0.1);
}
let summary = detector.summary();
assert_eq!(summary.total_observations, 10);
assert!(summary.final_e_value > 0.0);
assert!(summary.threshold > 0.0);
}
#[test]
fn unit_batch_observe() {
let config = FlakeConfig::default().with_min_observations(1);
let mut detector = FlakeDetector::new(config);
let residuals = vec![0.1, 0.2, 0.3, 0.4, 0.5];
let decision = detector.observe_batch(&residuals);
assert_eq!(decision.observation_count, 5);
}
#[test]
fn unit_config_builder() {
let config = FlakeConfig::new(0.01)
.with_lambda(0.3)
.with_sigma(2.0)
.with_variance_window(100)
.with_min_observations(5)
.with_logging(true);
assert!((config.alpha - 0.01).abs() < 1e-10);
assert!((config.lambda - 0.3).abs() < 1e-10);
assert!((config.sigma - 2.0).abs() < 1e-10);
assert_eq!(config.variance_window, 100);
assert_eq!(config.min_observations, 5);
assert!(config.enable_logging);
assert!((config.threshold() - 100.0).abs() < 1e-10);
}
#[test]
fn unit_numerical_stability() {
let mut detector = FlakeDetector::default();
for _ in 0..10 {
let decision = detector.observe(1000.0);
assert!(decision.e_value.is_finite());
assert!(decision.e_value > 0.0);
}
detector.reset();
for _ in 0..10 {
let decision = detector.observe(-1000.0);
assert!(decision.e_value.is_finite());
assert!(decision.e_value > 0.0);
}
}
#[test]
fn config_default_values() {
let config = FlakeConfig::default();
assert!((config.alpha - DEFAULT_ALPHA).abs() < f64::EPSILON);
assert!((config.lambda - DEFAULT_LAMBDA).abs() < f64::EPSILON);
assert!((config.sigma - 1.0).abs() < f64::EPSILON);
assert_eq!(config.variance_window, 50);
assert_eq!(config.min_observations, 3);
assert!(!config.enable_logging);
assert!(config.threshold.is_none());
}
#[test]
fn config_threshold_computed_from_alpha() {
let config = FlakeConfig::new(0.05);
assert!((config.threshold() - 20.0).abs() < 1e-10);
}
#[test]
fn config_threshold_override() {
let mut config = FlakeConfig::new(0.05);
config.threshold = Some(42.0);
assert!((config.threshold() - 42.0).abs() < f64::EPSILON);
}
#[test]
fn config_new_clamps_alpha_low() {
let config = FlakeConfig::new(0.0);
assert!(config.alpha >= 1e-10);
}
#[test]
fn config_new_clamps_alpha_high() {
let config = FlakeConfig::new(1.0);
assert!(config.alpha <= 0.5);
}
#[test]
fn config_with_lambda_clamps_low() {
let config = FlakeConfig::default().with_lambda(0.0);
assert!(config.lambda >= 0.01);
}
#[test]
fn config_with_lambda_clamps_high() {
let config = FlakeConfig::default().with_lambda(100.0);
assert!(config.lambda <= 2.0);
}
#[test]
fn config_with_sigma_clamps_to_min() {
let config = FlakeConfig::default().with_sigma(0.0);
assert!(config.sigma >= SIGMA_MIN);
}
#[test]
fn config_with_min_observations_clamps_to_one() {
let config = FlakeConfig::default().with_min_observations(0);
assert!(config.min_observations >= 1);
}
#[test]
fn decision_should_fail_requires_both_flaky_and_warmed_up() {
let d1 = FlakeDecision {
is_flaky: true,
warmed_up: false,
e_value: 100.0,
threshold: 20.0,
observation_count: 1,
variance_estimate: 1.0,
};
assert!(!d1.should_fail());
let d2 = FlakeDecision {
is_flaky: false,
warmed_up: true,
e_value: 1.0,
threshold: 20.0,
observation_count: 5,
variance_estimate: 1.0,
};
assert!(!d2.should_fail());
let d3 = FlakeDecision {
is_flaky: true,
warmed_up: true,
e_value: 100.0,
threshold: 20.0,
observation_count: 5,
variance_estimate: 1.0,
};
assert!(d3.should_fail());
}
#[test]
fn evidence_log_to_jsonl_format() {
let log = EvidenceLog {
observation_idx: 3,
residual: 1.5,
e_increment: 2.1,
e_cumulative: 4.2,
variance: 0.9,
decision: true,
};
let jsonl = log.to_jsonl();
assert!(jsonl.contains("\"idx\":3"));
assert!(jsonl.contains("\"residual\":"));
assert!(jsonl.contains("\"e_inc\":"));
assert!(jsonl.contains("\"e_cum\":"));
assert!(jsonl.contains("\"var\":"));
assert!(jsonl.contains("\"decision\":true"));
}
#[test]
fn evidence_log_to_jsonl_false_decision() {
let log = EvidenceLog {
observation_idx: 1,
residual: 0.0,
e_increment: 1.0,
e_cumulative: 1.0,
variance: 1.0,
decision: false,
};
let jsonl = log.to_jsonl();
assert!(jsonl.contains("\"decision\":false"));
}
#[test]
fn detector_default_initial_state() {
let detector = FlakeDetector::default();
assert_eq!(detector.observation_count(), 0);
assert!((detector.e_value() - 1.0).abs() < f64::EPSILON);
assert!(!detector.is_warmed_up());
assert!(detector.evidence_log().is_empty());
}
#[test]
fn detector_config_accessor() {
let config = FlakeConfig::new(0.01).with_lambda(0.3);
let detector = FlakeDetector::new(config);
assert!((detector.config().alpha - 0.01).abs() < 1e-10);
assert!((detector.config().lambda - 0.3).abs() < 1e-10);
}
#[test]
fn detector_is_warmed_up_after_min_observations() {
let config = FlakeConfig::default().with_min_observations(3);
let mut detector = FlakeDetector::new(config);
assert!(!detector.is_warmed_up());
detector.observe(0.0);
detector.observe(0.0);
assert!(!detector.is_warmed_up());
detector.observe(0.0);
assert!(detector.is_warmed_up());
}
#[test]
fn fixed_sigma_when_variance_window_zero() {
let config = FlakeConfig::default()
.with_sigma(3.0)
.with_variance_window(0);
let mut detector = FlakeDetector::new(config);
detector.observe(10.0);
detector.observe(20.0);
assert!((detector.current_sigma() - 3.0).abs() < f64::EPSILON);
}
#[test]
fn summary_empty_detector() {
let detector = FlakeDetector::new(FlakeConfig::default().with_logging(true));
let summary = detector.summary();
assert_eq!(summary.total_observations, 0);
assert!((summary.final_e_value - 1.0).abs() < f64::EPSILON);
assert!(!summary.is_flaky);
assert!(summary.first_flaky_at.is_none());
assert!((summary.max_e_value - 1.0).abs() < f64::EPSILON);
}
#[test]
fn summary_first_flaky_at_recorded() {
let config = FlakeConfig::new(0.05)
.with_min_observations(1)
.with_logging(true);
let mut detector = FlakeDetector::new(config);
for _ in 0..50 {
detector.observe(5.0);
}
let summary = detector.summary();
if summary.is_flaky {
assert!(
summary.first_flaky_at.is_some(),
"should record first flaky index"
);
assert!(summary.first_flaky_at.unwrap() > 0);
}
}
#[test]
fn deterministic_same_inputs() {
let config = FlakeConfig::new(0.05).with_lambda(0.5).with_sigma(1.0);
let residuals = [0.1, -0.2, 0.5, -0.1, 3.0, 0.0, -1.0, 2.0];
let mut d1 = FlakeDetector::new(config.clone());
let mut d2 = FlakeDetector::new(config);
for &r in &residuals {
d1.observe(r);
d2.observe(r);
}
assert!((d1.e_value() - d2.e_value()).abs() < 1e-10);
assert_eq!(d1.observation_count(), d2.observation_count());
}
#[test]
fn batch_early_stops_on_flaky() {
let config = FlakeConfig::new(0.05)
.with_min_observations(1)
.with_lambda(0.5);
let mut detector = FlakeDetector::new(config);
let mut residuals = vec![10.0; 20];
residuals.extend(vec![0.0; 80]);
let decision = detector.observe_batch(&residuals);
if decision.should_fail() {
assert!(
decision.observation_count < 100,
"should stop early, count={}",
decision.observation_count
);
}
}
#[test]
fn e_value_increases_under_consistent_positive_residuals() {
let config = FlakeConfig::default()
.with_variance_window(0)
.with_sigma(1.0);
let mut detector = FlakeDetector::new(config);
let mut prev_e = 1.0;
for _ in 0..5 {
let decision = detector.observe(2.0);
assert!(
decision.e_value >= prev_e,
"e-value should increase: prev={prev_e}, cur={}",
decision.e_value
);
prev_e = decision.e_value;
}
}
#[test]
fn no_evidence_log_when_disabled() {
let config = FlakeConfig::default();
let mut detector = FlakeDetector::new(config);
detector.observe(1.0);
detector.observe(2.0);
assert!(detector.evidence_log().is_empty());
assert!(detector.evidence_to_jsonl().is_empty());
}
#[test]
fn summary_tracks_metrics_when_logging_disabled() {
let config = FlakeConfig::new(0.05).with_min_observations(1);
let mut detector = FlakeDetector::new(config);
detector.observe(5.0);
detector.observe(5.0);
detector.observe(-1.0);
let summary = detector.summary();
assert_eq!(summary.first_flaky_at, Some(2));
assert!(summary.max_e_value > summary.threshold);
assert!(summary.max_e_value + f64::EPSILON >= summary.final_e_value);
}
#[test]
fn reset_clears_evidence_log() {
let config = FlakeConfig::default().with_logging(true);
let mut detector = FlakeDetector::new(config);
detector.observe(1.0);
assert_eq!(detector.evidence_log().len(), 1);
detector.reset();
assert!(detector.evidence_log().is_empty());
}
}