1#![forbid(unsafe_code)]
2
3use std::time::Duration;
53
54const C_MIN: Duration = Duration::from_micros(1);
56
57const DEFAULT_GAMMA: f64 = 0.3;
59
60#[derive(Debug, Clone)]
62pub struct PipelineConfig {
63 pub prior_alpha: f64,
66
67 pub prior_beta: f64,
70
71 pub gamma: f64,
74
75 pub c_min: Duration,
77}
78
79impl Default for PipelineConfig {
80 fn default() -> Self {
81 Self {
82 prior_alpha: 1.0,
83 prior_beta: 1.0,
84 gamma: DEFAULT_GAMMA,
85 c_min: C_MIN,
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ValidatorStats {
93 pub id: usize,
95 pub name: String,
97 pub alpha: f64,
99 pub beta: f64,
101 pub cost_ema: Duration,
103 pub observations: u64,
105 pub failures: u64,
107}
108
109impl ValidatorStats {
110 #[inline]
112 pub fn failure_prob(&self) -> f64 {
113 self.alpha / (self.alpha + self.beta)
114 }
115
116 #[inline]
118 pub fn score(&self, c_min: Duration) -> f64 {
119 let c = self.cost_ema.max(c_min).as_secs_f64();
120 self.failure_prob() / c
121 }
122
123 #[inline]
125 pub fn variance(&self) -> f64 {
126 let sum = self.alpha + self.beta;
127 (self.alpha * self.beta) / (sum * sum * (sum + 1.0))
128 }
129
130 #[inline]
132 pub fn confidence_width(&self) -> f64 {
133 2.0 * 1.96 * self.variance().sqrt()
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct LedgerEntry {
140 pub id: usize,
142 pub name: String,
144 pub p: f64,
146 pub c: Duration,
148 pub score: f64,
150 pub rank: usize,
152}
153
154#[derive(Debug, Clone)]
156pub struct ValidationOutcome {
157 pub id: usize,
159 pub passed: bool,
161 pub duration: Duration,
163}
164
165#[derive(Debug, Clone)]
167pub struct PipelineResult {
168 pub all_passed: bool,
170 pub outcomes: Vec<ValidationOutcome>,
172 pub total_cost: Duration,
174 pub ordering: Vec<usize>,
176 pub ledger: Vec<LedgerEntry>,
178 pub skipped: usize,
180}
181
182#[derive(Debug, Clone)]
184pub struct ValidationPipeline {
185 config: PipelineConfig,
186 validators: Vec<ValidatorStats>,
187 total_runs: u64,
189}
190
191impl ValidationPipeline {
192 pub fn new() -> Self {
194 Self {
195 config: PipelineConfig::default(),
196 validators: Vec::new(),
197 total_runs: 0,
198 }
199 }
200
201 pub fn with_config(config: PipelineConfig) -> Self {
203 Self {
204 config,
205 validators: Vec::new(),
206 total_runs: 0,
207 }
208 }
209
210 pub fn register(&mut self, name: impl Into<String>, initial_cost: Duration) -> usize {
213 let id = self.validators.len();
214 self.validators.push(ValidatorStats {
215 id,
216 name: name.into(),
217 alpha: self.config.prior_alpha,
218 beta: self.config.prior_beta,
219 cost_ema: initial_cost.max(self.config.c_min),
220 observations: 0,
221 failures: 0,
222 });
223 id
224 }
225
226 pub fn compute_ordering(&self) -> (Vec<usize>, Vec<LedgerEntry>) {
229 if self.validators.is_empty() {
230 return (Vec::new(), Vec::new());
231 }
232
233 let mut scored: Vec<(usize, f64)> = self
235 .validators
236 .iter()
237 .map(|v| (v.id, v.score(self.config.c_min)))
238 .collect();
239
240 scored.sort_by(|a, b| {
242 b.1.partial_cmp(&a.1)
243 .unwrap_or(std::cmp::Ordering::Equal)
244 .then_with(|| a.0.cmp(&b.0))
245 });
246
247 let ordering: Vec<usize> = scored.iter().map(|(id, _)| *id).collect();
248
249 let ledger: Vec<LedgerEntry> = scored
250 .iter()
251 .enumerate()
252 .map(|(rank, (id, score))| {
253 let v = &self.validators[*id];
254 LedgerEntry {
255 id: *id,
256 name: v.name.clone(),
257 p: v.failure_prob(),
258 c: v.cost_ema,
259 score: *score,
260 rank,
261 }
262 })
263 .collect();
264
265 (ordering, ledger)
266 }
267
268 pub fn expected_cost(&self, ordering: &[usize]) -> f64 {
274 let mut survival = 1.0; let mut total = 0.0;
276
277 for &id in ordering {
278 let v = &self.validators[id];
279 let c = v.cost_ema.max(self.config.c_min).as_secs_f64();
280 total += c * survival;
281 survival *= 1.0 - v.failure_prob();
282 }
283
284 total
285 }
286
287 pub fn update(&mut self, outcome: &ValidationOutcome) {
289 if let Some(v) = self.validators.get_mut(outcome.id) {
290 v.observations += 1;
291 if outcome.passed {
292 v.beta += 1.0;
293 } else {
294 v.alpha += 1.0;
295 v.failures += 1;
296 }
297 let gamma = self.config.gamma;
299 let old_ns = v.cost_ema.as_nanos() as f64;
300 let new_ns = outcome.duration.as_nanos() as f64;
301 let updated_ns = gamma * new_ns + (1.0 - gamma) * old_ns;
302 v.cost_ema =
303 Duration::from_nanos(updated_ns.max(self.config.c_min.as_nanos() as f64) as u64);
304 }
305 }
306
307 pub fn update_batch(&mut self, result: &PipelineResult) {
309 self.total_runs += 1;
310 for outcome in &result.outcomes {
311 self.update(outcome);
312 }
313 }
314
315 pub fn run<F>(&self, mut validate: F) -> PipelineResult
320 where
321 F: FnMut(usize) -> (bool, Duration),
322 {
323 let (ordering, ledger) = self.compute_ordering();
324 let total_validators = ordering.len();
325 let mut outcomes = Vec::with_capacity(total_validators);
326 let mut total_cost = Duration::ZERO;
327 let mut all_passed = true;
328
329 for &id in &ordering {
330 let (passed, duration) = validate(id);
331 total_cost += duration;
332 outcomes.push(ValidationOutcome {
333 id,
334 passed,
335 duration,
336 });
337 if !passed {
338 all_passed = false;
339 break; }
341 }
342
343 let skipped = total_validators - outcomes.len();
344
345 PipelineResult {
346 all_passed,
347 outcomes,
348 total_cost,
349 ordering,
350 ledger,
351 skipped,
352 }
353 }
354
355 pub fn stats(&self, id: usize) -> Option<&ValidatorStats> {
357 self.validators.get(id)
358 }
359
360 pub fn all_stats(&self) -> &[ValidatorStats] {
362 &self.validators
363 }
364
365 pub fn total_runs(&self) -> u64 {
367 self.total_runs
368 }
369
370 pub fn validator_count(&self) -> usize {
372 self.validators.len()
373 }
374
375 pub fn summary(&self) -> PipelineSummary {
377 let (ordering, ledger) = self.compute_ordering();
378 let expected = self.expected_cost(&ordering);
379 let natural: Vec<usize> = (0..self.validators.len()).collect();
381 let natural_cost = self.expected_cost(&natural);
382 let improvement = if natural_cost > 0.0 {
383 1.0 - expected / natural_cost
384 } else {
385 0.0
386 };
387
388 PipelineSummary {
389 validator_count: self.validators.len(),
390 total_runs: self.total_runs,
391 optimal_ordering: ordering,
392 expected_cost_secs: expected,
393 natural_cost_secs: natural_cost,
394 improvement_fraction: improvement,
395 ledger,
396 }
397 }
398}
399
400impl Default for ValidationPipeline {
401 fn default() -> Self {
402 Self::new()
403 }
404}
405
406#[derive(Debug, Clone)]
408pub struct PipelineSummary {
409 pub validator_count: usize,
410 pub total_runs: u64,
411 pub optimal_ordering: Vec<usize>,
412 pub expected_cost_secs: f64,
413 pub natural_cost_secs: f64,
414 pub improvement_fraction: f64,
415 pub ledger: Vec<LedgerEntry>,
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
425 fn empty_pipeline_returns_success() {
426 let pipeline = ValidationPipeline::new();
427 let result = pipeline.run(|_| unreachable!());
428 assert!(result.all_passed);
429 assert!(result.outcomes.is_empty());
430 assert_eq!(result.total_cost, Duration::ZERO);
431 assert_eq!(result.skipped, 0);
432 }
433
434 #[test]
435 fn single_validator_pass() {
436 let mut pipeline = ValidationPipeline::new();
437 pipeline.register("check_a", Duration::from_millis(10));
438 let result = pipeline.run(|_| (true, Duration::from_millis(8)));
439 assert!(result.all_passed);
440 assert_eq!(result.outcomes.len(), 1);
441 assert_eq!(result.skipped, 0);
442 }
443
444 #[test]
445 fn single_validator_fail() {
446 let mut pipeline = ValidationPipeline::new();
447 pipeline.register("check_a", Duration::from_millis(10));
448 let result = pipeline.run(|_| (false, Duration::from_millis(5)));
449 assert!(!result.all_passed);
450 assert_eq!(result.outcomes.len(), 1);
451 assert!(!result.outcomes[0].passed);
452 }
453
454 #[test]
455 fn early_exit_on_failure() {
456 let mut pipeline = ValidationPipeline::new();
457 pipeline.register("cheap_fail", Duration::from_millis(1));
458 pipeline.register("expensive", Duration::from_millis(100));
459 pipeline.register("also_expensive", Duration::from_millis(50));
460
461 for _ in 0..10 {
463 pipeline.update(&ValidationOutcome {
464 id: 0,
465 passed: false,
466 duration: Duration::from_millis(1),
467 });
468 }
469
470 let mut ran = Vec::new();
471 let result = pipeline.run(|id| {
472 ran.push(id);
473 if id == 0 {
474 (false, Duration::from_millis(1))
475 } else {
476 (true, Duration::from_millis(50))
477 }
478 });
479
480 assert!(!result.all_passed);
481 assert_eq!(ran.len(), 1);
483 assert_eq!(ran[0], 0);
484 assert_eq!(result.skipped, 2);
485 }
486
487 #[test]
488 fn unit_expected_cost_formula() {
489 let mut pipeline = ValidationPipeline::new();
493 let a = pipeline.register("A", Duration::from_millis(10));
494 let b = pipeline.register("B", Duration::from_millis(100));
495
496 for _ in 0..7 {
498 pipeline.update(&ValidationOutcome {
499 id: a,
500 passed: false,
501 duration: Duration::from_millis(10),
502 });
503 }
504 for _ in 0..1 {
505 pipeline.update(&ValidationOutcome {
506 id: a,
507 passed: true,
508 duration: Duration::from_millis(10),
509 });
510 }
511 for _ in 0..1 {
515 pipeline.update(&ValidationOutcome {
516 id: b,
517 passed: false,
518 duration: Duration::from_millis(100),
519 });
520 }
521 for _ in 0..7 {
522 pipeline.update(&ValidationOutcome {
523 id: b,
524 passed: true,
525 duration: Duration::from_millis(100),
526 });
527 }
528 let p_a = pipeline.stats(a).unwrap().failure_prob();
531 let p_b = pipeline.stats(b).unwrap().failure_prob();
532 assert!((p_a - 0.8).abs() < 1e-10);
533 assert!((p_b - 0.2).abs() < 1e-10);
534
535 let cost_ab = pipeline.expected_cost(&[a, b]);
537 let c_a = pipeline.stats(a).unwrap().cost_ema.as_secs_f64();
538 let c_b = pipeline.stats(b).unwrap().cost_ema.as_secs_f64();
539 let expected_ab = c_a + (1.0 - p_a) * c_b;
540 assert!((cost_ab - expected_ab).abs() < 1e-9);
541
542 let cost_ba = pipeline.expected_cost(&[b, a]);
544 let expected_ba = c_b + (1.0 - p_b) * c_a;
545 assert!((cost_ba - expected_ba).abs() < 1e-9);
546
547 assert!(cost_ab < cost_ba);
549 }
550
551 #[test]
552 fn unit_posterior_update() {
553 let mut pipeline = ValidationPipeline::new();
554 let id = pipeline.register("v", Duration::from_millis(5));
555
556 assert!((pipeline.stats(id).unwrap().failure_prob() - 0.5).abs() < 1e-10);
558
559 for _ in 0..3 {
561 pipeline.update(&ValidationOutcome {
562 id,
563 passed: false,
564 duration: Duration::from_millis(5),
565 });
566 }
567 assert!((pipeline.stats(id).unwrap().failure_prob() - 0.8).abs() < 1e-10);
569
570 for _ in 0..4 {
572 pipeline.update(&ValidationOutcome {
573 id,
574 passed: true,
575 duration: Duration::from_millis(5),
576 });
577 }
578 assert!((pipeline.stats(id).unwrap().failure_prob() - 4.0 / 9.0).abs() < 1e-10);
580 }
581
582 #[test]
583 fn optimal_ordering_sorts_by_score() {
584 let mut pipeline = ValidationPipeline::new();
585 let a = pipeline.register("A_cheap_reliable", Duration::from_millis(1));
587 let b = pipeline.register("B_expensive_flaky", Duration::from_millis(100));
589 let c = pipeline.register("C_cheap_flaky", Duration::from_millis(1));
591
592 for _ in 0..8 {
594 pipeline.update(&ValidationOutcome {
595 id: b,
596 passed: false,
597 duration: Duration::from_millis(100),
598 });
599 }
600 for _ in 0..8 {
602 pipeline.update(&ValidationOutcome {
603 id: c,
604 passed: false,
605 duration: Duration::from_millis(1),
606 });
607 }
608 for _ in 0..8 {
610 pipeline.update(&ValidationOutcome {
611 id: a,
612 passed: true,
613 duration: Duration::from_millis(1),
614 });
615 }
616
617 let (ordering, _ledger) = pipeline.compute_ordering();
618 assert_eq!(ordering[0], c);
620 assert_eq!(ordering[1], a);
622 assert_eq!(ordering[2], b);
624 }
625
626 #[test]
627 fn cost_ema_updates() {
628 let mut pipeline = ValidationPipeline::with_config(PipelineConfig {
629 gamma: 0.5,
630 ..Default::default()
631 });
632 let id = pipeline.register("v", Duration::from_millis(10));
633
634 pipeline.update(&ValidationOutcome {
636 id,
637 passed: true,
638 duration: Duration::from_millis(20),
639 });
640 let cost = pipeline.stats(id).unwrap().cost_ema;
642 assert!((cost.as_millis() as i64 - 15).abs() <= 1);
643
644 pipeline.update(&ValidationOutcome {
646 id,
647 passed: true,
648 duration: Duration::from_millis(30),
649 });
650 let cost = pipeline.stats(id).unwrap().cost_ema;
652 assert!((cost.as_millis() as i64 - 22).abs() <= 1);
653 }
654
655 #[test]
656 fn cost_floor_prevents_zero() {
657 let mut pipeline = ValidationPipeline::new();
658 let id = pipeline.register("v", Duration::ZERO);
659 let cost = pipeline.stats(id).unwrap().cost_ema;
661 assert!(cost >= C_MIN);
662 }
663
664 #[test]
665 fn ledger_records_all_validators() {
666 let mut pipeline = ValidationPipeline::new();
667 pipeline.register("a", Duration::from_millis(5));
668 pipeline.register("b", Duration::from_millis(10));
669 pipeline.register("c", Duration::from_millis(15));
670
671 let (_, ledger) = pipeline.compute_ordering();
672 assert_eq!(ledger.len(), 3);
673
674 let mut ranks: Vec<usize> = ledger.iter().map(|e| e.rank).collect();
676 ranks.sort_unstable();
677 assert_eq!(ranks, vec![0, 1, 2]);
678 }
679
680 #[test]
681 fn deterministic_under_same_history() {
682 let run = || {
683 let mut p = ValidationPipeline::new();
684 p.register("x", Duration::from_millis(10));
685 p.register("y", Duration::from_millis(20));
686 p.register("z", Duration::from_millis(5));
687
688 let history = [
690 (0, false, 10),
691 (1, true, 20),
692 (2, false, 5),
693 (0, true, 12),
694 (1, false, 18),
695 (2, true, 6),
696 (0, false, 9),
697 (1, true, 22),
698 (2, false, 4),
699 ];
700 for (id, passed, ms) in history {
701 p.update(&ValidationOutcome {
702 id,
703 passed,
704 duration: Duration::from_millis(ms),
705 });
706 }
707
708 let (ordering, _) = p.compute_ordering();
709 let cost = p.expected_cost(&ordering);
710 (ordering, cost)
711 };
712
713 let (o1, c1) = run();
714 let (o2, c2) = run();
715 assert_eq!(o1, o2);
716 assert!((c1 - c2).abs() < 1e-15);
717 }
718
719 #[test]
720 fn summary_shows_improvement() {
721 let mut pipeline = ValidationPipeline::new();
722 pipeline.register("expensive_reliable", Duration::from_millis(100));
724 pipeline.register("cheap_flaky", Duration::from_millis(1));
725
726 for _ in 0..20 {
728 pipeline.update(&ValidationOutcome {
729 id: 1,
730 passed: false,
731 duration: Duration::from_millis(1),
732 });
733 }
734 for _ in 0..20 {
736 pipeline.update(&ValidationOutcome {
737 id: 0,
738 passed: true,
739 duration: Duration::from_millis(100),
740 });
741 }
742
743 let summary = pipeline.summary();
744 assert_eq!(summary.optimal_ordering[0], 1);
746 assert!(summary.improvement_fraction > 0.0);
748 }
749
750 #[test]
751 fn variance_decreases_with_observations() {
752 let mut pipeline = ValidationPipeline::new();
753 let id = pipeline.register("v", Duration::from_millis(5));
754
755 let var_0 = pipeline.stats(id).unwrap().variance();
756
757 for _ in 0..10 {
758 pipeline.update(&ValidationOutcome {
759 id,
760 passed: false,
761 duration: Duration::from_millis(5),
762 });
763 }
764 let var_10 = pipeline.stats(id).unwrap().variance();
765
766 for _ in 0..90 {
767 pipeline.update(&ValidationOutcome {
768 id,
769 passed: false,
770 duration: Duration::from_millis(5),
771 });
772 }
773 let var_100 = pipeline.stats(id).unwrap().variance();
774
775 assert!(var_10 < var_0);
777 assert!(var_100 < var_10);
778 }
779
780 #[test]
781 fn confidence_width_contracts() {
782 let mut pipeline = ValidationPipeline::new();
783 let id = pipeline.register("v", Duration::from_millis(5));
784
785 let w0 = pipeline.stats(id).unwrap().confidence_width();
786
787 for _ in 0..50 {
788 pipeline.update(&ValidationOutcome {
789 id,
790 passed: true,
791 duration: Duration::from_millis(5),
792 });
793 }
794 let w50 = pipeline.stats(id).unwrap().confidence_width();
795
796 assert!(w50 < w0, "CI should narrow: w0={w0}, w50={w50}");
797 }
798
799 #[test]
800 fn update_batch_increments_total_runs() {
801 let mut pipeline = ValidationPipeline::new();
802 pipeline.register("v", Duration::from_millis(5));
803 assert_eq!(pipeline.total_runs(), 0);
804
805 let result = PipelineResult {
806 all_passed: true,
807 outcomes: vec![ValidationOutcome {
808 id: 0,
809 passed: true,
810 duration: Duration::from_millis(4),
811 }],
812 total_cost: Duration::from_millis(4),
813 ordering: vec![0],
814 ledger: Vec::new(),
815 skipped: 0,
816 };
817 pipeline.update_batch(&result);
818 assert_eq!(pipeline.total_runs(), 1);
819 }
820
821 #[test]
824 fn expected_cost_matches_brute_force_n3() {
825 let mut pipeline = ValidationPipeline::new();
826 pipeline.register("a", Duration::from_millis(10));
827 pipeline.register("b", Duration::from_millis(20));
828 pipeline.register("c", Duration::from_millis(5));
829
830 for _ in 0..3 {
833 pipeline.update(&ValidationOutcome {
834 id: 0,
835 passed: false,
836 duration: Duration::from_millis(10),
837 });
838 }
839 pipeline.update(&ValidationOutcome {
841 id: 1,
842 passed: false,
843 duration: Duration::from_millis(20),
844 });
845 for _ in 0..3 {
846 pipeline.update(&ValidationOutcome {
847 id: 1,
848 passed: true,
849 duration: Duration::from_millis(20),
850 });
851 }
852 for _ in 0..2 {
854 pipeline.update(&ValidationOutcome {
855 id: 2,
856 passed: false,
857 duration: Duration::from_millis(5),
858 });
859 }
860 pipeline.update(&ValidationOutcome {
861 id: 2,
862 passed: true,
863 duration: Duration::from_millis(5),
864 });
865
866 let perms: &[&[usize]] = &[
868 &[0, 1, 2],
869 &[0, 2, 1],
870 &[1, 0, 2],
871 &[1, 2, 0],
872 &[2, 0, 1],
873 &[2, 1, 0],
874 ];
875 let mut best_cost = f64::MAX;
876 let mut best_perm = &[0usize, 1, 2][..];
877 for perm in perms {
878 let cost = pipeline.expected_cost(perm);
879 if cost < best_cost {
880 best_cost = cost;
881 best_perm = perm;
882 }
883 }
884
885 let (optimal, _) = pipeline.compute_ordering();
887 let optimal_cost = pipeline.expected_cost(&optimal);
888
889 assert!(
890 (optimal_cost - best_cost).abs() < 1e-12,
891 "optimal={optimal_cost}, brute_force={best_cost}, best_perm={best_perm:?}, our={optimal:?}"
892 );
893 }
894
895 #[test]
898 fn perf_ordering_overhead() {
899 let mut pipeline = ValidationPipeline::new();
900 for i in 0..100 {
902 pipeline.register(format!("v{i}"), Duration::from_micros(100 + i as u64 * 10));
903 }
904 for i in 0..100 {
906 for _ in 0..5 {
907 pipeline.update(&ValidationOutcome {
908 id: i,
909 passed: i % 3 != 0,
910 duration: Duration::from_micros(100 + i as u64 * 10),
911 });
912 }
913 }
914
915 let start = std::time::Instant::now();
916 for _ in 0..1000 {
917 let _ = pipeline.compute_ordering();
918 }
919 let elapsed = start.elapsed();
920 assert!(
922 elapsed < Duration::from_millis(100),
923 "ordering overhead too high: {elapsed:?} for 1000 iterations"
924 );
925 }
926}