1use std::collections::BTreeSet;
2
3use crate::agent::AgentState;
4use crate::rng::DeterministicRng;
5
6#[derive(Debug, Clone, PartialEq)]
12pub struct OracleResult {
13 pub passed: bool,
15 pub violations: Vec<InvariantViolation>,
17}
18
19impl OracleResult {
20 #[must_use]
22 const fn pass() -> Self {
23 Self {
24 passed: true,
25 violations: Vec::new(),
26 }
27 }
28
29 #[must_use]
31 const fn fail(violations: Vec<InvariantViolation>) -> Self {
32 Self {
33 passed: false,
34 violations,
35 }
36 }
37
38 #[must_use]
40 fn merge(mut self, other: Self) -> Self {
41 if !other.passed {
42 self.passed = false;
43 self.violations.extend(other.violations);
44 }
45 self
46 }
47}
48
49#[derive(Debug, Clone, PartialEq)]
53pub enum InvariantViolation {
54 Convergence {
58 agent_a: usize,
60 agent_b: usize,
62 only_in_a: Vec<u64>,
64 only_in_b: Vec<u64>,
66 },
67
68 Commutativity {
72 permutation_index: usize,
74 missing_events: Vec<u64>,
76 extra_events: Vec<u64>,
78 },
79
80 Idempotence {
84 event_id: u64,
86 events_before: Vec<u64>,
88 events_after_dup: Vec<u64>,
90 },
91
92 CausalConsistency {
96 observer_agent: usize,
98 source_agent: usize,
100 missing_seq: u64,
102 present_higher_seq: u64,
104 },
105
106 TriageStability {
110 agent_a: usize,
112 agent_b: usize,
114 score_a: f64,
116 score_b: f64,
118 diff: f64,
120 epsilon: f64,
122 },
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct ConvergenceReport {
130 pub converged: bool,
132 pub divergent_agents: Vec<usize>,
134 pub canonical_event_count: usize,
136}
137
138pub struct ConvergenceOracle;
157
158impl ConvergenceOracle {
159 #[must_use]
166 pub fn evaluate(states: &[AgentState]) -> ConvergenceReport {
167 if states.is_empty() {
168 return ConvergenceReport {
169 converged: true,
170 divergent_agents: Vec::new(),
171 canonical_event_count: 0,
172 };
173 }
174
175 let canonical = &states[0].known_events;
176 let divergent_agents = states
177 .iter()
178 .filter(|state| state.known_events != *canonical)
179 .map(|state| state.id)
180 .collect::<Vec<_>>();
181
182 ConvergenceReport {
183 converged: divergent_agents.is_empty(),
184 divergent_agents,
185 canonical_event_count: canonical.len(),
186 }
187 }
188
189 #[must_use]
196 pub fn check_convergence(states: &[AgentState]) -> OracleResult {
197 if states.len() < 2 {
198 return OracleResult::pass();
199 }
200
201 let mut violations = Vec::new();
202
203 for i in 0..states.len() {
204 for j in (i + 1)..states.len() {
205 let a = &states[i];
206 let b = &states[j];
207
208 if a.known_events == b.known_events {
209 continue;
210 }
211
212 let only_in_a: Vec<u64> = a
213 .known_events
214 .difference(&b.known_events)
215 .copied()
216 .collect();
217 let only_in_b: Vec<u64> = b
218 .known_events
219 .difference(&a.known_events)
220 .copied()
221 .collect();
222
223 violations.push(InvariantViolation::Convergence {
224 agent_a: a.id,
225 agent_b: b.id,
226 only_in_a,
227 only_in_b,
228 });
229 }
230 }
231
232 if violations.is_empty() {
233 OracleResult::pass()
234 } else {
235 OracleResult::fail(violations)
236 }
237 }
238
239 #[must_use]
250 pub fn check_commutativity(
251 events: &[u64],
252 rng: &mut DeterministicRng,
253 iterations: usize,
254 ) -> OracleResult {
255 if events.len() < 2 || iterations == 0 {
256 return OracleResult::pass();
257 }
258
259 let canonical: BTreeSet<u64> = events.iter().copied().collect();
261
262 let mut violations = Vec::new();
263
264 for perm_idx in 0..iterations {
265 let shuffled = fisher_yates_shuffle(events, rng);
266 let result: BTreeSet<u64> = shuffled.iter().copied().collect();
267
268 if result != canonical {
269 let missing_events: Vec<u64> = canonical.difference(&result).copied().collect();
270 let extra_events: Vec<u64> = result.difference(&canonical).copied().collect();
271
272 violations.push(InvariantViolation::Commutativity {
273 permutation_index: perm_idx,
274 missing_events,
275 extra_events,
276 });
277 }
278 }
279
280 if violations.is_empty() {
281 OracleResult::pass()
282 } else {
283 OracleResult::fail(violations)
284 }
285 }
286
287 #[must_use]
294 pub fn check_idempotence(state: &AgentState, events: &[u64]) -> OracleResult {
295 let mut violations = Vec::new();
296
297 for &event_id in events {
298 let before = state.known_events.clone();
299
300 let mut after = before.clone();
302 after.insert(event_id);
303
304 if before != after {
305 violations.push(InvariantViolation::Idempotence {
306 event_id,
307 events_before: before.iter().copied().collect(),
308 events_after_dup: after.iter().copied().collect(),
309 });
310 }
311 }
312
313 if violations.is_empty() {
314 OracleResult::pass()
315 } else {
316 OracleResult::fail(violations)
317 }
318 }
319
320 #[must_use]
329 pub fn check_causality(states: &[AgentState]) -> OracleResult {
330 let mut violations = Vec::new();
331
332 for state in states {
333 let mut per_source: std::collections::BTreeMap<usize, Vec<u64>> =
335 std::collections::BTreeMap::new();
336
337 for &event_id in &state.known_events {
338 let source = decode_source(event_id);
339 let seq = decode_seq(event_id);
340 per_source.entry(source).or_default().push(seq);
341 }
342
343 for (source_agent, mut seqs) in per_source {
344 seqs.sort_unstable();
345
346 for window in seqs.windows(2) {
348 let prev = window[0];
349 let next = window[1];
350
351 if next != prev.saturating_add(1) {
352 let missing_seq = prev.saturating_add(1);
354 violations.push(InvariantViolation::CausalConsistency {
355 observer_agent: state.id,
356 source_agent,
357 missing_seq,
358 present_higher_seq: next,
359 });
360 }
361 }
362
363 if let Some(&first) = seqs.first()
365 && first != 0
366 {
367 violations.push(InvariantViolation::CausalConsistency {
368 observer_agent: state.id,
369 source_agent,
370 missing_seq: 0,
371 present_higher_seq: first,
372 });
373 }
374 }
375 }
376
377 if violations.is_empty() {
378 OracleResult::pass()
379 } else {
380 OracleResult::fail(violations)
381 }
382 }
383
384 #[must_use]
396 pub fn check_triage_stability(
397 states: &[AgentState],
398 total_events: usize,
399 epsilon: f64,
400 ) -> OracleResult {
401 if states.len() < 2 {
402 return OracleResult::pass();
403 }
404
405 let scores: Vec<f64> = states
406 .iter()
407 .map(|s| triage_score(s.known_events.len(), total_events))
408 .collect();
409
410 let mut violations = Vec::new();
411
412 for i in 0..scores.len() {
413 for j in (i + 1)..scores.len() {
414 let diff = (scores[i] - scores[j]).abs();
415 if diff > epsilon {
416 violations.push(InvariantViolation::TriageStability {
417 agent_a: states[i].id,
418 agent_b: states[j].id,
419 score_a: scores[i],
420 score_b: scores[j],
421 diff,
422 epsilon,
423 });
424 }
425 }
426 }
427
428 if violations.is_empty() {
429 OracleResult::pass()
430 } else {
431 OracleResult::fail(violations)
432 }
433 }
434
435 #[must_use]
444 pub fn check_all(
445 states: &[AgentState],
446 events: &[u64],
447 rng: &mut DeterministicRng,
448 ) -> OracleResult {
449 let total_events = {
451 let mut union: BTreeSet<u64> = BTreeSet::new();
452 for s in states {
453 union.extend(s.known_events.iter().copied());
454 }
455 union.len()
456 };
457
458 let convergence = Self::check_convergence(states);
459
460 let commutativity = Self::check_commutativity(events, rng, 8);
461
462 let idempotence = states.first().map_or_else(OracleResult::pass, |first| {
464 Self::check_idempotence(first, events)
465 });
466
467 let causality = Self::check_causality(states);
468
469 let triage = Self::check_triage_stability(states, total_events, 1e-9);
471
472 convergence
473 .merge(commutativity)
474 .merge(idempotence)
475 .merge(causality)
476 .merge(triage)
477 }
478}
479
480#[inline]
484fn decode_source(event_id: u64) -> usize {
485 usize::try_from(event_id >> 32).unwrap_or(usize::MAX)
486}
487
488#[inline]
490const fn decode_seq(event_id: u64) -> u64 {
491 event_id & 0xFFFF_FFFF
492}
493
494#[inline]
496#[allow(clippy::cast_precision_loss)]
497fn triage_score(known: usize, total: usize) -> f64 {
498 if total == 0 {
499 0.0
500 } else {
501 known as f64 / total as f64
502 }
503}
504
505fn fisher_yates_shuffle(events: &[u64], rng: &mut DeterministicRng) -> Vec<u64> {
507 let mut v = events.to_vec();
508 let n = v.len();
509 for i in (1..n).rev() {
510 let j_u64 = rng.next_bounded(u64::try_from(i + 1).unwrap_or(1));
511 let j = usize::try_from(j_u64).unwrap_or(0);
512 v.swap(i, j);
513 }
514 v
515}
516
517#[cfg(test)]
520mod tests {
521 use std::collections::BTreeSet;
522
523 use super::*;
524 use crate::agent::AgentState;
525 use crate::rng::DeterministicRng;
526
527 fn make_state(id: usize, events: &[u64]) -> AgentState {
530 AgentState {
531 id,
532 known_events: events.iter().copied().collect(),
533 }
534 }
535
536 fn event_id(source: u64, seq: u64) -> u64 {
538 (source << 32) | (seq & 0xFFFF_FFFF)
539 }
540
541 #[test]
544 fn evaluate_all_same_reports_converged() {
545 let s0 = make_state(0, &[1, 2, 3]);
546 let s1 = make_state(1, &[1, 2, 3]);
547 let report = ConvergenceOracle::evaluate(&[s0, s1]);
548 assert!(report.converged);
549 assert!(report.divergent_agents.is_empty());
550 assert_eq!(report.canonical_event_count, 3);
551 }
552
553 #[test]
554 fn evaluate_divergent_reports_agents() {
555 let s0 = make_state(0, &[1, 2, 3]);
556 let s1 = make_state(1, &[1, 3]);
557 let report = ConvergenceOracle::evaluate(&[s0, s1]);
558 assert!(!report.converged);
559 assert_eq!(report.divergent_agents, vec![1]);
560 }
561
562 #[test]
563 fn evaluate_empty_slice_is_converged() {
564 let report = ConvergenceOracle::evaluate(&[]);
565 assert!(report.converged);
566 }
567
568 #[test]
571 fn check_convergence_identical_states_passes() {
572 let s0 = make_state(0, &[10, 20, 30]);
573 let s1 = make_state(1, &[10, 20, 30]);
574 let s2 = make_state(2, &[10, 20, 30]);
575 let result = ConvergenceOracle::check_convergence(&[s0, s1, s2]);
576 assert!(result.passed);
577 assert!(result.violations.is_empty());
578 }
579
580 #[test]
581 fn check_convergence_divergent_states_fails() {
582 let s0 = make_state(0, &[1, 2, 3]);
583 let s1 = make_state(1, &[1, 3]);
584 let result = ConvergenceOracle::check_convergence(&[s0, s1]);
585 assert!(!result.passed);
586 assert_eq!(result.violations.len(), 1);
587
588 match &result.violations[0] {
589 InvariantViolation::Convergence {
590 agent_a,
591 agent_b,
592 only_in_a,
593 only_in_b,
594 } => {
595 assert_eq!(*agent_a, 0);
596 assert_eq!(*agent_b, 1);
597 assert_eq!(only_in_a, &[2_u64]);
598 assert!(only_in_b.is_empty());
599 }
600 other => panic!("unexpected violation: {other:?}"),
601 }
602 }
603
604 #[test]
605 fn check_convergence_reports_all_divergent_pairs() {
606 let s0 = make_state(0, &[1, 2, 3]);
607 let s1 = make_state(1, &[1, 2]);
608 let s2 = make_state(2, &[1]);
609 let result = ConvergenceOracle::check_convergence(&[s0, s1, s2]);
610 assert!(!result.passed);
611 assert_eq!(result.violations.len(), 3);
613 }
614
615 #[test]
616 fn check_convergence_single_agent_passes() {
617 let s0 = make_state(0, &[1, 2, 3]);
618 let result = ConvergenceOracle::check_convergence(&[s0]);
619 assert!(result.passed);
620 }
621
622 #[test]
625 fn check_commutativity_set_union_always_passes() {
626 let events = [1_u64, 2, 3, 4, 5];
627 let mut rng = DeterministicRng::new(42);
628 let result = ConvergenceOracle::check_commutativity(&events, &mut rng, 16);
629 assert!(result.passed, "grow-only set is always commutative");
630 }
631
632 #[test]
633 fn check_commutativity_single_event_passes() {
634 let events = [7_u64];
635 let mut rng = DeterministicRng::new(1);
636 let result = ConvergenceOracle::check_commutativity(&events, &mut rng, 4);
637 assert!(result.passed);
638 }
639
640 #[test]
641 fn check_commutativity_empty_events_passes() {
642 let mut rng = DeterministicRng::new(0);
643 let result = ConvergenceOracle::check_commutativity(&[], &mut rng, 4);
644 assert!(result.passed);
645 }
646
647 #[test]
648 fn check_commutativity_detects_non_commutative_merge() {
649 let violation = InvariantViolation::Commutativity {
656 permutation_index: 0,
657 missing_events: vec![30],
658 extra_events: vec![10],
659 };
660 let result = OracleResult::fail(vec![violation]);
661 assert!(!result.passed);
662 assert_eq!(result.violations.len(), 1);
663 }
664
665 #[test]
668 fn check_idempotence_set_insert_is_always_idempotent() {
669 let events = [1_u64, 2, 3, 4, 5];
670 let state = make_state(0, &events);
671 let result = ConvergenceOracle::check_idempotence(&state, &events);
672 assert!(result.passed, "set insert is idempotent");
673 }
674
675 #[test]
676 fn check_idempotence_re_applying_unknown_event_is_ok() {
677 let state = make_state(0, &[1, 2, 3]);
681 let result = ConvergenceOracle::check_idempotence(&state, &[1, 2, 3]);
682 assert!(result.passed);
683 }
684
685 #[test]
686 fn check_idempotence_violation_produces_correct_diagnostic() {
687 let violation = InvariantViolation::Idempotence {
689 event_id: 42,
690 events_before: vec![1, 2, 3],
691 events_after_dup: vec![1, 2, 3, 42],
692 };
693 let result = OracleResult::fail(vec![violation]);
694 assert!(!result.passed);
695 match &result.violations[0] {
696 InvariantViolation::Idempotence {
697 event_id,
698 events_before,
699 events_after_dup,
700 } => {
701 assert_eq!(*event_id, 42);
702 assert_eq!(events_before, &[1, 2, 3]);
703 assert_eq!(events_after_dup, &[1, 2, 3, 42]);
704 }
705 other => panic!("unexpected violation: {other:?}"),
706 }
707 }
708
709 #[test]
712 fn check_causality_contiguous_sequences_pass() {
713 let e0 = event_id(0, 0);
715 let e1 = event_id(0, 1);
716 let e2 = event_id(0, 2);
717 let state = make_state(1, &[e0, e1, e2]);
719 let result = ConvergenceOracle::check_causality(&[state]);
720 assert!(result.passed);
721 }
722
723 #[test]
724 fn check_causality_gap_in_sequence_fails() {
725 let e0 = event_id(0, 0);
727 let e2 = event_id(0, 2);
728 let state = make_state(1, &[e0, e2]);
729 let result = ConvergenceOracle::check_causality(&[state]);
730 assert!(!result.passed);
731
732 match &result.violations[0] {
733 InvariantViolation::CausalConsistency {
734 observer_agent,
735 source_agent,
736 missing_seq,
737 present_higher_seq,
738 } => {
739 assert_eq!(*observer_agent, 1);
740 assert_eq!(*source_agent, 0);
741 assert_eq!(*missing_seq, 1);
742 assert_eq!(*present_higher_seq, 2);
743 }
744 other => panic!("unexpected: {other:?}"),
745 }
746 }
747
748 #[test]
749 fn check_causality_missing_seq_zero_fails() {
750 let e2 = event_id(0, 2);
752 let state = make_state(1, &[e2]);
753 let result = ConvergenceOracle::check_causality(&[state]);
754 assert!(!result.passed);
755
756 let has_missing_zero = result.violations.iter().any(|v| {
757 matches!(
758 v,
759 InvariantViolation::CausalConsistency { missing_seq: 0, .. }
760 )
761 });
762 assert!(has_missing_zero, "must report missing seq=0");
763 }
764
765 #[test]
766 fn check_causality_single_event_per_source_passes() {
767 let e0 = event_id(3, 0);
769 let state = make_state(0, &[e0]);
770 let result = ConvergenceOracle::check_causality(&[state]);
771 assert!(result.passed);
772 }
773
774 #[test]
775 fn check_causality_multiple_sources_independent() {
776 let a0 = event_id(0, 0);
778 let a1 = event_id(0, 1);
779 let b0 = event_id(1, 0);
780 let b1 = event_id(1, 1);
781 let state = make_state(2, &[a0, a1, b0, b1]);
782 let result = ConvergenceOracle::check_causality(&[state]);
783 assert!(result.passed);
784 }
785
786 #[test]
787 fn check_causality_violation_in_one_source_only() {
788 let a0 = event_id(0, 0);
790 let a1 = event_id(0, 1);
791 let b0 = event_id(1, 0);
792 let b2 = event_id(1, 2); let state = make_state(2, &[a0, a1, b0, b2]);
794 let result = ConvergenceOracle::check_causality(&[state]);
795 assert!(!result.passed);
796 assert_eq!(result.violations.len(), 1);
797 }
798
799 #[test]
802 fn check_triage_stability_identical_counts_pass() {
803 let s0 = make_state(0, &[1, 2, 3]);
805 let s1 = make_state(1, &[1, 2, 3]);
806 let result = ConvergenceOracle::check_triage_stability(&[s0, s1], 6, 1e-9);
807 assert!(result.passed);
808 }
809
810 #[test]
811 fn check_triage_stability_divergent_scores_fail() {
812 let s0 = make_state(0, &[1, 2, 3]);
814 let s1 = make_state(1, &[1]);
815 let result = ConvergenceOracle::check_triage_stability(&[s0, s1], 6, 0.01);
816 assert!(!result.passed);
817 match &result.violations[0] {
818 InvariantViolation::TriageStability {
819 agent_a,
820 agent_b,
821 score_a,
822 score_b,
823 diff,
824 epsilon,
825 } => {
826 assert_eq!(*agent_a, 0);
827 assert_eq!(*agent_b, 1);
828 assert!((score_a - 0.5).abs() < 1e-9);
829 assert!((score_b - (1.0 / 6.0)).abs() < 1e-9);
830 assert!(*diff > *epsilon);
831 }
832 other => panic!("unexpected: {other:?}"),
833 }
834 }
835
836 #[test]
837 fn check_triage_stability_zero_total_events_passes() {
838 let s0 = make_state(0, &[]);
839 let s1 = make_state(1, &[]);
840 let result = ConvergenceOracle::check_triage_stability(&[s0, s1], 0, 1e-9);
841 assert!(result.passed);
842 }
843
844 #[test]
845 fn check_triage_stability_within_loose_epsilon_passes() {
846 let s0 = make_state(0, &[1, 2]);
848 let s1 = make_state(1, &[1, 2, 3]);
849 let result = ConvergenceOracle::check_triage_stability(&[s0, s1], 10, 0.2);
850 assert!(result.passed);
851 }
852
853 #[test]
856 fn check_all_fully_converged_passes_every_invariant() {
857 let e = |src: u64, seq: u64| event_id(src, seq);
859 let all = [e(0, 0), e(0, 1), e(1, 0), e(1, 1), e(2, 0)];
860 let s0 = make_state(0, &all);
861 let s1 = make_state(1, &all);
862 let s2 = make_state(2, &all);
863
864 let mut rng = DeterministicRng::new(77);
865 let result = ConvergenceOracle::check_all(&[s0, s1, s2], &all, &mut rng);
866 assert!(
867 result.passed,
868 "all invariants must pass; violations: {:?}",
869 result.violations
870 );
871 }
872
873 #[test]
874 fn check_all_divergent_states_fails_convergence() {
875 let all = [1_u64, 2, 3, 4];
876 let s0 = make_state(0, &all);
877 let s1 = make_state(1, &[1, 2]); let mut rng = DeterministicRng::new(7);
879 let result = ConvergenceOracle::check_all(&[s0, s1], &all, &mut rng);
880 assert!(!result.passed);
881 let has_conv = result
882 .violations
883 .iter()
884 .any(|v| matches!(v, InvariantViolation::Convergence { .. }));
885 assert!(has_conv);
886 }
887
888 #[test]
889 fn check_all_causal_violation_fails_causality() {
890 let e = |src: u64, seq: u64| event_id(src, seq);
891 let state = make_state(0, &[e(0, 0), e(1, 0), e(1, 2)]);
893 let events: Vec<u64> = state.known_events.iter().copied().collect();
894 let mut rng = DeterministicRng::new(3);
895 let result = ConvergenceOracle::check_all(&[state], &events, &mut rng);
896 assert!(!result.passed);
897 let has_causal = result
898 .violations
899 .iter()
900 .any(|v| matches!(v, InvariantViolation::CausalConsistency { .. }));
901 assert!(has_causal);
902 }
903
904 #[test]
907 fn fisher_yates_preserves_all_elements() {
908 let events = [1_u64, 2, 3, 4, 5, 6, 7, 8];
909 let mut rng = DeterministicRng::new(99);
910 let shuffled = fisher_yates_shuffle(&events, &mut rng);
911 assert_eq!(shuffled.len(), events.len());
912 let orig_set: BTreeSet<u64> = events.iter().copied().collect();
913 let shuf_set: BTreeSet<u64> = shuffled.iter().copied().collect();
914 assert_eq!(orig_set, shuf_set);
915 }
916
917 #[test]
918 fn different_seeds_produce_different_shuffles() {
919 let events = [1_u64, 2, 3, 4, 5, 6, 7, 8, 9, 10];
920 let mut r1 = DeterministicRng::new(10);
921 let mut r2 = DeterministicRng::new(20);
922 let s1 = fisher_yates_shuffle(&events, &mut r1);
923 let s2 = fisher_yates_shuffle(&events, &mut r2);
924 assert_ne!(s1, s2);
927 }
928}