Skip to main content

ftui_runtime/
eprocess_throttle.rs

1#![forbid(unsafe_code)]
2
3//! Anytime-valid throttle using e-process (test martingale) control.
4//!
5//! This module provides an adaptive recompute throttle for streaming workloads
6//! (e.g., live log search). It uses a wealth-based betting strategy to decide
7//! when accumulated evidence warrants a full recomputation, while providing
8//! anytime-valid statistical guarantees.
9//!
10//! # Mathematical Model
11//!
12//! The throttle maintains a wealth process `W_t`:
13//!
14//! ```text
15//! W_0 = 1
16//! W_t = W_{t-1} × (1 + λ_t × (X_t − μ₀))
17//! ```
18//!
19//! where:
20//! - `X_t ∈ {0, 1}`: whether observation `t` is evidence for recompute
21//!   (e.g., a log line matched the active search/filter query)
22//! - `μ₀`: null hypothesis match rate — the "normal" baseline match frequency
23//! - `λ_t ∈ (0, 1/μ₀)`: betting fraction (adaptive via GRAPA)
24//!
25//! When `W_t ≥ 1/α` (the e-value threshold), we reject H₀ ("results are
26//! still fresh") and trigger recompute. After triggering, `W` resets to 1.
27//!
28//! # Key Invariants
29//!
30//! 1. **Supermartingale**: `E[W_t | W_{t-1}] ≤ W_{t-1}` under H₀
31//! 2. **Anytime-valid Type I control**: `P(∃t: W_t ≥ 1/α) ≤ α` under H₀
32//! 3. **Non-negative wealth**: `W_t ≥ 0` always
33//! 4. **Bounded latency**: hard deadline forces recompute regardless of `W_t`
34//!
35//! # Failure Modes
36//!
37//! | Condition | Behavior | Rationale |
38//! |-----------|----------|-----------|
39//! | `μ₀ = 0` | Clamp to `μ₀ = ε` (1e-6) | Division by zero guard |
40//! | `μ₀ ≥ 1` | Clamp to `1 − ε` | Degenerate: everything matches |
41//! | `W_t` underflow | Clamp to `W_MIN` (1e-12) | Prevents permanent zero-lock |
42//! | Hard deadline exceeded | Force recompute | Bounded worst-case latency |
43//! | No observations | No change to `W_t` | Idle is not evidence |
44//!
45//! # Usage
46//!
47//! ```ignore
48//! use ftui_runtime::eprocess_throttle::{EProcessThrottle, ThrottleConfig};
49//!
50//! let mut throttle = EProcessThrottle::new(ThrottleConfig::default());
51//!
52//! // On each log line push:
53//! let matched = line.contains(&query);
54//! let decision = throttle.observe(matched);
55//! if decision.should_recompute {
56//!     recompute_search_results();
57//! }
58//! ```
59
60use std::collections::VecDeque;
61use std::sync::atomic::{AtomicU64, Ordering};
62use web_time::{Duration, Instant};
63
64/// Minimum wealth floor to prevent permanent zero-lock after adverse bets.
65const W_MIN: f64 = 1e-12;
66
67/// Minimum mu_0 to prevent division by zero.
68const MU_0_MIN: f64 = 1e-6;
69
70/// Maximum mu_0 to prevent degenerate all-match scenarios.
71const MU_0_MAX: f64 = 1.0 - 1e-6;
72
73// ---------------------------------------------------------------------------
74// Monotonic counters (exported for observability dashboards / tests)
75// ---------------------------------------------------------------------------
76
77static EPROCESS_REJECTIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
78
79/// Total e-process rejections (monotonic counter for metrics export).
80#[must_use]
81pub fn eprocess_rejections_total() -> u64 {
82    EPROCESS_REJECTIONS_TOTAL.load(Ordering::Relaxed)
83}
84
85/// Configuration for the e-process throttle.
86#[derive(Debug, Clone)]
87pub struct ThrottleConfig {
88    /// Significance level `α`. Recompute triggers when `W_t ≥ 1/α`.
89    /// Lower α → more conservative (fewer recomputes). Default: 0.05.
90    pub alpha: f64,
91
92    /// Prior null hypothesis match rate `μ₀`. The expected fraction of
93    /// observations that are matches under "normal" conditions.
94    /// Default: 0.1 (10% of log lines match).
95    pub mu_0: f64,
96
97    /// Initial betting fraction. Adaptive GRAPA updates this, but this
98    /// sets the starting value. Must be in `(0, 1/(1 − μ₀))`.
99    /// Default: 0.5.
100    pub initial_lambda: f64,
101
102    /// GRAPA learning rate for adaptive lambda. Higher → faster adaptation
103    /// but noisier. Default: 0.1.
104    pub grapa_eta: f64,
105
106    /// Hard deadline: force recompute if this many milliseconds pass since
107    /// last recompute, regardless of wealth. Default: 500ms.
108    pub hard_deadline_ms: u64,
109
110    /// Minimum observations between recomputes. Prevents rapid-fire
111    /// recomputes when every line matches. Default: 8.
112    pub min_observations_between: u64,
113
114    /// Window size for empirical match rate estimation. Default: 64.
115    pub rate_window_size: usize,
116
117    /// Enable JSONL-compatible decision logging. Default: false.
118    pub enable_logging: bool,
119}
120
121impl Default for ThrottleConfig {
122    fn default() -> Self {
123        Self {
124            alpha: 0.05,
125            mu_0: 0.1,
126            initial_lambda: 0.5,
127            grapa_eta: 0.1,
128            hard_deadline_ms: 500,
129            min_observations_between: 8,
130            rate_window_size: 64,
131            enable_logging: false,
132        }
133    }
134}
135
136/// Decision returned by the throttle on each observation.
137#[derive(Debug, Clone, Copy, PartialEq)]
138pub struct ThrottleDecision {
139    /// Whether to trigger recomputation now.
140    pub should_recompute: bool,
141    /// Current wealth (e-value). When `≥ 1/α`, triggers recompute.
142    pub wealth: f64,
143    /// Current adaptive betting fraction.
144    pub lambda: f64,
145    /// Empirical match rate over the sliding window.
146    pub empirical_rate: f64,
147    /// Whether the decision was forced by hard deadline.
148    pub forced_by_deadline: bool,
149    /// Observations since last recompute.
150    pub observations_since_recompute: u64,
151}
152
153/// Decision log entry for observability.
154#[derive(Debug, Clone)]
155pub struct ThrottleLog {
156    /// Timestamp of the observation.
157    pub timestamp: Instant,
158    /// Observation index (total count).
159    pub observation_idx: u64,
160    /// Whether this observation was a match (X_t = 1).
161    pub matched: bool,
162    /// Wealth before this observation.
163    pub wealth_before: f64,
164    /// Wealth after this observation.
165    pub wealth_after: f64,
166    /// Betting fraction used.
167    pub lambda: f64,
168    /// Empirical match rate.
169    pub empirical_rate: f64,
170    /// Action taken.
171    pub action: &'static str,
172    /// Time since last recompute (ms).
173    pub time_since_recompute_ms: f64,
174}
175
176impl ThrottleDecision {
177    /// Format this decision as a JSONL line for structured logging.
178    #[must_use]
179    pub fn to_jsonl(&self) -> String {
180        format!(
181            r#"{{"schema":"eprocess-throttle-v1","should_recompute":{},"wealth":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"forced_by_deadline":{},"obs_since_recompute":{}}}"#,
182            self.should_recompute,
183            self.wealth,
184            self.lambda,
185            self.empirical_rate,
186            self.forced_by_deadline,
187            self.observations_since_recompute,
188        )
189    }
190}
191
192impl ThrottleLog {
193    /// Format this log entry as a JSONL line for structured logging.
194    #[must_use]
195    pub fn to_jsonl(&self) -> String {
196        format!(
197            r#"{{"schema":"eprocess-log-v1","obs_idx":{},"matched":{},"wealth_before":{:.6},"wealth_after":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"action":"{}","time_since_recompute_ms":{:.3}}}"#,
198            self.observation_idx,
199            self.matched,
200            self.wealth_before,
201            self.wealth_after,
202            self.lambda,
203            self.empirical_rate,
204            self.action,
205            self.time_since_recompute_ms,
206        )
207    }
208}
209
210/// Aggregate statistics for the throttle.
211#[derive(Debug, Clone)]
212pub struct ThrottleStats {
213    /// Total observations processed.
214    pub total_observations: u64,
215    /// Total recomputes triggered.
216    pub total_recomputes: u64,
217    /// Recomputes forced by hard deadline.
218    pub forced_recomputes: u64,
219    /// Recomputes triggered by e-process threshold.
220    pub eprocess_recomputes: u64,
221    /// Current wealth.
222    pub current_wealth: f64,
223    /// Current lambda.
224    pub current_lambda: f64,
225    /// Current empirical match rate.
226    pub empirical_rate: f64,
227    /// Average observations between recomputes (0 if no recomputes yet).
228    pub avg_observations_between_recomputes: f64,
229}
230
231/// Anytime-valid recompute throttle using e-process (test martingale) control.
232///
233/// See module-level docs for the mathematical model and guarantees.
234#[derive(Debug)]
235pub struct EProcessThrottle {
236    config: ThrottleConfig,
237
238    /// Current wealth W_t. Starts at 1, resets on recompute.
239    wealth: f64,
240
241    /// Current adaptive betting fraction λ_t.
242    lambda: f64,
243
244    /// Clamped mu_0 for safe arithmetic.
245    mu_0: f64,
246
247    /// Maximum lambda: `1 / (1 − μ₀)` minus small epsilon.
248    lambda_max: f64,
249
250    /// E-value threshold: `1 / α`.
251    threshold: f64,
252
253    /// Sliding window of recent observations for empirical rate.
254    recent_matches: VecDeque<bool>,
255
256    /// Total observation count.
257    observation_count: u64,
258
259    /// Observations since last recompute (or creation).
260    observations_since_recompute: u64,
261
262    /// Timestamp of last recompute (or creation).
263    last_recompute: Instant,
264
265    /// Total recomputes.
266    total_recomputes: u64,
267
268    /// Recomputes forced by deadline.
269    forced_recomputes: u64,
270
271    /// Recomputes triggered by e-process.
272    eprocess_recomputes: u64,
273
274    /// Sum of observations_since_recompute at each recompute (for averaging).
275    cumulative_obs_at_recompute: u64,
276
277    /// Decision logs (if logging enabled).
278    logs: Vec<ThrottleLog>,
279}
280
281impl EProcessThrottle {
282    /// Create a new throttle with the given configuration.
283    pub fn new(config: ThrottleConfig) -> Self {
284        Self::new_at(config, Instant::now())
285    }
286
287    /// Create a new throttle at a specific time (for deterministic testing).
288    pub fn new_at(config: ThrottleConfig, now: Instant) -> Self {
289        let mu_0 = config.mu_0.clamp(MU_0_MIN, MU_0_MAX);
290        let lambda_max = 1.0 / mu_0 - 1e-6;
291        let lambda = config.initial_lambda.clamp(1e-6, lambda_max);
292        let threshold = 1.0 / config.alpha.max(1e-12);
293
294        Self {
295            config,
296            wealth: 1.0,
297            lambda,
298            mu_0,
299            lambda_max,
300            threshold,
301            recent_matches: VecDeque::new(),
302            observation_count: 0,
303            observations_since_recompute: 0,
304            last_recompute: now,
305            total_recomputes: 0,
306            forced_recomputes: 0,
307            eprocess_recomputes: 0,
308            cumulative_obs_at_recompute: 0,
309            logs: Vec::new(),
310        }
311    }
312
313    /// Observe a single event. `matched` indicates whether this observation
314    /// is evidence for recomputation (e.g., the log line matched the query).
315    ///
316    /// Returns a [`ThrottleDecision`] indicating whether to recompute.
317    pub fn observe(&mut self, matched: bool) -> ThrottleDecision {
318        self.observe_at(matched, Instant::now())
319    }
320
321    /// Observe at a specific time (for deterministic testing).
322    pub fn observe_at(&mut self, matched: bool, now: Instant) -> ThrottleDecision {
323        self.observation_count += 1;
324        self.observations_since_recompute += 1;
325
326        // Update sliding window
327        self.recent_matches.push_back(matched);
328        while self.recent_matches.len() > self.config.rate_window_size {
329            self.recent_matches.pop_front();
330        }
331
332        let empirical_rate = self.empirical_match_rate();
333
334        // Wealth update: W_t = W_{t-1} × (1 + λ × (X_t − μ₀))
335        let x_t = if matched { 1.0 } else { 0.0 };
336        let wealth_before = self.wealth;
337        let multiplier = 1.0 + self.lambda * (x_t - self.mu_0);
338        self.wealth = (self.wealth * multiplier).max(W_MIN);
339
340        // GRAPA adaptive lambda update
341        // Gradient of log-wealth w.r.t. lambda: (X_t - μ₀) / (1 + λ(X_t - μ₀))
342        let denominator = 1.0 + self.lambda * (x_t - self.mu_0);
343        if denominator.abs() > 1e-12 {
344            let grad = (x_t - self.mu_0) / denominator;
345            self.lambda = (self.lambda + self.config.grapa_eta * grad).clamp(1e-6, self.lambda_max);
346        }
347
348        // Check recompute conditions
349        let time_since_recompute = now.saturating_duration_since(self.last_recompute);
350        let hard_deadline_exceeded =
351            time_since_recompute >= Duration::from_millis(self.config.hard_deadline_ms);
352        let min_obs_met = self.observations_since_recompute >= self.config.min_observations_between;
353        let wealth_exceeded = self.wealth >= self.threshold;
354
355        let eprocess_triggered = wealth_exceeded && min_obs_met;
356        let should_recompute = hard_deadline_exceeded || eprocess_triggered;
357        let forced_by_deadline = hard_deadline_exceeded && !eprocess_triggered;
358
359        let action = if should_recompute {
360            if forced_by_deadline {
361                "recompute_forced"
362            } else {
363                "recompute_eprocess"
364            }
365        } else {
366            "observe"
367        };
368
369        // --- Tracing observability (bd-37a.5) ---
370        let rejected = eprocess_triggered;
371        let _span = tracing::debug_span!(
372            "eprocess.update",
373            test_id = "throttle",
374            wealth_current = %self.wealth,
375            wealth_threshold = %self.threshold,
376            observation_count = self.observation_count,
377            rejected = rejected,
378        )
379        .entered();
380
381        tracing::debug!(
382            target: "ftui.eprocess",
383            wealth_before = %wealth_before,
384            wealth_after = %self.wealth,
385            lambda = %self.lambda,
386            empirical_rate = %empirical_rate,
387            matched = matched,
388            eprocess_wealth = %self.wealth,
389            observation_count = self.observation_count,
390            action = %action,
391            "wealth update"
392        );
393
394        if rejected {
395            EPROCESS_REJECTIONS_TOTAL.fetch_add(1, Ordering::Relaxed);
396            tracing::info!(
397                target: "ftui.eprocess",
398                wealth = %self.wealth,
399                threshold = %self.threshold,
400                observation_count = self.observation_count,
401                observations_since_recompute = self.observations_since_recompute,
402                "e-process rejection: significant finding"
403            );
404        }
405
406        if forced_by_deadline && should_recompute {
407            tracing::info!(
408                target: "ftui.eprocess",
409                deadline_ms = self.config.hard_deadline_ms,
410                observation_count = self.observation_count,
411                "hard deadline forced recompute"
412            );
413        }
414
415        self.log_decision(
416            now,
417            matched,
418            wealth_before,
419            self.wealth,
420            action,
421            time_since_recompute,
422        );
423
424        if should_recompute {
425            self.trigger_recompute(now, forced_by_deadline);
426        }
427
428        ThrottleDecision {
429            should_recompute,
430            wealth: self.wealth,
431            lambda: self.lambda,
432            empirical_rate,
433            forced_by_deadline: should_recompute && forced_by_deadline,
434            observations_since_recompute: self.observations_since_recompute,
435        }
436    }
437
438    /// Manually trigger a recompute (e.g., when the query changes).
439    /// Resets the e-process state.
440    pub fn reset(&mut self) {
441        self.reset_at(Instant::now());
442    }
443
444    /// Reset at a specific time (for testing).
445    pub fn reset_at(&mut self, now: Instant) {
446        self.wealth = 1.0;
447        self.observations_since_recompute = 0;
448        self.last_recompute = now;
449        self.recent_matches.clear();
450        // Lambda keeps its adapted value — intentional, since the match rate
451        // character of the data likely hasn't changed.
452    }
453
454    /// Update the null hypothesis match rate μ₀.
455    ///
456    /// Call this when the baseline match rate changes (e.g., new query with
457    /// different selectivity). Resets the e-process.
458    pub fn set_mu_0(&mut self, mu_0: f64) {
459        self.mu_0 = mu_0.clamp(MU_0_MIN, MU_0_MAX);
460        self.lambda_max = 1.0 / self.mu_0 - 1e-6;
461        self.lambda = self.lambda.clamp(1e-6, self.lambda_max);
462        self.reset();
463    }
464
465    /// Current wealth (e-value).
466    #[inline]
467    pub fn wealth(&self) -> f64 {
468        self.wealth
469    }
470
471    /// Current adaptive lambda.
472    #[inline]
473    pub fn lambda(&self) -> f64 {
474        self.lambda
475    }
476
477    /// Empirical match rate over the sliding window.
478    pub fn empirical_match_rate(&self) -> f64 {
479        if self.recent_matches.is_empty() {
480            return 0.0;
481        }
482        let matches = self.recent_matches.iter().filter(|&&m| m).count();
483        matches as f64 / self.recent_matches.len() as f64
484    }
485
486    /// E-value threshold (1/α).
487    #[inline]
488    pub fn threshold(&self) -> f64 {
489        self.threshold
490    }
491
492    /// Total observation count.
493    #[inline]
494    pub fn observation_count(&self) -> u64 {
495        self.observation_count
496    }
497
498    /// Get aggregate statistics.
499    pub fn stats(&self) -> ThrottleStats {
500        let avg_obs = if self.total_recomputes > 0 {
501            self.cumulative_obs_at_recompute as f64 / self.total_recomputes as f64
502        } else {
503            0.0
504        };
505
506        ThrottleStats {
507            total_observations: self.observation_count,
508            total_recomputes: self.total_recomputes,
509            forced_recomputes: self.forced_recomputes,
510            eprocess_recomputes: self.eprocess_recomputes,
511            current_wealth: self.wealth,
512            current_lambda: self.lambda,
513            empirical_rate: self.empirical_match_rate(),
514            avg_observations_between_recomputes: avg_obs,
515        }
516    }
517
518    /// Get decision logs (if logging enabled).
519    pub fn logs(&self) -> &[ThrottleLog] {
520        &self.logs
521    }
522
523    /// Clear decision logs.
524    pub fn clear_logs(&mut self) {
525        self.logs.clear();
526    }
527
528    // --- Internal ---
529
530    fn trigger_recompute(&mut self, now: Instant, forced: bool) {
531        self.total_recomputes += 1;
532        self.cumulative_obs_at_recompute += self.observations_since_recompute;
533        if forced {
534            self.forced_recomputes += 1;
535        } else {
536            self.eprocess_recomputes += 1;
537        }
538        self.wealth = 1.0;
539        self.observations_since_recompute = 0;
540        self.last_recompute = now;
541    }
542
543    fn log_decision(
544        &mut self,
545        now: Instant,
546        matched: bool,
547        wealth_before: f64,
548        wealth_after: f64,
549        action: &'static str,
550        time_since_recompute: Duration,
551    ) {
552        if !self.config.enable_logging {
553            return;
554        }
555
556        self.logs.push(ThrottleLog {
557            timestamp: now,
558            observation_idx: self.observation_count,
559            matched,
560            wealth_before,
561            wealth_after,
562            lambda: self.lambda,
563            empirical_rate: self.empirical_match_rate(),
564            action,
565            time_since_recompute_ms: time_since_recompute.as_secs_f64() * 1000.0,
566        });
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573    use std::collections::HashMap;
574    use std::sync::{Arc, Mutex};
575    use tracing_subscriber::layer::SubscriberExt;
576    use tracing_subscriber::registry::LookupSpan;
577
578    fn test_config() -> ThrottleConfig {
579        ThrottleConfig {
580            alpha: 0.05,
581            mu_0: 0.1,
582            initial_lambda: 0.5,
583            grapa_eta: 0.1,
584            hard_deadline_ms: 500,
585            min_observations_between: 4,
586            rate_window_size: 32,
587            enable_logging: true,
588        }
589    }
590
591    // ---------------------------------------------------------------
592    // Basic construction and invariants
593    // ---------------------------------------------------------------
594
595    #[test]
596    fn initial_state() {
597        let t = EProcessThrottle::new(test_config());
598        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
599        assert_eq!(t.observation_count(), 0);
600        assert!(t.lambda() > 0.0);
601        assert!((t.threshold() - 20.0).abs() < 0.01); // 1/0.05 = 20
602    }
603
604    #[test]
605    fn mu_0_clamped_to_valid_range() {
606        let mut cfg = test_config();
607        cfg.mu_0 = 0.0;
608        let t = EProcessThrottle::new(cfg.clone());
609        assert!(t.mu_0 >= MU_0_MIN);
610
611        cfg.mu_0 = 1.0;
612        let t = EProcessThrottle::new(cfg.clone());
613        assert!(t.mu_0 <= MU_0_MAX);
614
615        cfg.mu_0 = -5.0;
616        let t = EProcessThrottle::new(cfg);
617        assert!(t.mu_0 >= MU_0_MIN);
618    }
619
620    // ---------------------------------------------------------------
621    // Wealth dynamics
622    // ---------------------------------------------------------------
623
624    #[test]
625    fn no_match_decreases_wealth() {
626        let base = Instant::now();
627        let mut t = EProcessThrottle::new_at(test_config(), base);
628        let d = t.observe_at(false, base + Duration::from_millis(1));
629        assert!(
630            d.wealth < 1.0,
631            "No-match should decrease wealth: {}",
632            d.wealth
633        );
634    }
635
636    #[test]
637    fn match_increases_wealth() {
638        let base = Instant::now();
639        let mut t = EProcessThrottle::new_at(test_config(), base);
640        let d = t.observe_at(true, base + Duration::from_millis(1));
641        assert!(d.wealth > 1.0, "Match should increase wealth: {}", d.wealth);
642    }
643
644    #[test]
645    fn wealth_stays_positive() {
646        let base = Instant::now();
647        let mut t = EProcessThrottle::new_at(test_config(), base);
648        // 1000 non-matches in a row — wealth should never reach zero
649        for i in 1..=1000 {
650            let d = t.observe_at(false, base + Duration::from_millis(i));
651            assert!(d.wealth > 0.0, "Wealth must stay positive at obs {}", i);
652        }
653    }
654
655    #[test]
656    fn wealth_floor_prevents_zero_lock() {
657        let base = Instant::now();
658        let mut cfg = test_config();
659        cfg.hard_deadline_ms = u64::MAX; // disable deadline
660        cfg.initial_lambda = 0.99; // aggressive betting
661        let mut t = EProcessThrottle::new_at(cfg, base);
662
663        for i in 1..=500 {
664            t.observe_at(false, base + Duration::from_millis(i));
665        }
666        assert!(t.wealth() >= W_MIN, "Wealth should be at floor, not zero");
667
668        // A match should still be able to grow wealth from the floor
669        let before = t.wealth();
670        t.observe_at(true, base + Duration::from_millis(501));
671        assert!(
672            t.wealth() > before,
673            "Match should grow wealth even from floor"
674        );
675    }
676
677    // ---------------------------------------------------------------
678    // Recompute triggering
679    // ---------------------------------------------------------------
680
681    #[test]
682    fn burst_of_matches_triggers_recompute() {
683        let base = Instant::now();
684        let mut cfg = test_config();
685        cfg.min_observations_between = 1; // allow fast trigger
686        let mut t = EProcessThrottle::new_at(cfg, base);
687
688        let mut triggered = false;
689        for i in 1..=100 {
690            let d = t.observe_at(true, base + Duration::from_millis(i));
691            if d.should_recompute && !d.forced_by_deadline {
692                triggered = true;
693                break;
694            }
695        }
696        assert!(
697            triggered,
698            "Burst of matches should trigger e-process recompute"
699        );
700    }
701
702    #[test]
703    fn no_matches_does_not_trigger_eprocess() {
704        let base = Instant::now();
705        let mut cfg = test_config();
706        cfg.hard_deadline_ms = u64::MAX;
707        let mut t = EProcessThrottle::new_at(cfg, base);
708
709        for i in 1..=200 {
710            let d = t.observe_at(false, base + Duration::from_millis(i));
711            assert!(
712                !d.should_recompute,
713                "No-match stream should never trigger e-process recompute at obs {}",
714                i
715            );
716        }
717    }
718
719    #[test]
720    fn hard_deadline_forces_recompute() {
721        let base = Instant::now();
722        let mut cfg = test_config();
723        cfg.hard_deadline_ms = 100;
724        cfg.min_observations_between = 1;
725        let mut t = EProcessThrottle::new_at(cfg, base);
726
727        // Only non-matches, but exceed deadline
728        let d = t.observe_at(false, base + Duration::from_millis(150));
729        assert!(d.should_recompute, "Should trigger on deadline");
730        assert!(d.forced_by_deadline, "Should be forced by deadline");
731    }
732
733    #[test]
734    fn min_observations_between_prevents_rapid_fire() {
735        let base = Instant::now();
736        let mut cfg = test_config();
737        cfg.min_observations_between = 10;
738        cfg.hard_deadline_ms = u64::MAX;
739        cfg.alpha = 0.5; // very permissive to trigger early
740        let mut t = EProcessThrottle::new_at(cfg, base);
741
742        let mut first_trigger = None;
743        for i in 1..=100 {
744            let d = t.observe_at(true, base + Duration::from_millis(i));
745            if d.should_recompute {
746                first_trigger = Some(i);
747                break;
748            }
749        }
750
751        assert!(
752            first_trigger.unwrap_or(0) >= 10,
753            "First trigger should be at obs >= 10, was {:?}",
754            first_trigger
755        );
756    }
757
758    #[test]
759    fn reset_clears_wealth_and_counter() {
760        let base = Instant::now();
761        let mut t = EProcessThrottle::new_at(test_config(), base);
762
763        for i in 1..=10 {
764            t.observe_at(true, base + Duration::from_millis(i));
765        }
766        assert!(t.wealth() > 1.0);
767        assert!(t.observations_since_recompute > 0);
768
769        t.reset_at(base + Duration::from_millis(20));
770        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
771        assert_eq!(t.observations_since_recompute, 0);
772    }
773
774    // ---------------------------------------------------------------
775    // Adaptive lambda (GRAPA)
776    // ---------------------------------------------------------------
777
778    #[test]
779    fn lambda_adapts_to_high_match_rate() {
780        let base = Instant::now();
781        let mut cfg = test_config();
782        cfg.hard_deadline_ms = u64::MAX;
783        cfg.min_observations_between = u64::MAX;
784        let mut t = EProcessThrottle::new_at(cfg, base);
785
786        let initial_lambda = t.lambda();
787
788        // Many matches should increase lambda (bet more aggressively)
789        for i in 1..=50 {
790            t.observe_at(true, base + Duration::from_millis(i));
791        }
792
793        assert!(
794            t.lambda() > initial_lambda,
795            "Lambda should increase with frequent matches: {} vs {}",
796            t.lambda(),
797            initial_lambda
798        );
799    }
800
801    #[test]
802    fn lambda_adapts_to_low_match_rate() {
803        let base = Instant::now();
804        let mut cfg = test_config();
805        cfg.hard_deadline_ms = u64::MAX;
806        cfg.min_observations_between = u64::MAX;
807        cfg.initial_lambda = 0.8;
808        let mut t = EProcessThrottle::new_at(cfg, base);
809
810        let initial_lambda = t.lambda();
811
812        // Many non-matches should decrease lambda (bet more conservatively)
813        for i in 1..=50 {
814            t.observe_at(false, base + Duration::from_millis(i));
815        }
816
817        assert!(
818            t.lambda() < initial_lambda,
819            "Lambda should decrease with few matches: {} vs {}",
820            t.lambda(),
821            initial_lambda
822        );
823    }
824
825    #[test]
826    fn lambda_stays_bounded() {
827        let base = Instant::now();
828        let mut cfg = test_config();
829        cfg.hard_deadline_ms = u64::MAX;
830        cfg.min_observations_between = u64::MAX;
831        cfg.grapa_eta = 1.0; // aggressive learning
832        let mut t = EProcessThrottle::new_at(cfg, base);
833
834        for i in 1..=200 {
835            let matched = i % 2 == 0;
836            t.observe_at(matched, base + Duration::from_millis(i as u64));
837        }
838
839        assert!(t.lambda() > 0.0, "Lambda must be positive");
840        assert!(
841            t.lambda() <= t.lambda_max,
842            "Lambda must not exceed 1/(1-mu_0): {} vs {}",
843            t.lambda(),
844            t.lambda_max
845        );
846    }
847
848    // ---------------------------------------------------------------
849    // Empirical match rate
850    // ---------------------------------------------------------------
851
852    #[test]
853    fn empirical_rate_tracks_window() {
854        let base = Instant::now();
855        let mut cfg = test_config();
856        cfg.rate_window_size = 10;
857        cfg.hard_deadline_ms = u64::MAX;
858        cfg.min_observations_between = u64::MAX;
859        let mut t = EProcessThrottle::new_at(cfg, base);
860
861        // 10 matches
862        for i in 1..=10 {
863            t.observe_at(true, base + Duration::from_millis(i));
864        }
865        assert!((t.empirical_match_rate() - 1.0).abs() < f64::EPSILON);
866
867        // 10 non-matches (window slides)
868        for i in 11..=20 {
869            t.observe_at(false, base + Duration::from_millis(i));
870        }
871        assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
872    }
873
874    #[test]
875    fn empirical_rate_zero_when_empty() {
876        let t = EProcessThrottle::new(test_config());
877        assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
878    }
879
880    // ---------------------------------------------------------------
881    // Stats and logging
882    // ---------------------------------------------------------------
883
884    #[test]
885    fn stats_reflect_state() {
886        let base = Instant::now();
887        let mut cfg = test_config();
888        cfg.min_observations_between = 1;
889        let mut t = EProcessThrottle::new_at(cfg, base);
890
891        // Drive past a recompute
892        let mut recomputed = false;
893        for i in 1..=50 {
894            let d = t.observe_at(true, base + Duration::from_millis(i));
895            if d.should_recompute {
896                recomputed = true;
897            }
898        }
899
900        let stats = t.stats();
901        assert_eq!(stats.total_observations, 50);
902        if recomputed {
903            assert!(stats.total_recomputes > 0);
904            assert!(stats.avg_observations_between_recomputes > 0.0);
905        }
906    }
907
908    #[test]
909    fn logging_captures_decisions() {
910        let base = Instant::now();
911        let mut cfg = test_config();
912        cfg.enable_logging = true;
913        let mut t = EProcessThrottle::new_at(cfg, base);
914
915        t.observe_at(true, base + Duration::from_millis(1));
916        t.observe_at(false, base + Duration::from_millis(2));
917
918        assert_eq!(t.logs().len(), 2);
919        assert!(t.logs()[0].matched);
920        assert!(!t.logs()[1].matched);
921
922        t.clear_logs();
923        assert!(t.logs().is_empty());
924    }
925
926    #[test]
927    fn logging_disabled_by_default() {
928        let base = Instant::now();
929        let mut cfg = test_config();
930        cfg.enable_logging = false;
931        let mut t = EProcessThrottle::new_at(cfg, base);
932
933        t.observe_at(true, base + Duration::from_millis(1));
934        assert!(t.logs().is_empty());
935    }
936
937    // ---------------------------------------------------------------
938    // set_mu_0
939    // ---------------------------------------------------------------
940
941    #[test]
942    fn set_mu_0_resets_eprocess() {
943        let base = Instant::now();
944        let mut t = EProcessThrottle::new_at(test_config(), base);
945
946        for i in 1..=10 {
947            t.observe_at(true, base + Duration::from_millis(i));
948        }
949        assert!(t.wealth() > 1.0);
950
951        t.set_mu_0(0.5);
952        assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
953    }
954
955    // ---------------------------------------------------------------
956    // Determinism
957    // ---------------------------------------------------------------
958
959    #[test]
960    fn deterministic_behavior() {
961        let base = Instant::now();
962        let cfg = test_config();
963
964        let run = |cfg: &ThrottleConfig| {
965            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
966            let mut decisions = Vec::new();
967            for i in 1..=30 {
968                let matched = i % 3 == 0;
969                let d = t.observe_at(matched, base + Duration::from_millis(i));
970                decisions.push((d.should_recompute, d.forced_by_deadline));
971            }
972            (decisions, t.wealth(), t.lambda())
973        };
974
975        let (d1, w1, l1) = run(&cfg);
976        let (d2, w2, l2) = run(&cfg);
977
978        assert_eq!(d1, d2, "Decisions must be deterministic");
979        assert!((w1 - w2).abs() < 1e-10, "Wealth must be deterministic");
980        assert!((l1 - l2).abs() < 1e-10, "Lambda must be deterministic");
981    }
982
983    // ---------------------------------------------------------------
984    // Supermartingale property (Monte Carlo)
985    // ---------------------------------------------------------------
986
987    #[test]
988    fn property_supermartingale_under_null() {
989        // Under H₀ (match rate = μ₀), the expected wealth should not grow.
990        // We verify empirically by running many trials and checking the
991        // average final wealth ≤ initial wealth (with statistical slack).
992        let base = Instant::now();
993        let mut cfg = test_config();
994        cfg.hard_deadline_ms = u64::MAX;
995        cfg.min_observations_between = u64::MAX;
996        cfg.mu_0 = 0.2;
997        cfg.grapa_eta = 0.0; // fix lambda to test pure martingale property
998
999        let n_trials = 200;
1000        let n_obs = 100;
1001        let mut total_wealth = 0.0;
1002
1003        // Simple LCG for deterministic pseudo-random
1004        let mut rng_state: u64 = 42;
1005        let lcg_next = |state: &mut u64| -> f64 {
1006            *state = state
1007                .wrapping_mul(6364136223846793005)
1008                .wrapping_add(1442695040888963407);
1009            (*state >> 33) as f64 / (1u64 << 31) as f64
1010        };
1011
1012        for trial in 0..n_trials {
1013            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
1014            for i in 1..=n_obs {
1015                let matched = lcg_next(&mut rng_state) < cfg.mu_0;
1016                t.observe_at(
1017                    matched,
1018                    base + Duration::from_millis(i as u64 + trial * 1000),
1019                );
1020            }
1021            total_wealth += t.wealth();
1022        }
1023
1024        let avg_wealth = total_wealth / n_trials as f64;
1025        // Under H₀ with fixed lambda, E[W_t] ≤ 1. Allow statistical slack.
1026        assert!(
1027            avg_wealth < 2.0,
1028            "Average wealth under H₀ should be near 1.0, got {}",
1029            avg_wealth
1030        );
1031    }
1032
1033    // ---------------------------------------------------------------
1034    // Anytime-valid Type I control
1035    // ---------------------------------------------------------------
1036
1037    #[test]
1038    fn property_type_i_control() {
1039        // Under H₀, the probability of ever triggering should be ≤ α.
1040        // We test with many trials.
1041        let base = Instant::now();
1042        let mut cfg = test_config();
1043        cfg.hard_deadline_ms = u64::MAX;
1044        cfg.min_observations_between = 1;
1045        cfg.alpha = 0.05;
1046        cfg.mu_0 = 0.1;
1047        cfg.grapa_eta = 0.0; // fixed lambda for clean test
1048
1049        let n_trials = 500;
1050        let n_obs = 200;
1051        let mut false_triggers = 0u64;
1052
1053        let mut rng_state: u64 = 123;
1054        let lcg_next = |state: &mut u64| -> f64 {
1055            *state = state
1056                .wrapping_mul(6364136223846793005)
1057                .wrapping_add(1442695040888963407);
1058            (*state >> 33) as f64 / (1u64 << 31) as f64
1059        };
1060
1061        for trial in 0..n_trials {
1062            let mut t = EProcessThrottle::new_at(cfg.clone(), base);
1063            let mut triggered = false;
1064            for i in 1..=n_obs {
1065                let matched = lcg_next(&mut rng_state) < cfg.mu_0;
1066                let d = t.observe_at(
1067                    matched,
1068                    base + Duration::from_millis(i as u64 + trial * 1000),
1069                );
1070                if d.should_recompute {
1071                    triggered = true;
1072                    break;
1073                }
1074            }
1075            if triggered {
1076                false_triggers += 1;
1077            }
1078        }
1079
1080        let false_trigger_rate = false_triggers as f64 / n_trials as f64;
1081        // Allow 3× slack for finite-sample variance
1082        assert!(
1083            false_trigger_rate < cfg.alpha * 3.0,
1084            "False trigger rate {} exceeds 3×α = {}",
1085            false_trigger_rate,
1086            cfg.alpha * 3.0
1087        );
1088    }
1089
1090    // ---------------------------------------------------------------
1091    // Edge cases
1092    // ---------------------------------------------------------------
1093
1094    #[test]
1095    fn single_observation() {
1096        let base = Instant::now();
1097        let cfg = test_config();
1098        let mut t = EProcessThrottle::new_at(cfg, base);
1099        let d = t.observe_at(true, base + Duration::from_millis(1));
1100        assert_eq!(t.observation_count(), 1);
1101        // Should not trigger with just 1 obs (min_observations_between = 4)
1102        assert!(!d.should_recompute || d.forced_by_deadline);
1103    }
1104
1105    #[test]
1106    fn alternating_match_pattern() {
1107        let base = Instant::now();
1108        let mut cfg = test_config();
1109        cfg.hard_deadline_ms = u64::MAX;
1110        cfg.min_observations_between = u64::MAX;
1111        let mut t = EProcessThrottle::new_at(cfg, base);
1112
1113        // Alternating: match rate = 0.5, much higher than μ₀ = 0.1
1114        for i in 1..=100 {
1115            t.observe_at(i % 2 == 0, base + Duration::from_millis(i as u64));
1116        }
1117
1118        // With 50% match rate vs 10% null, wealth should grow significantly
1119        assert!(
1120            t.wealth() > 1.0,
1121            "50% match rate vs 10% null should grow wealth: {}",
1122            t.wealth()
1123        );
1124    }
1125
1126    #[test]
1127    fn recompute_resets_wealth() {
1128        let base = Instant::now();
1129        let mut cfg = test_config();
1130        cfg.min_observations_between = 1;
1131        let mut t = EProcessThrottle::new_at(cfg, base);
1132
1133        // Drive to recompute
1134        let mut triggered = false;
1135        for i in 1..=100 {
1136            let d = t.observe_at(true, base + Duration::from_millis(i));
1137            if d.should_recompute && !d.forced_by_deadline {
1138                // Wealth should be reset to 1.0 after recompute
1139                assert!(
1140                    (t.wealth() - 1.0).abs() < f64::EPSILON,
1141                    "Wealth should reset to 1.0 after recompute, got {}",
1142                    t.wealth()
1143                );
1144                triggered = true;
1145                break;
1146            }
1147        }
1148        assert!(
1149            triggered,
1150            "Should have triggered at least one e-process recompute"
1151        );
1152    }
1153
1154    #[test]
1155    fn config_default_values() {
1156        let cfg = ThrottleConfig::default();
1157        assert!((cfg.alpha - 0.05).abs() < f64::EPSILON);
1158        assert!((cfg.mu_0 - 0.1).abs() < f64::EPSILON);
1159        assert!((cfg.initial_lambda - 0.5).abs() < f64::EPSILON);
1160        assert!((cfg.grapa_eta - 0.1).abs() < f64::EPSILON);
1161        assert_eq!(cfg.hard_deadline_ms, 500);
1162        assert_eq!(cfg.min_observations_between, 8);
1163        assert_eq!(cfg.rate_window_size, 64);
1164        assert!(!cfg.enable_logging);
1165    }
1166
1167    #[test]
1168    fn throttle_decision_fields() {
1169        let base = Instant::now();
1170        let mut cfg = test_config();
1171        cfg.hard_deadline_ms = u64::MAX;
1172        let mut t = EProcessThrottle::new_at(cfg, base);
1173        let d = t.observe_at(true, base + Duration::from_millis(1));
1174
1175        assert!(!d.should_recompute);
1176        assert!(!d.forced_by_deadline);
1177        assert!(d.wealth > 1.0);
1178        assert!(d.lambda > 0.0);
1179        assert!((d.empirical_rate - 1.0).abs() < f64::EPSILON);
1180        assert_eq!(d.observations_since_recompute, 1);
1181    }
1182
1183    #[test]
1184    fn stats_no_recomputes_avg_is_zero() {
1185        let base = Instant::now();
1186        let mut cfg = test_config();
1187        cfg.hard_deadline_ms = u64::MAX;
1188        cfg.min_observations_between = u64::MAX;
1189        let mut t = EProcessThrottle::new_at(cfg, base);
1190
1191        t.observe_at(false, base + Duration::from_millis(1));
1192        let stats = t.stats();
1193        assert_eq!(stats.total_recomputes, 0);
1194        assert!((stats.avg_observations_between_recomputes - 0.0).abs() < f64::EPSILON);
1195    }
1196
1197    #[test]
1198    fn set_mu_0_clamps_extreme_values() {
1199        let base = Instant::now();
1200        let mut t = EProcessThrottle::new_at(test_config(), base);
1201
1202        t.set_mu_0(0.0);
1203        assert!(t.mu_0 >= MU_0_MIN);
1204
1205        t.set_mu_0(2.0);
1206        assert!(t.mu_0 <= MU_0_MAX);
1207    }
1208
1209    #[test]
1210    fn reset_preserves_lambda() {
1211        let base = Instant::now();
1212        let mut cfg = test_config();
1213        cfg.hard_deadline_ms = u64::MAX;
1214        cfg.min_observations_between = u64::MAX;
1215        let mut t = EProcessThrottle::new_at(cfg, base);
1216
1217        for i in 1..=20 {
1218            t.observe_at(true, base + Duration::from_millis(i));
1219        }
1220        let lambda_before = t.lambda();
1221        t.reset_at(base + Duration::from_millis(30));
1222        assert!(
1223            (t.lambda() - lambda_before).abs() < f64::EPSILON,
1224            "Lambda should be preserved across reset"
1225        );
1226    }
1227
1228    #[test]
1229    fn logging_records_match_status_and_action() {
1230        let base = Instant::now();
1231        let mut cfg = test_config();
1232        cfg.enable_logging = true;
1233        cfg.hard_deadline_ms = u64::MAX;
1234        cfg.min_observations_between = u64::MAX;
1235        let mut t = EProcessThrottle::new_at(cfg, base);
1236
1237        t.observe_at(true, base + Duration::from_millis(1));
1238        let log = &t.logs()[0];
1239        assert!(log.matched);
1240        assert_eq!(log.observation_idx, 1);
1241        assert_eq!(log.action, "observe");
1242        assert!(log.wealth_after > log.wealth_before);
1243    }
1244
1245    #[test]
1246    fn consecutive_recomputes_tracked() {
1247        let base = Instant::now();
1248        let mut cfg = test_config();
1249        cfg.min_observations_between = 1;
1250        cfg.alpha = 0.5; // permissive
1251        let mut t = EProcessThrottle::new_at(cfg, base);
1252
1253        let mut recompute_count = 0;
1254        for i in 1..=200 {
1255            let d = t.observe_at(true, base + Duration::from_millis(i));
1256            if d.should_recompute {
1257                recompute_count += 1;
1258            }
1259        }
1260
1261        let stats = t.stats();
1262        assert_eq!(stats.total_recomputes, recompute_count as u64);
1263        assert!(
1264            stats.total_recomputes >= 2,
1265            "Should have multiple recomputes"
1266        );
1267    }
1268
1269    // =========================================================================
1270    // Tracing capture infrastructure (bd-37a.5)
1271    // =========================================================================
1272
1273    #[derive(Debug, Clone)]
1274    #[allow(dead_code)]
1275    struct CapturedSpan {
1276        name: String,
1277        target: String,
1278        level: tracing::Level,
1279        fields: HashMap<String, String>,
1280    }
1281
1282    #[derive(Debug, Clone)]
1283    #[allow(dead_code)]
1284    struct CapturedEvent {
1285        level: tracing::Level,
1286        target: String,
1287        message: String,
1288        fields: HashMap<String, String>,
1289    }
1290
1291    struct SpanCapture {
1292        spans: Arc<Mutex<Vec<CapturedSpan>>>,
1293        events: Arc<Mutex<Vec<CapturedEvent>>>,
1294    }
1295
1296    impl SpanCapture {
1297        fn new() -> (Self, CaptureHandle) {
1298            let spans = Arc::new(Mutex::new(Vec::new()));
1299            let events = Arc::new(Mutex::new(Vec::new()));
1300
1301            let handle = CaptureHandle {
1302                spans: spans.clone(),
1303                events: events.clone(),
1304            };
1305
1306            (Self { spans, events }, handle)
1307        }
1308    }
1309
1310    struct CaptureHandle {
1311        spans: Arc<Mutex<Vec<CapturedSpan>>>,
1312        events: Arc<Mutex<Vec<CapturedEvent>>>,
1313    }
1314
1315    impl CaptureHandle {
1316        fn spans(&self) -> Vec<CapturedSpan> {
1317            self.spans.lock().unwrap().clone()
1318        }
1319
1320        fn events(&self) -> Vec<CapturedEvent> {
1321            self.events.lock().unwrap().clone()
1322        }
1323    }
1324
1325    struct FieldVisitor(Vec<(String, String)>);
1326
1327    impl tracing::field::Visit for FieldVisitor {
1328        fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1329            self.0
1330                .push((field.name().to_string(), format!("{value:?}")));
1331        }
1332
1333        fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1334            self.0.push((field.name().to_string(), value.to_string()));
1335        }
1336
1337        fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1338            self.0.push((field.name().to_string(), value.to_string()));
1339        }
1340
1341        fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1342            self.0.push((field.name().to_string(), value.to_string()));
1343        }
1344
1345        fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1346            self.0.push((field.name().to_string(), value.to_string()));
1347        }
1348
1349        fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1350            self.0.push((field.name().to_string(), value.to_string()));
1351        }
1352    }
1353
1354    impl<S> tracing_subscriber::Layer<S> for SpanCapture
1355    where
1356        S: tracing::Subscriber + for<'a> LookupSpan<'a>,
1357    {
1358        fn on_new_span(
1359            &self,
1360            attrs: &tracing::span::Attributes<'_>,
1361            _id: &tracing::span::Id,
1362            _ctx: tracing_subscriber::layer::Context<'_, S>,
1363        ) {
1364            let mut visitor = FieldVisitor(Vec::new());
1365            attrs.record(&mut visitor);
1366
1367            let mut fields: HashMap<String, String> = visitor.0.into_iter().collect();
1368            for field in attrs.metadata().fields() {
1369                fields.entry(field.name().to_string()).or_default();
1370            }
1371
1372            self.spans.lock().unwrap().push(CapturedSpan {
1373                name: attrs.metadata().name().to_string(),
1374                target: attrs.metadata().target().to_string(),
1375                level: *attrs.metadata().level(),
1376                fields,
1377            });
1378        }
1379
1380        fn on_event(
1381            &self,
1382            event: &tracing::Event<'_>,
1383            _ctx: tracing_subscriber::layer::Context<'_, S>,
1384        ) {
1385            let mut visitor = FieldVisitor(Vec::new());
1386            event.record(&mut visitor);
1387
1388            let fields: HashMap<String, String> = visitor.0.clone().into_iter().collect();
1389            let message = visitor
1390                .0
1391                .iter()
1392                .find(|(k, _)| k == "message")
1393                .map(|(_, v)| v.clone())
1394                .unwrap_or_default();
1395
1396            self.events.lock().unwrap().push(CapturedEvent {
1397                level: *event.metadata().level(),
1398                target: event.metadata().target().to_string(),
1399                message,
1400                fields,
1401            });
1402        }
1403    }
1404
1405    fn with_captured_tracing<F>(f: F) -> CaptureHandle
1406    where
1407        F: FnOnce(),
1408    {
1409        let (layer, handle) = SpanCapture::new();
1410        let subscriber = tracing_subscriber::registry().with(layer);
1411        tracing::subscriber::with_default(subscriber, f);
1412        handle
1413    }
1414
1415    // =========================================================================
1416    // Tracing span field assertions
1417    // =========================================================================
1418
1419    #[test]
1420    fn span_eprocess_update_has_required_fields() {
1421        let handle = with_captured_tracing(|| {
1422            let base = Instant::now();
1423            let mut t = EProcessThrottle::new_at(test_config(), base);
1424            t.observe_at(true, base + Duration::from_millis(1));
1425        });
1426
1427        let spans = handle.spans();
1428        let ep_spans: Vec<_> = spans
1429            .iter()
1430            .filter(|s| s.name == "eprocess.update")
1431            .collect();
1432        assert!(
1433            !ep_spans.is_empty(),
1434            "expected at least one eprocess.update span"
1435        );
1436
1437        let span = &ep_spans[0];
1438        assert!(span.fields.contains_key("test_id"), "missing test_id field");
1439        assert!(
1440            span.fields.contains_key("wealth_current"),
1441            "missing wealth_current"
1442        );
1443        assert!(
1444            span.fields.contains_key("wealth_threshold"),
1445            "missing wealth_threshold"
1446        );
1447        assert!(
1448            span.fields.contains_key("observation_count"),
1449            "missing observation_count"
1450        );
1451        assert!(
1452            span.fields.contains_key("rejected"),
1453            "missing rejected field"
1454        );
1455    }
1456
1457    #[test]
1458    fn span_rejected_field_true_on_eprocess_trigger() {
1459        let handle = with_captured_tracing(|| {
1460            let base = Instant::now();
1461            let mut cfg = test_config();
1462            cfg.min_observations_between = 1;
1463            let mut t = EProcessThrottle::new_at(cfg, base);
1464
1465            for i in 1..=100 {
1466                let d = t.observe_at(true, base + Duration::from_millis(i));
1467                if d.should_recompute && !d.forced_by_deadline {
1468                    break;
1469                }
1470            }
1471        });
1472
1473        let spans = handle.spans();
1474        let ep_spans: Vec<_> = spans
1475            .iter()
1476            .filter(|s| s.name == "eprocess.update")
1477            .collect();
1478
1479        // At least one span should have rejected=true
1480        let rejected_spans: Vec<_> = ep_spans
1481            .iter()
1482            .filter(|s| s.fields.get("rejected").is_some_and(|v| v == "true"))
1483            .collect();
1484        assert!(
1485            !rejected_spans.is_empty(),
1486            "expected at least one span with rejected=true"
1487        );
1488    }
1489
1490    // =========================================================================
1491    // DEBUG log assertions
1492    // =========================================================================
1493
1494    #[test]
1495    fn debug_log_wealth_update() {
1496        let handle = with_captured_tracing(|| {
1497            let base = Instant::now();
1498            let mut t = EProcessThrottle::new_at(test_config(), base);
1499            t.observe_at(true, base + Duration::from_millis(1));
1500        });
1501
1502        let events = handle.events();
1503        let debug_events: Vec<_> = events
1504            .iter()
1505            .filter(|e| {
1506                e.level == tracing::Level::DEBUG
1507                    && e.target == "ftui.eprocess"
1508                    && e.fields.contains_key("wealth_before")
1509            })
1510            .collect();
1511
1512        assert!(
1513            !debug_events.is_empty(),
1514            "expected at least one DEBUG wealth update event"
1515        );
1516
1517        let evt = &debug_events[0];
1518        assert!(
1519            evt.fields.contains_key("wealth_after"),
1520            "missing wealth_after"
1521        );
1522        assert!(evt.fields.contains_key("lambda"), "missing lambda");
1523        assert!(
1524            evt.fields.contains_key("eprocess_wealth"),
1525            "missing eprocess_wealth gauge"
1526        );
1527    }
1528
1529    // =========================================================================
1530    // INFO log on rejection
1531    // =========================================================================
1532
1533    #[test]
1534    fn info_log_on_eprocess_rejection() {
1535        let handle = with_captured_tracing(|| {
1536            let base = Instant::now();
1537            let mut cfg = test_config();
1538            cfg.min_observations_between = 1;
1539            let mut t = EProcessThrottle::new_at(cfg, base);
1540
1541            for i in 1..=100 {
1542                let d = t.observe_at(true, base + Duration::from_millis(i));
1543                if d.should_recompute && !d.forced_by_deadline {
1544                    break;
1545                }
1546            }
1547        });
1548
1549        let events = handle.events();
1550        let info_events: Vec<_> = events
1551            .iter()
1552            .filter(|e| {
1553                e.level == tracing::Level::INFO
1554                    && e.target == "ftui.eprocess"
1555                    && e.fields.contains_key("wealth")
1556                    && e.fields.contains_key("threshold")
1557            })
1558            .collect();
1559
1560        assert!(
1561            !info_events.is_empty(),
1562            "expected INFO log on e-process rejection"
1563        );
1564    }
1565
1566    #[test]
1567    fn info_log_on_deadline_forced_recompute() {
1568        let handle = with_captured_tracing(|| {
1569            let base = Instant::now();
1570            let mut cfg = test_config();
1571            cfg.hard_deadline_ms = 100;
1572            cfg.min_observations_between = 1;
1573            let mut t = EProcessThrottle::new_at(cfg, base);
1574
1575            // Only non-matches, exceed deadline
1576            t.observe_at(false, base + Duration::from_millis(150));
1577        });
1578
1579        let events = handle.events();
1580        let deadline_events: Vec<_> = events
1581            .iter()
1582            .filter(|e| {
1583                e.level == tracing::Level::INFO
1584                    && e.target == "ftui.eprocess"
1585                    && e.fields.contains_key("deadline_ms")
1586            })
1587            .collect();
1588
1589        assert!(
1590            !deadline_events.is_empty(),
1591            "expected INFO log on deadline forced recompute"
1592        );
1593    }
1594
1595    // =========================================================================
1596    // Counter verification
1597    // =========================================================================
1598
1599    #[test]
1600    fn counter_accessor_is_callable() {
1601        let total = eprocess_rejections_total();
1602        let _ = total.checked_add(0).expect("counter overflow");
1603    }
1604
1605    #[test]
1606    fn counter_increments_on_rejection() {
1607        let before = eprocess_rejections_total();
1608
1609        let base = Instant::now();
1610        let mut cfg = test_config();
1611        cfg.min_observations_between = 1;
1612        let mut t = EProcessThrottle::new_at(cfg, base);
1613
1614        for i in 1..=100 {
1615            let d = t.observe_at(true, base + Duration::from_millis(i));
1616            if d.should_recompute && !d.forced_by_deadline {
1617                break;
1618            }
1619        }
1620
1621        let after = eprocess_rejections_total();
1622        assert!(
1623            after > before,
1624            "counter should increment on rejection: before={before}, after={after}"
1625        );
1626    }
1627
1628    #[test]
1629    fn debug_events_per_observation() {
1630        let handle = with_captured_tracing(|| {
1631            let base = Instant::now();
1632            let mut cfg = test_config();
1633            cfg.hard_deadline_ms = u64::MAX;
1634            cfg.min_observations_between = u64::MAX;
1635            let mut t = EProcessThrottle::new_at(cfg, base);
1636
1637            for i in 1..=5 {
1638                t.observe_at(i % 2 == 0, base + Duration::from_millis(i));
1639            }
1640        });
1641
1642        let events = handle.events();
1643        let debug_events: Vec<_> = events
1644            .iter()
1645            .filter(|e| {
1646                e.level == tracing::Level::DEBUG
1647                    && e.target == "ftui.eprocess"
1648                    && e.fields.contains_key("wealth_before")
1649            })
1650            .collect();
1651
1652        assert_eq!(
1653            debug_events.len(),
1654            5,
1655            "expected one DEBUG wealth event per observation"
1656        );
1657    }
1658}