use std::fmt::Write as _;
use serde::{Deserialize, Serialize};
use super::OracleReport;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct EValue {
pub value: f64,
pub time: usize,
}
impl EValue {
#[must_use]
pub fn rejects_at(&self, alpha: f64) -> bool {
debug_assert!(alpha > 0.0 && alpha <= 1.0);
self.value >= 1.0 / alpha
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EProcessConfig {
pub p0: f64,
pub lambda: f64,
pub alpha: f64,
pub max_evalue: f64,
}
impl Default for EProcessConfig {
fn default() -> Self {
Self {
p0: 0.001,
lambda: 0.5,
alpha: 0.05,
max_evalue: 1e15,
}
}
}
impl EProcessConfig {
pub fn validate(&self) -> Result<(), String> {
if !self.p0.is_finite() {
return Err(format!("p0 must be finite, got {}", self.p0));
}
if !self.lambda.is_finite() {
return Err(format!("lambda must be finite, got {}", self.lambda));
}
if !self.alpha.is_finite() {
return Err(format!("alpha must be finite, got {}", self.alpha));
}
if self.p0 <= 0.0 || self.p0 >= 1.0 {
return Err(format!("p0 must be in (0, 1), got {}", self.p0));
}
let lambda_min = -1.0 / (1.0 - self.p0);
let lambda_max = 1.0 / self.p0;
if self.lambda <= lambda_min || self.lambda >= lambda_max {
return Err(format!(
"lambda must be in ({:.4}, {:.4}), got {}",
lambda_min, lambda_max, self.lambda
));
}
if self.alpha <= 0.0 || self.alpha > 1.0 {
return Err(format!("alpha must be in (0, 1], got {}", self.alpha));
}
if !self.max_evalue.is_finite() || self.max_evalue < 1.0 {
return Err(format!(
"max_evalue must be finite and >= 1.0, got {}",
self.max_evalue
));
}
let threshold = 1.0 / self.alpha;
if self.max_evalue < threshold {
return Err(format!(
"max_evalue ({}) must be >= threshold 1/alpha ({:.1}), otherwise rejection is impossible",
self.max_evalue, threshold
));
}
Ok(())
}
#[must_use]
pub fn threshold(&self) -> f64 {
1.0 / self.alpha
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EProcess {
pub invariant: String,
pub config: EProcessConfig,
pub current: f64,
pub observations: usize,
pub violations_observed: usize,
pub rejected: bool,
pub rejection_time: Option<usize>,
pub history: Vec<EValue>,
record_history: bool,
}
impl EProcess {
#[must_use]
pub fn new(invariant: &str, config: EProcessConfig) -> Self {
assert!(
config.validate().is_ok(),
"EProcessConfig validation failed: {}",
config
.validate()
.expect_err("expected e-process config validation to fail")
);
Self {
invariant: invariant.to_owned(),
config,
current: 1.0,
observations: 0,
violations_observed: 0,
rejected: false,
rejection_time: None,
history: Vec::new(),
record_history: true,
}
}
#[must_use]
pub fn new_without_history(invariant: &str, config: EProcessConfig) -> Self {
let mut ep = Self::new(invariant, config);
ep.record_history = false;
ep
}
pub fn observe(&mut self, violated: bool) {
let x = if violated { 1.0 } else { 0.0 };
let factor = self.config.lambda.mul_add(x - self.config.p0, 1.0);
let factor = factor.max(1e-15);
let product = self.current * factor;
self.current = if product.is_finite() {
product.min(self.config.max_evalue)
} else {
self.config.max_evalue
};
self.observations += 1;
if violated {
self.violations_observed += 1;
}
if self.record_history {
self.history.push(EValue {
value: self.current,
time: self.observations - 1,
});
}
if !self.rejected && self.current >= self.config.threshold() {
self.rejected = true;
self.rejection_time = Some(self.observations - 1);
}
}
#[must_use]
pub fn e_value(&self) -> f64 {
self.current
}
#[must_use]
pub fn log10_e_value(&self) -> f64 {
self.current.max(1e-300).log10()
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn empirical_rate(&self) -> f64 {
if self.observations == 0 {
0.0
} else {
self.violations_observed as f64 / self.observations as f64
}
}
pub fn reset(&mut self) {
self.current = 1.0;
self.observations = 0;
self.violations_observed = 0;
self.rejected = false;
self.rejection_time = None;
self.history.clear();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitorResult {
pub invariant: String,
pub e_value: f64,
pub log10_e_value: f64,
pub rejected: bool,
pub rejection_time: Option<usize>,
pub observations: usize,
pub violations_observed: usize,
pub empirical_rate: f64,
}
impl MonitorResult {
fn from_eprocess(ep: &EProcess) -> Self {
Self {
invariant: ep.invariant.clone(),
e_value: ep.current,
log10_e_value: ep.log10_e_value(),
rejected: ep.rejected,
rejection_time: ep.rejection_time,
observations: ep.observations,
violations_observed: ep.violations_observed,
empirical_rate: ep.empirical_rate(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EProcessMonitor {
processes: Vec<EProcess>,
config: EProcessConfig,
}
impl EProcessMonitor {
#[must_use]
pub fn standard() -> Self {
Self::standard_with_config(EProcessConfig::default())
}
#[must_use]
pub fn standard_with_config(config: EProcessConfig) -> Self {
let invariants = ["task_leak", "obligation_leak", "quiescence"];
Self::new(&invariants, config)
}
#[must_use]
pub fn new(invariants: &[&str], config: EProcessConfig) -> Self {
let processes = invariants
.iter()
.map(|inv| EProcess::new(inv, config.clone()))
.collect();
Self { processes, config }
}
#[must_use]
pub fn all_invariants() -> Self {
Self::all_invariants_with_config(EProcessConfig::default())
}
#[must_use]
pub fn all_invariants_with_config(config: EProcessConfig) -> Self {
let invariants = [
"task_leak",
"obligation_leak",
"quiescence",
"loser_drain",
"finalizer",
"region_tree",
"region_leak",
"ambient_authority",
"deadline_monotone",
"cancellation_protocol",
"cancel_correctness",
"cancel_debt",
"cancel_signal_ordering",
"runtime_epoch",
"channel_atomicity",
"waker_dedup",
"actor_leak",
"supervision",
"mailbox",
"rref_access",
"reply_linearity",
"registry_lease",
"down_order",
"supervisor_quiescence",
];
Self::new(&invariants, config)
}
pub fn observe_report(&mut self, report: &OracleReport) {
for ep in &mut self.processes {
if let Some(entry) = report.entries.iter().find(|e| e.invariant == ep.invariant) {
ep.observe(!entry.passed);
}
}
}
pub fn observe(&mut self, invariant: &str, violated: bool) -> bool {
self.processes
.iter_mut()
.find(|ep| ep.invariant == invariant)
.is_some_and(|ep| {
ep.observe(violated);
true
})
}
#[must_use]
pub fn any_rejected(&self) -> bool {
self.processes.iter().any(|ep| ep.rejected)
}
#[must_use]
pub fn rejected_invariants(&self) -> Vec<&str> {
self.processes
.iter()
.filter(|ep| ep.rejected)
.map(|ep| ep.invariant.as_str())
.collect()
}
#[must_use]
pub fn process(&self, invariant: &str) -> Option<&EProcess> {
self.processes.iter().find(|ep| ep.invariant == invariant)
}
#[must_use]
pub fn results(&self) -> Vec<MonitorResult> {
self.processes
.iter()
.map(MonitorResult::from_eprocess)
.collect()
}
#[must_use]
pub fn config(&self) -> &EProcessConfig {
&self.config
}
pub fn reset(&mut self) {
for ep in &mut self.processes {
ep.reset();
}
}
#[must_use]
pub fn to_text(&self) -> String {
let mut out = String::new();
let _ = writeln!(&mut out, "E-Process Monitor (α = {})", self.config.alpha);
let _ = writeln!(
&mut out,
" Rejection threshold: {:.1}",
self.config.threshold()
);
let _ = writeln!(&mut out, " Invariants: {}", self.processes.len());
let rejected = self.rejected_invariants();
let _ = writeln!(&mut out, " Rejected: {}", rejected.len());
let _ = writeln!(&mut out);
for ep in &self.processes {
let status = if ep.rejected {
"REJECTED"
} else {
"monitoring"
};
let _ = writeln!(
&mut out,
" [{status}] {inv}: e={e:.4}, log₁₀(e)={log:.4}, n={n}, violations={v}",
inv = ep.invariant,
e = ep.current,
log = ep.log10_e_value(),
n = ep.observations,
v = ep.violations_observed,
);
if let Some(t) = ep.rejection_time {
let _ = writeln!(&mut out, " → rejected at observation {t}");
}
}
out
}
#[must_use]
pub fn to_json(&self) -> serde_json::Value {
serde_json::to_value(self).unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lab::oracle::{OracleEntryReport, OracleReport, OracleStats};
fn make_clean_report(invariants: &[&str]) -> OracleReport {
let entries = invariants
.iter()
.map(|inv| OracleEntryReport {
invariant: inv.to_string(),
passed: true,
violation: None,
stats: OracleStats {
entities_tracked: 5,
events_recorded: 10,
},
})
.collect::<Vec<_>>();
let total = entries.len();
OracleReport {
entries,
total,
passed: total,
failed: 0,
check_time_nanos: 0,
}
}
fn make_violation_report(invariants: &[&str], violated: &[&str]) -> OracleReport {
let entries = invariants
.iter()
.map(|inv| {
let is_violated = violated.contains(inv);
OracleEntryReport {
invariant: inv.to_string(),
passed: !is_violated,
violation: if is_violated {
Some("test violation".into())
} else {
None
},
stats: OracleStats {
entities_tracked: 5,
events_recorded: 10,
},
}
})
.collect::<Vec<_>>();
let total = entries.len();
let failed = entries.iter().filter(|e| !e.passed).count();
OracleReport {
entries,
total,
passed: total - failed,
failed,
check_time_nanos: 0,
}
}
#[test]
fn config_default_valid() {
assert!(EProcessConfig::default().validate().is_ok());
}
#[test]
fn config_threshold() {
let config = EProcessConfig::default();
assert!((config.threshold() - 20.0).abs() < 1e-10);
}
#[test]
fn config_invalid_p0() {
let c = EProcessConfig {
p0: 0.0,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
let c = EProcessConfig {
p0: 1.0,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
let c = EProcessConfig {
p0: -0.1,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
}
#[test]
fn config_invalid_lambda() {
let c = EProcessConfig {
lambda: -2000.0,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
let c = EProcessConfig {
lambda: 2000.0,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
}
#[test]
fn config_invalid_alpha() {
let c = EProcessConfig {
alpha: 0.0,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
}
#[test]
fn config_invalid_max_evalue() {
let c = EProcessConfig {
max_evalue: 0.0,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
let c = EProcessConfig {
max_evalue: -1.0,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
let c = EProcessConfig {
max_evalue: f64::NAN,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
let c = EProcessConfig {
max_evalue: f64::INFINITY,
..EProcessConfig::default()
};
assert!(c.validate().is_err());
}
#[test]
fn config_max_evalue_below_threshold() {
let c = EProcessConfig {
alpha: 0.05,
max_evalue: 10.0, ..EProcessConfig::default()
};
assert!(c.validate().is_err());
}
#[test]
fn config_lambda_bounds_correct_for_large_p0() {
let c = EProcessConfig {
p0: 0.6,
lambda: 1.5,
alpha: 0.05,
max_evalue: 1e15,
};
assert!(c.validate().is_ok());
let c = EProcessConfig {
p0: 0.6,
lambda: 1.7,
alpha: 0.05,
max_evalue: 1e15,
};
assert!(
c.validate().is_err(),
"lambda=1.7 with p0=0.6 should be rejected (negative factor)"
);
}
#[test]
fn evalue_rejects() {
let ev = EValue {
value: 25.0,
time: 0,
};
assert!(ev.rejects_at(0.05)); assert!(!ev.rejects_at(0.01)); }
#[test]
fn eprocess_starts_at_one() {
let ep = EProcess::new("test", EProcessConfig::default());
assert!((ep.current - 1.0).abs() < 1e-10);
assert_eq!(ep.observations, 0);
assert!(!ep.rejected);
}
#[test]
fn eprocess_clean_observations_decrease() {
let mut ep = EProcess::new("test", EProcessConfig::default());
for _ in 0..10 {
ep.observe(false);
}
assert!(
ep.current < 1.0,
"clean observations should decrease e-value, got {}",
ep.current
);
assert_eq!(ep.observations, 10);
assert_eq!(ep.violations_observed, 0);
assert!(!ep.rejected);
}
#[test]
fn eprocess_violations_increase() {
let mut ep = EProcess::new("test", EProcessConfig::default());
ep.observe(true);
assert!(
ep.current > 1.0,
"violation should increase e-value, got {}",
ep.current
);
assert_eq!(ep.violations_observed, 1);
}
#[test]
fn eprocess_many_violations_reject() {
let mut ep = EProcess::new("test", EProcessConfig::default());
for _ in 0..20 {
ep.observe(true);
}
assert!(ep.rejected, "repeated violations should cause rejection");
assert!(ep.rejection_time.is_some());
}
#[test]
fn eprocess_rejection_is_sticky() {
let mut ep = EProcess::new("test", EProcessConfig::default());
for _ in 0..20 {
ep.observe(true);
}
let rejection_time = ep.rejection_time;
assert!(ep.rejected);
for _ in 0..100 {
ep.observe(false);
}
assert!(ep.rejected, "rejection should be sticky");
assert_eq!(
ep.rejection_time, rejection_time,
"rejection time should not change"
);
}
#[test]
fn eprocess_history_recorded() {
let mut ep = EProcess::new("test", EProcessConfig::default());
ep.observe(false);
ep.observe(true);
ep.observe(false);
assert_eq!(ep.history.len(), 3);
assert_eq!(ep.history[0].time, 0);
assert_eq!(ep.history[1].time, 1);
assert_eq!(ep.history[2].time, 2);
}
#[test]
fn eprocess_no_history() {
let mut ep = EProcess::new_without_history("test", EProcessConfig::default());
ep.observe(false);
ep.observe(true);
assert!(ep.history.is_empty());
}
#[test]
fn eprocess_reset() {
let mut ep = EProcess::new("test", EProcessConfig::default());
ep.observe(true);
ep.observe(true);
assert!(ep.current > 1.0);
ep.reset();
assert!((ep.current - 1.0).abs() < 1e-10);
assert_eq!(ep.observations, 0);
assert_eq!(ep.violations_observed, 0);
assert!(!ep.rejected);
assert!(ep.history.is_empty());
}
#[test]
fn eprocess_empirical_rate() {
let mut ep = EProcess::new("test", EProcessConfig::default());
ep.observe(false);
ep.observe(true);
ep.observe(false);
ep.observe(true);
assert!((ep.empirical_rate() - 0.5).abs() < 1e-10);
}
#[test]
fn eprocess_empirical_rate_zero_observations() {
let ep = EProcess::new("test", EProcessConfig::default());
assert!((ep.empirical_rate()).abs() < 1e-10);
}
#[test]
fn eprocess_log10_evalue() {
let mut ep = EProcess::new("test", EProcessConfig::default());
ep.current = 100.0;
assert!((ep.log10_e_value() - 2.0).abs() < 1e-10);
}
#[test]
fn eprocess_evalue_capped() {
let mut ep = EProcess::new("test", EProcessConfig::default());
for _ in 0..1000 {
ep.observe(true);
}
assert!(
ep.current <= ep.config.max_evalue,
"e-value should be capped"
);
assert!(ep.current.is_finite());
}
#[test]
fn monitor_standard_has_three_invariants() {
let monitor = EProcessMonitor::standard();
assert_eq!(monitor.processes.len(), 3);
assert!(monitor.process("task_leak").is_some());
assert!(monitor.process("obligation_leak").is_some());
assert!(monitor.process("quiescence").is_some());
}
#[test]
fn monitor_all_invariants_has_spork_invariants_too() {
let monitor = EProcessMonitor::all_invariants();
assert_eq!(monitor.processes.len(), 24);
assert!(monitor.process("reply_linearity").is_some());
assert!(monitor.process("registry_lease").is_some());
assert!(monitor.process("down_order").is_some());
assert!(monitor.process("supervisor_quiescence").is_some());
}
#[test]
fn monitor_observe_report_clean() {
let mut monitor = EProcessMonitor::standard();
let report = make_clean_report(&["task_leak", "obligation_leak", "quiescence"]);
for _ in 0..10 {
monitor.observe_report(&report);
}
assert!(!monitor.any_rejected());
assert!(monitor.rejected_invariants().is_empty());
for ep in &monitor.processes {
assert!(
ep.current < 1.0,
"clean reports should decrease e-value for '{}'",
ep.invariant
);
}
}
#[test]
fn monitor_observe_report_violation() {
let mut monitor = EProcessMonitor::standard();
let invariants = ["task_leak", "obligation_leak", "quiescence"];
for _ in 0..20 {
let report = make_violation_report(&invariants, &["task_leak"]);
monitor.observe_report(&report);
}
assert!(monitor.any_rejected());
let rejected = monitor.rejected_invariants();
assert!(rejected.contains(&"task_leak"));
assert!(!rejected.contains(&"obligation_leak"));
assert!(!rejected.contains(&"quiescence"));
}
#[test]
fn monitor_observe_raw() {
let mut monitor = EProcessMonitor::standard();
assert!(monitor.observe("task_leak", true));
assert!(monitor.observe("task_leak", false));
assert!(!monitor.observe("nonexistent", true));
let ep = monitor.process("task_leak").unwrap();
assert_eq!(ep.observations, 2);
assert_eq!(ep.violations_observed, 1);
}
#[test]
fn monitor_results() {
let mut monitor = EProcessMonitor::standard();
let report = make_clean_report(&["task_leak", "obligation_leak", "quiescence"]);
monitor.observe_report(&report);
let results = monitor.results();
assert_eq!(results.len(), 3);
for r in &results {
assert_eq!(r.observations, 1);
assert!(!r.rejected);
}
}
#[test]
fn monitor_reset() {
let mut monitor = EProcessMonitor::standard();
monitor.observe("task_leak", true);
monitor.observe("task_leak", true);
monitor.reset();
for ep in &monitor.processes {
assert!((ep.current - 1.0).abs() < 1e-10);
assert_eq!(ep.observations, 0);
}
}
#[test]
fn monitor_to_text() {
let mut monitor = EProcessMonitor::standard();
let report = make_clean_report(&["task_leak", "obligation_leak", "quiescence"]);
monitor.observe_report(&report);
let text = monitor.to_text();
assert!(text.contains("E-Process Monitor"));
assert!(text.contains("task_leak"));
assert!(text.contains("monitoring"));
}
#[test]
fn monitor_to_json() {
let monitor = EProcessMonitor::standard();
let json = monitor.to_json();
assert!(json["processes"].is_array());
assert!(json["config"].is_object());
}
#[test]
fn monitor_json_roundtrip() {
let mut monitor = EProcessMonitor::standard();
let report = make_clean_report(&["task_leak", "obligation_leak", "quiescence"]);
monitor.observe_report(&report);
monitor.observe_report(&report);
let json_str = serde_json::to_string(&monitor).unwrap();
let deserialized: EProcessMonitor = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.processes.len(), monitor.processes.len());
for (orig, deser) in monitor.processes.iter().zip(deserialized.processes.iter()) {
assert_eq!(orig.invariant, deser.invariant);
assert!((orig.current - deser.current).abs() < 1e-10);
assert_eq!(orig.observations, deser.observations);
}
}
#[test]
fn eprocess_martingale_under_null() {
let n_trials: u32 = 1000;
let n_obs: u32 = 50;
let config = EProcessConfig::default();
let p0 = config.p0;
let mut sum_final_e = 0.0;
let mut rng_state: u64 = 42;
for _ in 0..n_trials {
let mut ep = EProcess::new_without_history("test", config.clone());
for _ in 0..n_obs {
rng_state = rng_state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
let u = f64::from((rng_state >> 33) as u32) / f64::from(1_u32 << 31);
let violated = u < p0;
ep.observe(violated);
}
sum_final_e += ep.current;
}
let avg_e = sum_final_e / f64::from(n_trials);
assert!(
avg_e < 2.0,
"average e-value under H0 should be ≤ 1 (got {avg_e:.4})"
);
}
#[test]
fn eprocess_detects_elevated_rate() {
let config = EProcessConfig::default();
let mut ep = EProcess::new("test", config);
for i in 0..100 {
let violated = i % 10 == 0;
ep.observe(violated);
}
assert!(
ep.rejected,
"elevated violation rate (10%) should be detected within 100 observations"
);
}
#[test]
fn early_stopping_valid() {
let n_trials: u32 = 10_000;
let n_obs: u32 = 100;
let config = EProcessConfig {
alpha: 0.05,
..EProcessConfig::default()
};
let p0 = config.p0;
let mut false_rejections: u32 = 0;
let mut rng_state: u64 = 999;
for _ in 0..n_trials {
let mut ep = EProcess::new_without_history("test", config.clone());
for _ in 0..n_obs {
rng_state = rng_state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
let u = f64::from((rng_state >> 33) as u32) / f64::from(1_u32 << 31);
let violated = u < p0;
ep.observe(violated);
}
if ep.rejected {
false_rejections += 1;
}
}
let fpr = f64::from(false_rejections) / f64::from(n_trials);
assert!(
fpr < 0.10,
"false positive rate under optional stopping should be ≤ α, got {fpr:.4}"
);
}
#[test]
fn validate_rejects_nan_p0() {
let config = EProcessConfig {
p0: f64::NAN,
..EProcessConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn validate_rejects_nan_lambda() {
let config = EProcessConfig {
lambda: f64::NAN,
..EProcessConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn validate_rejects_nan_alpha() {
let config = EProcessConfig {
alpha: f64::NAN,
..EProcessConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn validate_rejects_inf_p0() {
let config = EProcessConfig {
p0: f64::INFINITY,
..EProcessConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
fn validate_rejects_neg_inf_lambda() {
let config = EProcessConfig {
lambda: f64::NEG_INFINITY,
..EProcessConfig::default()
};
assert!(config.validate().is_err());
}
#[test]
#[should_panic(expected = "EProcessConfig validation failed")]
fn constructor_panics_on_nan_config() {
let config = EProcessConfig {
p0: f64::NAN,
..EProcessConfig::default()
};
let _ep = EProcess::new("test", config);
}
#[test]
fn monitor_with_oracle_suite() {
let mut suite = crate::lab::oracle::OracleSuite::new();
let report = suite.report(crate::types::Time::ZERO);
let mut monitor = EProcessMonitor::all_invariants();
for _ in 0..10 {
monitor.observe_report(&report);
}
assert!(
!monitor.any_rejected(),
"clean suite should not trigger rejection"
);
for r in monitor.results() {
assert!(!r.rejected);
assert!(
r.e_value < 1.0,
"clean: e-value should be < 1 for {}",
r.invariant
);
}
}
}