1#![forbid(unsafe_code)]
2
3use std::collections::VecDeque;
61use web_time::{Duration, Instant};
62
63const W_MIN: f64 = 1e-12;
65
66const MU_0_MIN: f64 = 1e-6;
68
69const MU_0_MAX: f64 = 1.0 - 1e-6;
71
72#[derive(Debug, Clone)]
74pub struct ThrottleConfig {
75 pub alpha: f64,
78
79 pub mu_0: f64,
83
84 pub initial_lambda: f64,
88
89 pub grapa_eta: f64,
92
93 pub hard_deadline_ms: u64,
96
97 pub min_observations_between: u64,
100
101 pub rate_window_size: usize,
103
104 pub enable_logging: bool,
106}
107
108impl Default for ThrottleConfig {
109 fn default() -> Self {
110 Self {
111 alpha: 0.05,
112 mu_0: 0.1,
113 initial_lambda: 0.5,
114 grapa_eta: 0.1,
115 hard_deadline_ms: 500,
116 min_observations_between: 8,
117 rate_window_size: 64,
118 enable_logging: false,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq)]
125pub struct ThrottleDecision {
126 pub should_recompute: bool,
128 pub wealth: f64,
130 pub lambda: f64,
132 pub empirical_rate: f64,
134 pub forced_by_deadline: bool,
136 pub observations_since_recompute: u64,
138}
139
140#[derive(Debug, Clone)]
142pub struct ThrottleLog {
143 pub timestamp: Instant,
145 pub observation_idx: u64,
147 pub matched: bool,
149 pub wealth_before: f64,
151 pub wealth_after: f64,
153 pub lambda: f64,
155 pub empirical_rate: f64,
157 pub action: &'static str,
159 pub time_since_recompute_ms: f64,
161}
162
163impl ThrottleDecision {
164 #[must_use]
166 pub fn to_jsonl(&self) -> String {
167 format!(
168 r#"{{"schema":"eprocess-throttle-v1","should_recompute":{},"wealth":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"forced_by_deadline":{},"obs_since_recompute":{}}}"#,
169 self.should_recompute,
170 self.wealth,
171 self.lambda,
172 self.empirical_rate,
173 self.forced_by_deadline,
174 self.observations_since_recompute,
175 )
176 }
177}
178
179impl ThrottleLog {
180 #[must_use]
182 pub fn to_jsonl(&self) -> String {
183 format!(
184 r#"{{"schema":"eprocess-log-v1","obs_idx":{},"matched":{},"wealth_before":{:.6},"wealth_after":{:.6},"lambda":{:.6},"empirical_rate":{:.6},"action":"{}","time_since_recompute_ms":{:.3}}}"#,
185 self.observation_idx,
186 self.matched,
187 self.wealth_before,
188 self.wealth_after,
189 self.lambda,
190 self.empirical_rate,
191 self.action,
192 self.time_since_recompute_ms,
193 )
194 }
195}
196
197#[derive(Debug, Clone)]
199pub struct ThrottleStats {
200 pub total_observations: u64,
202 pub total_recomputes: u64,
204 pub forced_recomputes: u64,
206 pub eprocess_recomputes: u64,
208 pub current_wealth: f64,
210 pub current_lambda: f64,
212 pub empirical_rate: f64,
214 pub avg_observations_between_recomputes: f64,
216}
217
218#[derive(Debug)]
222pub struct EProcessThrottle {
223 config: ThrottleConfig,
224
225 wealth: f64,
227
228 lambda: f64,
230
231 mu_0: f64,
233
234 lambda_max: f64,
236
237 threshold: f64,
239
240 recent_matches: VecDeque<bool>,
242
243 observation_count: u64,
245
246 observations_since_recompute: u64,
248
249 last_recompute: Instant,
251
252 total_recomputes: u64,
254
255 forced_recomputes: u64,
257
258 eprocess_recomputes: u64,
260
261 cumulative_obs_at_recompute: u64,
263
264 logs: Vec<ThrottleLog>,
266}
267
268impl EProcessThrottle {
269 pub fn new(config: ThrottleConfig) -> Self {
271 Self::new_at(config, Instant::now())
272 }
273
274 pub fn new_at(config: ThrottleConfig, now: Instant) -> Self {
276 let mu_0 = config.mu_0.clamp(MU_0_MIN, MU_0_MAX);
277 let lambda_max = 1.0 / mu_0 - 1e-6;
278 let lambda = config.initial_lambda.clamp(1e-6, lambda_max);
279 let threshold = 1.0 / config.alpha.max(1e-12);
280
281 Self {
282 config,
283 wealth: 1.0,
284 lambda,
285 mu_0,
286 lambda_max,
287 threshold,
288 recent_matches: VecDeque::new(),
289 observation_count: 0,
290 observations_since_recompute: 0,
291 last_recompute: now,
292 total_recomputes: 0,
293 forced_recomputes: 0,
294 eprocess_recomputes: 0,
295 cumulative_obs_at_recompute: 0,
296 logs: Vec::new(),
297 }
298 }
299
300 pub fn observe(&mut self, matched: bool) -> ThrottleDecision {
305 self.observe_at(matched, Instant::now())
306 }
307
308 pub fn observe_at(&mut self, matched: bool, now: Instant) -> ThrottleDecision {
310 self.observation_count += 1;
311 self.observations_since_recompute += 1;
312
313 self.recent_matches.push_back(matched);
315 while self.recent_matches.len() > self.config.rate_window_size {
316 self.recent_matches.pop_front();
317 }
318
319 let empirical_rate = self.empirical_match_rate();
320
321 let x_t = if matched { 1.0 } else { 0.0 };
323 let wealth_before = self.wealth;
324 let multiplier = 1.0 + self.lambda * (x_t - self.mu_0);
325 self.wealth = (self.wealth * multiplier).max(W_MIN);
326
327 let denominator = 1.0 + self.lambda * (x_t - self.mu_0);
330 if denominator.abs() > 1e-12 {
331 let grad = (x_t - self.mu_0) / denominator;
332 self.lambda = (self.lambda + self.config.grapa_eta * grad).clamp(1e-6, self.lambda_max);
333 }
334
335 let time_since_recompute = now.duration_since(self.last_recompute);
337 let hard_deadline_exceeded =
338 time_since_recompute >= Duration::from_millis(self.config.hard_deadline_ms);
339 let min_obs_met = self.observations_since_recompute >= self.config.min_observations_between;
340 let wealth_exceeded = self.wealth >= self.threshold;
341
342 let should_recompute = hard_deadline_exceeded || (wealth_exceeded && min_obs_met);
343 let forced_by_deadline = hard_deadline_exceeded && !wealth_exceeded;
344
345 let action = if should_recompute {
346 if forced_by_deadline {
347 "recompute_forced"
348 } else {
349 "recompute_eprocess"
350 }
351 } else {
352 "observe"
353 };
354
355 self.log_decision(
356 now,
357 matched,
358 wealth_before,
359 self.wealth,
360 action,
361 time_since_recompute,
362 );
363
364 if should_recompute {
365 self.trigger_recompute(now, forced_by_deadline);
366 }
367
368 ThrottleDecision {
369 should_recompute,
370 wealth: self.wealth,
371 lambda: self.lambda,
372 empirical_rate,
373 forced_by_deadline: should_recompute && forced_by_deadline,
374 observations_since_recompute: self.observations_since_recompute,
375 }
376 }
377
378 pub fn reset(&mut self) {
381 self.reset_at(Instant::now());
382 }
383
384 pub fn reset_at(&mut self, now: Instant) {
386 self.wealth = 1.0;
387 self.observations_since_recompute = 0;
388 self.last_recompute = now;
389 self.recent_matches.clear();
390 }
393
394 pub fn set_mu_0(&mut self, mu_0: f64) {
399 self.mu_0 = mu_0.clamp(MU_0_MIN, MU_0_MAX);
400 self.lambda_max = 1.0 / self.mu_0 - 1e-6;
401 self.lambda = self.lambda.clamp(1e-6, self.lambda_max);
402 self.reset();
403 }
404
405 #[inline]
407 pub fn wealth(&self) -> f64 {
408 self.wealth
409 }
410
411 #[inline]
413 pub fn lambda(&self) -> f64 {
414 self.lambda
415 }
416
417 pub fn empirical_match_rate(&self) -> f64 {
419 if self.recent_matches.is_empty() {
420 return 0.0;
421 }
422 let matches = self.recent_matches.iter().filter(|&&m| m).count();
423 matches as f64 / self.recent_matches.len() as f64
424 }
425
426 #[inline]
428 pub fn threshold(&self) -> f64 {
429 self.threshold
430 }
431
432 #[inline]
434 pub fn observation_count(&self) -> u64 {
435 self.observation_count
436 }
437
438 pub fn stats(&self) -> ThrottleStats {
440 let avg_obs = if self.total_recomputes > 0 {
441 self.cumulative_obs_at_recompute as f64 / self.total_recomputes as f64
442 } else {
443 0.0
444 };
445
446 ThrottleStats {
447 total_observations: self.observation_count,
448 total_recomputes: self.total_recomputes,
449 forced_recomputes: self.forced_recomputes,
450 eprocess_recomputes: self.eprocess_recomputes,
451 current_wealth: self.wealth,
452 current_lambda: self.lambda,
453 empirical_rate: self.empirical_match_rate(),
454 avg_observations_between_recomputes: avg_obs,
455 }
456 }
457
458 pub fn logs(&self) -> &[ThrottleLog] {
460 &self.logs
461 }
462
463 pub fn clear_logs(&mut self) {
465 self.logs.clear();
466 }
467
468 fn trigger_recompute(&mut self, now: Instant, forced: bool) {
471 self.total_recomputes += 1;
472 self.cumulative_obs_at_recompute += self.observations_since_recompute;
473 if forced {
474 self.forced_recomputes += 1;
475 } else {
476 self.eprocess_recomputes += 1;
477 }
478 self.wealth = 1.0;
479 self.observations_since_recompute = 0;
480 self.last_recompute = now;
481 }
482
483 fn log_decision(
484 &mut self,
485 now: Instant,
486 matched: bool,
487 wealth_before: f64,
488 wealth_after: f64,
489 action: &'static str,
490 time_since_recompute: Duration,
491 ) {
492 if !self.config.enable_logging {
493 return;
494 }
495
496 self.logs.push(ThrottleLog {
497 timestamp: now,
498 observation_idx: self.observation_count,
499 matched,
500 wealth_before,
501 wealth_after,
502 lambda: self.lambda,
503 empirical_rate: self.empirical_match_rate(),
504 action,
505 time_since_recompute_ms: time_since_recompute.as_secs_f64() * 1000.0,
506 });
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 fn test_config() -> ThrottleConfig {
515 ThrottleConfig {
516 alpha: 0.05,
517 mu_0: 0.1,
518 initial_lambda: 0.5,
519 grapa_eta: 0.1,
520 hard_deadline_ms: 500,
521 min_observations_between: 4,
522 rate_window_size: 32,
523 enable_logging: true,
524 }
525 }
526
527 #[test]
532 fn initial_state() {
533 let t = EProcessThrottle::new(test_config());
534 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
535 assert_eq!(t.observation_count(), 0);
536 assert!(t.lambda() > 0.0);
537 assert!((t.threshold() - 20.0).abs() < 0.01); }
539
540 #[test]
541 fn mu_0_clamped_to_valid_range() {
542 let mut cfg = test_config();
543 cfg.mu_0 = 0.0;
544 let t = EProcessThrottle::new(cfg.clone());
545 assert!(t.mu_0 >= MU_0_MIN);
546
547 cfg.mu_0 = 1.0;
548 let t = EProcessThrottle::new(cfg.clone());
549 assert!(t.mu_0 <= MU_0_MAX);
550
551 cfg.mu_0 = -5.0;
552 let t = EProcessThrottle::new(cfg);
553 assert!(t.mu_0 >= MU_0_MIN);
554 }
555
556 #[test]
561 fn no_match_decreases_wealth() {
562 let base = Instant::now();
563 let mut t = EProcessThrottle::new_at(test_config(), base);
564 let d = t.observe_at(false, base + Duration::from_millis(1));
565 assert!(
566 d.wealth < 1.0,
567 "No-match should decrease wealth: {}",
568 d.wealth
569 );
570 }
571
572 #[test]
573 fn match_increases_wealth() {
574 let base = Instant::now();
575 let mut t = EProcessThrottle::new_at(test_config(), base);
576 let d = t.observe_at(true, base + Duration::from_millis(1));
577 assert!(d.wealth > 1.0, "Match should increase wealth: {}", d.wealth);
578 }
579
580 #[test]
581 fn wealth_stays_positive() {
582 let base = Instant::now();
583 let mut t = EProcessThrottle::new_at(test_config(), base);
584 for i in 1..=1000 {
586 let d = t.observe_at(false, base + Duration::from_millis(i));
587 assert!(d.wealth > 0.0, "Wealth must stay positive at obs {}", i);
588 }
589 }
590
591 #[test]
592 fn wealth_floor_prevents_zero_lock() {
593 let base = Instant::now();
594 let mut cfg = test_config();
595 cfg.hard_deadline_ms = u64::MAX; cfg.initial_lambda = 0.99; let mut t = EProcessThrottle::new_at(cfg, base);
598
599 for i in 1..=500 {
600 t.observe_at(false, base + Duration::from_millis(i));
601 }
602 assert!(t.wealth() >= W_MIN, "Wealth should be at floor, not zero");
603
604 let before = t.wealth();
606 t.observe_at(true, base + Duration::from_millis(501));
607 assert!(
608 t.wealth() > before,
609 "Match should grow wealth even from floor"
610 );
611 }
612
613 #[test]
618 fn burst_of_matches_triggers_recompute() {
619 let base = Instant::now();
620 let mut cfg = test_config();
621 cfg.min_observations_between = 1; let mut t = EProcessThrottle::new_at(cfg, base);
623
624 let mut triggered = false;
625 for i in 1..=100 {
626 let d = t.observe_at(true, base + Duration::from_millis(i));
627 if d.should_recompute && !d.forced_by_deadline {
628 triggered = true;
629 break;
630 }
631 }
632 assert!(
633 triggered,
634 "Burst of matches should trigger e-process recompute"
635 );
636 }
637
638 #[test]
639 fn no_matches_does_not_trigger_eprocess() {
640 let base = Instant::now();
641 let mut cfg = test_config();
642 cfg.hard_deadline_ms = u64::MAX;
643 let mut t = EProcessThrottle::new_at(cfg, base);
644
645 for i in 1..=200 {
646 let d = t.observe_at(false, base + Duration::from_millis(i));
647 assert!(
648 !d.should_recompute,
649 "No-match stream should never trigger e-process recompute at obs {}",
650 i
651 );
652 }
653 }
654
655 #[test]
656 fn hard_deadline_forces_recompute() {
657 let base = Instant::now();
658 let mut cfg = test_config();
659 cfg.hard_deadline_ms = 100;
660 cfg.min_observations_between = 1;
661 let mut t = EProcessThrottle::new_at(cfg, base);
662
663 let d = t.observe_at(false, base + Duration::from_millis(150));
665 assert!(d.should_recompute, "Should trigger on deadline");
666 assert!(d.forced_by_deadline, "Should be forced by deadline");
667 }
668
669 #[test]
670 fn min_observations_between_prevents_rapid_fire() {
671 let base = Instant::now();
672 let mut cfg = test_config();
673 cfg.min_observations_between = 10;
674 cfg.hard_deadline_ms = u64::MAX;
675 cfg.alpha = 0.5; let mut t = EProcessThrottle::new_at(cfg, base);
677
678 let mut first_trigger = None;
679 for i in 1..=100 {
680 let d = t.observe_at(true, base + Duration::from_millis(i));
681 if d.should_recompute {
682 first_trigger = Some(i);
683 break;
684 }
685 }
686
687 assert!(
688 first_trigger.unwrap_or(0) >= 10,
689 "First trigger should be at obs >= 10, was {:?}",
690 first_trigger
691 );
692 }
693
694 #[test]
695 fn reset_clears_wealth_and_counter() {
696 let base = Instant::now();
697 let mut t = EProcessThrottle::new_at(test_config(), base);
698
699 for i in 1..=10 {
700 t.observe_at(true, base + Duration::from_millis(i));
701 }
702 assert!(t.wealth() > 1.0);
703 assert!(t.observations_since_recompute > 0);
704
705 t.reset_at(base + Duration::from_millis(20));
706 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
707 assert_eq!(t.observations_since_recompute, 0);
708 }
709
710 #[test]
715 fn lambda_adapts_to_high_match_rate() {
716 let base = Instant::now();
717 let mut cfg = test_config();
718 cfg.hard_deadline_ms = u64::MAX;
719 cfg.min_observations_between = u64::MAX;
720 let mut t = EProcessThrottle::new_at(cfg, base);
721
722 let initial_lambda = t.lambda();
723
724 for i in 1..=50 {
726 t.observe_at(true, base + Duration::from_millis(i));
727 }
728
729 assert!(
730 t.lambda() > initial_lambda,
731 "Lambda should increase with frequent matches: {} vs {}",
732 t.lambda(),
733 initial_lambda
734 );
735 }
736
737 #[test]
738 fn lambda_adapts_to_low_match_rate() {
739 let base = Instant::now();
740 let mut cfg = test_config();
741 cfg.hard_deadline_ms = u64::MAX;
742 cfg.min_observations_between = u64::MAX;
743 cfg.initial_lambda = 0.8;
744 let mut t = EProcessThrottle::new_at(cfg, base);
745
746 let initial_lambda = t.lambda();
747
748 for i in 1..=50 {
750 t.observe_at(false, base + Duration::from_millis(i));
751 }
752
753 assert!(
754 t.lambda() < initial_lambda,
755 "Lambda should decrease with few matches: {} vs {}",
756 t.lambda(),
757 initial_lambda
758 );
759 }
760
761 #[test]
762 fn lambda_stays_bounded() {
763 let base = Instant::now();
764 let mut cfg = test_config();
765 cfg.hard_deadline_ms = u64::MAX;
766 cfg.min_observations_between = u64::MAX;
767 cfg.grapa_eta = 1.0; let mut t = EProcessThrottle::new_at(cfg, base);
769
770 for i in 1..=200 {
771 let matched = i % 2 == 0;
772 t.observe_at(matched, base + Duration::from_millis(i as u64));
773 }
774
775 assert!(t.lambda() > 0.0, "Lambda must be positive");
776 assert!(
777 t.lambda() <= t.lambda_max,
778 "Lambda must not exceed 1/(1-mu_0): {} vs {}",
779 t.lambda(),
780 t.lambda_max
781 );
782 }
783
784 #[test]
789 fn empirical_rate_tracks_window() {
790 let base = Instant::now();
791 let mut cfg = test_config();
792 cfg.rate_window_size = 10;
793 cfg.hard_deadline_ms = u64::MAX;
794 cfg.min_observations_between = u64::MAX;
795 let mut t = EProcessThrottle::new_at(cfg, base);
796
797 for i in 1..=10 {
799 t.observe_at(true, base + Duration::from_millis(i));
800 }
801 assert!((t.empirical_match_rate() - 1.0).abs() < f64::EPSILON);
802
803 for i in 11..=20 {
805 t.observe_at(false, base + Duration::from_millis(i));
806 }
807 assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
808 }
809
810 #[test]
811 fn empirical_rate_zero_when_empty() {
812 let t = EProcessThrottle::new(test_config());
813 assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
814 }
815
816 #[test]
821 fn stats_reflect_state() {
822 let base = Instant::now();
823 let mut cfg = test_config();
824 cfg.min_observations_between = 1;
825 let mut t = EProcessThrottle::new_at(cfg, base);
826
827 let mut recomputed = false;
829 for i in 1..=50 {
830 let d = t.observe_at(true, base + Duration::from_millis(i));
831 if d.should_recompute {
832 recomputed = true;
833 }
834 }
835
836 let stats = t.stats();
837 assert_eq!(stats.total_observations, 50);
838 if recomputed {
839 assert!(stats.total_recomputes > 0);
840 assert!(stats.avg_observations_between_recomputes > 0.0);
841 }
842 }
843
844 #[test]
845 fn logging_captures_decisions() {
846 let base = Instant::now();
847 let mut cfg = test_config();
848 cfg.enable_logging = true;
849 let mut t = EProcessThrottle::new_at(cfg, base);
850
851 t.observe_at(true, base + Duration::from_millis(1));
852 t.observe_at(false, base + Duration::from_millis(2));
853
854 assert_eq!(t.logs().len(), 2);
855 assert!(t.logs()[0].matched);
856 assert!(!t.logs()[1].matched);
857
858 t.clear_logs();
859 assert!(t.logs().is_empty());
860 }
861
862 #[test]
863 fn logging_disabled_by_default() {
864 let base = Instant::now();
865 let mut cfg = test_config();
866 cfg.enable_logging = false;
867 let mut t = EProcessThrottle::new_at(cfg, base);
868
869 t.observe_at(true, base + Duration::from_millis(1));
870 assert!(t.logs().is_empty());
871 }
872
873 #[test]
878 fn set_mu_0_resets_eprocess() {
879 let base = Instant::now();
880 let mut t = EProcessThrottle::new_at(test_config(), base);
881
882 for i in 1..=10 {
883 t.observe_at(true, base + Duration::from_millis(i));
884 }
885 assert!(t.wealth() > 1.0);
886
887 t.set_mu_0(0.5);
888 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
889 }
890
891 #[test]
896 fn deterministic_behavior() {
897 let base = Instant::now();
898 let cfg = test_config();
899
900 let run = |cfg: &ThrottleConfig| {
901 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
902 let mut decisions = Vec::new();
903 for i in 1..=30 {
904 let matched = i % 3 == 0;
905 let d = t.observe_at(matched, base + Duration::from_millis(i));
906 decisions.push((d.should_recompute, d.forced_by_deadline));
907 }
908 (decisions, t.wealth(), t.lambda())
909 };
910
911 let (d1, w1, l1) = run(&cfg);
912 let (d2, w2, l2) = run(&cfg);
913
914 assert_eq!(d1, d2, "Decisions must be deterministic");
915 assert!((w1 - w2).abs() < 1e-10, "Wealth must be deterministic");
916 assert!((l1 - l2).abs() < 1e-10, "Lambda must be deterministic");
917 }
918
919 #[test]
924 fn property_supermartingale_under_null() {
925 let base = Instant::now();
929 let mut cfg = test_config();
930 cfg.hard_deadline_ms = u64::MAX;
931 cfg.min_observations_between = u64::MAX;
932 cfg.mu_0 = 0.2;
933 cfg.grapa_eta = 0.0; let n_trials = 200;
936 let n_obs = 100;
937 let mut total_wealth = 0.0;
938
939 let mut rng_state: u64 = 42;
941 let lcg_next = |state: &mut u64| -> f64 {
942 *state = state
943 .wrapping_mul(6364136223846793005)
944 .wrapping_add(1442695040888963407);
945 (*state >> 33) as f64 / (1u64 << 31) as f64
946 };
947
948 for trial in 0..n_trials {
949 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
950 for i in 1..=n_obs {
951 let matched = lcg_next(&mut rng_state) < cfg.mu_0;
952 t.observe_at(
953 matched,
954 base + Duration::from_millis(i as u64 + trial * 1000),
955 );
956 }
957 total_wealth += t.wealth();
958 }
959
960 let avg_wealth = total_wealth / n_trials as f64;
961 assert!(
963 avg_wealth < 2.0,
964 "Average wealth under H₀ should be near 1.0, got {}",
965 avg_wealth
966 );
967 }
968
969 #[test]
974 fn property_type_i_control() {
975 let base = Instant::now();
978 let mut cfg = test_config();
979 cfg.hard_deadline_ms = u64::MAX;
980 cfg.min_observations_between = 1;
981 cfg.alpha = 0.05;
982 cfg.mu_0 = 0.1;
983 cfg.grapa_eta = 0.0; let n_trials = 500;
986 let n_obs = 200;
987 let mut false_triggers = 0u64;
988
989 let mut rng_state: u64 = 123;
990 let lcg_next = |state: &mut u64| -> f64 {
991 *state = state
992 .wrapping_mul(6364136223846793005)
993 .wrapping_add(1442695040888963407);
994 (*state >> 33) as f64 / (1u64 << 31) as f64
995 };
996
997 for trial in 0..n_trials {
998 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
999 let mut triggered = false;
1000 for i in 1..=n_obs {
1001 let matched = lcg_next(&mut rng_state) < cfg.mu_0;
1002 let d = t.observe_at(
1003 matched,
1004 base + Duration::from_millis(i as u64 + trial * 1000),
1005 );
1006 if d.should_recompute {
1007 triggered = true;
1008 break;
1009 }
1010 }
1011 if triggered {
1012 false_triggers += 1;
1013 }
1014 }
1015
1016 let false_trigger_rate = false_triggers as f64 / n_trials as f64;
1017 assert!(
1019 false_trigger_rate < cfg.alpha * 3.0,
1020 "False trigger rate {} exceeds 3×α = {}",
1021 false_trigger_rate,
1022 cfg.alpha * 3.0
1023 );
1024 }
1025
1026 #[test]
1031 fn single_observation() {
1032 let base = Instant::now();
1033 let cfg = test_config();
1034 let mut t = EProcessThrottle::new_at(cfg, base);
1035 let d = t.observe_at(true, base + Duration::from_millis(1));
1036 assert_eq!(t.observation_count(), 1);
1037 assert!(!d.should_recompute || d.forced_by_deadline);
1039 }
1040
1041 #[test]
1042 fn alternating_match_pattern() {
1043 let base = Instant::now();
1044 let mut cfg = test_config();
1045 cfg.hard_deadline_ms = u64::MAX;
1046 cfg.min_observations_between = u64::MAX;
1047 let mut t = EProcessThrottle::new_at(cfg, base);
1048
1049 for i in 1..=100 {
1051 t.observe_at(i % 2 == 0, base + Duration::from_millis(i as u64));
1052 }
1053
1054 assert!(
1056 t.wealth() > 1.0,
1057 "50% match rate vs 10% null should grow wealth: {}",
1058 t.wealth()
1059 );
1060 }
1061
1062 #[test]
1063 fn recompute_resets_wealth() {
1064 let base = Instant::now();
1065 let mut cfg = test_config();
1066 cfg.min_observations_between = 1;
1067 let mut t = EProcessThrottle::new_at(cfg, base);
1068
1069 let mut triggered = false;
1071 for i in 1..=100 {
1072 let d = t.observe_at(true, base + Duration::from_millis(i));
1073 if d.should_recompute && !d.forced_by_deadline {
1074 assert!(
1076 (t.wealth() - 1.0).abs() < f64::EPSILON,
1077 "Wealth should reset to 1.0 after recompute, got {}",
1078 t.wealth()
1079 );
1080 triggered = true;
1081 break;
1082 }
1083 }
1084 assert!(
1085 triggered,
1086 "Should have triggered at least one e-process recompute"
1087 );
1088 }
1089
1090 #[test]
1091 fn config_default_values() {
1092 let cfg = ThrottleConfig::default();
1093 assert!((cfg.alpha - 0.05).abs() < f64::EPSILON);
1094 assert!((cfg.mu_0 - 0.1).abs() < f64::EPSILON);
1095 assert!((cfg.initial_lambda - 0.5).abs() < f64::EPSILON);
1096 assert!((cfg.grapa_eta - 0.1).abs() < f64::EPSILON);
1097 assert_eq!(cfg.hard_deadline_ms, 500);
1098 assert_eq!(cfg.min_observations_between, 8);
1099 assert_eq!(cfg.rate_window_size, 64);
1100 assert!(!cfg.enable_logging);
1101 }
1102
1103 #[test]
1104 fn throttle_decision_fields() {
1105 let base = Instant::now();
1106 let mut cfg = test_config();
1107 cfg.hard_deadline_ms = u64::MAX;
1108 let mut t = EProcessThrottle::new_at(cfg, base);
1109 let d = t.observe_at(true, base + Duration::from_millis(1));
1110
1111 assert!(!d.should_recompute);
1112 assert!(!d.forced_by_deadline);
1113 assert!(d.wealth > 1.0);
1114 assert!(d.lambda > 0.0);
1115 assert!((d.empirical_rate - 1.0).abs() < f64::EPSILON);
1116 assert_eq!(d.observations_since_recompute, 1);
1117 }
1118
1119 #[test]
1120 fn stats_no_recomputes_avg_is_zero() {
1121 let base = Instant::now();
1122 let mut cfg = test_config();
1123 cfg.hard_deadline_ms = u64::MAX;
1124 cfg.min_observations_between = u64::MAX;
1125 let mut t = EProcessThrottle::new_at(cfg, base);
1126
1127 t.observe_at(false, base + Duration::from_millis(1));
1128 let stats = t.stats();
1129 assert_eq!(stats.total_recomputes, 0);
1130 assert!((stats.avg_observations_between_recomputes - 0.0).abs() < f64::EPSILON);
1131 }
1132
1133 #[test]
1134 fn set_mu_0_clamps_extreme_values() {
1135 let base = Instant::now();
1136 let mut t = EProcessThrottle::new_at(test_config(), base);
1137
1138 t.set_mu_0(0.0);
1139 assert!(t.mu_0 >= MU_0_MIN);
1140
1141 t.set_mu_0(2.0);
1142 assert!(t.mu_0 <= MU_0_MAX);
1143 }
1144
1145 #[test]
1146 fn reset_preserves_lambda() {
1147 let base = Instant::now();
1148 let mut cfg = test_config();
1149 cfg.hard_deadline_ms = u64::MAX;
1150 cfg.min_observations_between = u64::MAX;
1151 let mut t = EProcessThrottle::new_at(cfg, base);
1152
1153 for i in 1..=20 {
1154 t.observe_at(true, base + Duration::from_millis(i));
1155 }
1156 let lambda_before = t.lambda();
1157 t.reset_at(base + Duration::from_millis(30));
1158 assert!(
1159 (t.lambda() - lambda_before).abs() < f64::EPSILON,
1160 "Lambda should be preserved across reset"
1161 );
1162 }
1163
1164 #[test]
1165 fn logging_records_match_status_and_action() {
1166 let base = Instant::now();
1167 let mut cfg = test_config();
1168 cfg.enable_logging = true;
1169 cfg.hard_deadline_ms = u64::MAX;
1170 cfg.min_observations_between = u64::MAX;
1171 let mut t = EProcessThrottle::new_at(cfg, base);
1172
1173 t.observe_at(true, base + Duration::from_millis(1));
1174 let log = &t.logs()[0];
1175 assert!(log.matched);
1176 assert_eq!(log.observation_idx, 1);
1177 assert_eq!(log.action, "observe");
1178 assert!(log.wealth_after > log.wealth_before);
1179 }
1180
1181 #[test]
1182 fn consecutive_recomputes_tracked() {
1183 let base = Instant::now();
1184 let mut cfg = test_config();
1185 cfg.min_observations_between = 1;
1186 cfg.alpha = 0.5; let mut t = EProcessThrottle::new_at(cfg, base);
1188
1189 let mut recompute_count = 0;
1190 for i in 1..=200 {
1191 let d = t.observe_at(true, base + Duration::from_millis(i));
1192 if d.should_recompute {
1193 recompute_count += 1;
1194 }
1195 }
1196
1197 let stats = t.stats();
1198 assert_eq!(stats.total_recomputes, recompute_count as u64);
1199 assert!(
1200 stats.total_recomputes >= 2,
1201 "Should have multiple recomputes"
1202 );
1203 }
1204}