1#![forbid(unsafe_code)]
2
3use std::collections::VecDeque;
61use std::time::{Duration, Instant};
62
63const W_MIN: f64 = 1e-12;
65
66const MU_0_MIN: f64 = 1e-6;
68
69const MU_0_MAX: f64 = 1.0 - 1e-6;
71
72#[derive(Debug, Clone)]
74pub struct ThrottleConfig {
75 pub alpha: f64,
78
79 pub mu_0: f64,
83
84 pub initial_lambda: f64,
88
89 pub grapa_eta: f64,
92
93 pub hard_deadline_ms: u64,
96
97 pub min_observations_between: u64,
100
101 pub rate_window_size: usize,
103
104 pub enable_logging: bool,
106}
107
108impl Default for ThrottleConfig {
109 fn default() -> Self {
110 Self {
111 alpha: 0.05,
112 mu_0: 0.1,
113 initial_lambda: 0.5,
114 grapa_eta: 0.1,
115 hard_deadline_ms: 500,
116 min_observations_between: 8,
117 rate_window_size: 64,
118 enable_logging: false,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq)]
125pub struct ThrottleDecision {
126 pub should_recompute: bool,
128 pub wealth: f64,
130 pub lambda: f64,
132 pub empirical_rate: f64,
134 pub forced_by_deadline: bool,
136 pub observations_since_recompute: u64,
138}
139
140#[derive(Debug, Clone)]
142pub struct ThrottleLog {
143 pub timestamp: Instant,
145 pub observation_idx: u64,
147 pub matched: bool,
149 pub wealth_before: f64,
151 pub wealth_after: f64,
153 pub lambda: f64,
155 pub empirical_rate: f64,
157 pub action: &'static str,
159 pub time_since_recompute_ms: f64,
161}
162
163#[derive(Debug, Clone)]
165pub struct ThrottleStats {
166 pub total_observations: u64,
168 pub total_recomputes: u64,
170 pub forced_recomputes: u64,
172 pub eprocess_recomputes: u64,
174 pub current_wealth: f64,
176 pub current_lambda: f64,
178 pub empirical_rate: f64,
180 pub avg_observations_between_recomputes: f64,
182}
183
184#[derive(Debug)]
188pub struct EProcessThrottle {
189 config: ThrottleConfig,
190
191 wealth: f64,
193
194 lambda: f64,
196
197 mu_0: f64,
199
200 lambda_max: f64,
202
203 threshold: f64,
205
206 recent_matches: VecDeque<bool>,
208
209 observation_count: u64,
211
212 observations_since_recompute: u64,
214
215 last_recompute: Instant,
217
218 total_recomputes: u64,
220
221 forced_recomputes: u64,
223
224 eprocess_recomputes: u64,
226
227 cumulative_obs_at_recompute: u64,
229
230 logs: Vec<ThrottleLog>,
232}
233
234impl EProcessThrottle {
235 pub fn new(config: ThrottleConfig) -> Self {
237 Self::new_at(config, Instant::now())
238 }
239
240 pub fn new_at(config: ThrottleConfig, now: Instant) -> Self {
242 let mu_0 = config.mu_0.clamp(MU_0_MIN, MU_0_MAX);
243 let lambda_max = 1.0 / mu_0 - 1e-6;
244 let lambda = config.initial_lambda.clamp(1e-6, lambda_max);
245 let threshold = 1.0 / config.alpha.max(1e-12);
246
247 Self {
248 config,
249 wealth: 1.0,
250 lambda,
251 mu_0,
252 lambda_max,
253 threshold,
254 recent_matches: VecDeque::new(),
255 observation_count: 0,
256 observations_since_recompute: 0,
257 last_recompute: now,
258 total_recomputes: 0,
259 forced_recomputes: 0,
260 eprocess_recomputes: 0,
261 cumulative_obs_at_recompute: 0,
262 logs: Vec::new(),
263 }
264 }
265
266 pub fn observe(&mut self, matched: bool) -> ThrottleDecision {
271 self.observe_at(matched, Instant::now())
272 }
273
274 pub fn observe_at(&mut self, matched: bool, now: Instant) -> ThrottleDecision {
276 self.observation_count += 1;
277 self.observations_since_recompute += 1;
278
279 self.recent_matches.push_back(matched);
281 while self.recent_matches.len() > self.config.rate_window_size {
282 self.recent_matches.pop_front();
283 }
284
285 let empirical_rate = self.empirical_match_rate();
286
287 let x_t = if matched { 1.0 } else { 0.0 };
289 let wealth_before = self.wealth;
290 let multiplier = 1.0 + self.lambda * (x_t - self.mu_0);
291 self.wealth = (self.wealth * multiplier).max(W_MIN);
292
293 let denominator = 1.0 + self.lambda * (x_t - self.mu_0);
296 if denominator.abs() > 1e-12 {
297 let grad = (x_t - self.mu_0) / denominator;
298 self.lambda = (self.lambda + self.config.grapa_eta * grad).clamp(1e-6, self.lambda_max);
299 }
300
301 let time_since_recompute = now.duration_since(self.last_recompute);
303 let hard_deadline_exceeded =
304 time_since_recompute >= Duration::from_millis(self.config.hard_deadline_ms);
305 let min_obs_met = self.observations_since_recompute >= self.config.min_observations_between;
306 let wealth_exceeded = self.wealth >= self.threshold;
307
308 let should_recompute = hard_deadline_exceeded || (wealth_exceeded && min_obs_met);
309 let forced_by_deadline = hard_deadline_exceeded && !wealth_exceeded;
310
311 let action = if should_recompute {
312 if forced_by_deadline {
313 "recompute_forced"
314 } else {
315 "recompute_eprocess"
316 }
317 } else {
318 "observe"
319 };
320
321 self.log_decision(
322 now,
323 matched,
324 wealth_before,
325 self.wealth,
326 action,
327 time_since_recompute,
328 );
329
330 if should_recompute {
331 self.trigger_recompute(now, forced_by_deadline);
332 }
333
334 ThrottleDecision {
335 should_recompute,
336 wealth: self.wealth,
337 lambda: self.lambda,
338 empirical_rate,
339 forced_by_deadline: should_recompute && forced_by_deadline,
340 observations_since_recompute: self.observations_since_recompute,
341 }
342 }
343
344 pub fn reset(&mut self) {
347 self.reset_at(Instant::now());
348 }
349
350 pub fn reset_at(&mut self, now: Instant) {
352 self.wealth = 1.0;
353 self.observations_since_recompute = 0;
354 self.last_recompute = now;
355 self.recent_matches.clear();
356 }
359
360 pub fn set_mu_0(&mut self, mu_0: f64) {
365 self.mu_0 = mu_0.clamp(MU_0_MIN, MU_0_MAX);
366 self.lambda_max = 1.0 / self.mu_0 - 1e-6;
367 self.lambda = self.lambda.clamp(1e-6, self.lambda_max);
368 self.reset();
369 }
370
371 #[inline]
373 pub fn wealth(&self) -> f64 {
374 self.wealth
375 }
376
377 #[inline]
379 pub fn lambda(&self) -> f64 {
380 self.lambda
381 }
382
383 pub fn empirical_match_rate(&self) -> f64 {
385 if self.recent_matches.is_empty() {
386 return 0.0;
387 }
388 let matches = self.recent_matches.iter().filter(|&&m| m).count();
389 matches as f64 / self.recent_matches.len() as f64
390 }
391
392 #[inline]
394 pub fn threshold(&self) -> f64 {
395 self.threshold
396 }
397
398 #[inline]
400 pub fn observation_count(&self) -> u64 {
401 self.observation_count
402 }
403
404 pub fn stats(&self) -> ThrottleStats {
406 let avg_obs = if self.total_recomputes > 0 {
407 self.cumulative_obs_at_recompute as f64 / self.total_recomputes as f64
408 } else {
409 0.0
410 };
411
412 ThrottleStats {
413 total_observations: self.observation_count,
414 total_recomputes: self.total_recomputes,
415 forced_recomputes: self.forced_recomputes,
416 eprocess_recomputes: self.eprocess_recomputes,
417 current_wealth: self.wealth,
418 current_lambda: self.lambda,
419 empirical_rate: self.empirical_match_rate(),
420 avg_observations_between_recomputes: avg_obs,
421 }
422 }
423
424 pub fn logs(&self) -> &[ThrottleLog] {
426 &self.logs
427 }
428
429 pub fn clear_logs(&mut self) {
431 self.logs.clear();
432 }
433
434 fn trigger_recompute(&mut self, now: Instant, forced: bool) {
437 self.total_recomputes += 1;
438 self.cumulative_obs_at_recompute += self.observations_since_recompute;
439 if forced {
440 self.forced_recomputes += 1;
441 } else {
442 self.eprocess_recomputes += 1;
443 }
444 self.wealth = 1.0;
445 self.observations_since_recompute = 0;
446 self.last_recompute = now;
447 }
448
449 fn log_decision(
450 &mut self,
451 now: Instant,
452 matched: bool,
453 wealth_before: f64,
454 wealth_after: f64,
455 action: &'static str,
456 time_since_recompute: Duration,
457 ) {
458 if !self.config.enable_logging {
459 return;
460 }
461
462 self.logs.push(ThrottleLog {
463 timestamp: now,
464 observation_idx: self.observation_count,
465 matched,
466 wealth_before,
467 wealth_after,
468 lambda: self.lambda,
469 empirical_rate: self.empirical_match_rate(),
470 action,
471 time_since_recompute_ms: time_since_recompute.as_secs_f64() * 1000.0,
472 });
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 fn test_config() -> ThrottleConfig {
481 ThrottleConfig {
482 alpha: 0.05,
483 mu_0: 0.1,
484 initial_lambda: 0.5,
485 grapa_eta: 0.1,
486 hard_deadline_ms: 500,
487 min_observations_between: 4,
488 rate_window_size: 32,
489 enable_logging: true,
490 }
491 }
492
493 #[test]
498 fn initial_state() {
499 let t = EProcessThrottle::new(test_config());
500 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
501 assert_eq!(t.observation_count(), 0);
502 assert!(t.lambda() > 0.0);
503 assert!((t.threshold() - 20.0).abs() < 0.01); }
505
506 #[test]
507 fn mu_0_clamped_to_valid_range() {
508 let mut cfg = test_config();
509 cfg.mu_0 = 0.0;
510 let t = EProcessThrottle::new(cfg.clone());
511 assert!(t.mu_0 >= MU_0_MIN);
512
513 cfg.mu_0 = 1.0;
514 let t = EProcessThrottle::new(cfg.clone());
515 assert!(t.mu_0 <= MU_0_MAX);
516
517 cfg.mu_0 = -5.0;
518 let t = EProcessThrottle::new(cfg);
519 assert!(t.mu_0 >= MU_0_MIN);
520 }
521
522 #[test]
527 fn no_match_decreases_wealth() {
528 let base = Instant::now();
529 let mut t = EProcessThrottle::new_at(test_config(), base);
530 let d = t.observe_at(false, base + Duration::from_millis(1));
531 assert!(
532 d.wealth < 1.0,
533 "No-match should decrease wealth: {}",
534 d.wealth
535 );
536 }
537
538 #[test]
539 fn match_increases_wealth() {
540 let base = Instant::now();
541 let mut t = EProcessThrottle::new_at(test_config(), base);
542 let d = t.observe_at(true, base + Duration::from_millis(1));
543 assert!(d.wealth > 1.0, "Match should increase wealth: {}", d.wealth);
544 }
545
546 #[test]
547 fn wealth_stays_positive() {
548 let base = Instant::now();
549 let mut t = EProcessThrottle::new_at(test_config(), base);
550 for i in 1..=1000 {
552 let d = t.observe_at(false, base + Duration::from_millis(i));
553 assert!(d.wealth > 0.0, "Wealth must stay positive at obs {}", i);
554 }
555 }
556
557 #[test]
558 fn wealth_floor_prevents_zero_lock() {
559 let base = Instant::now();
560 let mut cfg = test_config();
561 cfg.hard_deadline_ms = u64::MAX; cfg.initial_lambda = 0.99; let mut t = EProcessThrottle::new_at(cfg, base);
564
565 for i in 1..=500 {
566 t.observe_at(false, base + Duration::from_millis(i));
567 }
568 assert!(t.wealth() >= W_MIN, "Wealth should be at floor, not zero");
569
570 let before = t.wealth();
572 t.observe_at(true, base + Duration::from_millis(501));
573 assert!(
574 t.wealth() > before,
575 "Match should grow wealth even from floor"
576 );
577 }
578
579 #[test]
584 fn burst_of_matches_triggers_recompute() {
585 let base = Instant::now();
586 let mut cfg = test_config();
587 cfg.min_observations_between = 1; let mut t = EProcessThrottle::new_at(cfg, base);
589
590 let mut triggered = false;
591 for i in 1..=100 {
592 let d = t.observe_at(true, base + Duration::from_millis(i));
593 if d.should_recompute && !d.forced_by_deadline {
594 triggered = true;
595 break;
596 }
597 }
598 assert!(
599 triggered,
600 "Burst of matches should trigger e-process recompute"
601 );
602 }
603
604 #[test]
605 fn no_matches_does_not_trigger_eprocess() {
606 let base = Instant::now();
607 let mut cfg = test_config();
608 cfg.hard_deadline_ms = u64::MAX;
609 let mut t = EProcessThrottle::new_at(cfg, base);
610
611 for i in 1..=200 {
612 let d = t.observe_at(false, base + Duration::from_millis(i));
613 assert!(
614 !d.should_recompute,
615 "No-match stream should never trigger e-process recompute at obs {}",
616 i
617 );
618 }
619 }
620
621 #[test]
622 fn hard_deadline_forces_recompute() {
623 let base = Instant::now();
624 let mut cfg = test_config();
625 cfg.hard_deadline_ms = 100;
626 cfg.min_observations_between = 1;
627 let mut t = EProcessThrottle::new_at(cfg, base);
628
629 let d = t.observe_at(false, base + Duration::from_millis(150));
631 assert!(d.should_recompute, "Should trigger on deadline");
632 assert!(d.forced_by_deadline, "Should be forced by deadline");
633 }
634
635 #[test]
636 fn min_observations_between_prevents_rapid_fire() {
637 let base = Instant::now();
638 let mut cfg = test_config();
639 cfg.min_observations_between = 10;
640 cfg.hard_deadline_ms = u64::MAX;
641 cfg.alpha = 0.5; let mut t = EProcessThrottle::new_at(cfg, base);
643
644 let mut first_trigger = None;
645 for i in 1..=100 {
646 let d = t.observe_at(true, base + Duration::from_millis(i));
647 if d.should_recompute {
648 first_trigger = Some(i);
649 break;
650 }
651 }
652
653 assert!(
654 first_trigger.unwrap_or(0) >= 10,
655 "First trigger should be at obs >= 10, was {:?}",
656 first_trigger
657 );
658 }
659
660 #[test]
661 fn reset_clears_wealth_and_counter() {
662 let base = Instant::now();
663 let mut t = EProcessThrottle::new_at(test_config(), base);
664
665 for i in 1..=10 {
666 t.observe_at(true, base + Duration::from_millis(i));
667 }
668 assert!(t.wealth() > 1.0);
669 assert!(t.observations_since_recompute > 0);
670
671 t.reset_at(base + Duration::from_millis(20));
672 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
673 assert_eq!(t.observations_since_recompute, 0);
674 }
675
676 #[test]
681 fn lambda_adapts_to_high_match_rate() {
682 let base = Instant::now();
683 let mut cfg = test_config();
684 cfg.hard_deadline_ms = u64::MAX;
685 cfg.min_observations_between = u64::MAX;
686 let mut t = EProcessThrottle::new_at(cfg, base);
687
688 let initial_lambda = t.lambda();
689
690 for i in 1..=50 {
692 t.observe_at(true, base + Duration::from_millis(i));
693 }
694
695 assert!(
696 t.lambda() > initial_lambda,
697 "Lambda should increase with frequent matches: {} vs {}",
698 t.lambda(),
699 initial_lambda
700 );
701 }
702
703 #[test]
704 fn lambda_adapts_to_low_match_rate() {
705 let base = Instant::now();
706 let mut cfg = test_config();
707 cfg.hard_deadline_ms = u64::MAX;
708 cfg.min_observations_between = u64::MAX;
709 cfg.initial_lambda = 0.8;
710 let mut t = EProcessThrottle::new_at(cfg, base);
711
712 let initial_lambda = t.lambda();
713
714 for i in 1..=50 {
716 t.observe_at(false, base + Duration::from_millis(i));
717 }
718
719 assert!(
720 t.lambda() < initial_lambda,
721 "Lambda should decrease with few matches: {} vs {}",
722 t.lambda(),
723 initial_lambda
724 );
725 }
726
727 #[test]
728 fn lambda_stays_bounded() {
729 let base = Instant::now();
730 let mut cfg = test_config();
731 cfg.hard_deadline_ms = u64::MAX;
732 cfg.min_observations_between = u64::MAX;
733 cfg.grapa_eta = 1.0; let mut t = EProcessThrottle::new_at(cfg, base);
735
736 for i in 1..=200 {
737 let matched = i % 2 == 0;
738 t.observe_at(matched, base + Duration::from_millis(i as u64));
739 }
740
741 assert!(t.lambda() > 0.0, "Lambda must be positive");
742 assert!(
743 t.lambda() <= t.lambda_max,
744 "Lambda must not exceed 1/(1-mu_0): {} vs {}",
745 t.lambda(),
746 t.lambda_max
747 );
748 }
749
750 #[test]
755 fn empirical_rate_tracks_window() {
756 let base = Instant::now();
757 let mut cfg = test_config();
758 cfg.rate_window_size = 10;
759 cfg.hard_deadline_ms = u64::MAX;
760 cfg.min_observations_between = u64::MAX;
761 let mut t = EProcessThrottle::new_at(cfg, base);
762
763 for i in 1..=10 {
765 t.observe_at(true, base + Duration::from_millis(i));
766 }
767 assert!((t.empirical_match_rate() - 1.0).abs() < f64::EPSILON);
768
769 for i in 11..=20 {
771 t.observe_at(false, base + Duration::from_millis(i));
772 }
773 assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
774 }
775
776 #[test]
777 fn empirical_rate_zero_when_empty() {
778 let t = EProcessThrottle::new(test_config());
779 assert!((t.empirical_match_rate() - 0.0).abs() < f64::EPSILON);
780 }
781
782 #[test]
787 fn stats_reflect_state() {
788 let base = Instant::now();
789 let mut cfg = test_config();
790 cfg.min_observations_between = 1;
791 let mut t = EProcessThrottle::new_at(cfg, base);
792
793 let mut recomputed = false;
795 for i in 1..=50 {
796 let d = t.observe_at(true, base + Duration::from_millis(i));
797 if d.should_recompute {
798 recomputed = true;
799 }
800 }
801
802 let stats = t.stats();
803 assert_eq!(stats.total_observations, 50);
804 if recomputed {
805 assert!(stats.total_recomputes > 0);
806 assert!(stats.avg_observations_between_recomputes > 0.0);
807 }
808 }
809
810 #[test]
811 fn logging_captures_decisions() {
812 let base = Instant::now();
813 let mut cfg = test_config();
814 cfg.enable_logging = true;
815 let mut t = EProcessThrottle::new_at(cfg, base);
816
817 t.observe_at(true, base + Duration::from_millis(1));
818 t.observe_at(false, base + Duration::from_millis(2));
819
820 assert_eq!(t.logs().len(), 2);
821 assert!(t.logs()[0].matched);
822 assert!(!t.logs()[1].matched);
823
824 t.clear_logs();
825 assert!(t.logs().is_empty());
826 }
827
828 #[test]
829 fn logging_disabled_by_default() {
830 let base = Instant::now();
831 let mut cfg = test_config();
832 cfg.enable_logging = false;
833 let mut t = EProcessThrottle::new_at(cfg, base);
834
835 t.observe_at(true, base + Duration::from_millis(1));
836 assert!(t.logs().is_empty());
837 }
838
839 #[test]
844 fn set_mu_0_resets_eprocess() {
845 let base = Instant::now();
846 let mut t = EProcessThrottle::new_at(test_config(), base);
847
848 for i in 1..=10 {
849 t.observe_at(true, base + Duration::from_millis(i));
850 }
851 assert!(t.wealth() > 1.0);
852
853 t.set_mu_0(0.5);
854 assert!((t.wealth() - 1.0).abs() < f64::EPSILON);
855 }
856
857 #[test]
862 fn deterministic_behavior() {
863 let base = Instant::now();
864 let cfg = test_config();
865
866 let run = |cfg: &ThrottleConfig| {
867 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
868 let mut decisions = Vec::new();
869 for i in 1..=30 {
870 let matched = i % 3 == 0;
871 let d = t.observe_at(matched, base + Duration::from_millis(i));
872 decisions.push((d.should_recompute, d.forced_by_deadline));
873 }
874 (decisions, t.wealth(), t.lambda())
875 };
876
877 let (d1, w1, l1) = run(&cfg);
878 let (d2, w2, l2) = run(&cfg);
879
880 assert_eq!(d1, d2, "Decisions must be deterministic");
881 assert!((w1 - w2).abs() < 1e-10, "Wealth must be deterministic");
882 assert!((l1 - l2).abs() < 1e-10, "Lambda must be deterministic");
883 }
884
885 #[test]
890 fn property_supermartingale_under_null() {
891 let base = Instant::now();
895 let mut cfg = test_config();
896 cfg.hard_deadline_ms = u64::MAX;
897 cfg.min_observations_between = u64::MAX;
898 cfg.mu_0 = 0.2;
899 cfg.grapa_eta = 0.0; let n_trials = 200;
902 let n_obs = 100;
903 let mut total_wealth = 0.0;
904
905 let mut rng_state: u64 = 42;
907 let lcg_next = |state: &mut u64| -> f64 {
908 *state = state
909 .wrapping_mul(6364136223846793005)
910 .wrapping_add(1442695040888963407);
911 (*state >> 33) as f64 / (1u64 << 31) as f64
912 };
913
914 for trial in 0..n_trials {
915 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
916 for i in 1..=n_obs {
917 let matched = lcg_next(&mut rng_state) < cfg.mu_0;
918 t.observe_at(
919 matched,
920 base + Duration::from_millis(i as u64 + trial * 1000),
921 );
922 }
923 total_wealth += t.wealth();
924 }
925
926 let avg_wealth = total_wealth / n_trials as f64;
927 assert!(
929 avg_wealth < 2.0,
930 "Average wealth under H₀ should be near 1.0, got {}",
931 avg_wealth
932 );
933 }
934
935 #[test]
940 fn property_type_i_control() {
941 let base = Instant::now();
944 let mut cfg = test_config();
945 cfg.hard_deadline_ms = u64::MAX;
946 cfg.min_observations_between = 1;
947 cfg.alpha = 0.05;
948 cfg.mu_0 = 0.1;
949 cfg.grapa_eta = 0.0; let n_trials = 500;
952 let n_obs = 200;
953 let mut false_triggers = 0u64;
954
955 let mut rng_state: u64 = 123;
956 let lcg_next = |state: &mut u64| -> f64 {
957 *state = state
958 .wrapping_mul(6364136223846793005)
959 .wrapping_add(1442695040888963407);
960 (*state >> 33) as f64 / (1u64 << 31) as f64
961 };
962
963 for trial in 0..n_trials {
964 let mut t = EProcessThrottle::new_at(cfg.clone(), base);
965 let mut triggered = false;
966 for i in 1..=n_obs {
967 let matched = lcg_next(&mut rng_state) < cfg.mu_0;
968 let d = t.observe_at(
969 matched,
970 base + Duration::from_millis(i as u64 + trial * 1000),
971 );
972 if d.should_recompute {
973 triggered = true;
974 break;
975 }
976 }
977 if triggered {
978 false_triggers += 1;
979 }
980 }
981
982 let false_trigger_rate = false_triggers as f64 / n_trials as f64;
983 assert!(
985 false_trigger_rate < cfg.alpha * 3.0,
986 "False trigger rate {} exceeds 3×α = {}",
987 false_trigger_rate,
988 cfg.alpha * 3.0
989 );
990 }
991
992 #[test]
997 fn single_observation() {
998 let base = Instant::now();
999 let cfg = test_config();
1000 let mut t = EProcessThrottle::new_at(cfg, base);
1001 let d = t.observe_at(true, base + Duration::from_millis(1));
1002 assert_eq!(t.observation_count(), 1);
1003 assert!(!d.should_recompute || d.forced_by_deadline);
1005 }
1006
1007 #[test]
1008 fn alternating_match_pattern() {
1009 let base = Instant::now();
1010 let mut cfg = test_config();
1011 cfg.hard_deadline_ms = u64::MAX;
1012 cfg.min_observations_between = u64::MAX;
1013 let mut t = EProcessThrottle::new_at(cfg, base);
1014
1015 for i in 1..=100 {
1017 t.observe_at(i % 2 == 0, base + Duration::from_millis(i as u64));
1018 }
1019
1020 assert!(
1022 t.wealth() > 1.0,
1023 "50% match rate vs 10% null should grow wealth: {}",
1024 t.wealth()
1025 );
1026 }
1027
1028 #[test]
1029 fn recompute_resets_wealth() {
1030 let base = Instant::now();
1031 let mut cfg = test_config();
1032 cfg.min_observations_between = 1;
1033 let mut t = EProcessThrottle::new_at(cfg, base);
1034
1035 let mut triggered = false;
1037 for i in 1..=100 {
1038 let d = t.observe_at(true, base + Duration::from_millis(i));
1039 if d.should_recompute && !d.forced_by_deadline {
1040 assert!(
1042 (t.wealth() - 1.0).abs() < f64::EPSILON,
1043 "Wealth should reset to 1.0 after recompute, got {}",
1044 t.wealth()
1045 );
1046 triggered = true;
1047 break;
1048 }
1049 }
1050 assert!(
1051 triggered,
1052 "Should have triggered at least one e-process recompute"
1053 );
1054 }
1055
1056 #[test]
1057 fn consecutive_recomputes_tracked() {
1058 let base = Instant::now();
1059 let mut cfg = test_config();
1060 cfg.min_observations_between = 1;
1061 cfg.alpha = 0.5; let mut t = EProcessThrottle::new_at(cfg, base);
1063
1064 let mut recompute_count = 0;
1065 for i in 1..=200 {
1066 let d = t.observe_at(true, base + Duration::from_millis(i));
1067 if d.should_recompute {
1068 recompute_count += 1;
1069 }
1070 }
1071
1072 let stats = t.stats();
1073 assert_eq!(stats.total_recomputes, recompute_count as u64);
1074 assert!(
1075 stats.total_recomputes >= 2,
1076 "Should have multiple recomputes"
1077 );
1078 }
1079}