1use std::cmp::Ordering;
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::time::{Duration, Instant};
10
11use serde::{Deserialize, Serialize};
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::REPAIR_HINT_MIN_AGE;
15use saorsa_core::identity::PeerId;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum VerificationState {
28 OfferReceived,
30 PendingVerify,
32 QuorumVerified,
35 PaidListVerified,
38 QueuedForFetch,
40 Fetching,
42 Stored,
44 FetchRetryable,
46 FetchAbandoned,
48 QuorumFailed,
51 QuorumInconclusive,
53 QuorumAbandoned,
55 Idle,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
66pub enum HintPipeline {
67 Replica,
69 PaidOnly,
72}
73
74#[derive(Debug, Clone)]
83pub struct VerificationEntry {
84 pub state: VerificationState,
86 pub pipeline: HintPipeline,
88 pub verified_sources: Vec<PeerId>,
91 pub tried_sources: HashSet<PeerId>,
93 pub created_at: Instant,
95 pub hint_sender: PeerId,
97}
98
99#[derive(Debug, Clone)]
109pub struct FetchCandidate {
110 pub key: XorName,
112 pub distance: XorName,
114 pub sources: Vec<PeerId>,
116}
117
118impl Eq for FetchCandidate {}
119
120impl PartialEq for FetchCandidate {
121 fn eq(&self, other: &Self) -> bool {
122 self.distance == other.distance && self.key == other.key
123 }
124}
125
126impl Ord for FetchCandidate {
127 fn cmp(&self, other: &Self) -> Ordering {
128 other
131 .distance
132 .cmp(&self.distance)
133 .then_with(|| self.key.cmp(&other.key))
134 }
135}
136
137impl PartialOrd for FetchCandidate {
138 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139 Some(self.cmp(other))
140 }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
149pub enum PresenceEvidence {
150 Present,
152 Absent,
154 Unresolved,
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
160pub enum PaidListEvidence {
161 Confirmed,
163 NotFound,
165 Unresolved,
167}
168
169#[derive(Debug, Clone)]
172pub struct KeyVerificationEvidence {
173 pub presence: HashMap<PeerId, PresenceEvidence>,
175 pub paid_list: HashMap<PeerId, PaidListEvidence>,
177}
178
179#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
186pub struct AuditFailureSummary {
187 pub challenged_keys: usize,
189 pub failed_keys: usize,
191 pub absent_keys: usize,
193 pub digest_mismatch_keys: usize,
196}
197
198#[derive(Debug, Clone)]
200pub enum FailureEvidence {
201 ReplicationFailure {
203 peer: PeerId,
205 key: XorName,
207 },
208 AuditFailure {
210 challenge_id: u64,
212 challenged_peer: PeerId,
214 confirmed_failed_keys: Vec<XorName>,
216 summary: AuditFailureSummary,
218 reason: AuditFailureReason,
220 },
221 BootstrapClaimAbuse {
223 peer: PeerId,
225 first_seen: Instant,
227 },
228}
229
230#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
232pub enum AuditFailureReason {
233 Timeout,
235 MalformedResponse,
237 DigestMismatch,
239 KeyAbsent,
241 Rejected,
243}
244
245#[derive(Debug, Clone)]
251pub struct PeerSyncRecord {
252 pub last_sync: Option<Instant>,
254 pub cycles_since_sync: u32,
257}
258
259impl PeerSyncRecord {
260 #[must_use]
263 pub fn has_repair_opportunity(&self) -> bool {
264 self.last_sync.is_some() && self.cycles_since_sync >= 1
265 }
266}
267
268#[derive(Debug, Clone)]
274struct RepairProof {
275 hinted_at_epoch: u64,
277 hinted_at: Instant,
279}
280
281#[derive(Debug, Clone)]
284struct RepairProofEntry {
285 close_peers: HashSet<PeerId>,
287 peer_proofs: HashMap<PeerId, RepairProof>,
289}
290
291impl RepairProofEntry {
292 fn new(close_peers: HashSet<PeerId>) -> Self {
293 Self {
294 close_peers,
295 peer_proofs: HashMap::new(),
296 }
297 }
298}
299
300#[derive(Debug, Clone, Default)]
306pub struct RepairProofs {
307 proofs_by_key: HashMap<XorName, RepairProofEntry>,
309}
310
311impl RepairProofs {
312 #[must_use]
314 pub fn new() -> Self {
315 Self::default()
316 }
317
318 pub fn record_replica_hint_sent(
326 &mut self,
327 peer: PeerId,
328 key: XorName,
329 current_close_peers: &HashSet<PeerId>,
330 hinted_at_epoch: u64,
331 ) -> bool {
332 self.insert_replica_hint_sent(
333 peer,
334 key,
335 current_close_peers,
336 hinted_at_epoch,
337 Instant::now(),
338 )
339 }
340
341 #[cfg(any(test, feature = "test-utils"))]
348 pub fn record_replica_hint_sent_at(
349 &mut self,
350 peer: PeerId,
351 key: XorName,
352 current_close_peers: &HashSet<PeerId>,
353 hinted_at_epoch: u64,
354 hinted_at: Instant,
355 ) -> bool {
356 self.insert_replica_hint_sent(peer, key, current_close_peers, hinted_at_epoch, hinted_at)
357 }
358
359 fn insert_replica_hint_sent(
360 &mut self,
361 peer: PeerId,
362 key: XorName,
363 current_close_peers: &HashSet<PeerId>,
364 hinted_at_epoch: u64,
365 hinted_at: Instant,
366 ) -> bool {
367 self.reconcile_key_close_group(&key, current_close_peers);
368
369 if !current_close_peers.contains(&peer) {
370 return false;
371 }
372
373 let entry = self
374 .proofs_by_key
375 .entry(key)
376 .or_insert_with(|| RepairProofEntry::new(current_close_peers.clone()));
377
378 if entry.peer_proofs.contains_key(&peer) {
379 return false;
380 }
381
382 entry.peer_proofs.insert(
383 peer,
384 RepairProof {
385 hinted_at_epoch,
386 hinted_at,
387 },
388 );
389 true
390 }
391
392 pub fn has_mature_replica_hint(
399 &mut self,
400 peer: &PeerId,
401 key: &XorName,
402 current_close_peers: &HashSet<PeerId>,
403 current_epoch: u64,
404 now: Instant,
405 ) -> bool {
406 self.reconcile_key_close_group(key, current_close_peers);
407
408 self.proofs_by_key
409 .get(key)
410 .and_then(|entry| entry.peer_proofs.get(peer))
411 .is_some_and(|proof| {
412 proof.hinted_at_epoch < current_epoch
413 && now.saturating_duration_since(proof.hinted_at) >= REPAIR_HINT_MIN_AGE
414 })
415 }
416
417 pub fn remove_key(&mut self, key: &XorName) {
419 self.proofs_by_key.remove(key);
420 }
421
422 pub fn remove_peer(&mut self, peer: &PeerId) {
424 self.proofs_by_key.retain(|_, entry| {
425 entry.peer_proofs.remove(peer);
426 !entry.peer_proofs.is_empty()
427 });
428 }
429
430 fn reconcile_key_close_group(&mut self, key: &XorName, current_close_peers: &HashSet<PeerId>) {
431 let should_remove = if let Some(entry) = self.proofs_by_key.get_mut(key) {
432 if entry.close_peers == *current_close_peers {
433 return;
434 }
435
436 entry.close_peers.clone_from(current_close_peers);
437 entry
438 .peer_proofs
439 .retain(|peer, _| current_close_peers.contains(peer));
440 entry.peer_proofs.is_empty()
441 } else {
442 false
443 };
444
445 if should_remove {
446 self.proofs_by_key.remove(key);
447 }
448 }
449}
450
451#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum BootstrapClaimObservation {
458 WithinGrace {
460 first_seen: Instant,
462 },
463 PastGrace {
465 first_seen: Instant,
467 },
468 Repeated {
470 first_seen: Instant,
472 },
473}
474
475#[derive(Debug)]
481pub struct NeighborSyncState {
482 pub priority_order: VecDeque<PeerId>,
489 pub order: Vec<PeerId>,
491 pub cursor: usize,
493 pub last_sync_times: HashMap<PeerId, Instant>,
495 pub bootstrap_claims: HashMap<PeerId, Instant>,
501 pub bootstrap_claim_history: HashMap<PeerId, Instant>,
509 pub prune_cursor: usize,
512}
513
514impl NeighborSyncState {
515 #[must_use]
517 pub fn new_cycle(close_neighbors: Vec<PeerId>) -> Self {
518 Self {
519 priority_order: VecDeque::new(),
520 order: close_neighbors,
521 cursor: 0,
522 last_sync_times: HashMap::new(),
523 bootstrap_claims: HashMap::new(),
524 bootstrap_claim_history: HashMap::new(),
525 prune_cursor: 0,
526 }
527 }
528
529 #[must_use]
536 pub fn observe_bootstrap_claim(
537 &mut self,
538 peer: PeerId,
539 now: Instant,
540 grace_period: Duration,
541 ) -> BootstrapClaimObservation {
542 if let Some(first_seen) = self.bootstrap_claims.get(&peer).copied() {
543 if now.duration_since(first_seen) > grace_period {
544 BootstrapClaimObservation::PastGrace { first_seen }
545 } else {
546 BootstrapClaimObservation::WithinGrace { first_seen }
547 }
548 } else if let Some(first_seen) = self.bootstrap_claim_history.get(&peer).copied() {
549 BootstrapClaimObservation::Repeated { first_seen }
550 } else {
551 self.bootstrap_claims.insert(peer, now);
552 self.bootstrap_claim_history.insert(peer, now);
553 BootstrapClaimObservation::WithinGrace { first_seen: now }
554 }
555 }
556
557 pub fn clear_active_bootstrap_claim(&mut self, peer: &PeerId) -> bool {
559 self.bootstrap_claims.remove(peer).is_some()
560 }
561
562 pub fn queue_priority_peers<I>(&mut self, peers: I) -> usize
567 where
568 I: IntoIterator<Item = PeerId>,
569 {
570 let mut queued = 0;
571 for peer in peers {
572 if self.priority_order.contains(&peer) {
573 continue;
574 }
575 self.priority_order.push_back(peer);
576 queued += 1;
577 }
578 queued
579 }
580
581 pub fn retain_sync_peers(&mut self, close_peers: &HashSet<PeerId>) -> usize {
586 let old_priority_len = self.priority_order.len();
587 self.priority_order
588 .retain(|peer| close_peers.contains(peer));
589
590 let old_order_len = self.order.len();
591 let old_cursor = self.cursor;
592 let mut retained_before_cursor = 0;
593 let mut retained_order = Vec::with_capacity(old_order_len);
594 for (idx, peer) in self.order.drain(..).enumerate() {
595 if close_peers.contains(&peer) {
596 if idx < old_cursor {
597 retained_before_cursor += 1;
598 }
599 retained_order.push(peer);
600 }
601 }
602
603 self.order = retained_order;
604 self.cursor = retained_before_cursor;
605
606 (old_priority_len - self.priority_order.len()) + (old_order_len - self.order.len())
607 }
608
609 pub fn remove_peer(&mut self, peer: &PeerId) -> bool {
611 let old_priority_len = self.priority_order.len();
612 self.priority_order.retain(|queued| queued != peer);
613
614 let old_order_len = self.order.len();
615 if let Some(pos) = self.order.iter().position(|queued| queued == peer) {
616 self.order.remove(pos);
617 if pos < self.cursor {
618 self.cursor = self.cursor.saturating_sub(1);
619 }
620 }
621
622 old_priority_len != self.priority_order.len() || old_order_len != self.order.len()
623 }
624
625 #[must_use]
627 pub fn is_cycle_complete(&self) -> bool {
628 self.priority_order.is_empty() && self.cursor >= self.order.len()
629 }
630}
631
632#[derive(Debug)]
638pub struct BootstrapState {
639 pub drained: bool,
641 pub pending_peer_requests: usize,
643 pub pending_keys: HashSet<XorName>,
646 pub capacity_rejected_sources: HashSet<PeerId>,
657}
658
659impl BootstrapState {
660 #[must_use]
662 pub fn new() -> Self {
663 Self {
664 drained: false,
665 pending_peer_requests: 0,
666 pending_keys: HashSet::new(),
667 capacity_rejected_sources: HashSet::new(),
668 }
669 }
670
671 #[must_use]
678 pub fn is_drained(&self) -> bool {
679 self.drained
680 }
681
682 pub fn remove_key(&mut self, key: &XorName) {
688 self.pending_keys.remove(key);
689 }
690}
691
692impl Default for BootstrapState {
693 fn default() -> Self {
694 Self::new()
695 }
696}
697
698#[cfg(test)]
703mod tests {
704 use std::collections::BinaryHeap;
705
706 use super::*;
707
708 fn peer_id_from_byte(b: u8) -> PeerId {
710 let mut bytes = [0u8; 32];
711 bytes[0] = b;
712 PeerId::from_bytes(bytes)
713 }
714
715 fn mature_hint_times() -> (Instant, Instant) {
716 let hinted_at = Instant::now();
717 let now = hinted_at
718 .checked_add(REPAIR_HINT_MIN_AGE)
719 .unwrap_or(hinted_at);
720 (hinted_at, now)
721 }
722
723 #[test]
726 fn fetch_candidate_nearest_key_has_highest_priority() {
727 let near = FetchCandidate {
728 key: [1u8; 32],
729 distance: [
730 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
731 0, 0, 0, 0,
732 ],
733 sources: vec![peer_id_from_byte(1)],
734 };
735
736 let far = FetchCandidate {
737 key: [2u8; 32],
738 distance: [
739 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
740 0, 0, 0, 0, 0,
741 ],
742 sources: vec![peer_id_from_byte(2)],
743 };
744
745 assert!(near > far, "nearer candidate should compare greater");
748
749 let mut heap = BinaryHeap::new();
750 heap.push(far.clone());
751 heap.push(near.clone());
752
753 assert_eq!(heap.len(), 2, "heap should contain both candidates");
754
755 let first = heap.pop();
756 assert!(first.is_some(), "first pop should succeed");
757 assert_eq!(
758 first.map(|c| c.key),
759 Some(near.key),
760 "nearest key should pop first"
761 );
762
763 let second = heap.pop();
764 assert!(second.is_some(), "second pop should succeed");
765 assert_eq!(
766 second.map(|c| c.key),
767 Some(far.key),
768 "farthest key should pop second"
769 );
770 }
771
772 #[test]
773 fn fetch_candidate_same_distance_and_key_is_equal() {
774 let a = FetchCandidate {
775 key: [1u8; 32],
776 distance: [5u8; 32],
777 sources: vec![],
778 };
779
780 let b = FetchCandidate {
781 key: [1u8; 32],
782 distance: [5u8; 32],
783 sources: vec![],
784 };
785
786 assert_eq!(
787 a.cmp(&b),
788 Ordering::Equal,
789 "same distance + same key should yield Equal"
790 );
791 assert_eq!(a, b, "PartialEq must agree with Ord");
792 }
793
794 #[test]
795 fn fetch_candidate_same_distance_different_key_is_deterministic() {
796 let a = FetchCandidate {
797 key: [1u8; 32],
798 distance: [5u8; 32],
799 sources: vec![],
800 };
801
802 let b = FetchCandidate {
803 key: [2u8; 32],
804 distance: [5u8; 32],
805 sources: vec![],
806 };
807
808 assert_ne!(
809 a.cmp(&b),
810 Ordering::Equal,
811 "same distance + different key must not be Equal"
812 );
813 assert_ne!(a, b, "PartialEq must agree with Ord");
814 }
815
816 #[test]
819 fn peer_sync_record_no_sync_yet() {
820 let record = PeerSyncRecord {
821 last_sync: None,
822 cycles_since_sync: 0,
823 };
824 assert!(
825 !record.has_repair_opportunity(),
826 "never-synced peer has no repair opportunity"
827 );
828 }
829
830 #[test]
831 fn peer_sync_record_synced_but_no_cycle() {
832 let record = PeerSyncRecord {
833 last_sync: Some(Instant::now()),
834 cycles_since_sync: 0,
835 };
836 assert!(
837 !record.has_repair_opportunity(),
838 "synced peer with zero subsequent cycles has no repair opportunity"
839 );
840 }
841
842 #[test]
843 fn peer_sync_record_synced_with_cycle() {
844 let record = PeerSyncRecord {
845 last_sync: Some(Instant::now()),
846 cycles_since_sync: 1,
847 };
848 assert!(
849 record.has_repair_opportunity(),
850 "synced peer with >= 1 cycle should have repair opportunity"
851 );
852 }
853
854 #[test]
855 fn peer_sync_record_no_sync_many_cycles() {
856 let record = PeerSyncRecord {
857 last_sync: None,
858 cycles_since_sync: 10,
859 };
860 assert!(
861 !record.has_repair_opportunity(),
862 "never-synced peer has no repair opportunity regardless of cycle count"
863 );
864 }
865
866 #[test]
869 fn repair_proofs_record_sent_hint_for_close_peer() {
870 const HINT_EPOCH: u64 = 7;
871 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
872
873 let key = [0xA1; 32];
874 let peer = peer_id_from_byte(1);
875 let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
876 let mut proofs = RepairProofs::new();
877 let (hinted_at, now) = mature_hint_times();
878
879 assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at,));
880
881 assert!(
882 proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
883 "old sent hint should make key auditable for that peer"
884 );
885 }
886
887 #[test]
888 fn repair_proofs_reject_peer_outside_current_close_group() {
889 const HINT_EPOCH: u64 = 7;
890 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
891
892 let key = [0xA2; 32];
893 let peer = peer_id_from_byte(1);
894 let close_peers = HashSet::from([peer_id_from_byte(2), peer_id_from_byte(3)]);
895 let mut proofs = RepairProofs::new();
896 let (hinted_at, now) = mature_hint_times();
897
898 assert!(!proofs.record_replica_hint_sent_at(
899 peer,
900 key,
901 &close_peers,
902 HINT_EPOCH,
903 hinted_at,
904 ));
905
906 assert!(
907 !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
908 "peers outside current close group must not get repair proof"
909 );
910 }
911
912 #[test]
913 fn repair_proofs_require_later_epoch() {
914 const HINT_EPOCH: u64 = 7;
915 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
916
917 let key = [0xA3; 32];
918 let peer = peer_id_from_byte(1);
919 let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
920 let mut proofs = RepairProofs::new();
921 let (hinted_at, now) = mature_hint_times();
922
923 assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at,));
924
925 assert!(
926 !proofs.has_mature_replica_hint(&peer, &key, &close_peers, HINT_EPOCH, now),
927 "same-cycle proof should not be audit-eligible"
928 );
929 assert!(
930 proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
931 "old proof should mature after a later local sync-cycle epoch"
932 );
933 }
934
935 #[test]
936 fn repair_proofs_require_min_hint_age() {
937 const HINT_EPOCH: u64 = 7;
938 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
939
940 let key = [0xA8; 32];
941 let peer = peer_id_from_byte(1);
942 let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
943 let mut proofs = RepairProofs::new();
944 let hinted_at = Instant::now();
945
946 assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at));
947
948 assert!(
949 !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, hinted_at),
950 "fresh repair hints should not be audit-eligible"
951 );
952 assert!(
953 proofs.has_mature_replica_hint(
954 &peer,
955 &key,
956 &close_peers,
957 CURRENT_EPOCH,
958 hinted_at
959 .checked_add(REPAIR_HINT_MIN_AGE)
960 .unwrap_or(hinted_at),
961 ),
962 "repair hints should mature once they are at least the minimum age"
963 );
964 }
965
966 #[test]
967 fn repair_proofs_repeated_hint_does_not_reset_maturity() {
968 const HINT_EPOCH: u64 = 7;
969 const REPEATED_HINT_EPOCH: u64 = HINT_EPOCH + 1;
970
971 let key = [0xA5; 32];
972 let peer = peer_id_from_byte(1);
973 let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
974 let mut proofs = RepairProofs::new();
975 let (hinted_at, now) = mature_hint_times();
976
977 assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at,));
978 assert!(
979 !proofs.record_replica_hint_sent_at(peer, key, &close_peers, REPEATED_HINT_EPOCH, now),
980 "duplicate hint in the same close group should keep existing proof"
981 );
982 assert!(
983 proofs.has_mature_replica_hint(&peer, &key, &close_peers, REPEATED_HINT_EPOCH, now),
984 "duplicate hint must not reset an already mature proof"
985 );
986 }
987
988 #[test]
989 fn repair_proofs_retain_stable_peers_on_close_group_change() {
990 const HINT_EPOCH: u64 = 7;
991 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
992
993 let key = [0xA7; 32];
994 let stable_peer = peer_id_from_byte(1);
995 let departing_peer = peer_id_from_byte(2);
996 let retained_peer = peer_id_from_byte(3);
997 let new_peer = peer_id_from_byte(4);
998 let old_group = HashSet::from([stable_peer, departing_peer, retained_peer]);
999 let changed_group = HashSet::from([stable_peer, retained_peer, new_peer]);
1000 let mut proofs = RepairProofs::new();
1001 let (hinted_at, now) = mature_hint_times();
1002
1003 assert!(proofs.record_replica_hint_sent_at(
1004 stable_peer,
1005 key,
1006 &old_group,
1007 HINT_EPOCH,
1008 hinted_at,
1009 ));
1010 assert!(proofs.record_replica_hint_sent_at(
1011 departing_peer,
1012 key,
1013 &old_group,
1014 HINT_EPOCH,
1015 hinted_at,
1016 ));
1017
1018 assert!(
1019 proofs.has_mature_replica_hint(&stable_peer, &key, &changed_group, CURRENT_EPOCH, now),
1020 "stable peers should keep mature repair proofs across unrelated close-group churn"
1021 );
1022 assert!(
1023 !proofs.has_mature_replica_hint(
1024 &departing_peer,
1025 &key,
1026 &changed_group,
1027 CURRENT_EPOCH,
1028 now,
1029 ),
1030 "peers that left the close group should lose repair proofs"
1031 );
1032 assert!(
1033 !proofs.has_mature_replica_hint(&new_peer, &key, &changed_group, CURRENT_EPOCH, now),
1034 "new close-group peers need their own repair hint before auditing"
1035 );
1036 }
1037
1038 #[test]
1039 fn repair_proofs_evicted_peer_reentry_requires_fresh_hint() {
1040 const FIRST_HINT_EPOCH: u64 = 7;
1041 const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1;
1042 const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1;
1043
1044 let key = [0xA3; 32];
1045 let returning_peer = peer_id_from_byte(1);
1046 let new_peer = peer_id_from_byte(4);
1047 let old_group = HashSet::from([returning_peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
1048 let changed_group = HashSet::from([new_peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
1049 let mut proofs = RepairProofs::new();
1050 let (hinted_at, now) = mature_hint_times();
1051
1052 assert!(proofs.record_replica_hint_sent_at(
1053 returning_peer,
1054 key,
1055 &old_group,
1056 FIRST_HINT_EPOCH,
1057 hinted_at,
1058 ));
1059
1060 assert!(
1061 !proofs.has_mature_replica_hint(
1062 &new_peer,
1063 &key,
1064 &changed_group,
1065 SECOND_HINT_EPOCH,
1066 now
1067 ),
1068 "new close-group peer should not inherit another peer's repair proof"
1069 );
1070 assert!(
1071 !proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH, now),
1072 "a peer that re-enters must receive a fresh repair hint"
1073 );
1074
1075 assert!(proofs.record_replica_hint_sent_at(
1076 returning_peer,
1077 key,
1078 &old_group,
1079 SECOND_HINT_EPOCH,
1080 hinted_at,
1081 ));
1082 assert!(
1083 proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH, now),
1084 "fresh repair hint after re-entry should be eligible once mature"
1085 );
1086 }
1087
1088 #[test]
1089 fn repair_proofs_remove_peer_requires_fresh_hint_after_reentry() {
1090 const FIRST_HINT_EPOCH: u64 = 7;
1091 const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1;
1092 const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1;
1093
1094 let key = [0xA6; 32];
1095 let peer = peer_id_from_byte(1);
1096 let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
1097 let mut proofs = RepairProofs::new();
1098 let (hinted_at, now) = mature_hint_times();
1099
1100 assert!(proofs.record_replica_hint_sent_at(
1101 peer,
1102 key,
1103 &close_peers,
1104 FIRST_HINT_EPOCH,
1105 hinted_at,
1106 ));
1107 proofs.remove_peer(&peer);
1108
1109 assert!(
1110 !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
1111 "routing-table removal should clear proof even if peer re-enters same close group"
1112 );
1113
1114 assert!(proofs.record_replica_hint_sent_at(
1115 peer,
1116 key,
1117 &close_peers,
1118 SECOND_HINT_EPOCH,
1119 hinted_at,
1120 ));
1121 assert!(
1122 proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
1123 "fresh hint after re-entry should become eligible after a later epoch"
1124 );
1125 }
1126
1127 #[test]
1128 fn repair_proofs_remove_key_clears_all_peer_entries() {
1129 const HINT_EPOCH: u64 = 7;
1130 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1131
1132 let key = [0xA4; 32];
1133 let peer = peer_id_from_byte(1);
1134 let close_peers = HashSet::from([peer]);
1135 let mut proofs = RepairProofs::new();
1136 let (hinted_at, now) = mature_hint_times();
1137
1138 assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at,));
1139 proofs.remove_key(&key);
1140
1141 assert!(
1142 !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
1143 "deleted local key should not retain repair proof entries"
1144 );
1145 }
1146
1147 #[test]
1150 fn neighbor_sync_empty_cycle_is_immediately_complete() {
1151 let state = NeighborSyncState::new_cycle(vec![]);
1152 assert!(
1153 state.is_cycle_complete(),
1154 "empty neighbor list means cycle is complete"
1155 );
1156 }
1157
1158 #[test]
1159 fn neighbor_sync_new_cycle_not_complete() {
1160 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1161 let state = NeighborSyncState::new_cycle(peers);
1162 assert!(
1163 !state.is_cycle_complete(),
1164 "fresh cycle with peers should not be complete"
1165 );
1166 }
1167
1168 #[test]
1169 fn neighbor_sync_cycle_completes_when_cursor_reaches_end() {
1170 let peers = vec![
1171 peer_id_from_byte(1),
1172 peer_id_from_byte(2),
1173 peer_id_from_byte(3),
1174 ];
1175 let mut state = NeighborSyncState::new_cycle(peers);
1176
1177 state.cursor = 2;
1179 assert!(
1180 !state.is_cycle_complete(),
1181 "cursor at len-1 should not be complete"
1182 );
1183
1184 state.cursor = 3;
1185 assert!(
1186 state.is_cycle_complete(),
1187 "cursor at len should be complete"
1188 );
1189 }
1190
1191 #[test]
1192 fn neighbor_sync_cursor_past_end_is_still_complete() {
1193 let peers = vec![peer_id_from_byte(1)];
1194 let mut state = NeighborSyncState::new_cycle(peers);
1195 state.cursor = 5;
1196 assert!(
1197 state.is_cycle_complete(),
1198 "cursor past end should still report complete"
1199 );
1200 }
1201
1202 #[test]
1203 fn neighbor_sync_priority_queue_blocks_cycle_completion() {
1204 let peer = peer_id_from_byte(2);
1205 let mut state = NeighborSyncState::new_cycle(Vec::new());
1206
1207 assert!(state.is_cycle_complete());
1208 assert_eq!(state.queue_priority_peers([peer]), 1);
1209 assert!(
1210 !state.is_cycle_complete(),
1211 "pending priority peers must sync before the cycle completes"
1212 );
1213 }
1214
1215 #[test]
1216 fn neighbor_sync_priority_queue_deduplicates_peers() {
1217 let peer = peer_id_from_byte(3);
1218 let mut state = NeighborSyncState::new_cycle(Vec::new());
1219
1220 assert_eq!(state.queue_priority_peers([peer, peer]), 1);
1221 assert_eq!(state.priority_order.len(), 1);
1222 }
1223
1224 #[test]
1225 fn neighbor_sync_remove_peer_clears_order_and_priority_queue() {
1226 let peer = peer_id_from_byte(4);
1227 let retained = peer_id_from_byte(5);
1228 let mut state = NeighborSyncState::new_cycle(vec![peer, retained]);
1229 assert_eq!(state.queue_priority_peers([peer]), 1);
1230 state.cursor = 1;
1231
1232 assert!(state.remove_peer(&peer));
1233
1234 assert!(!state.order.contains(&peer));
1235 assert!(!state.priority_order.contains(&peer));
1236 assert_eq!(state.order, vec![retained]);
1237 assert_eq!(state.cursor, 0);
1238 }
1239
1240 #[test]
1241 fn neighbor_sync_retain_sync_peers_prunes_only_departed_peers() {
1242 let already_scanned = peer_id_from_byte(1);
1243 let stable_scanned = peer_id_from_byte(2);
1244 let departed_priority = peer_id_from_byte(3);
1245 let stable_unscanned = peer_id_from_byte(4);
1246 let stable_priority = peer_id_from_byte(5);
1247 let mut state = NeighborSyncState::new_cycle(vec![
1248 already_scanned,
1249 stable_scanned,
1250 departed_priority,
1251 stable_unscanned,
1252 ]);
1253 assert_eq!(
1254 state.queue_priority_peers([departed_priority, stable_priority]),
1255 2
1256 );
1257 state.cursor = 2;
1258 let close_peers = HashSet::from([stable_scanned, stable_unscanned, stable_priority]);
1259
1260 let removed = state.retain_sync_peers(&close_peers);
1261
1262 assert_eq!(removed, 3);
1263 assert_eq!(state.order, vec![stable_scanned, stable_unscanned]);
1264 assert_eq!(
1265 state.priority_order.iter().copied().collect::<Vec<_>>(),
1266 vec![stable_priority]
1267 );
1268 assert_eq!(
1269 state.cursor, 1,
1270 "stable peer scanned before churn must not be selected again"
1271 );
1272 }
1273
1274 #[test]
1275 fn bootstrap_claim_history_prevents_second_grace_window() {
1276 let peer = peer_id_from_byte(9);
1277 let mut state = NeighborSyncState::new_cycle(vec![peer]);
1278 let first_seen = Instant::now();
1279 let grace = Duration::from_secs(60);
1280
1281 assert_eq!(
1282 state.observe_bootstrap_claim(peer, first_seen, grace),
1283 BootstrapClaimObservation::WithinGrace { first_seen }
1284 );
1285 assert!(state.clear_active_bootstrap_claim(&peer));
1286 assert!(!state.bootstrap_claims.contains_key(&peer));
1287 assert!(state.bootstrap_claim_history.contains_key(&peer));
1288
1289 assert_eq!(
1290 state.observe_bootstrap_claim(peer, first_seen + Duration::from_secs(1), grace),
1291 BootstrapClaimObservation::Repeated { first_seen }
1292 );
1293 assert!(
1294 !state.bootstrap_claims.contains_key(&peer),
1295 "repeated claims must not recreate an active grace window"
1296 );
1297 assert_eq!(
1298 state.observe_bootstrap_claim(peer, first_seen + Duration::from_secs(2), grace),
1299 BootstrapClaimObservation::Repeated { first_seen }
1300 );
1301 }
1302
1303 #[test]
1304 fn bootstrap_claim_active_window_reports_past_grace() {
1305 let peer = peer_id_from_byte(10);
1306 let mut state = NeighborSyncState::new_cycle(vec![peer]);
1307 let first_seen = Instant::now();
1308 let grace = Duration::from_secs(60);
1309
1310 let _ = state.observe_bootstrap_claim(peer, first_seen, grace);
1311
1312 assert_eq!(
1313 state.observe_bootstrap_claim(peer, first_seen + grace + Duration::from_secs(1), grace),
1314 BootstrapClaimObservation::PastGrace { first_seen }
1315 );
1316 }
1317
1318 #[test]
1321 fn bootstrap_state_initial_not_drained() {
1322 let state = BootstrapState::new();
1325 assert!(
1326 !state.is_drained(),
1327 "initial state must not be drained before bootstrap begins"
1328 );
1329 }
1330
1331 #[test]
1332 fn bootstrap_state_pending_requests_block_drain() {
1333 let mut state = BootstrapState::new();
1334 state.pending_peer_requests = 3;
1335 assert!(
1336 !state.is_drained(),
1337 "pending peer requests should block drain"
1338 );
1339 }
1340
1341 #[test]
1342 fn bootstrap_state_pending_keys_block_drain() {
1343 let mut state = BootstrapState::new();
1344 state.pending_keys.insert([42u8; 32]);
1345 assert!(!state.is_drained(), "pending keys should block drain");
1346 }
1347
1348 #[test]
1349 fn bootstrap_state_explicit_drained_overrides() {
1350 let mut state = BootstrapState::new();
1351 state.pending_peer_requests = 5;
1352 state.pending_keys.insert([99u8; 32]);
1353 state.drained = true;
1354 assert!(
1355 state.is_drained(),
1356 "explicit drained flag should override pending counts"
1357 );
1358 }
1359
1360 #[test]
1361 fn bootstrap_state_requires_explicit_drain() {
1362 let mut state = BootstrapState::new();
1363 state.pending_peer_requests = 2;
1364 state.pending_keys.insert([1u8; 32]);
1365
1366 state.pending_peer_requests = 0;
1368 state.pending_keys.clear();
1369
1370 assert!(
1371 !state.is_drained(),
1372 "clearing counters alone must not drain — requires check_bootstrap_drained"
1373 );
1374
1375 state.drained = true;
1377 assert!(state.is_drained(), "explicit flag should drain");
1378 }
1379
1380 #[test]
1381 fn bootstrap_state_default_matches_new() {
1382 let from_new = BootstrapState::new();
1383 let from_default = BootstrapState::default();
1384
1385 assert_eq!(from_new.drained, from_default.drained);
1386 assert_eq!(
1387 from_new.pending_peer_requests,
1388 from_default.pending_peer_requests
1389 );
1390 assert_eq!(from_new.pending_keys, from_default.pending_keys);
1391 }
1392
1393 #[test]
1398 fn bootstrap_drain_requires_empty_pending_keys() {
1399 let key_a: XorName = [0xA0; 32];
1400 let key_b: XorName = [0xB0; 32];
1401 let key_c: XorName = [0xC0; 32];
1402
1403 let mut state = BootstrapState::new();
1404 state.pending_peer_requests = 0; state.pending_keys = std::iter::once(key_a)
1406 .chain(std::iter::once(key_b))
1407 .chain(std::iter::once(key_c))
1408 .collect();
1409
1410 assert!(
1411 !state.is_drained(),
1412 "should NOT be drained while pending_keys still has entries"
1413 );
1414
1415 state.pending_keys.remove(&key_a);
1417 assert!(!state.is_drained(), "still not drained with 2 pending keys");
1418
1419 state.pending_keys.remove(&key_b);
1420 assert!(!state.is_drained(), "still not drained with 1 pending key");
1421
1422 state.pending_keys.remove(&key_c);
1423 assert!(
1424 !state.is_drained(),
1425 "removing all keys is necessary but not sufficient — needs explicit drain"
1426 );
1427
1428 state.drained = true;
1430 assert!(state.is_drained(), "explicit drain flag should finalize");
1431 }
1432
1433 #[test]
1436 fn verification_state_terminal_variants() {
1437 let terminal_states = [
1438 VerificationState::QuorumAbandoned,
1439 VerificationState::FetchAbandoned,
1440 VerificationState::Stored,
1441 VerificationState::Idle,
1442 ];
1443
1444 for (i, a) in terminal_states.iter().enumerate() {
1446 for (j, b) in terminal_states.iter().enumerate() {
1447 if i != j {
1448 assert_ne!(
1449 a, b,
1450 "terminal states at indices {i} and {j} must be distinct"
1451 );
1452 }
1453 }
1454 }
1455
1456 let non_terminal_states = [
1458 VerificationState::OfferReceived,
1459 VerificationState::PendingVerify,
1460 VerificationState::QuorumVerified,
1461 VerificationState::PaidListVerified,
1462 VerificationState::QueuedForFetch,
1463 VerificationState::Fetching,
1464 VerificationState::FetchRetryable,
1465 VerificationState::QuorumFailed,
1466 VerificationState::QuorumInconclusive,
1467 ];
1468
1469 for terminal in &terminal_states {
1470 for non_terminal in &non_terminal_states {
1471 assert_ne!(
1472 terminal, non_terminal,
1473 "terminal state {terminal:?} must not equal non-terminal state {non_terminal:?}"
1474 );
1475 }
1476 }
1477 }
1478
1479 #[test]
1482 fn repair_opportunity_requires_both_sync_and_cycle() {
1483 let synced_no_cycle = PeerSyncRecord {
1485 last_sync: Some(
1486 Instant::now()
1487 .checked_sub(std::time::Duration::from_secs(2))
1488 .unwrap_or_else(Instant::now),
1489 ),
1490 cycles_since_sync: 0,
1491 };
1492 assert!(
1493 !synced_no_cycle.has_repair_opportunity(),
1494 "synced with zero subsequent cycles should NOT have repair opportunity"
1495 );
1496
1497 let never_synced = PeerSyncRecord {
1499 last_sync: None,
1500 cycles_since_sync: 5,
1501 };
1502 assert!(
1503 !never_synced.has_repair_opportunity(),
1504 "never-synced peer should NOT have repair opportunity regardless of cycles"
1505 );
1506
1507 let ready = PeerSyncRecord {
1509 last_sync: Some(
1510 Instant::now()
1511 .checked_sub(std::time::Duration::from_secs(5))
1512 .unwrap_or_else(Instant::now),
1513 ),
1514 cycles_since_sync: 1,
1515 };
1516 assert!(
1517 ready.has_repair_opportunity(),
1518 "synced peer with >= 1 cycle SHOULD have repair opportunity"
1519 );
1520 }
1521}