#![forbid(unsafe_code)]
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use web_time::{Duration, Instant};
const W_MIN: f64 = 1e-12;
const MU_0_MIN: f64 = 1e-6;
const MU_0_MAX: f64 = 1.0 - 1e-6;
static EPROCESS_REJECTIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
#[must_use]
pub fn eprocess_rejections_total() -> u64 {
EPROCESS_REJECTIONS_TOTAL.load(Ordering::Relaxed)
}
#[derive(Debug, Clone)]
pub struct ThrottleConfig {
pub alpha: f64,
pub mu_0: f64,
pub initial_lambda: f64,
pub grapa_eta: f64,
pub hard_deadline_ms: u64,
pub min_observations_between: u64,
pub rate_window_size: usize,
pub enable_logging: bool,
}
impl Default for ThrottleConfig {
fn default() -> Self {
Self {
alpha: 0.05,
mu_0: 0.1,
initial_lambda: 0.5,
grapa_eta: 0.1,
hard_deadline_ms: 500,
min_observations_between: 8,
rate_window_size: 64,
enable_logging: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ThrottleDecision {
pub should_recompute: bool,
pub wealth: f64,
pub lambda: f64,
pub empirical_rate: f64,
pub forced_by_deadline: bool,
pub observations_since_recompute: u64,
}
#[derive(Debug, Clone)]
pub struct ThrottleLog {
pub timestamp: Instant,
pub observation_idx: u64,
pub matched: bool,
pub wealth_before: f64,
pub wealth_after: f64,
pub lambda: f64,
pub empirical_rate: f64,
pub action: &'static str,
pub time_since_recompute_ms: f64,
}
impl ThrottleDecision {
#[must_use]
pub fn to_jsonl(&self) -> String {
format!(
r#"{{"schema":"eprocess-throttle-v1","should_recompute":{},"wealth":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"forced_by_deadline":{},"obs_since_recompute":{}}}"#,
self.should_recompute,
self.wealth,
self.lambda,
self.empirical_rate,
self.forced_by_deadline,
self.observations_since_recompute,
)
}
}
impl ThrottleLog {
#[must_use]
pub fn to_jsonl(&self) -> String {
format!(
r#"{{"schema":"eprocess-log-v1","obs_idx":{},"matched":{},"wealth_before":{:.6},"wealth_after":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"action":"{}","time_since_recompute_ms":{:.3}}}"#,
self.observation_idx,
self.matched,
self.wealth_before,
self.wealth_after,
self.lambda,
self.empirical_rate,
self.action,
self.time_since_recompute_ms,
)
}
}
#[derive(Debug, Clone)]
pub struct ThrottleStats {
pub total_observations: u64,
pub total_recomputes: u64,
pub forced_recomputes: u64,
pub eprocess_recomputes: u64,
pub current_wealth: f64,
pub current_lambda: f64,
pub empirical_rate: f64,
pub avg_observations_between_recomputes: f64,
}
#[derive(Debug)]
pub struct EProcessThrottle {
config: ThrottleConfig,
wealth: f64,
lambda: f64,
mu_0: f64,
lambda_max: f64,
threshold: f64,
recent_matches: VecDeque<bool>,
observation_count: u64,
observations_since_recompute: u64,
last_recompute: Instant,
total_recomputes: u64,
forced_recomputes: u64,
eprocess_recomputes: u64,
cumulative_obs_at_recompute: u64,
logs: Vec<ThrottleLog>,
}
impl EProcessThrottle {
pub fn new(config: ThrottleConfig) -> Self {
Self::new_at(config, Instant::now())
}
pub fn new_at(config: ThrottleConfig, now: Instant) -> Self {
let mu_0 = config.mu_0.clamp(MU_0_MIN, MU_0_MAX);
let lambda_max = 1.0 / mu_0 - 1e-6;
let lambda = config.initial_lambda.clamp(1e-6, lambda_max);
let threshold = 1.0 / config.alpha.max(1e-12);
Self {
config,
wealth: 1.0,
lambda,
mu_0,
lambda_max,
threshold,
recent_matches: VecDeque::new(),
observation_count: 0,
observations_since_recompute: 0,
last_recompute: now,
total_recomputes: 0,
forced_recomputes: 0,
eprocess_recomputes: 0,
cumulative_obs_at_recompute: 0,
logs: Vec::new(),
}
}
pub fn observe(&mut self, matched: bool) -> ThrottleDecision {
self.observe_at(matched, Instant::now())
}
pub fn observe_at(&mut self, matched: bool, now: Instant) -> ThrottleDecision {
self.observation_count += 1;
self.observations_since_recompute += 1;
self.recent_matches.push_back(matched);
while self.recent_matches.len() > self.config.rate_window_size {
self.recent_matches.pop_front();
}
let empirical_rate = self.empirical_match_rate();
let x_t = if matched { 1.0 } else { 0.0 };
let wealth_before = self.wealth;
let multiplier = 1.0 + self.lambda * (x_t - self.mu_0);
self.wealth = (self.wealth * multiplier).max(W_MIN);
let denominator = 1.0 + self.lambda * (x_t - self.mu_0);
if denominator.abs() > 1e-12 {
let grad = (x_t - self.mu_0) / denominator;
self.lambda = (self.lambda + self.config.grapa_eta * grad).clamp(1e-6, self.lambda_max);
}
let time_since_recompute = now.saturating_duration_since(self.last_recompute);
let hard_deadline_exceeded =
time_since_recompute >= Duration::from_millis(self.config.hard_deadline_ms);
let min_obs_met = self.observations_since_recompute >= self.config.min_observations_between;
let wealth_exceeded = self.wealth >= self.threshold;
let eprocess_triggered = wealth_exceeded && min_obs_met;
let should_recompute = hard_deadline_exceeded || eprocess_triggered;
let forced_by_deadline = hard_deadline_exceeded && !eprocess_triggered;
let action = if should_recompute {
if forced_by_deadline {
"recompute_forced"
} else {
"recompute_eprocess"
}
} else {
"observe"
};
let rejected = eprocess_triggered;
let _span = tracing::debug_span!(
"eprocess.update",
test_id = "throttle",
wealth_current = %self.wealth,
wealth_threshold = %self.threshold,
observation_count = self.observation_count,
rejected = rejected,
)
.entered();
tracing::debug!(
target: "ftui.eprocess",
wealth_before = %wealth_before,
wealth_after = %self.wealth,
lambda = %self.lambda,
empirical_rate = %empirical_rate,
matched = matched,
eprocess_wealth = %self.wealth,
observation_count = self.observation_count,
action = %action,
"wealth update"
);
if rejected {
EPROCESS_REJECTIONS_TOTAL.fetch_add(1, Ordering::Relaxed);
tracing::info!(
target: "ftui.eprocess",
wealth = %self.wealth,
threshold = %self.threshold,
observation_count = self.observation_count,
observations_since_recompute = self.observations_since_recompute,
"e-process rejection: significant finding"
);
}
if forced_by_deadline && should_recompute {
tracing::info!(
target: "ftui.eprocess",
deadline_ms = self.config.hard_deadline_ms,
observation_count = self.observation_count,
"hard deadline forced recompute"
);
}
self.log_decision(
now,
matched,
wealth_before,
self.wealth,
action,
time_since_recompute,
);
if should_recompute {
self.trigger_recompute(now, forced_by_deadline);
}
ThrottleDecision {
should_recompute,
wealth: self.wealth,
lambda: self.lambda,
empirical_rate,
forced_by_deadline: should_recompute && forced_by_deadline,
observations_since_recompute: self.observations_since_recompute,
}
}
pub fn reset(&mut self) {
self.reset_at(Instant::now());
}
pub fn reset_at(&mut self, now: Instant) {
self.wealth = 1.0;
self.observations_since_recompute = 0;
self.last_recompute = now;
self.recent_matches.clear();
}
pub fn set_mu_0(&mut self, mu_0: f64) {
self.mu_0 = mu_0.clamp(MU_0_MIN, MU_0_MAX);
self.lambda_max = 1.0 / self.mu_0 - 1e-6;
self.lambda = self.lambda.clamp(1e-6, self.lambda_max);
self.reset();
}
#[inline]
pub fn wealth(&self) -> f64 {
self.wealth
}
#[inline]
pub fn lambda(&self) -> f64 {
self.lambda
}
pub fn empirical_match_rate(&self) -> f64 {
if self.recent_matches.is_empty() {
return 0.0;
}
let matches = self.recent_matches.iter().filter(|&&m| m).count();
matches as f64 / self.recent_matches.len() as f64
}
#[inline]
pub fn threshold(&self) -> f64 {
self.threshold
}
#[inline]
pub fn observation_count(&self) -> u64 {
self.observation_count
}
pub fn stats(&self) -> ThrottleStats {
let avg_obs = if self.total_recomputes > 0 {
self.cumulative_obs_at_recompute as f64 / self.total_recomputes as f64
} else {
0.0
};
ThrottleStats {
total_observations: self.observation_count,
total_recomputes: self.total_recomputes,
forced_recomputes: self.forced_recomputes,
eprocess_recomputes: self.eprocess_recomputes,
current_wealth: self.wealth,
current_lambda: self.lambda,
empirical_rate: self.empirical_match_rate(),
avg_observations_between_recomputes: avg_obs,
}
}
pub fn logs(&self) -> &[ThrottleLog] {
&self.logs
}
pub fn clear_logs(&mut self) {
self.logs.clear();
}
fn trigger_recompute(&mut self, now: Instant, forced: bool) {
self.total_recomputes += 1;
self.cumulative_obs_at_recompute += self.observations_since_recompute;
if forced {
self.forced_recomputes += 1;
} else {
self.eprocess_recomputes += 1;
}
self.wealth = 1.0;
self.observations_since_recompute = 0;
self.last_recompute = now;
}
fn log_decision(
&mut self,
now: Instant,
matched: bool,
wealth_before: f64,
wealth_after: f64,
action: &'static str,
time_since_recompute: Duration,
) {
if !self.config.enable_logging {
return;
}
self.logs.push(ThrottleLog {
timestamp: now,
observation_idx: self.observation_count,
matched,
wealth_before,
wealth_after,
lambda: self.lambda,
empirical_rate: self.empirical_match_rate(),
action,
time_since_recompute_ms: time_since_recompute.as_secs_f64() * 1000.0,
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::LookupSpan;
fn test_config() -> ThrottleConfig {
ThrottleConfig {
alpha: 0.05,
mu_0: 0.1,
initial_lambda: 0.5,
grapa_eta: 0.1,
hard_deadline_ms: 500,
min_observations_between: 4,
rate_window_size: 32,
enable_logging: true,
}
}
#[test]
fn initial_state() {
let t = EProcessThrottle::new(test_config());
assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
assert_eq!(t.observation_count(), 0);
assert!(t.lambda() > 0.0);
assert!((t.threshold() - 20.0).abs() < 0.01); }
#[test]
fn mu_0_clamped_to_valid_range() {
let mut cfg = test_config();
cfg.mu_0 = 0.0;
let t = EProcessThrottle::new(cfg.clone());
assert!(t.mu_0 >= MU_0_MIN);
cfg.mu_0 = 1.0;
let t = EProcessThrottle::new(cfg.clone());
assert!(t.mu_0 <= MU_0_MAX);
cfg.mu_0 = -5.0;
let t = EProcessThrottle::new(cfg);
assert!(t.mu_0 >= MU_0_MIN);
}
#[test]
fn no_match_decreases_wealth() {
let base = Instant::now();
let mut t = EProcessThrottle::new_at(test_config(), base);
let d = t.observe_at(false, base + Duration::from_millis(1));
assert!(
d.wealth < 1.0,
"No-match should decrease wealth: {}",
d.wealth
);
}
#[test]
fn match_increases_wealth() {
let base = Instant::now();
let mut t = EProcessThrottle::new_at(test_config(), base);
let d = t.observe_at(true, base + Duration::from_millis(1));
assert!(d.wealth > 1.0, "Match should increase wealth: {}", d.wealth);
}
#[test]
fn wealth_stays_positive() {
let base = Instant::now();
let mut t = EProcessThrottle::new_at(test_config(), base);
for i in 1..=1000 {
let d = t.observe_at(false, base + Duration::from_millis(i));
assert!(d.wealth > 0.0, "Wealth must stay positive at obs {}", i);
}
}
#[test]
fn wealth_floor_prevents_zero_lock() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX; cfg.initial_lambda = 0.99; let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=500 {
t.observe_at(false, base + Duration::from_millis(i));
}
assert!(t.wealth() >= W_MIN, "Wealth should be at floor, not zero");
let before = t.wealth();
t.observe_at(true, base + Duration::from_millis(501));
assert!(
t.wealth() > before,
"Match should grow wealth even from floor"
);
}
#[test]
fn burst_of_matches_triggers_recompute() {
let base = Instant::now();
let mut cfg = test_config();
cfg.min_observations_between = 1; let mut t = EProcessThrottle::new_at(cfg, base);
let mut triggered = false;
for i in 1..=100 {
let d = t.observe_at(true, base + Duration::from_millis(i));
if d.should_recompute && !d.forced_by_deadline {
triggered = true;
break;
}
}
assert!(
triggered,
"Burst of matches should trigger e-process recompute"
);
}
#[test]
fn no_matches_does_not_trigger_eprocess() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=200 {
let d = t.observe_at(false, base + Duration::from_millis(i));
assert!(
!d.should_recompute,
"No-match stream should never trigger e-process recompute at obs {}",
i
);
}
}
#[test]
fn hard_deadline_forces_recompute() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = 100;
cfg.min_observations_between = 1;
let mut t = EProcessThrottle::new_at(cfg, base);
let d = t.observe_at(false, base + Duration::from_millis(150));
assert!(d.should_recompute, "Should trigger on deadline");
assert!(d.forced_by_deadline, "Should be forced by deadline");
}
#[test]
fn min_observations_between_prevents_rapid_fire() {
let base = Instant::now();
let mut cfg = test_config();
cfg.min_observations_between = 10;
cfg.hard_deadline_ms = u64::MAX;
cfg.alpha = 0.5; let mut t = EProcessThrottle::new_at(cfg, base);
let mut first_trigger = None;
for i in 1..=100 {
let d = t.observe_at(true, base + Duration::from_millis(i));
if d.should_recompute {
first_trigger = Some(i);
break;
}
}
assert!(
first_trigger.unwrap_or(0) >= 10,
"First trigger should be at obs >= 10, was {:?}",
first_trigger
);
}
#[test]
fn reset_clears_wealth_and_counter() {
let base = Instant::now();
let mut t = EProcessThrottle::new_at(test_config(), base);
for i in 1..=10 {
t.observe_at(true, base + Duration::from_millis(i));
}
assert!(t.wealth() > 1.0);
assert!(t.observations_since_recompute > 0);
t.reset_at(base + Duration::from_millis(20));
assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
assert_eq!(t.observations_since_recompute, 0);
}
#[test]
fn lambda_adapts_to_high_match_rate() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
let initial_lambda = t.lambda();
for i in 1..=50 {
t.observe_at(true, base + Duration::from_millis(i));
}
assert!(
t.lambda() > initial_lambda,
"Lambda should increase with frequent matches: {} vs {}",
t.lambda(),
initial_lambda
);
}
#[test]
fn lambda_adapts_to_low_match_rate() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
cfg.initial_lambda = 0.8;
let mut t = EProcessThrottle::new_at(cfg, base);
let initial_lambda = t.lambda();
for i in 1..=50 {
t.observe_at(false, base + Duration::from_millis(i));
}
assert!(
t.lambda() < initial_lambda,
"Lambda should decrease with few matches: {} vs {}",
t.lambda(),
initial_lambda
);
}
#[test]
fn lambda_stays_bounded() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
cfg.grapa_eta = 1.0; let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=200 {
let matched = i % 2 == 0;
t.observe_at(matched, base + Duration::from_millis(i as u64));
}
assert!(t.lambda() > 0.0, "Lambda must be positive");
assert!(
t.lambda() <= t.lambda_max,
"Lambda must not exceed 1/(1-mu_0): {} vs {}",
t.lambda(),
t.lambda_max
);
}
#[test]
fn empirical_rate_tracks_window() {
let base = Instant::now();
let mut cfg = test_config();
cfg.rate_window_size = 10;
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=10 {
t.observe_at(true, base + Duration::from_millis(i));
}
assert!((t.empirical_match_rate() - 1.0).abs() < f64::EPSILON);
for i in 11..=20 {
t.observe_at(false, base + Duration::from_millis(i));
}
assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
}
#[test]
fn empirical_rate_zero_when_empty() {
let t = EProcessThrottle::new(test_config());
assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
}
#[test]
fn stats_reflect_state() {
let base = Instant::now();
let mut cfg = test_config();
cfg.min_observations_between = 1;
let mut t = EProcessThrottle::new_at(cfg, base);
let mut recomputed = false;
for i in 1..=50 {
let d = t.observe_at(true, base + Duration::from_millis(i));
if d.should_recompute {
recomputed = true;
}
}
let stats = t.stats();
assert_eq!(stats.total_observations, 50);
if recomputed {
assert!(stats.total_recomputes > 0);
assert!(stats.avg_observations_between_recomputes > 0.0);
}
}
#[test]
fn logging_captures_decisions() {
let base = Instant::now();
let mut cfg = test_config();
cfg.enable_logging = true;
let mut t = EProcessThrottle::new_at(cfg, base);
t.observe_at(true, base + Duration::from_millis(1));
t.observe_at(false, base + Duration::from_millis(2));
assert_eq!(t.logs().len(), 2);
assert!(t.logs()[0].matched);
assert!(!t.logs()[1].matched);
t.clear_logs();
assert!(t.logs().is_empty());
}
#[test]
fn logging_disabled_by_default() {
let base = Instant::now();
let mut cfg = test_config();
cfg.enable_logging = false;
let mut t = EProcessThrottle::new_at(cfg, base);
t.observe_at(true, base + Duration::from_millis(1));
assert!(t.logs().is_empty());
}
#[test]
fn set_mu_0_resets_eprocess() {
let base = Instant::now();
let mut t = EProcessThrottle::new_at(test_config(), base);
for i in 1..=10 {
t.observe_at(true, base + Duration::from_millis(i));
}
assert!(t.wealth() > 1.0);
t.set_mu_0(0.5);
assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
}
#[test]
fn deterministic_behavior() {
let base = Instant::now();
let cfg = test_config();
let run = |cfg: &ThrottleConfig| {
let mut t = EProcessThrottle::new_at(cfg.clone(), base);
let mut decisions = Vec::new();
for i in 1..=30 {
let matched = i % 3 == 0;
let d = t.observe_at(matched, base + Duration::from_millis(i));
decisions.push((d.should_recompute, d.forced_by_deadline));
}
(decisions, t.wealth(), t.lambda())
};
let (d1, w1, l1) = run(&cfg);
let (d2, w2, l2) = run(&cfg);
assert_eq!(d1, d2, "Decisions must be deterministic");
assert!((w1 - w2).abs() < 1e-10, "Wealth must be deterministic");
assert!((l1 - l2).abs() < 1e-10, "Lambda must be deterministic");
}
#[test]
fn property_supermartingale_under_null() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
cfg.mu_0 = 0.2;
cfg.grapa_eta = 0.0;
let n_trials = 200;
let n_obs = 100;
let mut total_wealth = 0.0;
let mut rng_state: u64 = 42;
let lcg_next = |state: &mut u64| -> f64 {
*state = state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
(*state >> 33) as f64 / (1u64 << 31) as f64
};
for trial in 0..n_trials {
let mut t = EProcessThrottle::new_at(cfg.clone(), base);
for i in 1..=n_obs {
let matched = lcg_next(&mut rng_state) < cfg.mu_0;
t.observe_at(
matched,
base + Duration::from_millis(i as u64 + trial * 1000),
);
}
total_wealth += t.wealth();
}
let avg_wealth = total_wealth / n_trials as f64;
assert!(
avg_wealth < 2.0,
"Average wealth under Hâ‚€ should be near 1.0, got {}",
avg_wealth
);
}
#[test]
fn property_type_i_control() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = 1;
cfg.alpha = 0.05;
cfg.mu_0 = 0.1;
cfg.grapa_eta = 0.0;
let n_trials = 500;
let n_obs = 200;
let mut false_triggers = 0u64;
let mut rng_state: u64 = 123;
let lcg_next = |state: &mut u64| -> f64 {
*state = state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
(*state >> 33) as f64 / (1u64 << 31) as f64
};
for trial in 0..n_trials {
let mut t = EProcessThrottle::new_at(cfg.clone(), base);
let mut triggered = false;
for i in 1..=n_obs {
let matched = lcg_next(&mut rng_state) < cfg.mu_0;
let d = t.observe_at(
matched,
base + Duration::from_millis(i as u64 + trial * 1000),
);
if d.should_recompute {
triggered = true;
break;
}
}
if triggered {
false_triggers += 1;
}
}
let false_trigger_rate = false_triggers as f64 / n_trials as f64;
assert!(
false_trigger_rate < cfg.alpha * 3.0,
"False trigger rate {} exceeds 3×α = {}",
false_trigger_rate,
cfg.alpha * 3.0
);
}
#[test]
fn single_observation() {
let base = Instant::now();
let cfg = test_config();
let mut t = EProcessThrottle::new_at(cfg, base);
let d = t.observe_at(true, base + Duration::from_millis(1));
assert_eq!(t.observation_count(), 1);
assert!(!d.should_recompute || d.forced_by_deadline);
}
#[test]
fn alternating_match_pattern() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=100 {
t.observe_at(i % 2 == 0, base + Duration::from_millis(i as u64));
}
assert!(
t.wealth() > 1.0,
"50% match rate vs 10% null should grow wealth: {}",
t.wealth()
);
}
#[test]
fn recompute_resets_wealth() {
let base = Instant::now();
let mut cfg = test_config();
cfg.min_observations_between = 1;
let mut t = EProcessThrottle::new_at(cfg, base);
let mut triggered = false;
for i in 1..=100 {
let d = t.observe_at(true, base + Duration::from_millis(i));
if d.should_recompute && !d.forced_by_deadline {
assert!(
(t.wealth() - 1.0).abs() < f64::EPSILON,
"Wealth should reset to 1.0 after recompute, got {}",
t.wealth()
);
triggered = true;
break;
}
}
assert!(
triggered,
"Should have triggered at least one e-process recompute"
);
}
#[test]
fn config_default_values() {
let cfg = ThrottleConfig::default();
assert!((cfg.alpha - 0.05).abs() < f64::EPSILON);
assert!((cfg.mu_0 - 0.1).abs() < f64::EPSILON);
assert!((cfg.initial_lambda - 0.5).abs() < f64::EPSILON);
assert!((cfg.grapa_eta - 0.1).abs() < f64::EPSILON);
assert_eq!(cfg.hard_deadline_ms, 500);
assert_eq!(cfg.min_observations_between, 8);
assert_eq!(cfg.rate_window_size, 64);
assert!(!cfg.enable_logging);
}
#[test]
fn throttle_decision_fields() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
let d = t.observe_at(true, base + Duration::from_millis(1));
assert!(!d.should_recompute);
assert!(!d.forced_by_deadline);
assert!(d.wealth > 1.0);
assert!(d.lambda > 0.0);
assert!((d.empirical_rate - 1.0).abs() < f64::EPSILON);
assert_eq!(d.observations_since_recompute, 1);
}
#[test]
fn stats_no_recomputes_avg_is_zero() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
t.observe_at(false, base + Duration::from_millis(1));
let stats = t.stats();
assert_eq!(stats.total_recomputes, 0);
assert!((stats.avg_observations_between_recomputes - 0.0).abs() < f64::EPSILON);
}
#[test]
fn set_mu_0_clamps_extreme_values() {
let base = Instant::now();
let mut t = EProcessThrottle::new_at(test_config(), base);
t.set_mu_0(0.0);
assert!(t.mu_0 >= MU_0_MIN);
t.set_mu_0(2.0);
assert!(t.mu_0 <= MU_0_MAX);
}
#[test]
fn reset_preserves_lambda() {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=20 {
t.observe_at(true, base + Duration::from_millis(i));
}
let lambda_before = t.lambda();
t.reset_at(base + Duration::from_millis(30));
assert!(
(t.lambda() - lambda_before).abs() < f64::EPSILON,
"Lambda should be preserved across reset"
);
}
#[test]
fn logging_records_match_status_and_action() {
let base = Instant::now();
let mut cfg = test_config();
cfg.enable_logging = true;
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
t.observe_at(true, base + Duration::from_millis(1));
let log = &t.logs()[0];
assert!(log.matched);
assert_eq!(log.observation_idx, 1);
assert_eq!(log.action, "observe");
assert!(log.wealth_after > log.wealth_before);
}
#[test]
fn consecutive_recomputes_tracked() {
let base = Instant::now();
let mut cfg = test_config();
cfg.min_observations_between = 1;
cfg.alpha = 0.5; let mut t = EProcessThrottle::new_at(cfg, base);
let mut recompute_count = 0;
for i in 1..=200 {
let d = t.observe_at(true, base + Duration::from_millis(i));
if d.should_recompute {
recompute_count += 1;
}
}
let stats = t.stats();
assert_eq!(stats.total_recomputes, recompute_count as u64);
assert!(
stats.total_recomputes >= 2,
"Should have multiple recomputes"
);
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct CapturedSpan {
name: String,
target: String,
level: tracing::Level,
fields: HashMap<String, String>,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct CapturedEvent {
level: tracing::Level,
target: String,
message: String,
fields: HashMap<String, String>,
}
struct SpanCapture {
spans: Arc<Mutex<Vec<CapturedSpan>>>,
events: Arc<Mutex<Vec<CapturedEvent>>>,
}
impl SpanCapture {
fn new() -> (Self, CaptureHandle) {
let spans = Arc::new(Mutex::new(Vec::new()));
let events = Arc::new(Mutex::new(Vec::new()));
let handle = CaptureHandle {
spans: spans.clone(),
events: events.clone(),
};
(Self { spans, events }, handle)
}
}
struct CaptureHandle {
spans: Arc<Mutex<Vec<CapturedSpan>>>,
events: Arc<Mutex<Vec<CapturedEvent>>>,
}
impl CaptureHandle {
fn spans(&self) -> Vec<CapturedSpan> {
self.spans.lock().unwrap().clone()
}
fn events(&self) -> Vec<CapturedEvent> {
self.events.lock().unwrap().clone()
}
}
struct FieldVisitor(Vec<(String, String)>);
impl tracing::field::Visit for FieldVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.0
.push((field.name().to_string(), format!("{value:?}")));
}
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.0.push((field.name().to_string(), value.to_string()));
}
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.0.push((field.name().to_string(), value.to_string()));
}
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.0.push((field.name().to_string(), value.to_string()));
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.0.push((field.name().to_string(), value.to_string()));
}
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.0.push((field.name().to_string(), value.to_string()));
}
}
impl<S> tracing_subscriber::Layer<S> for SpanCapture
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(
&self,
attrs: &tracing::span::Attributes<'_>,
_id: &tracing::span::Id,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut visitor = FieldVisitor(Vec::new());
attrs.record(&mut visitor);
let mut fields: HashMap<String, String> = visitor.0.into_iter().collect();
for field in attrs.metadata().fields() {
fields.entry(field.name().to_string()).or_default();
}
self.spans.lock().unwrap().push(CapturedSpan {
name: attrs.metadata().name().to_string(),
target: attrs.metadata().target().to_string(),
level: *attrs.metadata().level(),
fields,
});
}
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut visitor = FieldVisitor(Vec::new());
event.record(&mut visitor);
let fields: HashMap<String, String> = visitor.0.clone().into_iter().collect();
let message = visitor
.0
.iter()
.find(|(k, _)| k == "message")
.map(|(_, v)| v.clone())
.unwrap_or_default();
self.events.lock().unwrap().push(CapturedEvent {
level: *event.metadata().level(),
target: event.metadata().target().to_string(),
message,
fields,
});
}
}
fn with_captured_tracing<F>(f: F) -> CaptureHandle
where
F: FnOnce(),
{
let (layer, handle) = SpanCapture::new();
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, f);
handle
}
#[test]
fn span_eprocess_update_has_required_fields() {
let handle = with_captured_tracing(|| {
let base = Instant::now();
let mut t = EProcessThrottle::new_at(test_config(), base);
t.observe_at(true, base + Duration::from_millis(1));
});
let spans = handle.spans();
let ep_spans: Vec<_> = spans
.iter()
.filter(|s| s.name == "eprocess.update")
.collect();
assert!(
!ep_spans.is_empty(),
"expected at least one eprocess.update span"
);
let span = &ep_spans[0];
assert!(span.fields.contains_key("test_id"), "missing test_id field");
assert!(
span.fields.contains_key("wealth_current"),
"missing wealth_current"
);
assert!(
span.fields.contains_key("wealth_threshold"),
"missing wealth_threshold"
);
assert!(
span.fields.contains_key("observation_count"),
"missing observation_count"
);
assert!(
span.fields.contains_key("rejected"),
"missing rejected field"
);
}
#[test]
fn span_rejected_field_true_on_eprocess_trigger() {
let handle = with_captured_tracing(|| {
let base = Instant::now();
let mut cfg = test_config();
cfg.min_observations_between = 1;
let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=100 {
let d = t.observe_at(true, base + Duration::from_millis(i));
if d.should_recompute && !d.forced_by_deadline {
break;
}
}
});
let spans = handle.spans();
let ep_spans: Vec<_> = spans
.iter()
.filter(|s| s.name == "eprocess.update")
.collect();
let rejected_spans: Vec<_> = ep_spans
.iter()
.filter(|s| s.fields.get("rejected").is_some_and(|v| v == "true"))
.collect();
assert!(
!rejected_spans.is_empty(),
"expected at least one span with rejected=true"
);
}
#[test]
fn debug_log_wealth_update() {
let handle = with_captured_tracing(|| {
let base = Instant::now();
let mut t = EProcessThrottle::new_at(test_config(), base);
t.observe_at(true, base + Duration::from_millis(1));
});
let events = handle.events();
let debug_events: Vec<_> = events
.iter()
.filter(|e| {
e.level == tracing::Level::DEBUG
&& e.target == "ftui.eprocess"
&& e.fields.contains_key("wealth_before")
})
.collect();
assert!(
!debug_events.is_empty(),
"expected at least one DEBUG wealth update event"
);
let evt = &debug_events[0];
assert!(
evt.fields.contains_key("wealth_after"),
"missing wealth_after"
);
assert!(evt.fields.contains_key("lambda"), "missing lambda");
assert!(
evt.fields.contains_key("eprocess_wealth"),
"missing eprocess_wealth gauge"
);
}
#[test]
fn info_log_on_eprocess_rejection() {
let handle = with_captured_tracing(|| {
let base = Instant::now();
let mut cfg = test_config();
cfg.min_observations_between = 1;
let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=100 {
let d = t.observe_at(true, base + Duration::from_millis(i));
if d.should_recompute && !d.forced_by_deadline {
break;
}
}
});
let events = handle.events();
let info_events: Vec<_> = events
.iter()
.filter(|e| {
e.level == tracing::Level::INFO
&& e.target == "ftui.eprocess"
&& e.fields.contains_key("wealth")
&& e.fields.contains_key("threshold")
})
.collect();
assert!(
!info_events.is_empty(),
"expected INFO log on e-process rejection"
);
}
#[test]
fn info_log_on_deadline_forced_recompute() {
let handle = with_captured_tracing(|| {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = 100;
cfg.min_observations_between = 1;
let mut t = EProcessThrottle::new_at(cfg, base);
t.observe_at(false, base + Duration::from_millis(150));
});
let events = handle.events();
let deadline_events: Vec<_> = events
.iter()
.filter(|e| {
e.level == tracing::Level::INFO
&& e.target == "ftui.eprocess"
&& e.fields.contains_key("deadline_ms")
})
.collect();
assert!(
!deadline_events.is_empty(),
"expected INFO log on deadline forced recompute"
);
}
#[test]
fn counter_accessor_is_callable() {
let total = eprocess_rejections_total();
let _ = total.checked_add(0).expect("counter overflow");
}
#[test]
fn counter_increments_on_rejection() {
let before = eprocess_rejections_total();
let base = Instant::now();
let mut cfg = test_config();
cfg.min_observations_between = 1;
let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=100 {
let d = t.observe_at(true, base + Duration::from_millis(i));
if d.should_recompute && !d.forced_by_deadline {
break;
}
}
let after = eprocess_rejections_total();
assert!(
after > before,
"counter should increment on rejection: before={before}, after={after}"
);
}
#[test]
fn debug_events_per_observation() {
let handle = with_captured_tracing(|| {
let base = Instant::now();
let mut cfg = test_config();
cfg.hard_deadline_ms = u64::MAX;
cfg.min_observations_between = u64::MAX;
let mut t = EProcessThrottle::new_at(cfg, base);
for i in 1..=5 {
t.observe_at(i % 2 == 0, base + Duration::from_millis(i));
}
});
let events = handle.events();
let debug_events: Vec<_> = events
.iter()
.filter(|e| {
e.level == tracing::Level::DEBUG
&& e.target == "ftui.eprocess"
&& e.fields.contains_key("wealth_before")
})
.collect();
assert_eq!(
debug_events.len(),
5,
"expected one DEBUG wealth event per observation"
);
}
}