Skip to main content

ftui_runtime/
eprocess_throttle.rs

1#![forbid(unsafe_code)]
2
3//! Anytime-valid throttle using e-process (test martingale) control.
4//!
5//! This module provides an adaptive recompute throttle for streaming workloads
6//! (e.g., live log search). It uses a wealth-based betting strategy to decide
7//! when accumulated evidence warrants a full recomputation, while providing
8//! anytime-valid statistical guarantees.
9//!
10//! # Mathematical Model
11//!
12//! The throttle maintains a wealth process `W_t`:
13//!
14//! ```text
15//! W_0 = 1
16//! W_t = W_{t-1} × (1 + λ_t × (X_t − μ₀))
17//! ```
18//!
19//! where:
20//! - `X_t ∈ {0, 1}`: whether observation `t` is evidence for recompute
21//!   (e.g., a log line matched the active search/filter query)
22//! - `μ₀`: null hypothesis match rate — the "normal" baseline match frequency
23//! - `λ_t ∈ (0, 1/μ₀)`: betting fraction (adaptive via GRAPA)
24//!
25//! When `W_t ≥ 1/α` (the e-value threshold), we reject H₀ ("results are
26//! still fresh") and trigger recompute. After triggering, `W` resets to 1.
27//!
28//! # Key Invariants
29//!
30//! 1. **Supermartingale**: `E[W_t | W_{t-1}] ≤ W_{t-1}` under H₀
31//! 2. **Anytime-valid Type I control**: `P(∃t: W_t ≥ 1/α) ≤ α` under H₀
32//! 3. **Non-negative wealth**: `W_t ≥ 0` always
33//! 4. **Bounded latency**: hard deadline forces recompute regardless of `W_t`
34//!
35//! # Failure Modes
36//!
37//! | Condition | Behavior | Rationale |
38//! |-----------|----------|-----------|
39//! | `μ₀ = 0` | Clamp to `μ₀ = ε` (1e-6) | Division by zero guard |
40//! | `μ₀ ≥ 1` | Clamp to `1 − ε` | Degenerate: everything matches |
41//! | `W_t` underflow | Clamp to `W_MIN` (1e-12) | Prevents permanent zero-lock |
42//! | Hard deadline exceeded | Force recompute | Bounded worst-case latency |
43//! | No observations | No change to `W_t` | Idle is not evidence |
44//!
45//! # Usage
46//!
47//! ```ignore
48//! use ftui_runtime::eprocess_throttle::{EProcessThrottle, ThrottleConfig};
49//!
50//! let mut throttle = EProcessThrottle::new(ThrottleConfig::default());
51//!
52//! // On each log line push:
53//! let matched = line.contains(&query);
54//! let decision = throttle.observe(matched);
55//! if decision.should_recompute {
56//!     recompute_search_results();
57//! }
58//! ```
59
60use std::collections::VecDeque;
61use std::time::{Duration, Instant};
62
63/// Minimum wealth floor to prevent permanent zero-lock after adverse bets.
64const W_MIN: f64 = 1e-12;
65
66/// Minimum mu_0 to prevent division by zero.
67const MU_0_MIN: f64 = 1e-6;
68
69/// Maximum mu_0 to prevent degenerate all-match scenarios.
70const MU_0_MAX: f64 = 1.0 - 1e-6;
71
72/// Configuration for the e-process throttle.
73#[derive(Debug, Clone)]
74pub struct ThrottleConfig {
75    /// Significance level `α`. Recompute triggers when `W_t ≥ 1/α`.
76    /// Lower α → more conservative (fewer recomputes). Default: 0.05.
77    pub alpha: f64,
78
79    /// Prior null hypothesis match rate `μ₀`. The expected fraction of
80    /// observations that are matches under "normal" conditions.
81    /// Default: 0.1 (10% of log lines match).
82    pub mu_0: f64,
83
84    /// Initial betting fraction. Adaptive GRAPA updates this, but this
85    /// sets the starting value. Must be in `(0, 1/(1 − μ₀))`.
86    /// Default: 0.5.
87    pub initial_lambda: f64,
88
89    /// GRAPA learning rate for adaptive lambda. Higher → faster adaptation
90    /// but noisier. Default: 0.1.
91    pub grapa_eta: f64,
92
93    /// Hard deadline: force recompute if this many milliseconds pass since
94    /// last recompute, regardless of wealth. Default: 500ms.
95    pub hard_deadline_ms: u64,
96
97    /// Minimum observations between recomputes. Prevents rapid-fire
98    /// recomputes when every line matches. Default: 8.
99    pub min_observations_between: u64,
100
101    /// Window size for empirical match rate estimation. Default: 64.
102    pub rate_window_size: usize,
103
104    /// Enable JSONL-compatible decision logging. Default: false.
105    pub enable_logging: bool,
106}
107
108impl Default for ThrottleConfig {
109    fn default() -> Self {
110        Self {
111            alpha: 0.05,
112            mu_0: 0.1,
113            initial_lambda: 0.5,
114            grapa_eta: 0.1,
115            hard_deadline_ms: 500,
116            min_observations_between: 8,
117            rate_window_size: 64,
118            enable_logging: false,
119        }
120    }
121}
122
123/// Decision returned by the throttle on each observation.
124#[derive(Debug, Clone, Copy, PartialEq)]
125pub struct ThrottleDecision {
126    /// Whether to trigger recomputation now.
127    pub should_recompute: bool,
128    /// Current wealth (e-value). When `≥ 1/α`, triggers recompute.
129    pub wealth: f64,
130    /// Current adaptive betting fraction.
131    pub lambda: f64,
132    /// Empirical match rate over the sliding window.
133    pub empirical_rate: f64,
134    /// Whether the decision was forced by hard deadline.
135    pub forced_by_deadline: bool,
136    /// Observations since last recompute.
137    pub observations_since_recompute: u64,
138}
139
140/// Decision log entry for observability.
141#[derive(Debug, Clone)]
142pub struct ThrottleLog {
143    /// Timestamp of the observation.
144    pub timestamp: Instant,
145    /// Observation index (total count).
146    pub observation_idx: u64,
147    /// Whether this observation was a match (X_t = 1).
148    pub matched: bool,
149    /// Wealth before this observation.
150    pub wealth_before: f64,
151    /// Wealth after this observation.
152    pub wealth_after: f64,
153    /// Betting fraction used.
154    pub lambda: f64,
155    /// Empirical match rate.
156    pub empirical_rate: f64,
157    /// Action taken.
158    pub action: &'static str,
159    /// Time since last recompute (ms).
160    pub time_since_recompute_ms: f64,
161}
162
163/// Aggregate statistics for the throttle.
164#[derive(Debug, Clone)]
165pub struct ThrottleStats {
166    /// Total observations processed.
167    pub total_observations: u64,
168    /// Total recomputes triggered.
169    pub total_recomputes: u64,
170    /// Recomputes forced by hard deadline.
171    pub forced_recomputes: u64,
172    /// Recomputes triggered by e-process threshold.
173    pub eprocess_recomputes: u64,
174    /// Current wealth.
175    pub current_wealth: f64,
176    /// Current lambda.
177    pub current_lambda: f64,
178    /// Current empirical match rate.
179    pub empirical_rate: f64,
180    /// Average observations between recomputes (0 if no recomputes yet).
181    pub avg_observations_between_recomputes: f64,
182}
183
184/// Anytime-valid recompute throttle using e-process (test martingale) control.
185///
186/// See module-level docs for the mathematical model and guarantees.
187#[derive(Debug)]
188pub struct EProcessThrottle {
189    config: ThrottleConfig,
190
191    /// Current wealth W_t. Starts at 1, resets on recompute.
192    wealth: f64,
193
194    /// Current adaptive betting fraction λ_t.
195    lambda: f64,
196
197    /// Clamped mu_0 for safe arithmetic.
198    mu_0: f64,
199
200    /// Maximum lambda: `1 / (1 − μ₀)` minus small epsilon.
201    lambda_max: f64,
202
203    /// E-value threshold: `1 / α`.
204    threshold: f64,
205
206    /// Sliding window of recent observations for empirical rate.
207    recent_matches: VecDeque<bool>,
208
209    /// Total observation count.
210    observation_count: u64,
211
212    /// Observations since last recompute (or creation).
213    observations_since_recompute: u64,
214
215    /// Timestamp of last recompute (or creation).
216    last_recompute: Instant,
217
218    /// Total recomputes.
219    total_recomputes: u64,
220
221    /// Recomputes forced by deadline.
222    forced_recomputes: u64,
223
224    /// Recomputes triggered by e-process.
225    eprocess_recomputes: u64,
226
227    /// Sum of observations_since_recompute at each recompute (for averaging).
228    cumulative_obs_at_recompute: u64,
229
230    /// Decision logs (if logging enabled).
231    logs: Vec<ThrottleLog>,
232}
233
234impl EProcessThrottle {
235    /// Create a new throttle with the given configuration.
236    pub fn new(config: ThrottleConfig) -> Self {
237        Self::new_at(config, Instant::now())
238    }
239
240    /// Create a new throttle at a specific time (for deterministic testing).
241    pub fn new_at(config: ThrottleConfig, now: Instant) -> Self {
242        let mu_0 = config.mu_0.clamp(MU_0_MIN, MU_0_MAX);
243        let lambda_max = 1.0 / mu_0 - 1e-6;
244        let lambda = config.initial_lambda.clamp(1e-6, lambda_max);
245        let threshold = 1.0 / config.alpha.max(1e-12);
246
247        Self {
248            config,
249            wealth: 1.0,
250            lambda,
251            mu_0,
252            lambda_max,
253            threshold,
254            recent_matches: VecDeque::new(),
255            observation_count: 0,
256            observations_since_recompute: 0,
257            last_recompute: now,
258            total_recomputes: 0,
259            forced_recomputes: 0,
260            eprocess_recomputes: 0,
261            cumulative_obs_at_recompute: 0,
262            logs: Vec::new(),
263        }
264    }
265
266    /// Observe a single event. `matched` indicates whether this observation
267    /// is evidence for recomputation (e.g., the log line matched the query).
268    ///
269    /// Returns a [`ThrottleDecision`] indicating whether to recompute.
270    pub fn observe(&mut self, matched: bool) -> ThrottleDecision {
271        self.observe_at(matched, Instant::now())
272    }
273
274    /// Observe at a specific time (for deterministic testing).
275    pub fn observe_at(&mut self, matched: bool, now: Instant) -> ThrottleDecision {
276        self.observation_count += 1;
277        self.observations_since_recompute += 1;
278
279        // Update sliding window
280        self.recent_matches.push_back(matched);
281        while self.recent_matches.len() > self.config.rate_window_size {
282            self.recent_matches.pop_front();
283        }
284
285        let empirical_rate = self.empirical_match_rate();
286
287        // Wealth update: W_t = W_{t-1} × (1 + λ × (X_t − μ₀))
288        let x_t = if matched { 1.0 } else { 0.0 };
289        let wealth_before = self.wealth;
290        let multiplier = 1.0 + self.lambda * (x_t - self.mu_0);
291        self.wealth = (self.wealth * multiplier).max(W_MIN);
292
293        // GRAPA adaptive lambda update
294        // Gradient of log-wealth w.r.t. lambda: (X_t - μ₀) / (1 + λ(X_t - μ₀))
295        let denominator = 1.0 + self.lambda * (x_t - self.mu_0);
296        if denominator.abs() > 1e-12 {
297            let grad = (x_t - self.mu_0) / denominator;
298            self.lambda = (self.lambda + self.config.grapa_eta * grad).clamp(1e-6, self.lambda_max);
299        }
300
301        // Check recompute conditions
302        let time_since_recompute = now.duration_since(self.last_recompute);
303        let hard_deadline_exceeded =
304            time_since_recompute >= Duration::from_millis(self.config.hard_deadline_ms);
305        let min_obs_met = self.observations_since_recompute >= self.config.min_observations_between;
306        let wealth_exceeded = self.wealth >= self.threshold;
307
308        let should_recompute = hard_deadline_exceeded || (wealth_exceeded && min_obs_met);
309        let forced_by_deadline = hard_deadline_exceeded && !wealth_exceeded;
310
311        let action = if should_recompute {
312            if forced_by_deadline {
313                "recompute_forced"
314            } else {
315                "recompute_eprocess"
316            }
317        } else {
318            "observe"
319        };
320
321        self.log_decision(
322            now,
323            matched,
324            wealth_before,
325            self.wealth,
326            action,
327            time_since_recompute,
328        );
329
330        if should_recompute {
331            self.trigger_recompute(now, forced_by_deadline);
332        }
333
334        ThrottleDecision {
335            should_recompute,
336            wealth: self.wealth,
337            lambda: self.lambda,
338            empirical_rate,
339            forced_by_deadline: should_recompute && forced_by_deadline,
340            observations_since_recompute: self.observations_since_recompute,
341        }
342    }
343
344    /// Manually trigger a recompute (e.g., when the query changes).
345    /// Resets the e-process state.
346    pub fn reset(&mut self) {
347        self.reset_at(Instant::now());
348    }
349
350    /// Reset at a specific time (for testing).
351    pub fn reset_at(&mut self, now: Instant) {
352        self.wealth = 1.0;
353        self.observations_since_recompute = 0;
354        self.last_recompute = now;
355        self.recent_matches.clear();
356        // Lambda keeps its adapted value — intentional, since the match rate
357        // character of the data likely hasn't changed.
358    }
359
360    /// Update the null hypothesis match rate μ₀.
361    ///
362    /// Call this when the baseline match rate changes (e.g., new query with
363    /// different selectivity). Resets the e-process.
364    pub fn set_mu_0(&mut self, mu_0: f64) {
365        self.mu_0 = mu_0.clamp(MU_0_MIN, MU_0_MAX);
366        self.lambda_max = 1.0 / self.mu_0 - 1e-6;
367        self.lambda = self.lambda.clamp(1e-6, self.lambda_max);
368        self.reset();
369    }
370
371    /// Current wealth (e-value).
372    #[inline]
373    pub fn wealth(&self) -> f64 {
374        self.wealth
375    }
376
377    /// Current adaptive lambda.
378    #[inline]
379    pub fn lambda(&self) -> f64 {
380        self.lambda
381    }
382
383    /// Empirical match rate over the sliding window.
384    pub fn empirical_match_rate(&self) -> f64 {
385        if self.recent_matches.is_empty() {
386            return 0.0;
387        }
388        let matches = self.recent_matches.iter().filter(|&&m| m).count();
389        matches as f64 / self.recent_matches.len() as f64
390    }
391
392    /// E-value threshold (1/α).
393    #[inline]
394    pub fn threshold(&self) -> f64 {
395        self.threshold
396    }
397
398    /// Total observation count.
399    #[inline]
400    pub fn observation_count(&self) -> u64 {
401        self.observation_count
402    }
403
404    /// Get aggregate statistics.
405    pub fn stats(&self) -> ThrottleStats {
406        let avg_obs = if self.total_recomputes > 0 {
407            self.cumulative_obs_at_recompute as f64 / self.total_recomputes as f64
408        } else {
409            0.0
410        };
411
412        ThrottleStats {
413            total_observations: self.observation_count,
414            total_recomputes: self.total_recomputes,
415            forced_recomputes: self.forced_recomputes,
416            eprocess_recomputes: self.eprocess_recomputes,
417            current_wealth: self.wealth,
418            current_lambda: self.lambda,
419            empirical_rate: self.empirical_match_rate(),
420            avg_observations_between_recomputes: avg_obs,
421        }
422    }
423
424    /// Get decision logs (if logging enabled).
425    pub fn logs(&self) -> &[ThrottleLog] {
426        &self.logs
427    }
428
429    /// Clear decision logs.
430    pub fn clear_logs(&mut self) {
431        self.logs.clear();
432    }
433
434    // --- Internal ---
435
436    fn trigger_recompute(&mut self, now: Instant, forced: bool) {
437        self.total_recomputes += 1;
438        self.cumulative_obs_at_recompute += self.observations_since_recompute;
439        if forced {
440            self.forced_recomputes += 1;
441        } else {
442            self.eprocess_recomputes += 1;
443        }
444        self.wealth = 1.0;
445        self.observations_since_recompute = 0;
446        self.last_recompute = now;
447    }
448
449    fn log_decision(
450        &mut self,
451        now: Instant,
452        matched: bool,
453        wealth_before: f64,
454        wealth_after: f64,
455        action: &'static str,
456        time_since_recompute: Duration,
457    ) {
458        if !self.config.enable_logging {
459            return;
460        }
461
462        self.logs.push(ThrottleLog {
463            timestamp: now,
464            observation_idx: self.observation_count,
465            matched,
466            wealth_before,
467            wealth_after,
468            lambda: self.lambda,
469            empirical_rate: self.empirical_match_rate(),
470            action,
471            time_since_recompute_ms: time_since_recompute.as_secs_f64() * 1000.0,
472        });
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    fn test_config() -> ThrottleConfig {
481        ThrottleConfig {
482            alpha: 0.05,
483            mu_0: 0.1,
484            initial_lambda: 0.5,
485            grapa_eta: 0.1,
486            hard_deadline_ms: 500,
487            min_observations_between: 4,
488            rate_window_size: 32,
489            enable_logging: true,
490        }
491    }
492
493    // ---------------------------------------------------------------
494    // Basic construction and invariants
495    // ---------------------------------------------------------------
496
497    #[test]
498    fn initial_state() {
499        let t = EProcessThrottle::new(test_config());
500        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
501        assert_eq!(t.observation_count(), 0);
502        assert!(t.lambda() > 0.0);
503        assert!((t.threshold() - 20.0).abs() < 0.01); // 1/0.05 = 20
504    }
505
506    #[test]
507    fn mu_0_clamped_to_valid_range() {
508        let mut cfg = test_config();
509        cfg.mu_0 = 0.0;
510        let t = EProcessThrottle::new(cfg.clone());
511        assert!(t.mu_0 >= MU_0_MIN);
512
513        cfg.mu_0 = 1.0;
514        let t = EProcessThrottle::new(cfg.clone());
515        assert!(t.mu_0 <= MU_0_MAX);
516
517        cfg.mu_0 = -5.0;
518        let t = EProcessThrottle::new(cfg);
519        assert!(t.mu_0 >= MU_0_MIN);
520    }
521
522    // ---------------------------------------------------------------
523    // Wealth dynamics
524    // ---------------------------------------------------------------
525
526    #[test]
527    fn no_match_decreases_wealth() {
528        let base = Instant::now();
529        let mut t = EProcessThrottle::new_at(test_config(), base);
530        let d = t.observe_at(false, base + Duration::from_millis(1));
531        assert!(
532            d.wealth < 1.0,
533            "No-match should decrease wealth: {}",
534            d.wealth
535        );
536    }
537
538    #[test]
539    fn match_increases_wealth() {
540        let base = Instant::now();
541        let mut t = EProcessThrottle::new_at(test_config(), base);
542        let d = t.observe_at(true, base + Duration::from_millis(1));
543        assert!(d.wealth > 1.0, "Match should increase wealth: {}", d.wealth);
544    }
545
546    #[test]
547    fn wealth_stays_positive() {
548        let base = Instant::now();
549        let mut t = EProcessThrottle::new_at(test_config(), base);
550        // 1000 non-matches in a row — wealth should never reach zero
551        for i in 1..=1000 {
552            let d = t.observe_at(false, base + Duration::from_millis(i));
553            assert!(d.wealth > 0.0, "Wealth must stay positive at obs {}", i);
554        }
555    }
556
557    #[test]
558    fn wealth_floor_prevents_zero_lock() {
559        let base = Instant::now();
560        let mut cfg = test_config();
561        cfg.hard_deadline_ms = u64::MAX; // disable deadline
562        cfg.initial_lambda = 0.99; // aggressive betting
563        let mut t = EProcessThrottle::new_at(cfg, base);
564
565        for i in 1..=500 {
566            t.observe_at(false, base + Duration::from_millis(i));
567        }
568        assert!(t.wealth() >= W_MIN, "Wealth should be at floor, not zero");
569
570        // A match should still be able to grow wealth from the floor
571        let before = t.wealth();
572        t.observe_at(true, base + Duration::from_millis(501));
573        assert!(
574            t.wealth() > before,
575            "Match should grow wealth even from floor"
576        );
577    }
578
579    // ---------------------------------------------------------------
580    // Recompute triggering
581    // ---------------------------------------------------------------
582
583    #[test]
584    fn burst_of_matches_triggers_recompute() {
585        let base = Instant::now();
586        let mut cfg = test_config();
587        cfg.min_observations_between = 1; // allow fast trigger
588        let mut t = EProcessThrottle::new_at(cfg, base);
589
590        let mut triggered = false;
591        for i in 1..=100 {
592            let d = t.observe_at(true, base + Duration::from_millis(i));
593            if d.should_recompute && !d.forced_by_deadline {
594                triggered = true;
595                break;
596            }
597        }
598        assert!(
599            triggered,
600            "Burst of matches should trigger e-process recompute"
601        );
602    }
603
604    #[test]
605    fn no_matches_does_not_trigger_eprocess() {
606        let base = Instant::now();
607        let mut cfg = test_config();
608        cfg.hard_deadline_ms = u64::MAX;
609        let mut t = EProcessThrottle::new_at(cfg, base);
610
611        for i in 1..=200 {
612            let d = t.observe_at(false, base + Duration::from_millis(i));
613            assert!(
614                !d.should_recompute,
615                "No-match stream should never trigger e-process recompute at obs {}",
616                i
617            );
618        }
619    }
620
621    #[test]
622    fn hard_deadline_forces_recompute() {
623        let base = Instant::now();
624        let mut cfg = test_config();
625        cfg.hard_deadline_ms = 100;
626        cfg.min_observations_between = 1;
627        let mut t = EProcessThrottle::new_at(cfg, base);
628
629        // Only non-matches, but exceed deadline
630        let d = t.observe_at(false, base + Duration::from_millis(150));
631        assert!(d.should_recompute, "Should trigger on deadline");
632        assert!(d.forced_by_deadline, "Should be forced by deadline");
633    }
634
635    #[test]
636    fn min_observations_between_prevents_rapid_fire() {
637        let base = Instant::now();
638        let mut cfg = test_config();
639        cfg.min_observations_between = 10;
640        cfg.hard_deadline_ms = u64::MAX;
641        cfg.alpha = 0.5; // very permissive to trigger early
642        let mut t = EProcessThrottle::new_at(cfg, base);
643
644        let mut first_trigger = None;
645        for i in 1..=100 {
646            let d = t.observe_at(true, base + Duration::from_millis(i));
647            if d.should_recompute {
648                first_trigger = Some(i);
649                break;
650            }
651        }
652
653        assert!(
654            first_trigger.unwrap_or(0) >= 10,
655            "First trigger should be at obs >= 10, was {:?}",
656            first_trigger
657        );
658    }
659
660    #[test]
661    fn reset_clears_wealth_and_counter() {
662        let base = Instant::now();
663        let mut t = EProcessThrottle::new_at(test_config(), base);
664
665        for i in 1..=10 {
666            t.observe_at(true, base + Duration::from_millis(i));
667        }
668        assert!(t.wealth() > 1.0);
669        assert!(t.observations_since_recompute > 0);
670
671        t.reset_at(base + Duration::from_millis(20));
672        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
673        assert_eq!(t.observations_since_recompute, 0);
674    }
675
676    // ---------------------------------------------------------------
677    // Adaptive lambda (GRAPA)
678    // ---------------------------------------------------------------
679
680    #[test]
681    fn lambda_adapts_to_high_match_rate() {
682        let base = Instant::now();
683        let mut cfg = test_config();
684        cfg.hard_deadline_ms = u64::MAX;
685        cfg.min_observations_between = u64::MAX;
686        let mut t = EProcessThrottle::new_at(cfg, base);
687
688        let initial_lambda = t.lambda();
689
690        // Many matches should increase lambda (bet more aggressively)
691        for i in 1..=50 {
692            t.observe_at(true, base + Duration::from_millis(i));
693        }
694
695        assert!(
696            t.lambda() > initial_lambda,
697            "Lambda should increase with frequent matches: {} vs {}",
698            t.lambda(),
699            initial_lambda
700        );
701    }
702
703    #[test]
704    fn lambda_adapts_to_low_match_rate() {
705        let base = Instant::now();
706        let mut cfg = test_config();
707        cfg.hard_deadline_ms = u64::MAX;
708        cfg.min_observations_between = u64::MAX;
709        cfg.initial_lambda = 0.8;
710        let mut t = EProcessThrottle::new_at(cfg, base);
711
712        let initial_lambda = t.lambda();
713
714        // Many non-matches should decrease lambda (bet more conservatively)
715        for i in 1..=50 {
716            t.observe_at(false, base + Duration::from_millis(i));
717        }
718
719        assert!(
720            t.lambda() < initial_lambda,
721            "Lambda should decrease with few matches: {} vs {}",
722            t.lambda(),
723            initial_lambda
724        );
725    }
726
727    #[test]
728    fn lambda_stays_bounded() {
729        let base = Instant::now();
730        let mut cfg = test_config();
731        cfg.hard_deadline_ms = u64::MAX;
732        cfg.min_observations_between = u64::MAX;
733        cfg.grapa_eta = 1.0; // aggressive learning
734        let mut t = EProcessThrottle::new_at(cfg, base);
735
736        for i in 1..=200 {
737            let matched = i % 2 == 0;
738            t.observe_at(matched, base + Duration::from_millis(i as u64));
739        }
740
741        assert!(t.lambda() > 0.0, "Lambda must be positive");
742        assert!(
743            t.lambda() <= t.lambda_max,
744            "Lambda must not exceed 1/(1-mu_0): {} vs {}",
745            t.lambda(),
746            t.lambda_max
747        );
748    }
749
750    // ---------------------------------------------------------------
751    // Empirical match rate
752    // ---------------------------------------------------------------
753
754    #[test]
755    fn empirical_rate_tracks_window() {
756        let base = Instant::now();
757        let mut cfg = test_config();
758        cfg.rate_window_size = 10;
759        cfg.hard_deadline_ms = u64::MAX;
760        cfg.min_observations_between = u64::MAX;
761        let mut t = EProcessThrottle::new_at(cfg, base);
762
763        // 10 matches
764        for i in 1..=10 {
765            t.observe_at(true, base + Duration::from_millis(i));
766        }
767        assert!((t.empirical_match_rate() - 1.0).abs() < f64::EPSILON);
768
769        // 10 non-matches (window slides)
770        for i in 11..=20 {
771            t.observe_at(false, base + Duration::from_millis(i));
772        }
773        assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
774    }
775
776    #[test]
777    fn empirical_rate_zero_when_empty() {
778        let t = EProcessThrottle::new(test_config());
779        assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
780    }
781
782    // ---------------------------------------------------------------
783    // Stats and logging
784    // ---------------------------------------------------------------
785
786    #[test]
787    fn stats_reflect_state() {
788        let base = Instant::now();
789        let mut cfg = test_config();
790        cfg.min_observations_between = 1;
791        let mut t = EProcessThrottle::new_at(cfg, base);
792
793        // Drive past a recompute
794        let mut recomputed = false;
795        for i in 1..=50 {
796            let d = t.observe_at(true, base + Duration::from_millis(i));
797            if d.should_recompute {
798                recomputed = true;
799            }
800        }
801
802        let stats = t.stats();
803        assert_eq!(stats.total_observations, 50);
804        if recomputed {
805            assert!(stats.total_recomputes > 0);
806            assert!(stats.avg_observations_between_recomputes > 0.0);
807        }
808    }
809
810    #[test]
811    fn logging_captures_decisions() {
812        let base = Instant::now();
813        let mut cfg = test_config();
814        cfg.enable_logging = true;
815        let mut t = EProcessThrottle::new_at(cfg, base);
816
817        t.observe_at(true, base + Duration::from_millis(1));
818        t.observe_at(false, base + Duration::from_millis(2));
819
820        assert_eq!(t.logs().len(), 2);
821        assert!(t.logs()[0].matched);
822        assert!(!t.logs()[1].matched);
823
824        t.clear_logs();
825        assert!(t.logs().is_empty());
826    }
827
828    #[test]
829    fn logging_disabled_by_default() {
830        let base = Instant::now();
831        let mut cfg = test_config();
832        cfg.enable_logging = false;
833        let mut t = EProcessThrottle::new_at(cfg, base);
834
835        t.observe_at(true, base + Duration::from_millis(1));
836        assert!(t.logs().is_empty());
837    }
838
839    // ---------------------------------------------------------------
840    // set_mu_0
841    // ---------------------------------------------------------------
842
843    #[test]
844    fn set_mu_0_resets_eprocess() {
845        let base = Instant::now();
846        let mut t = EProcessThrottle::new_at(test_config(), base);
847
848        for i in 1..=10 {
849            t.observe_at(true, base + Duration::from_millis(i));
850        }
851        assert!(t.wealth() > 1.0);
852
853        t.set_mu_0(0.5);
854        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
855    }
856
857    // ---------------------------------------------------------------
858    // Determinism
859    // ---------------------------------------------------------------
860
861    #[test]
862    fn deterministic_behavior() {
863        let base = Instant::now();
864        let cfg = test_config();
865
866        let run = |cfg: &ThrottleConfig| {
867            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
868            let mut decisions = Vec::new();
869            for i in 1..=30 {
870                let matched = i % 3 == 0;
871                let d = t.observe_at(matched, base + Duration::from_millis(i));
872                decisions.push((d.should_recompute, d.forced_by_deadline));
873            }
874            (decisions, t.wealth(), t.lambda())
875        };
876
877        let (d1, w1, l1) = run(&cfg);
878        let (d2, w2, l2) = run(&cfg);
879
880        assert_eq!(d1, d2, "Decisions must be deterministic");
881        assert!((w1 - w2).abs() < 1e-10, "Wealth must be deterministic");
882        assert!((l1 - l2).abs() < 1e-10, "Lambda must be deterministic");
883    }
884
885    // ---------------------------------------------------------------
886    // Supermartingale property (Monte Carlo)
887    // ---------------------------------------------------------------
888
889    #[test]
890    fn property_supermartingale_under_null() {
891        // Under H₀ (match rate = μ₀), the expected wealth should not grow.
892        // We verify empirically by running many trials and checking the
893        // average final wealth ≤ initial wealth (with statistical slack).
894        let base = Instant::now();
895        let mut cfg = test_config();
896        cfg.hard_deadline_ms = u64::MAX;
897        cfg.min_observations_between = u64::MAX;
898        cfg.mu_0 = 0.2;
899        cfg.grapa_eta = 0.0; // fix lambda to test pure martingale property
900
901        let n_trials = 200;
902        let n_obs = 100;
903        let mut total_wealth = 0.0;
904
905        // Simple LCG for deterministic pseudo-random
906        let mut rng_state: u64 = 42;
907        let lcg_next = |state: &mut u64| -> f64 {
908            *state = state
909                .wrapping_mul(6364136223846793005)
910                .wrapping_add(1442695040888963407);
911            (*state >> 33) as f64 / (1u64 << 31) as f64
912        };
913
914        for trial in 0..n_trials {
915            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
916            for i in 1..=n_obs {
917                let matched = lcg_next(&mut rng_state) < cfg.mu_0;
918                t.observe_at(
919                    matched,
920                    base + Duration::from_millis(i as u64 + trial * 1000),
921                );
922            }
923            total_wealth += t.wealth();
924        }
925
926        let avg_wealth = total_wealth / n_trials as f64;
927        // Under H₀ with fixed lambda, E[W_t] ≤ 1. Allow statistical slack.
928        assert!(
929            avg_wealth < 2.0,
930            "Average wealth under H₀ should be near 1.0, got {}",
931            avg_wealth
932        );
933    }
934
935    // ---------------------------------------------------------------
936    // Anytime-valid Type I control
937    // ---------------------------------------------------------------
938
939    #[test]
940    fn property_type_i_control() {
941        // Under H₀, the probability of ever triggering should be ≤ α.
942        // We test with many trials.
943        let base = Instant::now();
944        let mut cfg = test_config();
945        cfg.hard_deadline_ms = u64::MAX;
946        cfg.min_observations_between = 1;
947        cfg.alpha = 0.05;
948        cfg.mu_0 = 0.1;
949        cfg.grapa_eta = 0.0; // fixed lambda for clean test
950
951        let n_trials = 500;
952        let n_obs = 200;
953        let mut false_triggers = 0u64;
954
955        let mut rng_state: u64 = 123;
956        let lcg_next = |state: &mut u64| -> f64 {
957            *state = state
958                .wrapping_mul(6364136223846793005)
959                .wrapping_add(1442695040888963407);
960            (*state >> 33) as f64 / (1u64 << 31) as f64
961        };
962
963        for trial in 0..n_trials {
964            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
965            let mut triggered = false;
966            for i in 1..=n_obs {
967                let matched = lcg_next(&mut rng_state) < cfg.mu_0;
968                let d = t.observe_at(
969                    matched,
970                    base + Duration::from_millis(i as u64 + trial * 1000),
971                );
972                if d.should_recompute {
973                    triggered = true;
974                    break;
975                }
976            }
977            if triggered {
978                false_triggers += 1;
979            }
980        }
981
982        let false_trigger_rate = false_triggers as f64 / n_trials as f64;
983        // Allow 3× slack for finite-sample variance
984        assert!(
985            false_trigger_rate < cfg.alpha * 3.0,
986            "False trigger rate {} exceeds 3×α = {}",
987            false_trigger_rate,
988            cfg.alpha * 3.0
989        );
990    }
991
992    // ---------------------------------------------------------------
993    // Edge cases
994    // ---------------------------------------------------------------
995
996    #[test]
997    fn single_observation() {
998        let base = Instant::now();
999        let cfg = test_config();
1000        let mut t = EProcessThrottle::new_at(cfg, base);
1001        let d = t.observe_at(true, base + Duration::from_millis(1));
1002        assert_eq!(t.observation_count(), 1);
1003        // Should not trigger with just 1 obs (min_observations_between = 4)
1004        assert!(!d.should_recompute || d.forced_by_deadline);
1005    }
1006
1007    #[test]
1008    fn alternating_match_pattern() {
1009        let base = Instant::now();
1010        let mut cfg = test_config();
1011        cfg.hard_deadline_ms = u64::MAX;
1012        cfg.min_observations_between = u64::MAX;
1013        let mut t = EProcessThrottle::new_at(cfg, base);
1014
1015        // Alternating: match rate = 0.5, much higher than μ₀ = 0.1
1016        for i in 1..=100 {
1017            t.observe_at(i % 2 == 0, base + Duration::from_millis(i as u64));
1018        }
1019
1020        // With 50% match rate vs 10% null, wealth should grow significantly
1021        assert!(
1022            t.wealth() > 1.0,
1023            "50% match rate vs 10% null should grow wealth: {}",
1024            t.wealth()
1025        );
1026    }
1027
1028    #[test]
1029    fn recompute_resets_wealth() {
1030        let base = Instant::now();
1031        let mut cfg = test_config();
1032        cfg.min_observations_between = 1;
1033        let mut t = EProcessThrottle::new_at(cfg, base);
1034
1035        // Drive to recompute
1036        let mut triggered = false;
1037        for i in 1..=100 {
1038            let d = t.observe_at(true, base + Duration::from_millis(i));
1039            if d.should_recompute && !d.forced_by_deadline {
1040                // Wealth should be reset to 1.0 after recompute
1041                assert!(
1042                    (t.wealth() - 1.0).abs() < f64::EPSILON,
1043                    "Wealth should reset to 1.0 after recompute, got {}",
1044                    t.wealth()
1045                );
1046                triggered = true;
1047                break;
1048            }
1049        }
1050        assert!(
1051            triggered,
1052            "Should have triggered at least one e-process recompute"
1053        );
1054    }
1055
1056    #[test]
1057    fn consecutive_recomputes_tracked() {
1058        let base = Instant::now();
1059        let mut cfg = test_config();
1060        cfg.min_observations_between = 1;
1061        cfg.alpha = 0.5; // permissive
1062        let mut t = EProcessThrottle::new_at(cfg, base);
1063
1064        let mut recompute_count = 0;
1065        for i in 1..=200 {
1066            let d = t.observe_at(true, base + Duration::from_millis(i));
1067            if d.should_recompute {
1068                recompute_count += 1;
1069            }
1070        }
1071
1072        let stats = t.stats();
1073        assert_eq!(stats.total_recomputes, recompute_count as u64);
1074        assert!(
1075            stats.total_recomputes >= 2,
1076            "Should have multiple recomputes"
1077        );
1078    }
1079}