Skip to main content

ftui_runtime/
eprocess_throttle.rs

1#![forbid(unsafe_code)]
2
3//! Anytime-valid throttle using e-process (test martingale) control.
4//!
5//! This module provides an adaptive recompute throttle for streaming workloads
6//! (e.g., live log search). It uses a wealth-based betting strategy to decide
7//! when accumulated evidence warrants a full recomputation, while providing
8//! anytime-valid statistical guarantees.
9//!
10//! # Mathematical Model
11//!
12//! The throttle maintains a wealth process `W_t`:
13//!
14//! ```text
15//! W_0 = 1
16//! W_t = W_{t-1} × (1 + λ_t × (X_t − μ₀))
17//! ```
18//!
19//! where:
20//! - `X_t ∈ {0, 1}`: whether observation `t` is evidence for recompute
21//!   (e.g., a log line matched the active search/filter query)
22//! - `μ₀`: null hypothesis match rate — the "normal" baseline match frequency
23//! - `λ_t ∈ (0, 1/μ₀)`: betting fraction (adaptive via GRAPA)
24//!
25//! When `W_t ≥ 1/α` (the e-value threshold), we reject H₀ ("results are
26//! still fresh") and trigger recompute. After triggering, `W` resets to 1.
27//!
28//! # Key Invariants
29//!
30//! 1. **Supermartingale**: `E[W_t | W_{t-1}] ≤ W_{t-1}` under H₀
31//! 2. **Anytime-valid Type I control**: `P(∃t: W_t ≥ 1/α) ≤ α` under H₀
32//! 3. **Non-negative wealth**: `W_t ≥ 0` always
33//! 4. **Bounded latency**: hard deadline forces recompute regardless of `W_t`
34//!
35//! # Failure Modes
36//!
37//! | Condition | Behavior | Rationale |
38//! |-----------|----------|-----------|
39//! | `μ₀ = 0` | Clamp to `μ₀ = ε` (1e-6) | Division by zero guard |
40//! | `μ₀ ≥ 1` | Clamp to `1 − ε` | Degenerate: everything matches |
41//! | `W_t` underflow | Clamp to `W_MIN` (1e-12) | Prevents permanent zero-lock |
42//! | Hard deadline exceeded | Force recompute | Bounded worst-case latency |
43//! | No observations | No change to `W_t` | Idle is not evidence |
44//!
45//! # Usage
46//!
47//! ```ignore
48//! use ftui_runtime::eprocess_throttle::{EProcessThrottle, ThrottleConfig};
49//!
50//! let mut throttle = EProcessThrottle::new(ThrottleConfig::default());
51//!
52//! // On each log line push:
53//! let matched = line.contains(&query);
54//! let decision = throttle.observe(matched);
55//! if decision.should_recompute {
56//!     recompute_search_results();
57//! }
58//! ```
59
60use std::collections::VecDeque;
61use web_time::{Duration, Instant};
62
63/// Minimum wealth floor to prevent permanent zero-lock after adverse bets.
64const W_MIN: f64 = 1e-12;
65
66/// Minimum mu_0 to prevent division by zero.
67const MU_0_MIN: f64 = 1e-6;
68
69/// Maximum mu_0 to prevent degenerate all-match scenarios.
70const MU_0_MAX: f64 = 1.0 - 1e-6;
71
72/// Configuration for the e-process throttle.
73#[derive(Debug, Clone)]
74pub struct ThrottleConfig {
75    /// Significance level `α`. Recompute triggers when `W_t ≥ 1/α`.
76    /// Lower α → more conservative (fewer recomputes). Default: 0.05.
77    pub alpha: f64,
78
79    /// Prior null hypothesis match rate `μ₀`. The expected fraction of
80    /// observations that are matches under "normal" conditions.
81    /// Default: 0.1 (10% of log lines match).
82    pub mu_0: f64,
83
84    /// Initial betting fraction. Adaptive GRAPA updates this, but this
85    /// sets the starting value. Must be in `(0, 1/(1 − μ₀))`.
86    /// Default: 0.5.
87    pub initial_lambda: f64,
88
89    /// GRAPA learning rate for adaptive lambda. Higher → faster adaptation
90    /// but noisier. Default: 0.1.
91    pub grapa_eta: f64,
92
93    /// Hard deadline: force recompute if this many milliseconds pass since
94    /// last recompute, regardless of wealth. Default: 500ms.
95    pub hard_deadline_ms: u64,
96
97    /// Minimum observations between recomputes. Prevents rapid-fire
98    /// recomputes when every line matches. Default: 8.
99    pub min_observations_between: u64,
100
101    /// Window size for empirical match rate estimation. Default: 64.
102    pub rate_window_size: usize,
103
104    /// Enable JSONL-compatible decision logging. Default: false.
105    pub enable_logging: bool,
106}
107
108impl Default for ThrottleConfig {
109    fn default() -> Self {
110        Self {
111            alpha: 0.05,
112            mu_0: 0.1,
113            initial_lambda: 0.5,
114            grapa_eta: 0.1,
115            hard_deadline_ms: 500,
116            min_observations_between: 8,
117            rate_window_size: 64,
118            enable_logging: false,
119        }
120    }
121}
122
123/// Decision returned by the throttle on each observation.
124#[derive(Debug, Clone, Copy, PartialEq)]
125pub struct ThrottleDecision {
126    /// Whether to trigger recomputation now.
127    pub should_recompute: bool,
128    /// Current wealth (e-value). When `≥ 1/α`, triggers recompute.
129    pub wealth: f64,
130    /// Current adaptive betting fraction.
131    pub lambda: f64,
132    /// Empirical match rate over the sliding window.
133    pub empirical_rate: f64,
134    /// Whether the decision was forced by hard deadline.
135    pub forced_by_deadline: bool,
136    /// Observations since last recompute.
137    pub observations_since_recompute: u64,
138}
139
140/// Decision log entry for observability.
141#[derive(Debug, Clone)]
142pub struct ThrottleLog {
143    /// Timestamp of the observation.
144    pub timestamp: Instant,
145    /// Observation index (total count).
146    pub observation_idx: u64,
147    /// Whether this observation was a match (X_t = 1).
148    pub matched: bool,
149    /// Wealth before this observation.
150    pub wealth_before: f64,
151    /// Wealth after this observation.
152    pub wealth_after: f64,
153    /// Betting fraction used.
154    pub lambda: f64,
155    /// Empirical match rate.
156    pub empirical_rate: f64,
157    /// Action taken.
158    pub action: &'static str,
159    /// Time since last recompute (ms).
160    pub time_since_recompute_ms: f64,
161}
162
163impl ThrottleDecision {
164    /// Format this decision as a JSONL line for structured logging.
165    #[must_use]
166    pub fn to_jsonl(&self) -> String {
167        format!(
168            r#"{{"schema":"eprocess-throttle-v1","should_recompute":{},"wealth":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"forced_by_deadline":{},"obs_since_recompute":{}}}"#,
169            self.should_recompute,
170            self.wealth,
171            self.lambda,
172            self.empirical_rate,
173            self.forced_by_deadline,
174            self.observations_since_recompute,
175        )
176    }
177}
178
179impl ThrottleLog {
180    /// Format this log entry as a JSONL line for structured logging.
181    #[must_use]
182    pub fn to_jsonl(&self) -> String {
183        format!(
184            r#"{{"schema":"eprocess-log-v1","obs_idx":{},"matched":{},"wealth_before":{:.6},"wealth_after":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"action":"{}","time_since_recompute_ms":{:.3}}}"#,
185            self.observation_idx,
186            self.matched,
187            self.wealth_before,
188            self.wealth_after,
189            self.lambda,
190            self.empirical_rate,
191            self.action,
192            self.time_since_recompute_ms,
193        )
194    }
195}
196
197/// Aggregate statistics for the throttle.
198#[derive(Debug, Clone)]
199pub struct ThrottleStats {
200    /// Total observations processed.
201    pub total_observations: u64,
202    /// Total recomputes triggered.
203    pub total_recomputes: u64,
204    /// Recomputes forced by hard deadline.
205    pub forced_recomputes: u64,
206    /// Recomputes triggered by e-process threshold.
207    pub eprocess_recomputes: u64,
208    /// Current wealth.
209    pub current_wealth: f64,
210    /// Current lambda.
211    pub current_lambda: f64,
212    /// Current empirical match rate.
213    pub empirical_rate: f64,
214    /// Average observations between recomputes (0 if no recomputes yet).
215    pub avg_observations_between_recomputes: f64,
216}
217
218/// Anytime-valid recompute throttle using e-process (test martingale) control.
219///
220/// See module-level docs for the mathematical model and guarantees.
221#[derive(Debug)]
222pub struct EProcessThrottle {
223    config: ThrottleConfig,
224
225    /// Current wealth W_t. Starts at 1, resets on recompute.
226    wealth: f64,
227
228    /// Current adaptive betting fraction λ_t.
229    lambda: f64,
230
231    /// Clamped mu_0 for safe arithmetic.
232    mu_0: f64,
233
234    /// Maximum lambda: `1 / (1 − μ₀)` minus small epsilon.
235    lambda_max: f64,
236
237    /// E-value threshold: `1 / α`.
238    threshold: f64,
239
240    /// Sliding window of recent observations for empirical rate.
241    recent_matches: VecDeque<bool>,
242
243    /// Total observation count.
244    observation_count: u64,
245
246    /// Observations since last recompute (or creation).
247    observations_since_recompute: u64,
248
249    /// Timestamp of last recompute (or creation).
250    last_recompute: Instant,
251
252    /// Total recomputes.
253    total_recomputes: u64,
254
255    /// Recomputes forced by deadline.
256    forced_recomputes: u64,
257
258    /// Recomputes triggered by e-process.
259    eprocess_recomputes: u64,
260
261    /// Sum of observations_since_recompute at each recompute (for averaging).
262    cumulative_obs_at_recompute: u64,
263
264    /// Decision logs (if logging enabled).
265    logs: Vec<ThrottleLog>,
266}
267
268impl EProcessThrottle {
269    /// Create a new throttle with the given configuration.
270    pub fn new(config: ThrottleConfig) -> Self {
271        Self::new_at(config, Instant::now())
272    }
273
274    /// Create a new throttle at a specific time (for deterministic testing).
275    pub fn new_at(config: ThrottleConfig, now: Instant) -> Self {
276        let mu_0 = config.mu_0.clamp(MU_0_MIN, MU_0_MAX);
277        let lambda_max = 1.0 / mu_0 - 1e-6;
278        let lambda = config.initial_lambda.clamp(1e-6, lambda_max);
279        let threshold = 1.0 / config.alpha.max(1e-12);
280
281        Self {
282            config,
283            wealth: 1.0,
284            lambda,
285            mu_0,
286            lambda_max,
287            threshold,
288            recent_matches: VecDeque::new(),
289            observation_count: 0,
290            observations_since_recompute: 0,
291            last_recompute: now,
292            total_recomputes: 0,
293            forced_recomputes: 0,
294            eprocess_recomputes: 0,
295            cumulative_obs_at_recompute: 0,
296            logs: Vec::new(),
297        }
298    }
299
300    /// Observe a single event. `matched` indicates whether this observation
301    /// is evidence for recomputation (e.g., the log line matched the query).
302    ///
303    /// Returns a [`ThrottleDecision`] indicating whether to recompute.
304    pub fn observe(&mut self, matched: bool) -> ThrottleDecision {
305        self.observe_at(matched, Instant::now())
306    }
307
308    /// Observe at a specific time (for deterministic testing).
309    pub fn observe_at(&mut self, matched: bool, now: Instant) -> ThrottleDecision {
310        self.observation_count += 1;
311        self.observations_since_recompute += 1;
312
313        // Update sliding window
314        self.recent_matches.push_back(matched);
315        while self.recent_matches.len() > self.config.rate_window_size {
316            self.recent_matches.pop_front();
317        }
318
319        let empirical_rate = self.empirical_match_rate();
320
321        // Wealth update: W_t = W_{t-1} × (1 + λ × (X_t − μ₀))
322        let x_t = if matched { 1.0 } else { 0.0 };
323        let wealth_before = self.wealth;
324        let multiplier = 1.0 + self.lambda * (x_t - self.mu_0);
325        self.wealth = (self.wealth * multiplier).max(W_MIN);
326
327        // GRAPA adaptive lambda update
328        // Gradient of log-wealth w.r.t. lambda: (X_t - μ₀) / (1 + λ(X_t - μ₀))
329        let denominator = 1.0 + self.lambda * (x_t - self.mu_0);
330        if denominator.abs() > 1e-12 {
331            let grad = (x_t - self.mu_0) / denominator;
332            self.lambda = (self.lambda + self.config.grapa_eta * grad).clamp(1e-6, self.lambda_max);
333        }
334
335        // Check recompute conditions
336        let time_since_recompute = now.duration_since(self.last_recompute);
337        let hard_deadline_exceeded =
338            time_since_recompute >= Duration::from_millis(self.config.hard_deadline_ms);
339        let min_obs_met = self.observations_since_recompute >= self.config.min_observations_between;
340        let wealth_exceeded = self.wealth >= self.threshold;
341
342        let should_recompute = hard_deadline_exceeded || (wealth_exceeded && min_obs_met);
343        let forced_by_deadline = hard_deadline_exceeded && !wealth_exceeded;
344
345        let action = if should_recompute {
346            if forced_by_deadline {
347                "recompute_forced"
348            } else {
349                "recompute_eprocess"
350            }
351        } else {
352            "observe"
353        };
354
355        self.log_decision(
356            now,
357            matched,
358            wealth_before,
359            self.wealth,
360            action,
361            time_since_recompute,
362        );
363
364        if should_recompute {
365            self.trigger_recompute(now, forced_by_deadline);
366        }
367
368        ThrottleDecision {
369            should_recompute,
370            wealth: self.wealth,
371            lambda: self.lambda,
372            empirical_rate,
373            forced_by_deadline: should_recompute && forced_by_deadline,
374            observations_since_recompute: self.observations_since_recompute,
375        }
376    }
377
378    /// Manually trigger a recompute (e.g., when the query changes).
379    /// Resets the e-process state.
380    pub fn reset(&mut self) {
381        self.reset_at(Instant::now());
382    }
383
384    /// Reset at a specific time (for testing).
385    pub fn reset_at(&mut self, now: Instant) {
386        self.wealth = 1.0;
387        self.observations_since_recompute = 0;
388        self.last_recompute = now;
389        self.recent_matches.clear();
390        // Lambda keeps its adapted value — intentional, since the match rate
391        // character of the data likely hasn't changed.
392    }
393
394    /// Update the null hypothesis match rate μ₀.
395    ///
396    /// Call this when the baseline match rate changes (e.g., new query with
397    /// different selectivity). Resets the e-process.
398    pub fn set_mu_0(&mut self, mu_0: f64) {
399        self.mu_0 = mu_0.clamp(MU_0_MIN, MU_0_MAX);
400        self.lambda_max = 1.0 / self.mu_0 - 1e-6;
401        self.lambda = self.lambda.clamp(1e-6, self.lambda_max);
402        self.reset();
403    }
404
405    /// Current wealth (e-value).
406    #[inline]
407    pub fn wealth(&self) -> f64 {
408        self.wealth
409    }
410
411    /// Current adaptive lambda.
412    #[inline]
413    pub fn lambda(&self) -> f64 {
414        self.lambda
415    }
416
417    /// Empirical match rate over the sliding window.
418    pub fn empirical_match_rate(&self) -> f64 {
419        if self.recent_matches.is_empty() {
420            return 0.0;
421        }
422        let matches = self.recent_matches.iter().filter(|&&m| m).count();
423        matches as f64 / self.recent_matches.len() as f64
424    }
425
426    /// E-value threshold (1/α).
427    #[inline]
428    pub fn threshold(&self) -> f64 {
429        self.threshold
430    }
431
432    /// Total observation count.
433    #[inline]
434    pub fn observation_count(&self) -> u64 {
435        self.observation_count
436    }
437
438    /// Get aggregate statistics.
439    pub fn stats(&self) -> ThrottleStats {
440        let avg_obs = if self.total_recomputes > 0 {
441            self.cumulative_obs_at_recompute as f64 / self.total_recomputes as f64
442        } else {
443            0.0
444        };
445
446        ThrottleStats {
447            total_observations: self.observation_count,
448            total_recomputes: self.total_recomputes,
449            forced_recomputes: self.forced_recomputes,
450            eprocess_recomputes: self.eprocess_recomputes,
451            current_wealth: self.wealth,
452            current_lambda: self.lambda,
453            empirical_rate: self.empirical_match_rate(),
454            avg_observations_between_recomputes: avg_obs,
455        }
456    }
457
458    /// Get decision logs (if logging enabled).
459    pub fn logs(&self) -> &[ThrottleLog] {
460        &self.logs
461    }
462
463    /// Clear decision logs.
464    pub fn clear_logs(&mut self) {
465        self.logs.clear();
466    }
467
468    // --- Internal ---
469
470    fn trigger_recompute(&mut self, now: Instant, forced: bool) {
471        self.total_recomputes += 1;
472        self.cumulative_obs_at_recompute += self.observations_since_recompute;
473        if forced {
474            self.forced_recomputes += 1;
475        } else {
476            self.eprocess_recomputes += 1;
477        }
478        self.wealth = 1.0;
479        self.observations_since_recompute = 0;
480        self.last_recompute = now;
481    }
482
483    fn log_decision(
484        &mut self,
485        now: Instant,
486        matched: bool,
487        wealth_before: f64,
488        wealth_after: f64,
489        action: &'static str,
490        time_since_recompute: Duration,
491    ) {
492        if !self.config.enable_logging {
493            return;
494        }
495
496        self.logs.push(ThrottleLog {
497            timestamp: now,
498            observation_idx: self.observation_count,
499            matched,
500            wealth_before,
501            wealth_after,
502            lambda: self.lambda,
503            empirical_rate: self.empirical_match_rate(),
504            action,
505            time_since_recompute_ms: time_since_recompute.as_secs_f64() * 1000.0,
506        });
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513
514    fn test_config() -> ThrottleConfig {
515        ThrottleConfig {
516            alpha: 0.05,
517            mu_0: 0.1,
518            initial_lambda: 0.5,
519            grapa_eta: 0.1,
520            hard_deadline_ms: 500,
521            min_observations_between: 4,
522            rate_window_size: 32,
523            enable_logging: true,
524        }
525    }
526
527    // ---------------------------------------------------------------
528    // Basic construction and invariants
529    // ---------------------------------------------------------------
530
531    #[test]
532    fn initial_state() {
533        let t = EProcessThrottle::new(test_config());
534        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
535        assert_eq!(t.observation_count(), 0);
536        assert!(t.lambda() > 0.0);
537        assert!((t.threshold() - 20.0).abs() < 0.01); // 1/0.05 = 20
538    }
539
540    #[test]
541    fn mu_0_clamped_to_valid_range() {
542        let mut cfg = test_config();
543        cfg.mu_0 = 0.0;
544        let t = EProcessThrottle::new(cfg.clone());
545        assert!(t.mu_0 >= MU_0_MIN);
546
547        cfg.mu_0 = 1.0;
548        let t = EProcessThrottle::new(cfg.clone());
549        assert!(t.mu_0 <= MU_0_MAX);
550
551        cfg.mu_0 = -5.0;
552        let t = EProcessThrottle::new(cfg);
553        assert!(t.mu_0 >= MU_0_MIN);
554    }
555
556    // ---------------------------------------------------------------
557    // Wealth dynamics
558    // ---------------------------------------------------------------
559
560    #[test]
561    fn no_match_decreases_wealth() {
562        let base = Instant::now();
563        let mut t = EProcessThrottle::new_at(test_config(), base);
564        let d = t.observe_at(false, base + Duration::from_millis(1));
565        assert!(
566            d.wealth < 1.0,
567            "No-match should decrease wealth: {}",
568            d.wealth
569        );
570    }
571
572    #[test]
573    fn match_increases_wealth() {
574        let base = Instant::now();
575        let mut t = EProcessThrottle::new_at(test_config(), base);
576        let d = t.observe_at(true, base + Duration::from_millis(1));
577        assert!(d.wealth > 1.0, "Match should increase wealth: {}", d.wealth);
578    }
579
580    #[test]
581    fn wealth_stays_positive() {
582        let base = Instant::now();
583        let mut t = EProcessThrottle::new_at(test_config(), base);
584        // 1000 non-matches in a row — wealth should never reach zero
585        for i in 1..=1000 {
586            let d = t.observe_at(false, base + Duration::from_millis(i));
587            assert!(d.wealth > 0.0, "Wealth must stay positive at obs {}", i);
588        }
589    }
590
591    #[test]
592    fn wealth_floor_prevents_zero_lock() {
593        let base = Instant::now();
594        let mut cfg = test_config();
595        cfg.hard_deadline_ms = u64::MAX; // disable deadline
596        cfg.initial_lambda = 0.99; // aggressive betting
597        let mut t = EProcessThrottle::new_at(cfg, base);
598
599        for i in 1..=500 {
600            t.observe_at(false, base + Duration::from_millis(i));
601        }
602        assert!(t.wealth() >= W_MIN, "Wealth should be at floor, not zero");
603
604        // A match should still be able to grow wealth from the floor
605        let before = t.wealth();
606        t.observe_at(true, base + Duration::from_millis(501));
607        assert!(
608            t.wealth() > before,
609            "Match should grow wealth even from floor"
610        );
611    }
612
613    // ---------------------------------------------------------------
614    // Recompute triggering
615    // ---------------------------------------------------------------
616
617    #[test]
618    fn burst_of_matches_triggers_recompute() {
619        let base = Instant::now();
620        let mut cfg = test_config();
621        cfg.min_observations_between = 1; // allow fast trigger
622        let mut t = EProcessThrottle::new_at(cfg, base);
623
624        let mut triggered = false;
625        for i in 1..=100 {
626            let d = t.observe_at(true, base + Duration::from_millis(i));
627            if d.should_recompute && !d.forced_by_deadline {
628                triggered = true;
629                break;
630            }
631        }
632        assert!(
633            triggered,
634            "Burst of matches should trigger e-process recompute"
635        );
636    }
637
638    #[test]
639    fn no_matches_does_not_trigger_eprocess() {
640        let base = Instant::now();
641        let mut cfg = test_config();
642        cfg.hard_deadline_ms = u64::MAX;
643        let mut t = EProcessThrottle::new_at(cfg, base);
644
645        for i in 1..=200 {
646            let d = t.observe_at(false, base + Duration::from_millis(i));
647            assert!(
648                !d.should_recompute,
649                "No-match stream should never trigger e-process recompute at obs {}",
650                i
651            );
652        }
653    }
654
655    #[test]
656    fn hard_deadline_forces_recompute() {
657        let base = Instant::now();
658        let mut cfg = test_config();
659        cfg.hard_deadline_ms = 100;
660        cfg.min_observations_between = 1;
661        let mut t = EProcessThrottle::new_at(cfg, base);
662
663        // Only non-matches, but exceed deadline
664        let d = t.observe_at(false, base + Duration::from_millis(150));
665        assert!(d.should_recompute, "Should trigger on deadline");
666        assert!(d.forced_by_deadline, "Should be forced by deadline");
667    }
668
669    #[test]
670    fn min_observations_between_prevents_rapid_fire() {
671        let base = Instant::now();
672        let mut cfg = test_config();
673        cfg.min_observations_between = 10;
674        cfg.hard_deadline_ms = u64::MAX;
675        cfg.alpha = 0.5; // very permissive to trigger early
676        let mut t = EProcessThrottle::new_at(cfg, base);
677
678        let mut first_trigger = None;
679        for i in 1..=100 {
680            let d = t.observe_at(true, base + Duration::from_millis(i));
681            if d.should_recompute {
682                first_trigger = Some(i);
683                break;
684            }
685        }
686
687        assert!(
688            first_trigger.unwrap_or(0) >= 10,
689            "First trigger should be at obs >= 10, was {:?}",
690            first_trigger
691        );
692    }
693
694    #[test]
695    fn reset_clears_wealth_and_counter() {
696        let base = Instant::now();
697        let mut t = EProcessThrottle::new_at(test_config(), base);
698
699        for i in 1..=10 {
700            t.observe_at(true, base + Duration::from_millis(i));
701        }
702        assert!(t.wealth() > 1.0);
703        assert!(t.observations_since_recompute > 0);
704
705        t.reset_at(base + Duration::from_millis(20));
706        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
707        assert_eq!(t.observations_since_recompute, 0);
708    }
709
710    // ---------------------------------------------------------------
711    // Adaptive lambda (GRAPA)
712    // ---------------------------------------------------------------
713
714    #[test]
715    fn lambda_adapts_to_high_match_rate() {
716        let base = Instant::now();
717        let mut cfg = test_config();
718        cfg.hard_deadline_ms = u64::MAX;
719        cfg.min_observations_between = u64::MAX;
720        let mut t = EProcessThrottle::new_at(cfg, base);
721
722        let initial_lambda = t.lambda();
723
724        // Many matches should increase lambda (bet more aggressively)
725        for i in 1..=50 {
726            t.observe_at(true, base + Duration::from_millis(i));
727        }
728
729        assert!(
730            t.lambda() > initial_lambda,
731            "Lambda should increase with frequent matches: {} vs {}",
732            t.lambda(),
733            initial_lambda
734        );
735    }
736
737    #[test]
738    fn lambda_adapts_to_low_match_rate() {
739        let base = Instant::now();
740        let mut cfg = test_config();
741        cfg.hard_deadline_ms = u64::MAX;
742        cfg.min_observations_between = u64::MAX;
743        cfg.initial_lambda = 0.8;
744        let mut t = EProcessThrottle::new_at(cfg, base);
745
746        let initial_lambda = t.lambda();
747
748        // Many non-matches should decrease lambda (bet more conservatively)
749        for i in 1..=50 {
750            t.observe_at(false, base + Duration::from_millis(i));
751        }
752
753        assert!(
754            t.lambda() < initial_lambda,
755            "Lambda should decrease with few matches: {} vs {}",
756            t.lambda(),
757            initial_lambda
758        );
759    }
760
761    #[test]
762    fn lambda_stays_bounded() {
763        let base = Instant::now();
764        let mut cfg = test_config();
765        cfg.hard_deadline_ms = u64::MAX;
766        cfg.min_observations_between = u64::MAX;
767        cfg.grapa_eta = 1.0; // aggressive learning
768        let mut t = EProcessThrottle::new_at(cfg, base);
769
770        for i in 1..=200 {
771            let matched = i % 2 == 0;
772            t.observe_at(matched, base + Duration::from_millis(i as u64));
773        }
774
775        assert!(t.lambda() > 0.0, "Lambda must be positive");
776        assert!(
777            t.lambda() <= t.lambda_max,
778            "Lambda must not exceed 1/(1-mu_0): {} vs {}",
779            t.lambda(),
780            t.lambda_max
781        );
782    }
783
784    // ---------------------------------------------------------------
785    // Empirical match rate
786    // ---------------------------------------------------------------
787
788    #[test]
789    fn empirical_rate_tracks_window() {
790        let base = Instant::now();
791        let mut cfg = test_config();
792        cfg.rate_window_size = 10;
793        cfg.hard_deadline_ms = u64::MAX;
794        cfg.min_observations_between = u64::MAX;
795        let mut t = EProcessThrottle::new_at(cfg, base);
796
797        // 10 matches
798        for i in 1..=10 {
799            t.observe_at(true, base + Duration::from_millis(i));
800        }
801        assert!((t.empirical_match_rate() - 1.0).abs() < f64::EPSILON);
802
803        // 10 non-matches (window slides)
804        for i in 11..=20 {
805            t.observe_at(false, base + Duration::from_millis(i));
806        }
807        assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
808    }
809
810    #[test]
811    fn empirical_rate_zero_when_empty() {
812        let t = EProcessThrottle::new(test_config());
813        assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
814    }
815
816    // ---------------------------------------------------------------
817    // Stats and logging
818    // ---------------------------------------------------------------
819
820    #[test]
821    fn stats_reflect_state() {
822        let base = Instant::now();
823        let mut cfg = test_config();
824        cfg.min_observations_between = 1;
825        let mut t = EProcessThrottle::new_at(cfg, base);
826
827        // Drive past a recompute
828        let mut recomputed = false;
829        for i in 1..=50 {
830            let d = t.observe_at(true, base + Duration::from_millis(i));
831            if d.should_recompute {
832                recomputed = true;
833            }
834        }
835
836        let stats = t.stats();
837        assert_eq!(stats.total_observations, 50);
838        if recomputed {
839            assert!(stats.total_recomputes > 0);
840            assert!(stats.avg_observations_between_recomputes > 0.0);
841        }
842    }
843
844    #[test]
845    fn logging_captures_decisions() {
846        let base = Instant::now();
847        let mut cfg = test_config();
848        cfg.enable_logging = true;
849        let mut t = EProcessThrottle::new_at(cfg, base);
850
851        t.observe_at(true, base + Duration::from_millis(1));
852        t.observe_at(false, base + Duration::from_millis(2));
853
854        assert_eq!(t.logs().len(), 2);
855        assert!(t.logs()[0].matched);
856        assert!(!t.logs()[1].matched);
857
858        t.clear_logs();
859        assert!(t.logs().is_empty());
860    }
861
862    #[test]
863    fn logging_disabled_by_default() {
864        let base = Instant::now();
865        let mut cfg = test_config();
866        cfg.enable_logging = false;
867        let mut t = EProcessThrottle::new_at(cfg, base);
868
869        t.observe_at(true, base + Duration::from_millis(1));
870        assert!(t.logs().is_empty());
871    }
872
873    // ---------------------------------------------------------------
874    // set_mu_0
875    // ---------------------------------------------------------------
876
877    #[test]
878    fn set_mu_0_resets_eprocess() {
879        let base = Instant::now();
880        let mut t = EProcessThrottle::new_at(test_config(), base);
881
882        for i in 1..=10 {
883            t.observe_at(true, base + Duration::from_millis(i));
884        }
885        assert!(t.wealth() > 1.0);
886
887        t.set_mu_0(0.5);
888        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
889    }
890
891    // ---------------------------------------------------------------
892    // Determinism
893    // ---------------------------------------------------------------
894
895    #[test]
896    fn deterministic_behavior() {
897        let base = Instant::now();
898        let cfg = test_config();
899
900        let run = |cfg: &ThrottleConfig| {
901            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
902            let mut decisions = Vec::new();
903            for i in 1..=30 {
904                let matched = i % 3 == 0;
905                let d = t.observe_at(matched, base + Duration::from_millis(i));
906                decisions.push((d.should_recompute, d.forced_by_deadline));
907            }
908            (decisions, t.wealth(), t.lambda())
909        };
910
911        let (d1, w1, l1) = run(&cfg);
912        let (d2, w2, l2) = run(&cfg);
913
914        assert_eq!(d1, d2, "Decisions must be deterministic");
915        assert!((w1 - w2).abs() < 1e-10, "Wealth must be deterministic");
916        assert!((l1 - l2).abs() < 1e-10, "Lambda must be deterministic");
917    }
918
919    // ---------------------------------------------------------------
920    // Supermartingale property (Monte Carlo)
921    // ---------------------------------------------------------------
922
923    #[test]
924    fn property_supermartingale_under_null() {
925        // Under H₀ (match rate = μ₀), the expected wealth should not grow.
926        // We verify empirically by running many trials and checking the
927        // average final wealth ≤ initial wealth (with statistical slack).
928        let base = Instant::now();
929        let mut cfg = test_config();
930        cfg.hard_deadline_ms = u64::MAX;
931        cfg.min_observations_between = u64::MAX;
932        cfg.mu_0 = 0.2;
933        cfg.grapa_eta = 0.0; // fix lambda to test pure martingale property
934
935        let n_trials = 200;
936        let n_obs = 100;
937        let mut total_wealth = 0.0;
938
939        // Simple LCG for deterministic pseudo-random
940        let mut rng_state: u64 = 42;
941        let lcg_next = |state: &mut u64| -> f64 {
942            *state = state
943                .wrapping_mul(6364136223846793005)
944                .wrapping_add(1442695040888963407);
945            (*state >> 33) as f64 / (1u64 << 31) as f64
946        };
947
948        for trial in 0..n_trials {
949            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
950            for i in 1..=n_obs {
951                let matched = lcg_next(&mut rng_state) < cfg.mu_0;
952                t.observe_at(
953                    matched,
954                    base + Duration::from_millis(i as u64 + trial * 1000),
955                );
956            }
957            total_wealth += t.wealth();
958        }
959
960        let avg_wealth = total_wealth / n_trials as f64;
961        // Under H₀ with fixed lambda, E[W_t] ≤ 1. Allow statistical slack.
962        assert!(
963            avg_wealth < 2.0,
964            "Average wealth under H₀ should be near 1.0, got {}",
965            avg_wealth
966        );
967    }
968
969    // ---------------------------------------------------------------
970    // Anytime-valid Type I control
971    // ---------------------------------------------------------------
972
973    #[test]
974    fn property_type_i_control() {
975        // Under H₀, the probability of ever triggering should be ≤ α.
976        // We test with many trials.
977        let base = Instant::now();
978        let mut cfg = test_config();
979        cfg.hard_deadline_ms = u64::MAX;
980        cfg.min_observations_between = 1;
981        cfg.alpha = 0.05;
982        cfg.mu_0 = 0.1;
983        cfg.grapa_eta = 0.0; // fixed lambda for clean test
984
985        let n_trials = 500;
986        let n_obs = 200;
987        let mut false_triggers = 0u64;
988
989        let mut rng_state: u64 = 123;
990        let lcg_next = |state: &mut u64| -> f64 {
991            *state = state
992                .wrapping_mul(6364136223846793005)
993                .wrapping_add(1442695040888963407);
994            (*state >> 33) as f64 / (1u64 << 31) as f64
995        };
996
997        for trial in 0..n_trials {
998            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
999            let mut triggered = false;
1000            for i in 1..=n_obs {
1001                let matched = lcg_next(&mut rng_state) < cfg.mu_0;
1002                let d = t.observe_at(
1003                    matched,
1004                    base + Duration::from_millis(i as u64 + trial * 1000),
1005                );
1006                if d.should_recompute {
1007                    triggered = true;
1008                    break;
1009                }
1010            }
1011            if triggered {
1012                false_triggers += 1;
1013            }
1014        }
1015
1016        let false_trigger_rate = false_triggers as f64 / n_trials as f64;
1017        // Allow 3× slack for finite-sample variance
1018        assert!(
1019            false_trigger_rate < cfg.alpha * 3.0,
1020            "False trigger rate {} exceeds 3×α = {}",
1021            false_trigger_rate,
1022            cfg.alpha * 3.0
1023        );
1024    }
1025
1026    // ---------------------------------------------------------------
1027    // Edge cases
1028    // ---------------------------------------------------------------
1029
1030    #[test]
1031    fn single_observation() {
1032        let base = Instant::now();
1033        let cfg = test_config();
1034        let mut t = EProcessThrottle::new_at(cfg, base);
1035        let d = t.observe_at(true, base + Duration::from_millis(1));
1036        assert_eq!(t.observation_count(), 1);
1037        // Should not trigger with just 1 obs (min_observations_between = 4)
1038        assert!(!d.should_recompute || d.forced_by_deadline);
1039    }
1040
1041    #[test]
1042    fn alternating_match_pattern() {
1043        let base = Instant::now();
1044        let mut cfg = test_config();
1045        cfg.hard_deadline_ms = u64::MAX;
1046        cfg.min_observations_between = u64::MAX;
1047        let mut t = EProcessThrottle::new_at(cfg, base);
1048
1049        // Alternating: match rate = 0.5, much higher than μ₀ = 0.1
1050        for i in 1..=100 {
1051            t.observe_at(i % 2 == 0, base + Duration::from_millis(i as u64));
1052        }
1053
1054        // With 50% match rate vs 10% null, wealth should grow significantly
1055        assert!(
1056            t.wealth() > 1.0,
1057            "50% match rate vs 10% null should grow wealth: {}",
1058            t.wealth()
1059        );
1060    }
1061
1062    #[test]
1063    fn recompute_resets_wealth() {
1064        let base = Instant::now();
1065        let mut cfg = test_config();
1066        cfg.min_observations_between = 1;
1067        let mut t = EProcessThrottle::new_at(cfg, base);
1068
1069        // Drive to recompute
1070        let mut triggered = false;
1071        for i in 1..=100 {
1072            let d = t.observe_at(true, base + Duration::from_millis(i));
1073            if d.should_recompute && !d.forced_by_deadline {
1074                // Wealth should be reset to 1.0 after recompute
1075                assert!(
1076                    (t.wealth() - 1.0).abs() < f64::EPSILON,
1077                    "Wealth should reset to 1.0 after recompute, got {}",
1078                    t.wealth()
1079                );
1080                triggered = true;
1081                break;
1082            }
1083        }
1084        assert!(
1085            triggered,
1086            "Should have triggered at least one e-process recompute"
1087        );
1088    }
1089
1090    #[test]
1091    fn config_default_values() {
1092        let cfg = ThrottleConfig::default();
1093        assert!((cfg.alpha - 0.05).abs() < f64::EPSILON);
1094        assert!((cfg.mu_0 - 0.1).abs() < f64::EPSILON);
1095        assert!((cfg.initial_lambda - 0.5).abs() < f64::EPSILON);
1096        assert!((cfg.grapa_eta - 0.1).abs() < f64::EPSILON);
1097        assert_eq!(cfg.hard_deadline_ms, 500);
1098        assert_eq!(cfg.min_observations_between, 8);
1099        assert_eq!(cfg.rate_window_size, 64);
1100        assert!(!cfg.enable_logging);
1101    }
1102
1103    #[test]
1104    fn throttle_decision_fields() {
1105        let base = Instant::now();
1106        let mut cfg = test_config();
1107        cfg.hard_deadline_ms = u64::MAX;
1108        let mut t = EProcessThrottle::new_at(cfg, base);
1109        let d = t.observe_at(true, base + Duration::from_millis(1));
1110
1111        assert!(!d.should_recompute);
1112        assert!(!d.forced_by_deadline);
1113        assert!(d.wealth > 1.0);
1114        assert!(d.lambda > 0.0);
1115        assert!((d.empirical_rate - 1.0).abs() < f64::EPSILON);
1116        assert_eq!(d.observations_since_recompute, 1);
1117    }
1118
1119    #[test]
1120    fn stats_no_recomputes_avg_is_zero() {
1121        let base = Instant::now();
1122        let mut cfg = test_config();
1123        cfg.hard_deadline_ms = u64::MAX;
1124        cfg.min_observations_between = u64::MAX;
1125        let mut t = EProcessThrottle::new_at(cfg, base);
1126
1127        t.observe_at(false, base + Duration::from_millis(1));
1128        let stats = t.stats();
1129        assert_eq!(stats.total_recomputes, 0);
1130        assert!((stats.avg_observations_between_recomputes - 0.0).abs() < f64::EPSILON);
1131    }
1132
1133    #[test]
1134    fn set_mu_0_clamps_extreme_values() {
1135        let base = Instant::now();
1136        let mut t = EProcessThrottle::new_at(test_config(), base);
1137
1138        t.set_mu_0(0.0);
1139        assert!(t.mu_0 >= MU_0_MIN);
1140
1141        t.set_mu_0(2.0);
1142        assert!(t.mu_0 <= MU_0_MAX);
1143    }
1144
1145    #[test]
1146    fn reset_preserves_lambda() {
1147        let base = Instant::now();
1148        let mut cfg = test_config();
1149        cfg.hard_deadline_ms = u64::MAX;
1150        cfg.min_observations_between = u64::MAX;
1151        let mut t = EProcessThrottle::new_at(cfg, base);
1152
1153        for i in 1..=20 {
1154            t.observe_at(true, base + Duration::from_millis(i));
1155        }
1156        let lambda_before = t.lambda();
1157        t.reset_at(base + Duration::from_millis(30));
1158        assert!(
1159            (t.lambda() - lambda_before).abs() < f64::EPSILON,
1160            "Lambda should be preserved across reset"
1161        );
1162    }
1163
1164    #[test]
1165    fn logging_records_match_status_and_action() {
1166        let base = Instant::now();
1167        let mut cfg = test_config();
1168        cfg.enable_logging = true;
1169        cfg.hard_deadline_ms = u64::MAX;
1170        cfg.min_observations_between = u64::MAX;
1171        let mut t = EProcessThrottle::new_at(cfg, base);
1172
1173        t.observe_at(true, base + Duration::from_millis(1));
1174        let log = &t.logs()[0];
1175        assert!(log.matched);
1176        assert_eq!(log.observation_idx, 1);
1177        assert_eq!(log.action, "observe");
1178        assert!(log.wealth_after > log.wealth_before);
1179    }
1180
1181    #[test]
1182    fn consecutive_recomputes_tracked() {
1183        let base = Instant::now();
1184        let mut cfg = test_config();
1185        cfg.min_observations_between = 1;
1186        cfg.alpha = 0.5; // permissive
1187        let mut t = EProcessThrottle::new_at(cfg, base);
1188
1189        let mut recompute_count = 0;
1190        for i in 1..=200 {
1191            let d = t.observe_at(true, base + Duration::from_millis(i));
1192            if d.should_recompute {
1193                recompute_count += 1;
1194            }
1195        }
1196
1197        let stats = t.stats();
1198        assert_eq!(stats.total_recomputes, recompute_count as u64);
1199        assert!(
1200            stats.total_recomputes >= 2,
1201            "Should have multiple recomputes"
1202        );
1203    }
1204}