1#![forbid(unsafe_code)]
2
3use std::collections::VecDeque;
61use std::sync::atomic::{AtomicU64, Ordering};
62use web_time::{Duration, Instant};
63
64const W_MIN: f64 = 1e-12;
66
67const MU_0_MIN: f64 = 1e-6;
69
70const MU_0_MAX: f64 = 1.0 - 1e-6;
72
73static EPROCESS_REJECTIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
78
79#[must_use]
81pub fn eprocess_rejections_total() -> u64 {
82 EPROCESS_REJECTIONS_TOTAL.load(Ordering::Relaxed)
83}
84
85#[derive(Debug, Clone)]
87pub struct ThrottleConfig {
88 pub alpha: f64,
91
92 pub mu_0: f64,
96
97 pub initial_lambda: f64,
101
102 pub grapa_eta: f64,
105
106 pub hard_deadline_ms: u64,
109
110 pub min_observations_between: u64,
113
114 pub rate_window_size: usize,
116
117 pub enable_logging: bool,
119}
120
121impl Default for ThrottleConfig {
122 fn default() -> Self {
123 Self {
124 alpha: 0.05,
125 mu_0: 0.1,
126 initial_lambda: 0.5,
127 grapa_eta: 0.1,
128 hard_deadline_ms: 500,
129 min_observations_between: 8,
130 rate_window_size: 64,
131 enable_logging: false,
132 }
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq)]
138pub struct ThrottleDecision {
139 pub should_recompute: bool,
141 pub wealth: f64,
143 pub lambda: f64,
145 pub empirical_rate: f64,
147 pub forced_by_deadline: bool,
149 pub observations_since_recompute: u64,
151}
152
153#[derive(Debug, Clone)]
155pub struct ThrottleLog {
156 pub timestamp: Instant,
158 pub observation_idx: u64,
160 pub matched: bool,
162 pub wealth_before: f64,
164 pub wealth_after: f64,
166 pub lambda: f64,
168 pub empirical_rate: f64,
170 pub action: &'static str,
172 pub time_since_recompute_ms: f64,
174}
175
176impl ThrottleDecision {
177 #[must_use]
179 pub fn to_jsonl(&self) -> String {
180 format!(
181 r#"{{"schema":"eprocess-throttle-v1","should_recompute":{},"wealth":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"forced_by_deadline":{},"obs_since_recompute":{}}}"#,
182 self.should_recompute,
183 self.wealth,
184 self.lambda,
185 self.empirical_rate,
186 self.forced_by_deadline,
187 self.observations_since_recompute,
188 )
189 }
190}
191
192impl ThrottleLog {
193 #[must_use]
195 pub fn to_jsonl(&self) -> String {
196 format!(
197 r#"{{"schema":"eprocess-log-v1","obs_idx":{},"matched":{},"wealth_before":{:.6},"wealth_after":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"action":"{}","time_since_recompute_ms":{:.3}}}"#,
198 self.observation_idx,
199 self.matched,
200 self.wealth_before,
201 self.wealth_after,
202 self.lambda,
203 self.empirical_rate,
204 self.action,
205 self.time_since_recompute_ms,
206 )
207 }
208}
209
210#[derive(Debug, Clone)]
212pub struct ThrottleStats {
213 pub total_observations: u64,
215 pub total_recomputes: u64,
217 pub forced_recomputes: u64,
219 pub eprocess_recomputes: u64,
221 pub current_wealth: f64,
223 pub current_lambda: f64,
225 pub empirical_rate: f64,
227 pub avg_observations_between_recomputes: f64,
229}
230
231#[derive(Debug)]
235pub struct EProcessThrottle {
236 config: ThrottleConfig,
237
238 wealth: f64,
240
241 lambda: f64,
243
244 mu_0: f64,
246
247 lambda_max: f64,
249
250 threshold: f64,
252
253 recent_matches: VecDeque<bool>,
255
256 observation_count: u64,
258
259 observations_since_recompute: u64,
261
262 last_recompute: Instant,
264
265 total_recomputes: u64,
267
268 forced_recomputes: u64,
270
271 eprocess_recomputes: u64,
273
274 cumulative_obs_at_recompute: u64,
276
277 logs: Vec<ThrottleLog>,
279}
280
281impl EProcessThrottle {
282 pub fn new(config: ThrottleConfig) -> Self {
284 Self::new_at(config, Instant::now())
285 }
286
287 pub fn new_at(config: ThrottleConfig, now: Instant) -> Self {
289 let mu_0 = config.mu_0.clamp(MU_0_MIN, MU_0_MAX);
290 let lambda_max = 1.0 / mu_0 - 1e-6;
291 let lambda = config.initial_lambda.clamp(1e-6, lambda_max);
292 let threshold = 1.0 / config.alpha.max(1e-12);
293
294 Self {
295 config,
296 wealth: 1.0,
297 lambda,
298 mu_0,
299 lambda_max,
300 threshold,
301 recent_matches: VecDeque::new(),
302 observation_count: 0,
303 observations_since_recompute: 0,
304 last_recompute: now,
305 total_recomputes: 0,
306 forced_recomputes: 0,
307 eprocess_recomputes: 0,
308 cumulative_obs_at_recompute: 0,
309 logs: Vec::new(),
310 }
311 }
312
313 pub fn observe(&mut self, matched: bool) -> ThrottleDecision {
318 self.observe_at(matched, Instant::now())
319 }
320
321 pub fn observe_at(&mut self, matched: bool, now: Instant) -> ThrottleDecision {
323 self.observation_count += 1;
324 self.observations_since_recompute += 1;
325
326 self.recent_matches.push_back(matched);
328 while self.recent_matches.len() > self.config.rate_window_size {
329 self.recent_matches.pop_front();
330 }
331
332 let empirical_rate = self.empirical_match_rate();
333
334 let x_t = if matched { 1.0 } else { 0.0 };
336 let wealth_before = self.wealth;
337 let multiplier = 1.0 + self.lambda * (x_t - self.mu_0);
338 self.wealth = (self.wealth * multiplier).max(W_MIN);
339
340 let denominator = 1.0 + self.lambda * (x_t - self.mu_0);
343 if denominator.abs() > 1e-12 {
344 let grad = (x_t - self.mu_0) / denominator;
345 self.lambda = (self.lambda + self.config.grapa_eta * grad).clamp(1e-6, self.lambda_max);
346 }
347
348 let time_since_recompute = now.saturating_duration_since(self.last_recompute);
350 let hard_deadline_exceeded =
351 time_since_recompute >= Duration::from_millis(self.config.hard_deadline_ms);
352 let min_obs_met = self.observations_since_recompute >= self.config.min_observations_between;
353 let wealth_exceeded = self.wealth >= self.threshold;
354
355 let eprocess_triggered = wealth_exceeded && min_obs_met;
356 let should_recompute = hard_deadline_exceeded || eprocess_triggered;
357 let forced_by_deadline = hard_deadline_exceeded && !eprocess_triggered;
358
359 let action = if should_recompute {
360 if forced_by_deadline {
361 "recompute_forced"
362 } else {
363 "recompute_eprocess"
364 }
365 } else {
366 "observe"
367 };
368
369 let rejected = eprocess_triggered;
371 let _span = tracing::debug_span!(
372 "eprocess.update",
373 test_id = "throttle",
374 wealth_current = %self.wealth,
375 wealth_threshold = %self.threshold,
376 observation_count = self.observation_count,
377 rejected = rejected,
378 )
379 .entered();
380
381 tracing::debug!(
382 target: "ftui.eprocess",
383 wealth_before = %wealth_before,
384 wealth_after = %self.wealth,
385 lambda = %self.lambda,
386 empirical_rate = %empirical_rate,
387 matched = matched,
388 eprocess_wealth = %self.wealth,
389 observation_count = self.observation_count,
390 action = %action,
391 "wealth update"
392 );
393
394 if rejected {
395 EPROCESS_REJECTIONS_TOTAL.fetch_add(1, Ordering::Relaxed);
396 tracing::info!(
397 target: "ftui.eprocess",
398 wealth = %self.wealth,
399 threshold = %self.threshold,
400 observation_count = self.observation_count,
401 observations_since_recompute = self.observations_since_recompute,
402 "e-process rejection: significant finding"
403 );
404 }
405
406 if forced_by_deadline && should_recompute {
407 tracing::info!(
408 target: "ftui.eprocess",
409 deadline_ms = self.config.hard_deadline_ms,
410 observation_count = self.observation_count,
411 "hard deadline forced recompute"
412 );
413 }
414
415 self.log_decision(
416 now,
417 matched,
418 wealth_before,
419 self.wealth,
420 action,
421 time_since_recompute,
422 );
423
424 if should_recompute {
425 self.trigger_recompute(now, forced_by_deadline);
426 }
427
428 ThrottleDecision {
429 should_recompute,
430 wealth: self.wealth,
431 lambda: self.lambda,
432 empirical_rate,
433 forced_by_deadline: should_recompute && forced_by_deadline,
434 observations_since_recompute: self.observations_since_recompute,
435 }
436 }
437
438 pub fn reset(&mut self) {
441 self.reset_at(Instant::now());
442 }
443
444 pub fn reset_at(&mut self, now: Instant) {
446 self.wealth = 1.0;
447 self.observations_since_recompute = 0;
448 self.last_recompute = now;
449 self.recent_matches.clear();
450 }
453
454 pub fn set_mu_0(&mut self, mu_0: f64) {
459 self.mu_0 = mu_0.clamp(MU_0_MIN, MU_0_MAX);
460 self.lambda_max = 1.0 / self.mu_0 - 1e-6;
461 self.lambda = self.lambda.clamp(1e-6, self.lambda_max);
462 self.reset();
463 }
464
465 #[inline]
467 pub fn wealth(&self) -> f64 {
468 self.wealth
469 }
470
471 #[inline]
473 pub fn lambda(&self) -> f64 {
474 self.lambda
475 }
476
477 pub fn empirical_match_rate(&self) -> f64 {
479 if self.recent_matches.is_empty() {
480 return 0.0;
481 }
482 let matches = self.recent_matches.iter().filter(|&&m| m).count();
483 matches as f64 / self.recent_matches.len() as f64
484 }
485
486 #[inline]
488 pub fn threshold(&self) -> f64 {
489 self.threshold
490 }
491
492 #[inline]
494 pub fn observation_count(&self) -> u64 {
495 self.observation_count
496 }
497
498 pub fn stats(&self) -> ThrottleStats {
500 let avg_obs = if self.total_recomputes > 0 {
501 self.cumulative_obs_at_recompute as f64 / self.total_recomputes as f64
502 } else {
503 0.0
504 };
505
506 ThrottleStats {
507 total_observations: self.observation_count,
508 total_recomputes: self.total_recomputes,
509 forced_recomputes: self.forced_recomputes,
510 eprocess_recomputes: self.eprocess_recomputes,
511 current_wealth: self.wealth,
512 current_lambda: self.lambda,
513 empirical_rate: self.empirical_match_rate(),
514 avg_observations_between_recomputes: avg_obs,
515 }
516 }
517
518 pub fn logs(&self) -> &[ThrottleLog] {
520 &self.logs
521 }
522
523 pub fn clear_logs(&mut self) {
525 self.logs.clear();
526 }
527
528 fn trigger_recompute(&mut self, now: Instant, forced: bool) {
531 self.total_recomputes += 1;
532 self.cumulative_obs_at_recompute += self.observations_since_recompute;
533 if forced {
534 self.forced_recomputes += 1;
535 } else {
536 self.eprocess_recomputes += 1;
537 }
538 self.wealth = 1.0;
539 self.observations_since_recompute = 0;
540 self.last_recompute = now;
541 }
542
543 fn log_decision(
544 &mut self,
545 now: Instant,
546 matched: bool,
547 wealth_before: f64,
548 wealth_after: f64,
549 action: &'static str,
550 time_since_recompute: Duration,
551 ) {
552 if !self.config.enable_logging {
553 return;
554 }
555
556 self.logs.push(ThrottleLog {
557 timestamp: now,
558 observation_idx: self.observation_count,
559 matched,
560 wealth_before,
561 wealth_after,
562 lambda: self.lambda,
563 empirical_rate: self.empirical_match_rate(),
564 action,
565 time_since_recompute_ms: time_since_recompute.as_secs_f64() * 1000.0,
566 });
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573 use std::collections::HashMap;
574 use std::sync::{Arc, Mutex};
575 use tracing_subscriber::layer::SubscriberExt;
576 use tracing_subscriber::registry::LookupSpan;
577
578 fn test_config() -> ThrottleConfig {
579 ThrottleConfig {
580 alpha: 0.05,
581 mu_0: 0.1,
582 initial_lambda: 0.5,
583 grapa_eta: 0.1,
584 hard_deadline_ms: 500,
585 min_observations_between: 4,
586 rate_window_size: 32,
587 enable_logging: true,
588 }
589 }
590
591 #[test]
596 fn initial_state() {
597 let t = EProcessThrottle::new(test_config());
598 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
599 assert_eq!(t.observation_count(), 0);
600 assert!(t.lambda() > 0.0);
601 assert!((t.threshold() - 20.0).abs() < 0.01); }
603
604 #[test]
605 fn mu_0_clamped_to_valid_range() {
606 let mut cfg = test_config();
607 cfg.mu_0 = 0.0;
608 let t = EProcessThrottle::new(cfg.clone());
609 assert!(t.mu_0 >= MU_0_MIN);
610
611 cfg.mu_0 = 1.0;
612 let t = EProcessThrottle::new(cfg.clone());
613 assert!(t.mu_0 <= MU_0_MAX);
614
615 cfg.mu_0 = -5.0;
616 let t = EProcessThrottle::new(cfg);
617 assert!(t.mu_0 >= MU_0_MIN);
618 }
619
620 #[test]
625 fn no_match_decreases_wealth() {
626 let base = Instant::now();
627 let mut t = EProcessThrottle::new_at(test_config(), base);
628 let d = t.observe_at(false, base + Duration::from_millis(1));
629 assert!(
630 d.wealth < 1.0,
631 "No-match should decrease wealth: {}",
632 d.wealth
633 );
634 }
635
636 #[test]
637 fn match_increases_wealth() {
638 let base = Instant::now();
639 let mut t = EProcessThrottle::new_at(test_config(), base);
640 let d = t.observe_at(true, base + Duration::from_millis(1));
641 assert!(d.wealth > 1.0, "Match should increase wealth: {}", d.wealth);
642 }
643
644 #[test]
645 fn wealth_stays_positive() {
646 let base = Instant::now();
647 let mut t = EProcessThrottle::new_at(test_config(), base);
648 for i in 1..=1000 {
650 let d = t.observe_at(false, base + Duration::from_millis(i));
651 assert!(d.wealth > 0.0, "Wealth must stay positive at obs {}", i);
652 }
653 }
654
655 #[test]
656 fn wealth_floor_prevents_zero_lock() {
657 let base = Instant::now();
658 let mut cfg = test_config();
659 cfg.hard_deadline_ms = u64::MAX; cfg.initial_lambda = 0.99; let mut t = EProcessThrottle::new_at(cfg, base);
662
663 for i in 1..=500 {
664 t.observe_at(false, base + Duration::from_millis(i));
665 }
666 assert!(t.wealth() >= W_MIN, "Wealth should be at floor, not zero");
667
668 let before = t.wealth();
670 t.observe_at(true, base + Duration::from_millis(501));
671 assert!(
672 t.wealth() > before,
673 "Match should grow wealth even from floor"
674 );
675 }
676
677 #[test]
682 fn burst_of_matches_triggers_recompute() {
683 let base = Instant::now();
684 let mut cfg = test_config();
685 cfg.min_observations_between = 1; let mut t = EProcessThrottle::new_at(cfg, base);
687
688 let mut triggered = false;
689 for i in 1..=100 {
690 let d = t.observe_at(true, base + Duration::from_millis(i));
691 if d.should_recompute && !d.forced_by_deadline {
692 triggered = true;
693 break;
694 }
695 }
696 assert!(
697 triggered,
698 "Burst of matches should trigger e-process recompute"
699 );
700 }
701
702 #[test]
703 fn no_matches_does_not_trigger_eprocess() {
704 let base = Instant::now();
705 let mut cfg = test_config();
706 cfg.hard_deadline_ms = u64::MAX;
707 let mut t = EProcessThrottle::new_at(cfg, base);
708
709 for i in 1..=200 {
710 let d = t.observe_at(false, base + Duration::from_millis(i));
711 assert!(
712 !d.should_recompute,
713 "No-match stream should never trigger e-process recompute at obs {}",
714 i
715 );
716 }
717 }
718
719 #[test]
720 fn hard_deadline_forces_recompute() {
721 let base = Instant::now();
722 let mut cfg = test_config();
723 cfg.hard_deadline_ms = 100;
724 cfg.min_observations_between = 1;
725 let mut t = EProcessThrottle::new_at(cfg, base);
726
727 let d = t.observe_at(false, base + Duration::from_millis(150));
729 assert!(d.should_recompute, "Should trigger on deadline");
730 assert!(d.forced_by_deadline, "Should be forced by deadline");
731 }
732
733 #[test]
734 fn min_observations_between_prevents_rapid_fire() {
735 let base = Instant::now();
736 let mut cfg = test_config();
737 cfg.min_observations_between = 10;
738 cfg.hard_deadline_ms = u64::MAX;
739 cfg.alpha = 0.5; let mut t = EProcessThrottle::new_at(cfg, base);
741
742 let mut first_trigger = None;
743 for i in 1..=100 {
744 let d = t.observe_at(true, base + Duration::from_millis(i));
745 if d.should_recompute {
746 first_trigger = Some(i);
747 break;
748 }
749 }
750
751 assert!(
752 first_trigger.unwrap_or(0) >= 10,
753 "First trigger should be at obs >= 10, was {:?}",
754 first_trigger
755 );
756 }
757
758 #[test]
759 fn reset_clears_wealth_and_counter() {
760 let base = Instant::now();
761 let mut t = EProcessThrottle::new_at(test_config(), base);
762
763 for i in 1..=10 {
764 t.observe_at(true, base + Duration::from_millis(i));
765 }
766 assert!(t.wealth() > 1.0);
767 assert!(t.observations_since_recompute > 0);
768
769 t.reset_at(base + Duration::from_millis(20));
770 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
771 assert_eq!(t.observations_since_recompute, 0);
772 }
773
774 #[test]
779 fn lambda_adapts_to_high_match_rate() {
780 let base = Instant::now();
781 let mut cfg = test_config();
782 cfg.hard_deadline_ms = u64::MAX;
783 cfg.min_observations_between = u64::MAX;
784 let mut t = EProcessThrottle::new_at(cfg, base);
785
786 let initial_lambda = t.lambda();
787
788 for i in 1..=50 {
790 t.observe_at(true, base + Duration::from_millis(i));
791 }
792
793 assert!(
794 t.lambda() > initial_lambda,
795 "Lambda should increase with frequent matches: {} vs {}",
796 t.lambda(),
797 initial_lambda
798 );
799 }
800
801 #[test]
802 fn lambda_adapts_to_low_match_rate() {
803 let base = Instant::now();
804 let mut cfg = test_config();
805 cfg.hard_deadline_ms = u64::MAX;
806 cfg.min_observations_between = u64::MAX;
807 cfg.initial_lambda = 0.8;
808 let mut t = EProcessThrottle::new_at(cfg, base);
809
810 let initial_lambda = t.lambda();
811
812 for i in 1..=50 {
814 t.observe_at(false, base + Duration::from_millis(i));
815 }
816
817 assert!(
818 t.lambda() < initial_lambda,
819 "Lambda should decrease with few matches: {} vs {}",
820 t.lambda(),
821 initial_lambda
822 );
823 }
824
825 #[test]
826 fn lambda_stays_bounded() {
827 let base = Instant::now();
828 let mut cfg = test_config();
829 cfg.hard_deadline_ms = u64::MAX;
830 cfg.min_observations_between = u64::MAX;
831 cfg.grapa_eta = 1.0; let mut t = EProcessThrottle::new_at(cfg, base);
833
834 for i in 1..=200 {
835 let matched = i % 2 == 0;
836 t.observe_at(matched, base + Duration::from_millis(i as u64));
837 }
838
839 assert!(t.lambda() > 0.0, "Lambda must be positive");
840 assert!(
841 t.lambda() <= t.lambda_max,
842 "Lambda must not exceed 1/(1-mu_0): {} vs {}",
843 t.lambda(),
844 t.lambda_max
845 );
846 }
847
848 #[test]
853 fn empirical_rate_tracks_window() {
854 let base = Instant::now();
855 let mut cfg = test_config();
856 cfg.rate_window_size = 10;
857 cfg.hard_deadline_ms = u64::MAX;
858 cfg.min_observations_between = u64::MAX;
859 let mut t = EProcessThrottle::new_at(cfg, base);
860
861 for i in 1..=10 {
863 t.observe_at(true, base + Duration::from_millis(i));
864 }
865 assert!((t.empirical_match_rate() - 1.0).abs() < f64::EPSILON);
866
867 for i in 11..=20 {
869 t.observe_at(false, base + Duration::from_millis(i));
870 }
871 assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
872 }
873
874 #[test]
875 fn empirical_rate_zero_when_empty() {
876 let t = EProcessThrottle::new(test_config());
877 assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
878 }
879
880 #[test]
885 fn stats_reflect_state() {
886 let base = Instant::now();
887 let mut cfg = test_config();
888 cfg.min_observations_between = 1;
889 let mut t = EProcessThrottle::new_at(cfg, base);
890
891 let mut recomputed = false;
893 for i in 1..=50 {
894 let d = t.observe_at(true, base + Duration::from_millis(i));
895 if d.should_recompute {
896 recomputed = true;
897 }
898 }
899
900 let stats = t.stats();
901 assert_eq!(stats.total_observations, 50);
902 if recomputed {
903 assert!(stats.total_recomputes > 0);
904 assert!(stats.avg_observations_between_recomputes > 0.0);
905 }
906 }
907
908 #[test]
909 fn logging_captures_decisions() {
910 let base = Instant::now();
911 let mut cfg = test_config();
912 cfg.enable_logging = true;
913 let mut t = EProcessThrottle::new_at(cfg, base);
914
915 t.observe_at(true, base + Duration::from_millis(1));
916 t.observe_at(false, base + Duration::from_millis(2));
917
918 assert_eq!(t.logs().len(), 2);
919 assert!(t.logs()[0].matched);
920 assert!(!t.logs()[1].matched);
921
922 t.clear_logs();
923 assert!(t.logs().is_empty());
924 }
925
926 #[test]
927 fn logging_disabled_by_default() {
928 let base = Instant::now();
929 let mut cfg = test_config();
930 cfg.enable_logging = false;
931 let mut t = EProcessThrottle::new_at(cfg, base);
932
933 t.observe_at(true, base + Duration::from_millis(1));
934 assert!(t.logs().is_empty());
935 }
936
937 #[test]
942 fn set_mu_0_resets_eprocess() {
943 let base = Instant::now();
944 let mut t = EProcessThrottle::new_at(test_config(), base);
945
946 for i in 1..=10 {
947 t.observe_at(true, base + Duration::from_millis(i));
948 }
949 assert!(t.wealth() > 1.0);
950
951 t.set_mu_0(0.5);
952 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
953 }
954
955 #[test]
960 fn deterministic_behavior() {
961 let base = Instant::now();
962 let cfg = test_config();
963
964 let run = |cfg: &ThrottleConfig| {
965 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
966 let mut decisions = Vec::new();
967 for i in 1..=30 {
968 let matched = i % 3 == 0;
969 let d = t.observe_at(matched, base + Duration::from_millis(i));
970 decisions.push((d.should_recompute, d.forced_by_deadline));
971 }
972 (decisions, t.wealth(), t.lambda())
973 };
974
975 let (d1, w1, l1) = run(&cfg);
976 let (d2, w2, l2) = run(&cfg);
977
978 assert_eq!(d1, d2, "Decisions must be deterministic");
979 assert!((w1 - w2).abs() < 1e-10, "Wealth must be deterministic");
980 assert!((l1 - l2).abs() < 1e-10, "Lambda must be deterministic");
981 }
982
983 #[test]
988 fn property_supermartingale_under_null() {
989 let base = Instant::now();
993 let mut cfg = test_config();
994 cfg.hard_deadline_ms = u64::MAX;
995 cfg.min_observations_between = u64::MAX;
996 cfg.mu_0 = 0.2;
997 cfg.grapa_eta = 0.0; let n_trials = 200;
1000 let n_obs = 100;
1001 let mut total_wealth = 0.0;
1002
1003 let mut rng_state: u64 = 42;
1005 let lcg_next = |state: &mut u64| -> f64 {
1006 *state = state
1007 .wrapping_mul(6364136223846793005)
1008 .wrapping_add(1442695040888963407);
1009 (*state >> 33) as f64 / (1u64 << 31) as f64
1010 };
1011
1012 for trial in 0..n_trials {
1013 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
1014 for i in 1..=n_obs {
1015 let matched = lcg_next(&mut rng_state) < cfg.mu_0;
1016 t.observe_at(
1017 matched,
1018 base + Duration::from_millis(i as u64 + trial * 1000),
1019 );
1020 }
1021 total_wealth += t.wealth();
1022 }
1023
1024 let avg_wealth = total_wealth / n_trials as f64;
1025 assert!(
1027 avg_wealth < 2.0,
1028 "Average wealth under H₀ should be near 1.0, got {}",
1029 avg_wealth
1030 );
1031 }
1032
1033 #[test]
1038 fn property_type_i_control() {
1039 let base = Instant::now();
1042 let mut cfg = test_config();
1043 cfg.hard_deadline_ms = u64::MAX;
1044 cfg.min_observations_between = 1;
1045 cfg.alpha = 0.05;
1046 cfg.mu_0 = 0.1;
1047 cfg.grapa_eta = 0.0; let n_trials = 500;
1050 let n_obs = 200;
1051 let mut false_triggers = 0u64;
1052
1053 let mut rng_state: u64 = 123;
1054 let lcg_next = |state: &mut u64| -> f64 {
1055 *state = state
1056 .wrapping_mul(6364136223846793005)
1057 .wrapping_add(1442695040888963407);
1058 (*state >> 33) as f64 / (1u64 << 31) as f64
1059 };
1060
1061 for trial in 0..n_trials {
1062 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
1063 let mut triggered = false;
1064 for i in 1..=n_obs {
1065 let matched = lcg_next(&mut rng_state) < cfg.mu_0;
1066 let d = t.observe_at(
1067 matched,
1068 base + Duration::from_millis(i as u64 + trial * 1000),
1069 );
1070 if d.should_recompute {
1071 triggered = true;
1072 break;
1073 }
1074 }
1075 if triggered {
1076 false_triggers += 1;
1077 }
1078 }
1079
1080 let false_trigger_rate = false_triggers as f64 / n_trials as f64;
1081 assert!(
1083 false_trigger_rate < cfg.alpha * 3.0,
1084 "False trigger rate {} exceeds 3×α = {}",
1085 false_trigger_rate,
1086 cfg.alpha * 3.0
1087 );
1088 }
1089
1090 #[test]
1095 fn single_observation() {
1096 let base = Instant::now();
1097 let cfg = test_config();
1098 let mut t = EProcessThrottle::new_at(cfg, base);
1099 let d = t.observe_at(true, base + Duration::from_millis(1));
1100 assert_eq!(t.observation_count(), 1);
1101 assert!(!d.should_recompute || d.forced_by_deadline);
1103 }
1104
1105 #[test]
1106 fn alternating_match_pattern() {
1107 let base = Instant::now();
1108 let mut cfg = test_config();
1109 cfg.hard_deadline_ms = u64::MAX;
1110 cfg.min_observations_between = u64::MAX;
1111 let mut t = EProcessThrottle::new_at(cfg, base);
1112
1113 for i in 1..=100 {
1115 t.observe_at(i % 2 == 0, base + Duration::from_millis(i as u64));
1116 }
1117
1118 assert!(
1120 t.wealth() > 1.0,
1121 "50% match rate vs 10% null should grow wealth: {}",
1122 t.wealth()
1123 );
1124 }
1125
1126 #[test]
1127 fn recompute_resets_wealth() {
1128 let base = Instant::now();
1129 let mut cfg = test_config();
1130 cfg.min_observations_between = 1;
1131 let mut t = EProcessThrottle::new_at(cfg, base);
1132
1133 let mut triggered = false;
1135 for i in 1..=100 {
1136 let d = t.observe_at(true, base + Duration::from_millis(i));
1137 if d.should_recompute && !d.forced_by_deadline {
1138 assert!(
1140 (t.wealth() - 1.0).abs() < f64::EPSILON,
1141 "Wealth should reset to 1.0 after recompute, got {}",
1142 t.wealth()
1143 );
1144 triggered = true;
1145 break;
1146 }
1147 }
1148 assert!(
1149 triggered,
1150 "Should have triggered at least one e-process recompute"
1151 );
1152 }
1153
1154 #[test]
1155 fn config_default_values() {
1156 let cfg = ThrottleConfig::default();
1157 assert!((cfg.alpha - 0.05).abs() < f64::EPSILON);
1158 assert!((cfg.mu_0 - 0.1).abs() < f64::EPSILON);
1159 assert!((cfg.initial_lambda - 0.5).abs() < f64::EPSILON);
1160 assert!((cfg.grapa_eta - 0.1).abs() < f64::EPSILON);
1161 assert_eq!(cfg.hard_deadline_ms, 500);
1162 assert_eq!(cfg.min_observations_between, 8);
1163 assert_eq!(cfg.rate_window_size, 64);
1164 assert!(!cfg.enable_logging);
1165 }
1166
1167 #[test]
1168 fn throttle_decision_fields() {
1169 let base = Instant::now();
1170 let mut cfg = test_config();
1171 cfg.hard_deadline_ms = u64::MAX;
1172 let mut t = EProcessThrottle::new_at(cfg, base);
1173 let d = t.observe_at(true, base + Duration::from_millis(1));
1174
1175 assert!(!d.should_recompute);
1176 assert!(!d.forced_by_deadline);
1177 assert!(d.wealth > 1.0);
1178 assert!(d.lambda > 0.0);
1179 assert!((d.empirical_rate - 1.0).abs() < f64::EPSILON);
1180 assert_eq!(d.observations_since_recompute, 1);
1181 }
1182
1183 #[test]
1184 fn stats_no_recomputes_avg_is_zero() {
1185 let base = Instant::now();
1186 let mut cfg = test_config();
1187 cfg.hard_deadline_ms = u64::MAX;
1188 cfg.min_observations_between = u64::MAX;
1189 let mut t = EProcessThrottle::new_at(cfg, base);
1190
1191 t.observe_at(false, base + Duration::from_millis(1));
1192 let stats = t.stats();
1193 assert_eq!(stats.total_recomputes, 0);
1194 assert!((stats.avg_observations_between_recomputes - 0.0).abs() < f64::EPSILON);
1195 }
1196
1197 #[test]
1198 fn set_mu_0_clamps_extreme_values() {
1199 let base = Instant::now();
1200 let mut t = EProcessThrottle::new_at(test_config(), base);
1201
1202 t.set_mu_0(0.0);
1203 assert!(t.mu_0 >= MU_0_MIN);
1204
1205 t.set_mu_0(2.0);
1206 assert!(t.mu_0 <= MU_0_MAX);
1207 }
1208
1209 #[test]
1210 fn reset_preserves_lambda() {
1211 let base = Instant::now();
1212 let mut cfg = test_config();
1213 cfg.hard_deadline_ms = u64::MAX;
1214 cfg.min_observations_between = u64::MAX;
1215 let mut t = EProcessThrottle::new_at(cfg, base);
1216
1217 for i in 1..=20 {
1218 t.observe_at(true, base + Duration::from_millis(i));
1219 }
1220 let lambda_before = t.lambda();
1221 t.reset_at(base + Duration::from_millis(30));
1222 assert!(
1223 (t.lambda() - lambda_before).abs() < f64::EPSILON,
1224 "Lambda should be preserved across reset"
1225 );
1226 }
1227
1228 #[test]
1229 fn logging_records_match_status_and_action() {
1230 let base = Instant::now();
1231 let mut cfg = test_config();
1232 cfg.enable_logging = true;
1233 cfg.hard_deadline_ms = u64::MAX;
1234 cfg.min_observations_between = u64::MAX;
1235 let mut t = EProcessThrottle::new_at(cfg, base);
1236
1237 t.observe_at(true, base + Duration::from_millis(1));
1238 let log = &t.logs()[0];
1239 assert!(log.matched);
1240 assert_eq!(log.observation_idx, 1);
1241 assert_eq!(log.action, "observe");
1242 assert!(log.wealth_after > log.wealth_before);
1243 }
1244
1245 #[test]
1246 fn consecutive_recomputes_tracked() {
1247 let base = Instant::now();
1248 let mut cfg = test_config();
1249 cfg.min_observations_between = 1;
1250 cfg.alpha = 0.5; let mut t = EProcessThrottle::new_at(cfg, base);
1252
1253 let mut recompute_count = 0;
1254 for i in 1..=200 {
1255 let d = t.observe_at(true, base + Duration::from_millis(i));
1256 if d.should_recompute {
1257 recompute_count += 1;
1258 }
1259 }
1260
1261 let stats = t.stats();
1262 assert_eq!(stats.total_recomputes, recompute_count as u64);
1263 assert!(
1264 stats.total_recomputes >= 2,
1265 "Should have multiple recomputes"
1266 );
1267 }
1268
1269 #[derive(Debug, Clone)]
1274 #[allow(dead_code)]
1275 struct CapturedSpan {
1276 name: String,
1277 target: String,
1278 level: tracing::Level,
1279 fields: HashMap<String, String>,
1280 }
1281
1282 #[derive(Debug, Clone)]
1283 #[allow(dead_code)]
1284 struct CapturedEvent {
1285 level: tracing::Level,
1286 target: String,
1287 message: String,
1288 fields: HashMap<String, String>,
1289 }
1290
1291 struct SpanCapture {
1292 spans: Arc<Mutex<Vec<CapturedSpan>>>,
1293 events: Arc<Mutex<Vec<CapturedEvent>>>,
1294 }
1295
1296 impl SpanCapture {
1297 fn new() -> (Self, CaptureHandle) {
1298 let spans = Arc::new(Mutex::new(Vec::new()));
1299 let events = Arc::new(Mutex::new(Vec::new()));
1300
1301 let handle = CaptureHandle {
1302 spans: spans.clone(),
1303 events: events.clone(),
1304 };
1305
1306 (Self { spans, events }, handle)
1307 }
1308 }
1309
1310 struct CaptureHandle {
1311 spans: Arc<Mutex<Vec<CapturedSpan>>>,
1312 events: Arc<Mutex<Vec<CapturedEvent>>>,
1313 }
1314
1315 impl CaptureHandle {
1316 fn spans(&self) -> Vec<CapturedSpan> {
1317 self.spans.lock().unwrap().clone()
1318 }
1319
1320 fn events(&self) -> Vec<CapturedEvent> {
1321 self.events.lock().unwrap().clone()
1322 }
1323 }
1324
1325 struct FieldVisitor(Vec<(String, String)>);
1326
1327 impl tracing::field::Visit for FieldVisitor {
1328 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1329 self.0
1330 .push((field.name().to_string(), format!("{value:?}")));
1331 }
1332
1333 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1334 self.0.push((field.name().to_string(), value.to_string()));
1335 }
1336
1337 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1338 self.0.push((field.name().to_string(), value.to_string()));
1339 }
1340
1341 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1342 self.0.push((field.name().to_string(), value.to_string()));
1343 }
1344
1345 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1346 self.0.push((field.name().to_string(), value.to_string()));
1347 }
1348
1349 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1350 self.0.push((field.name().to_string(), value.to_string()));
1351 }
1352 }
1353
1354 impl<S> tracing_subscriber::Layer<S> for SpanCapture
1355 where
1356 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
1357 {
1358 fn on_new_span(
1359 &self,
1360 attrs: &tracing::span::Attributes<'_>,
1361 _id: &tracing::span::Id,
1362 _ctx: tracing_subscriber::layer::Context<'_, S>,
1363 ) {
1364 let mut visitor = FieldVisitor(Vec::new());
1365 attrs.record(&mut visitor);
1366
1367 let mut fields: HashMap<String, String> = visitor.0.into_iter().collect();
1368 for field in attrs.metadata().fields() {
1369 fields.entry(field.name().to_string()).or_default();
1370 }
1371
1372 self.spans.lock().unwrap().push(CapturedSpan {
1373 name: attrs.metadata().name().to_string(),
1374 target: attrs.metadata().target().to_string(),
1375 level: *attrs.metadata().level(),
1376 fields,
1377 });
1378 }
1379
1380 fn on_event(
1381 &self,
1382 event: &tracing::Event<'_>,
1383 _ctx: tracing_subscriber::layer::Context<'_, S>,
1384 ) {
1385 let mut visitor = FieldVisitor(Vec::new());
1386 event.record(&mut visitor);
1387
1388 let fields: HashMap<String, String> = visitor.0.clone().into_iter().collect();
1389 let message = visitor
1390 .0
1391 .iter()
1392 .find(|(k, _)| k == "message")
1393 .map(|(_, v)| v.clone())
1394 .unwrap_or_default();
1395
1396 self.events.lock().unwrap().push(CapturedEvent {
1397 level: *event.metadata().level(),
1398 target: event.metadata().target().to_string(),
1399 message,
1400 fields,
1401 });
1402 }
1403 }
1404
1405 fn with_captured_tracing<F>(f: F) -> CaptureHandle
1406 where
1407 F: FnOnce(),
1408 {
1409 let (layer, handle) = SpanCapture::new();
1410 let subscriber = tracing_subscriber::registry().with(layer);
1411 tracing::subscriber::with_default(subscriber, f);
1412 handle
1413 }
1414
1415 #[test]
1420 fn span_eprocess_update_has_required_fields() {
1421 let handle = with_captured_tracing(|| {
1422 let base = Instant::now();
1423 let mut t = EProcessThrottle::new_at(test_config(), base);
1424 t.observe_at(true, base + Duration::from_millis(1));
1425 });
1426
1427 let spans = handle.spans();
1428 let ep_spans: Vec<_> = spans
1429 .iter()
1430 .filter(|s| s.name == "eprocess.update")
1431 .collect();
1432 assert!(
1433 !ep_spans.is_empty(),
1434 "expected at least one eprocess.update span"
1435 );
1436
1437 let span = &ep_spans[0];
1438 assert!(span.fields.contains_key("test_id"), "missing test_id field");
1439 assert!(
1440 span.fields.contains_key("wealth_current"),
1441 "missing wealth_current"
1442 );
1443 assert!(
1444 span.fields.contains_key("wealth_threshold"),
1445 "missing wealth_threshold"
1446 );
1447 assert!(
1448 span.fields.contains_key("observation_count"),
1449 "missing observation_count"
1450 );
1451 assert!(
1452 span.fields.contains_key("rejected"),
1453 "missing rejected field"
1454 );
1455 }
1456
1457 #[test]
1458 fn span_rejected_field_true_on_eprocess_trigger() {
1459 let handle = with_captured_tracing(|| {
1460 let base = Instant::now();
1461 let mut cfg = test_config();
1462 cfg.min_observations_between = 1;
1463 let mut t = EProcessThrottle::new_at(cfg, base);
1464
1465 for i in 1..=100 {
1466 let d = t.observe_at(true, base + Duration::from_millis(i));
1467 if d.should_recompute && !d.forced_by_deadline {
1468 break;
1469 }
1470 }
1471 });
1472
1473 let spans = handle.spans();
1474 let ep_spans: Vec<_> = spans
1475 .iter()
1476 .filter(|s| s.name == "eprocess.update")
1477 .collect();
1478
1479 let rejected_spans: Vec<_> = ep_spans
1481 .iter()
1482 .filter(|s| s.fields.get("rejected").is_some_and(|v| v == "true"))
1483 .collect();
1484 assert!(
1485 !rejected_spans.is_empty(),
1486 "expected at least one span with rejected=true"
1487 );
1488 }
1489
1490 #[test]
1495 fn debug_log_wealth_update() {
1496 let handle = with_captured_tracing(|| {
1497 let base = Instant::now();
1498 let mut t = EProcessThrottle::new_at(test_config(), base);
1499 t.observe_at(true, base + Duration::from_millis(1));
1500 });
1501
1502 let events = handle.events();
1503 let debug_events: Vec<_> = events
1504 .iter()
1505 .filter(|e| {
1506 e.level == tracing::Level::DEBUG
1507 && e.target == "ftui.eprocess"
1508 && e.fields.contains_key("wealth_before")
1509 })
1510 .collect();
1511
1512 assert!(
1513 !debug_events.is_empty(),
1514 "expected at least one DEBUG wealth update event"
1515 );
1516
1517 let evt = &debug_events[0];
1518 assert!(
1519 evt.fields.contains_key("wealth_after"),
1520 "missing wealth_after"
1521 );
1522 assert!(evt.fields.contains_key("lambda"), "missing lambda");
1523 assert!(
1524 evt.fields.contains_key("eprocess_wealth"),
1525 "missing eprocess_wealth gauge"
1526 );
1527 }
1528
1529 #[test]
1534 fn info_log_on_eprocess_rejection() {
1535 let handle = with_captured_tracing(|| {
1536 let base = Instant::now();
1537 let mut cfg = test_config();
1538 cfg.min_observations_between = 1;
1539 let mut t = EProcessThrottle::new_at(cfg, base);
1540
1541 for i in 1..=100 {
1542 let d = t.observe_at(true, base + Duration::from_millis(i));
1543 if d.should_recompute && !d.forced_by_deadline {
1544 break;
1545 }
1546 }
1547 });
1548
1549 let events = handle.events();
1550 let info_events: Vec<_> = events
1551 .iter()
1552 .filter(|e| {
1553 e.level == tracing::Level::INFO
1554 && e.target == "ftui.eprocess"
1555 && e.fields.contains_key("wealth")
1556 && e.fields.contains_key("threshold")
1557 })
1558 .collect();
1559
1560 assert!(
1561 !info_events.is_empty(),
1562 "expected INFO log on e-process rejection"
1563 );
1564 }
1565
1566 #[test]
1567 fn info_log_on_deadline_forced_recompute() {
1568 let handle = with_captured_tracing(|| {
1569 let base = Instant::now();
1570 let mut cfg = test_config();
1571 cfg.hard_deadline_ms = 100;
1572 cfg.min_observations_between = 1;
1573 let mut t = EProcessThrottle::new_at(cfg, base);
1574
1575 t.observe_at(false, base + Duration::from_millis(150));
1577 });
1578
1579 let events = handle.events();
1580 let deadline_events: Vec<_> = events
1581 .iter()
1582 .filter(|e| {
1583 e.level == tracing::Level::INFO
1584 && e.target == "ftui.eprocess"
1585 && e.fields.contains_key("deadline_ms")
1586 })
1587 .collect();
1588
1589 assert!(
1590 !deadline_events.is_empty(),
1591 "expected INFO log on deadline forced recompute"
1592 );
1593 }
1594
1595 #[test]
1600 fn counter_accessor_is_callable() {
1601 let total = eprocess_rejections_total();
1602 let _ = total.checked_add(0).expect("counter overflow");
1603 }
1604
1605 #[test]
1606 fn counter_increments_on_rejection() {
1607 let before = eprocess_rejections_total();
1608
1609 let base = Instant::now();
1610 let mut cfg = test_config();
1611 cfg.min_observations_between = 1;
1612 let mut t = EProcessThrottle::new_at(cfg, base);
1613
1614 for i in 1..=100 {
1615 let d = t.observe_at(true, base + Duration::from_millis(i));
1616 if d.should_recompute && !d.forced_by_deadline {
1617 break;
1618 }
1619 }
1620
1621 let after = eprocess_rejections_total();
1622 assert!(
1623 after > before,
1624 "counter should increment on rejection: before={before}, after={after}"
1625 );
1626 }
1627
1628 #[test]
1629 fn debug_events_per_observation() {
1630 let handle = with_captured_tracing(|| {
1631 let base = Instant::now();
1632 let mut cfg = test_config();
1633 cfg.hard_deadline_ms = u64::MAX;
1634 cfg.min_observations_between = u64::MAX;
1635 let mut t = EProcessThrottle::new_at(cfg, base);
1636
1637 for i in 1..=5 {
1638 t.observe_at(i % 2 == 0, base + Duration::from_millis(i));
1639 }
1640 });
1641
1642 let events = handle.events();
1643 let debug_events: Vec<_> = events
1644 .iter()
1645 .filter(|e| {
1646 e.level == tracing::Level::DEBUG
1647 && e.target == "ftui.eprocess"
1648 && e.fields.contains_key("wealth_before")
1649 })
1650 .collect();
1651
1652 assert_eq!(
1653 debug_events.len(),
1654 5,
1655 "expected one DEBUG wealth event per observation"
1656 );
1657 }
1658}