1use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7
8use crate::logging::{debug, info, warn};
9use rand::seq::SliceRandom;
10use rand::Rng;
11
12use crate::ant_protocol::XorName;
13use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
14use crate::replication::protocol::{
15 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
16 ReplicationMessageBody, ABSENT_KEY_DIGEST,
17};
18use crate::replication::types::{
19 AuditFailureReason, AuditFailureSummary, FailureEvidence, PeerSyncRecord, RepairProofs,
20};
21use crate::storage::LmdbStorage;
22use saorsa_core::identity::PeerId;
23use saorsa_core::P2PNode;
24use tokio::sync::RwLock;
25
26#[derive(Debug)]
32pub enum AuditTickResult {
33 Passed {
35 challenged_peer: PeerId,
37 keys_checked: usize,
39 },
40 Failed {
42 evidence: FailureEvidence,
44 },
45 BootstrapClaim {
47 peer: PeerId,
49 },
50 Idle,
52 InsufficientKeys,
54}
55
56#[allow(clippy::implicit_hasher)]
68pub async fn audit_tick(
69 p2p_node: &Arc<P2PNode>,
70 storage: &Arc<LmdbStorage>,
71 config: &ReplicationConfig,
72 sync_history: &HashMap<PeerId, PeerSyncRecord>,
73 is_bootstrapping: bool,
74) -> AuditTickResult {
75 let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
76 audit_tick_with_repair_proofs(
77 p2p_node,
78 storage,
79 config,
80 sync_history,
81 &repair_proofs,
82 0,
83 is_bootstrapping,
84 )
85 .await
86}
87
88#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
95pub async fn audit_tick_with_repair_proofs(
96 p2p_node: &Arc<P2PNode>,
97 storage: &Arc<LmdbStorage>,
98 config: &ReplicationConfig,
99 sync_history: &HashMap<PeerId, PeerSyncRecord>,
100 repair_proofs: &Arc<RwLock<RepairProofs>>,
101 current_sync_epoch: u64,
102 is_bootstrapping: bool,
103) -> AuditTickResult {
104 if is_bootstrapping {
106 return AuditTickResult::Idle;
107 }
108
109 let dht = p2p_node.dht_manager();
110
111 let eligible_peers = eligible_audit_peers(sync_history);
115
116 if eligible_peers.is_empty() {
117 return AuditTickResult::Idle;
118 }
119
120 let (challenged_peer, nonce, challenge_id) = {
121 let mut rng = rand::thread_rng();
122 let selected = match eligible_peers.choose(&mut rng) {
123 Some(p) => *p,
124 None => return AuditTickResult::Idle,
125 };
126 let n: [u8; 32] = rng.gen();
127 let c: u64 = rng.gen();
128 (selected, n, c)
129 };
130
131 let all_keys = match storage.all_keys().await {
134 Ok(keys) => keys,
135 Err(e) => {
136 warn!("Audit: failed to read local keys: {e}");
137 return AuditTickResult::Idle;
138 }
139 };
140
141 if all_keys.is_empty() {
142 return AuditTickResult::Idle;
143 }
144
145 let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
146 let sampled_keys: Vec<XorName> = {
147 let mut rng = rand::thread_rng();
148 all_keys
149 .choose_multiple(&mut rng, sample_count)
150 .copied()
151 .collect()
152 };
153
154 let mut sampled_key_groups = Vec::new();
158 for key in &sampled_keys {
159 let closest = dht
160 .find_closest_nodes_local_with_self(key, config.close_group_size)
161 .await;
162 let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
163 if close_peers.contains(&challenged_peer) {
164 sampled_key_groups.push((*key, close_peers));
165 }
166 }
167
168 let peer_keys = {
169 let mut proofs = repair_proofs.write().await;
170 mature_audit_keys_for_peer(
171 &challenged_peer,
172 sampled_key_groups,
173 &mut proofs,
174 current_sync_epoch,
175 )
176 };
177
178 if peer_keys.is_empty() {
179 return AuditTickResult::Idle;
180 }
181
182 let challenge = AuditChallenge {
188 challenge_id,
189 nonce,
190 challenged_peer_id: *challenged_peer.as_bytes(),
191 keys: peer_keys.clone(),
192 };
193
194 let msg = ReplicationMessage {
195 request_id: challenge_id,
196 body: ReplicationMessageBody::AuditChallenge(challenge),
197 };
198
199 let encoded = match msg.encode() {
200 Ok(data) => data,
201 Err(e) => {
202 warn!("Audit: failed to encode challenge: {e}");
203 return AuditTickResult::Idle;
204 }
205 };
206
207 let response = match p2p_node
208 .send_request(
209 &challenged_peer,
210 REPLICATION_PROTOCOL_ID,
211 encoded,
212 config.audit_response_timeout(peer_keys.len()),
213 )
214 .await
215 {
216 Ok(resp) => resp,
217 Err(e) => {
218 debug!("Audit: challenge to {challenged_peer} failed: {e}");
219 return handle_audit_timeout(
221 &challenged_peer,
222 challenge_id,
223 &peer_keys,
224 p2p_node,
225 config,
226 )
227 .await;
228 }
229 };
230
231 let resp_msg = match ReplicationMessage::decode(&response.data) {
233 Ok(m) => m,
234 Err(e) => {
235 warn!("Audit: failed to decode response from {challenged_peer}: {e}");
236 return handle_audit_failure(
237 &challenged_peer,
238 challenge_id,
239 &peer_keys,
240 AuditFailureReason::MalformedResponse,
241 p2p_node,
242 config,
243 )
244 .await;
245 }
246 };
247
248 match resp_msg.body {
249 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
250 challenge_id: resp_id,
251 }) => {
252 if resp_id != challenge_id {
253 warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
254 return handle_audit_failure(
255 &challenged_peer,
256 challenge_id,
257 &peer_keys,
258 AuditFailureReason::MalformedResponse,
259 p2p_node,
260 config,
261 )
262 .await;
263 }
264 AuditTickResult::BootstrapClaim {
266 peer: challenged_peer,
267 }
268 }
269 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
270 challenge_id: resp_id,
271 digests,
272 }) => {
273 if resp_id != challenge_id {
274 warn!("Audit: challenge ID mismatch from {challenged_peer}");
275 return handle_audit_failure(
276 &challenged_peer,
277 challenge_id,
278 &peer_keys,
279 AuditFailureReason::MalformedResponse,
280 p2p_node,
281 config,
282 )
283 .await;
284 }
285 verify_digests(
286 &challenged_peer,
287 challenge_id,
288 &nonce,
289 &peer_keys,
290 &digests,
291 storage,
292 p2p_node,
293 config,
294 )
295 .await
296 }
297 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
298 challenge_id: resp_id,
299 reason,
300 }) => {
301 if resp_id != challenge_id {
302 warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
303 return handle_audit_failure(
304 &challenged_peer,
305 challenge_id,
306 &peer_keys,
307 AuditFailureReason::MalformedResponse,
308 p2p_node,
309 config,
310 )
311 .await;
312 }
313 warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
314 handle_audit_failure(
315 &challenged_peer,
316 challenge_id,
317 &peer_keys,
318 AuditFailureReason::Rejected,
319 p2p_node,
320 config,
321 )
322 .await
323 }
324 _ => {
325 warn!("Audit: unexpected response type from {challenged_peer}");
326 handle_audit_failure(
327 &challenged_peer,
328 challenge_id,
329 &peer_keys,
330 AuditFailureReason::MalformedResponse,
331 p2p_node,
332 config,
333 )
334 .await
335 }
336 }
337}
338
339fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
340 sync_history
341 .iter()
342 .filter(|(_, record)| record.has_repair_opportunity())
343 .map(|(peer, _)| *peer)
344 .collect()
345}
346
347fn mature_audit_keys_for_peer(
348 challenged_peer: &PeerId,
349 sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
350 repair_proofs: &mut RepairProofs,
351 current_sync_epoch: u64,
352) -> Vec<XorName> {
353 sampled_key_groups
354 .into_iter()
355 .filter_map(|(key, close_peers)| {
356 repair_proofs
357 .has_mature_replica_hint(challenged_peer, &key, &close_peers, current_sync_epoch)
358 .then_some(key)
359 })
360 .collect()
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364enum AuditKeyFailureKind {
365 Absent,
366 DigestMismatch,
367 Unclassified,
368}
369
370#[derive(Debug, Clone, Copy, PartialEq, Eq)]
371struct AuditKeyFailure {
372 key: XorName,
373 kind: AuditKeyFailureKind,
374}
375
376impl AuditKeyFailure {
377 fn absent(key: XorName) -> Self {
378 Self {
379 key,
380 kind: AuditKeyFailureKind::Absent,
381 }
382 }
383
384 fn digest_mismatch(key: XorName) -> Self {
385 Self {
386 key,
387 kind: AuditKeyFailureKind::DigestMismatch,
388 }
389 }
390
391 fn unclassified(key: XorName) -> Self {
392 Self {
393 key,
394 kind: AuditKeyFailureKind::Unclassified,
395 }
396 }
397}
398
399fn build_audit_failure_summary(
400 challenged_key_count: usize,
401 confirmed_failures: &[AuditKeyFailure],
402) -> AuditFailureSummary {
403 let mut summary = AuditFailureSummary {
404 challenged_keys: challenged_key_count,
405 failed_keys: confirmed_failures.len(),
406 ..AuditFailureSummary::default()
407 };
408
409 for failure in confirmed_failures {
410 match failure.kind {
411 AuditKeyFailureKind::Absent => summary.absent_keys += 1,
412 AuditKeyFailureKind::DigestMismatch => summary.digest_mismatch_keys += 1,
413 AuditKeyFailureKind::Unclassified => {}
414 }
415 }
416
417 summary
418}
419
420fn audit_digest_failure_reason(confirmed_failures: &[AuditKeyFailure]) -> AuditFailureReason {
421 if confirmed_failures
422 .iter()
423 .all(|failure| failure.kind == AuditKeyFailureKind::Absent)
424 {
425 AuditFailureReason::KeyAbsent
426 } else {
427 AuditFailureReason::DigestMismatch
428 }
429}
430
431#[allow(clippy::too_many_arguments)]
437async fn verify_digests(
438 challenged_peer: &PeerId,
439 challenge_id: u64,
440 nonce: &[u8; 32],
441 keys: &[XorName],
442 digests: &[[u8; 32]],
443 storage: &Arc<LmdbStorage>,
444 p2p_node: &Arc<P2PNode>,
445 config: &ReplicationConfig,
446) -> AuditTickResult {
447 if digests.len() != keys.len() {
449 warn!(
450 "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
451 digests.len(),
452 keys.len()
453 );
454 return handle_audit_failure(
455 challenged_peer,
456 challenge_id,
457 keys,
458 AuditFailureReason::MalformedResponse,
459 p2p_node,
460 config,
461 )
462 .await;
463 }
464
465 let challenged_peer_bytes = challenged_peer.as_bytes();
466 let mut failed_keys = Vec::new();
467
468 for (i, key) in keys.iter().enumerate() {
469 let received_digest = &digests[i];
470
471 if *received_digest == ABSENT_KEY_DIGEST {
473 failed_keys.push(AuditKeyFailure::absent(*key));
474 continue;
475 }
476
477 let local_bytes = match storage.get_raw(key).await {
479 Ok(Some(bytes)) => bytes,
480 Ok(None) => {
481 warn!(
483 "Audit: local key {} disappeared during audit",
484 hex::encode(key)
485 );
486 continue;
487 }
488 Err(e) => {
489 warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
490 continue;
491 }
492 };
493
494 let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
495 if *received_digest != expected {
496 failed_keys.push(AuditKeyFailure::digest_mismatch(*key));
497 }
498 }
499
500 if failed_keys.is_empty() {
501 info!(
502 "Audit: peer {challenged_peer} passed (all {} keys verified)",
503 keys.len()
504 );
505 return AuditTickResult::Passed {
506 challenged_peer: *challenged_peer,
507 keys_checked: keys.len(),
508 };
509 }
510
511 handle_classified_audit_failure(
513 challenged_peer,
514 challenge_id,
515 &failed_keys,
516 AuditFailureReason::DigestMismatch,
517 keys.len(),
518 p2p_node,
519 config,
520 )
521 .await
522}
523
524async fn handle_audit_failure(
530 challenged_peer: &PeerId,
531 challenge_id: u64,
532 failed_keys: &[XorName],
533 reason: AuditFailureReason,
534 p2p_node: &Arc<P2PNode>,
535 config: &ReplicationConfig,
536) -> AuditTickResult {
537 let failures = failed_keys
538 .iter()
539 .copied()
540 .map(AuditKeyFailure::unclassified)
541 .collect::<Vec<_>>();
542 handle_classified_audit_failure(
543 challenged_peer,
544 challenge_id,
545 &failures,
546 reason,
547 failed_keys.len(),
548 p2p_node,
549 config,
550 )
551 .await
552}
553
554async fn handle_classified_audit_failure(
555 challenged_peer: &PeerId,
556 challenge_id: u64,
557 failed_keys: &[AuditKeyFailure],
558 reason: AuditFailureReason,
559 challenged_key_count: usize,
560 p2p_node: &Arc<P2PNode>,
561 config: &ReplicationConfig,
562) -> AuditTickResult {
563 let dht = p2p_node.dht_manager();
564 let mut confirmed_failures = Vec::new();
565
566 for failure in failed_keys {
568 let closest = dht
569 .find_closest_nodes_local_with_self(&failure.key, config.close_group_size)
570 .await;
571 if closest.iter().any(|n| n.peer_id == *challenged_peer) {
572 confirmed_failures.push(*failure);
573 } else {
574 debug!(
575 "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
576 hex::encode(failure.key)
577 );
578 }
579 }
580
581 if confirmed_failures.is_empty() {
586 info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
587 return AuditTickResult::Idle;
588 }
589
590 let summary = build_audit_failure_summary(challenged_key_count, &confirmed_failures);
591 let reason = if reason == AuditFailureReason::DigestMismatch {
592 audit_digest_failure_reason(&confirmed_failures)
593 } else {
594 reason
595 };
596 let confirmed_failed_keys = confirmed_failures
597 .iter()
598 .map(|failure| failure.key)
599 .collect();
600
601 let evidence = FailureEvidence::AuditFailure {
603 challenge_id,
604 challenged_peer: *challenged_peer,
605 confirmed_failed_keys,
606 summary,
607 reason,
608 };
609
610 AuditTickResult::Failed { evidence }
611}
612
613async fn handle_audit_timeout(
615 challenged_peer: &PeerId,
616 challenge_id: u64,
617 keys: &[XorName],
618 p2p_node: &Arc<P2PNode>,
619 config: &ReplicationConfig,
620) -> AuditTickResult {
621 handle_audit_failure(
622 challenged_peer,
623 challenge_id,
624 keys,
625 AuditFailureReason::Timeout,
626 p2p_node,
627 config,
628 )
629 .await
630}
631
632pub async fn handle_audit_challenge(
643 challenge: &AuditChallenge,
644 storage: &LmdbStorage,
645 self_peer_id: &PeerId,
646 is_bootstrapping: bool,
647 stored_chunks: usize,
648) -> AuditResponse {
649 if is_bootstrapping {
650 return AuditResponse::Bootstrapping {
651 challenge_id: challenge.challenge_id,
652 };
653 }
654
655 if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
656 warn!(
657 "Audit challenge targeted wrong peer: expected {}, got {}",
658 hex::encode(self_peer_id.as_bytes()),
659 hex::encode(challenge.challenged_peer_id),
660 );
661 return AuditResponse::Rejected {
662 challenge_id: challenge.challenge_id,
663 reason: "challenged_peer_id does not match this node".to_string(),
664 };
665 }
666
667 let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
668 if challenge.keys.len() > max_keys {
669 warn!(
670 "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
671 (stored_chunks={stored_chunks})",
672 challenge.keys.len(),
673 );
674 return AuditResponse::Rejected {
675 challenge_id: challenge.challenge_id,
676 reason: format!(
677 "challenge contains {} keys, limit is {max_keys}",
678 challenge.keys.len()
679 ),
680 };
681 }
682
683 let mut digests = Vec::with_capacity(challenge.keys.len());
684
685 for key in &challenge.keys {
686 match storage.get_raw(key).await {
687 Ok(Some(data)) => {
688 let digest = compute_audit_digest(
689 &challenge.nonce,
690 &challenge.challenged_peer_id,
691 key,
692 &data,
693 );
694 digests.push(digest);
695 }
696 Ok(None) => {
697 digests.push(ABSENT_KEY_DIGEST);
698 }
699 Err(e) => {
700 warn!(
701 "Audit responder: failed to read key {}: {e}",
702 hex::encode(key)
703 );
704 digests.push(ABSENT_KEY_DIGEST);
705 }
706 }
707 }
708
709 AuditResponse::Digests {
710 challenge_id: challenge.challenge_id,
711 digests,
712 }
713}
714
715#[cfg(test)]
720#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
721mod tests {
722 use super::*;
723 use crate::replication::protocol::compute_audit_digest;
724 use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
725 use crate::storage::LmdbStorageConfig;
726 use std::time::Instant;
727 use tempfile::TempDir;
728
729 const TEST_STORED_CHUNKS: usize = 1_000_000;
732
733 async fn create_test_storage() -> (LmdbStorage, TempDir) {
735 let temp_dir = TempDir::new().expect("create temp dir");
736 let config = LmdbStorageConfig {
737 root_dir: temp_dir.path().to_path_buf(),
738 verify_on_read: false,
739 max_map_size: 0,
740 disk_reserve: 0,
741 };
742 let storage = LmdbStorage::new(config).await.expect("create storage");
743 (storage, temp_dir)
744 }
745
746 fn make_challenge(
748 challenge_id: u64,
749 nonce: [u8; 32],
750 peer_id: [u8; 32],
751 keys: Vec<XorName>,
752 ) -> AuditChallenge {
753 AuditChallenge {
754 challenge_id,
755 nonce,
756 challenged_peer_id: peer_id,
757 keys,
758 }
759 }
760
761 fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
763 PeerId::from_bytes(bytes)
764 }
765
766 #[tokio::test]
769 async fn handle_challenge_present_keys_returns_correct_digests() {
770 let (storage, _temp) = create_test_storage().await;
771
772 let content_a = b"chunk alpha";
774 let addr_a = LmdbStorage::compute_address(content_a);
775 storage.put(&addr_a, content_a).await.expect("put a");
776
777 let content_b = b"chunk beta";
778 let addr_b = LmdbStorage::compute_address(content_b);
779 storage.put(&addr_b, content_b).await.expect("put b");
780
781 let nonce = [0xAA; 32];
782 let peer_id = [0xBB; 32];
783 let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
784 let self_id = peer_id_from_bytes(peer_id);
785
786 let response =
787 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
788
789 match response {
790 AuditResponse::Digests {
791 challenge_id,
792 digests,
793 } => {
794 assert_eq!(challenge_id, 42);
795 assert_eq!(digests.len(), 2);
796
797 let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
798 let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
799 assert_eq!(digests[0], expected_a);
800 assert_eq!(digests[1], expected_b);
801 }
802 AuditResponse::Bootstrapping { .. } => {
803 panic!("expected Digests, got Bootstrapping");
804 }
805 AuditResponse::Rejected { .. } => {
806 panic!("Unexpected Rejected response");
807 }
808 }
809 }
810
811 #[tokio::test]
814 async fn handle_challenge_absent_keys_returns_sentinel() {
815 let (storage, _temp) = create_test_storage().await;
816
817 let absent_key = [0xFF; 32];
818 let nonce = [0x11; 32];
819 let peer_id = [0x22; 32];
820 let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
821 let self_id = peer_id_from_bytes(peer_id);
822
823 let response =
824 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
825
826 match response {
827 AuditResponse::Digests {
828 challenge_id,
829 digests,
830 } => {
831 assert_eq!(challenge_id, 99);
832 assert_eq!(digests.len(), 1);
833 assert_eq!(
834 digests[0], ABSENT_KEY_DIGEST,
835 "absent key should produce sentinel digest"
836 );
837 }
838 AuditResponse::Bootstrapping { .. } => {
839 panic!("expected Digests, got Bootstrapping");
840 }
841 AuditResponse::Rejected { .. } => {
842 panic!("Unexpected Rejected response");
843 }
844 }
845 }
846
847 #[tokio::test]
850 async fn handle_challenge_mixed_present_and_absent() {
851 let (storage, _temp) = create_test_storage().await;
852
853 let content = b"present chunk";
854 let addr_present = LmdbStorage::compute_address(content);
855 storage.put(&addr_present, content).await.expect("put");
856
857 let addr_absent = [0xDE; 32];
858 let nonce = [0x33; 32];
859 let peer_id = [0x44; 32];
860 let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
861 let self_id = peer_id_from_bytes(peer_id);
862
863 let response =
864 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
865
866 match response {
867 AuditResponse::Digests { digests, .. } => {
868 assert_eq!(digests.len(), 2);
869
870 let expected_present =
871 compute_audit_digest(&nonce, &peer_id, &addr_present, content);
872 assert_eq!(digests[0], expected_present);
873 assert_eq!(
874 digests[1], ABSENT_KEY_DIGEST,
875 "absent key should be sentinel"
876 );
877 }
878 AuditResponse::Bootstrapping { .. } => {
879 panic!("expected Digests, got Bootstrapping");
880 }
881 AuditResponse::Rejected { .. } => {
882 panic!("Unexpected Rejected response");
883 }
884 }
885 }
886
887 #[tokio::test]
890 async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
891 let (storage, _temp) = create_test_storage().await;
892
893 let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
894 let self_id = peer_id_from_bytes([0x01; 32]);
895
896 let response =
897 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
898
899 match response {
900 AuditResponse::Bootstrapping { challenge_id } => {
901 assert_eq!(challenge_id, 55);
902 }
903 AuditResponse::Digests { .. } => {
904 panic!("expected Bootstrapping, got Digests");
905 }
906 AuditResponse::Rejected { .. } => {
907 panic!("Unexpected Rejected response");
908 }
909 }
910 }
911
912 #[tokio::test]
915 async fn handle_challenge_empty_keys_returns_empty_digests() {
916 let (storage, _temp) = create_test_storage().await;
917
918 let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
919 let self_id = peer_id_from_bytes([0x20; 32]);
920
921 let response =
922 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
923
924 match response {
925 AuditResponse::Digests {
926 challenge_id,
927 digests,
928 } => {
929 assert_eq!(challenge_id, 100);
930 assert!(
931 digests.is_empty(),
932 "empty key list should yield empty digests"
933 );
934 }
935 AuditResponse::Bootstrapping { .. } => {
936 panic!("expected Digests, got Bootstrapping");
937 }
938 AuditResponse::Rejected { .. } => {
939 panic!("Unexpected Rejected response");
940 }
941 }
942 }
943
944 #[test]
947 fn digest_verification_matching() {
948 let nonce = [0x01; 32];
949 let peer_id = [0x02; 32];
950 let key: XorName = [0x03; 32];
951 let data = b"correct data";
952
953 let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
954 let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
955
956 assert_eq!(
957 expected, recomputed,
958 "same inputs must produce identical digests"
959 );
960 assert_ne!(
961 expected, ABSENT_KEY_DIGEST,
962 "real digest must not be sentinel"
963 );
964 }
965
966 #[test]
969 fn digest_verification_mismatching_data() {
970 let nonce = [0x01; 32];
971 let peer_id = [0x02; 32];
972 let key: XorName = [0x03; 32];
973
974 let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
975 let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
976
977 assert_ne!(
978 digest_a, digest_b,
979 "different data must produce different digests"
980 );
981 }
982
983 #[test]
984 fn digest_verification_mismatching_nonce() {
985 let peer_id = [0x02; 32];
986 let key: XorName = [0x03; 32];
987 let data = b"same data";
988
989 let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
990 let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
991
992 assert_ne!(
993 digest_a, digest_b,
994 "different nonces must produce different digests"
995 );
996 }
997
998 #[test]
999 fn digest_verification_mismatching_peer() {
1000 let nonce = [0x01; 32];
1001 let key: XorName = [0x03; 32];
1002 let data = b"same data";
1003
1004 let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
1005 let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
1006
1007 assert_ne!(
1008 digest_a, digest_b,
1009 "different peers must produce different digests"
1010 );
1011 }
1012
1013 #[test]
1014 fn digest_verification_mismatching_key() {
1015 let nonce = [0x01; 32];
1016 let peer_id = [0x02; 32];
1017 let data = b"same data";
1018
1019 let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
1020 let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
1021
1022 assert_ne!(
1023 digest_a, digest_b,
1024 "different keys must produce different digests"
1025 );
1026 }
1027
1028 #[test]
1031 fn absent_sentinel_is_all_zeros() {
1032 assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
1033 }
1034
1035 #[tokio::test]
1038 async fn bootstrapping_skips_digest_computation() {
1039 let (storage, _temp) = create_test_storage().await;
1040
1041 let content = b"stored but bootstrapping";
1042 let addr = LmdbStorage::compute_address(content);
1043 storage.put(&addr, content).await.expect("put");
1044
1045 let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
1046 let self_id = peer_id_from_bytes([0xDD; 32]);
1047
1048 let response =
1049 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1050
1051 assert!(
1052 matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
1053 "bootstrapping node must not compute digests"
1054 );
1055 }
1056
1057 #[tokio::test]
1060 async fn scenario_19_partial_failure_mixed_responsibility() {
1061 let (storage, _temp) = create_test_storage().await;
1068 let nonce = [0x42u8; 32];
1069 let peer_id = [0xAA; 32];
1070
1071 let content_k1 = b"key one data";
1073 let addr_k1 = LmdbStorage::compute_address(content_k1);
1074 storage.put(&addr_k1, content_k1).await.unwrap();
1075
1076 let content_k2 = b"key two data";
1077 let addr_k2 = LmdbStorage::compute_address(content_k2);
1078 storage.put(&addr_k2, content_k2).await.unwrap();
1079
1080 let addr_k3 = [0xFF; 32]; let challenge = AuditChallenge {
1083 challenge_id: 100,
1084 nonce,
1085 challenged_peer_id: peer_id,
1086 keys: vec![addr_k1, addr_k2, addr_k3],
1087 };
1088 let self_id = peer_id_from_bytes(peer_id);
1089
1090 let response =
1091 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1092
1093 match response {
1094 AuditResponse::Digests { digests, .. } => {
1095 assert_eq!(digests.len(), 3);
1096
1097 let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
1099 assert_eq!(digests[0], expected_k1);
1100
1101 let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
1103 assert_eq!(digests[1], expected_k2);
1104
1105 assert_eq!(digests[2], ABSENT_KEY_DIGEST);
1107 }
1108 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
1109 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1110 }
1111 }
1112
1113 #[tokio::test]
1116 async fn scenario_54_all_digests_pass() {
1117 let (storage, _temp) = create_test_storage().await;
1120 let nonce = [0x10; 32];
1121 let peer_id = [0x20; 32];
1122
1123 let c1 = b"chunk alpha";
1124 let c2 = b"chunk beta";
1125 let c3 = b"chunk gamma";
1126 let a1 = LmdbStorage::compute_address(c1);
1127 let a2 = LmdbStorage::compute_address(c2);
1128 let a3 = LmdbStorage::compute_address(c3);
1129 storage.put(&a1, c1).await.unwrap();
1130 storage.put(&a2, c2).await.unwrap();
1131 storage.put(&a3, c3).await.unwrap();
1132
1133 let challenge = AuditChallenge {
1134 challenge_id: 200,
1135 nonce,
1136 challenged_peer_id: peer_id,
1137 keys: vec![a1, a2, a3],
1138 };
1139 let self_id = peer_id_from_bytes(peer_id);
1140
1141 let response =
1142 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1143 match response {
1144 AuditResponse::Digests { digests, .. } => {
1145 assert_eq!(digests.len(), 3);
1146 for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
1147 .iter()
1148 .enumerate()
1149 {
1150 let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
1151 assert_eq!(digests[i], expected, "Key {i} digest should match");
1152 }
1153 }
1154 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1155 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1156 }
1157 }
1158
1159 #[tokio::test]
1172 async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1173 let (storage, _temp) = create_test_storage().await;
1174 let nonce = [0x55; 32];
1175 let peer_id = [0x55; 32];
1176
1177 let c1 = b"scenario 55 key one";
1179 let c2 = b"scenario 55 key two";
1180 let k1 = LmdbStorage::compute_address(c1);
1181 let k2 = LmdbStorage::compute_address(c2);
1182 storage.put(&k1, c1).await.expect("put k1");
1183 storage.put(&k2, c2).await.expect("put k2");
1184
1185 let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1187 let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1188
1189 let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1191 let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1192 assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1193 assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1194
1195 let keys = [k1, k2];
1197 let expected = [expected_d1, expected_d2];
1198 let received = [wrong_d1, wrong_d2];
1199
1200 let mut failed_keys = Vec::new();
1201 for i in 0..keys.len() {
1202 if received[i] != expected[i] {
1203 failed_keys.push(keys[i]);
1204 }
1205 }
1206 assert_eq!(
1207 failed_keys.len(),
1208 2,
1209 "Both keys should be identified as digest mismatches"
1210 );
1211
1212 let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1215 let confirmed_failures: Vec<XorName> = failed_keys
1216 .into_iter()
1217 .filter(|k| confirmed_responsible_keys.contains(k))
1218 .collect();
1219
1220 assert!(
1222 confirmed_failures.is_empty(),
1223 "With no confirmed responsibility, failure set must be empty — \
1224 no AuditFailure evidence should be emitted"
1225 );
1226
1227 let peer = PeerId::from_bytes(peer_id);
1230 let evidence = FailureEvidence::AuditFailure {
1231 challenge_id: 5500,
1232 challenged_peer: peer,
1233 confirmed_failed_keys: confirmed_failures,
1234 summary: AuditFailureSummary::default(),
1235 reason: AuditFailureReason::DigestMismatch,
1236 };
1237 if let FailureEvidence::AuditFailure {
1238 confirmed_failed_keys,
1239 ..
1240 } = evidence
1241 {
1242 assert!(
1243 confirmed_failed_keys.is_empty(),
1244 "Evidence with empty failure set should not trigger a trust penalty"
1245 );
1246 }
1247 }
1248
1249 #[test]
1252 fn scenario_56_repair_opportunity_filters_never_synced() {
1253 let never_synced = PeerSyncRecord {
1257 last_sync: None,
1258 cycles_since_sync: 5,
1259 };
1260 assert!(!never_synced.has_repair_opportunity());
1261
1262 let synced_no_cycle = PeerSyncRecord {
1263 last_sync: Some(Instant::now()),
1264 cycles_since_sync: 0,
1265 };
1266 assert!(!synced_no_cycle.has_repair_opportunity());
1267
1268 let synced_with_cycle = PeerSyncRecord {
1269 last_sync: Some(Instant::now()),
1270 cycles_since_sync: 1,
1271 };
1272 assert!(synced_with_cycle.has_repair_opportunity());
1273 }
1274
1275 #[test]
1276 fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1277 let peer = peer_id_from_bytes([0x57; 32]);
1278 let mut sync_history = HashMap::new();
1279 sync_history.insert(
1280 peer,
1281 PeerSyncRecord {
1282 last_sync: Some(Instant::now()),
1283 cycles_since_sync: 1,
1284 },
1285 );
1286
1287 let mut bootstrap_claims = HashMap::new();
1288 let first_seen = Instant::now()
1289 .checked_sub(
1290 crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1291 + std::time::Duration::from_secs(1),
1292 )
1293 .unwrap_or_else(Instant::now);
1294 bootstrap_claims.insert(peer, first_seen);
1295
1296 let eligible = eligible_audit_peers(&sync_history);
1297
1298 assert!(bootstrap_claims.contains_key(&peer));
1299 assert!(
1300 eligible.contains(&peer),
1301 "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1302 );
1303 }
1304
1305 #[test]
1306 fn audit_failure_summary_counts_confirmed_absent_and_mismatch_keys() {
1307 let absent_key = [0xA1; 32];
1308 let mismatch_key = [0xB2; 32];
1309 let confirmed = vec![
1310 AuditKeyFailure::absent(absent_key),
1311 AuditKeyFailure::digest_mismatch(mismatch_key),
1312 ];
1313
1314 let summary = build_audit_failure_summary(5, &confirmed);
1315
1316 assert_eq!(summary.challenged_keys, 5);
1317 assert_eq!(summary.failed_keys, 2);
1318 assert_eq!(summary.absent_keys, 1);
1319 assert_eq!(summary.digest_mismatch_keys, 1);
1320 }
1321
1322 #[test]
1323 fn audit_failure_summary_leaves_unclassified_rejections_out_of_absent_mismatch_counts() {
1324 let rejected_key = [0xC3; 32];
1325 let confirmed = vec![AuditKeyFailure::unclassified(rejected_key)];
1326
1327 let summary = build_audit_failure_summary(3, &confirmed);
1328
1329 assert_eq!(summary.challenged_keys, 3);
1330 assert_eq!(summary.failed_keys, 1);
1331 assert_eq!(summary.absent_keys, 0);
1332 assert_eq!(summary.digest_mismatch_keys, 0);
1333 }
1334
1335 #[test]
1336 fn audit_digest_failure_reason_is_key_absent_when_all_confirmed_failures_are_absent() {
1337 let failures = vec![AuditKeyFailure::absent([0xD4; 32])];
1338
1339 assert_eq!(
1340 audit_digest_failure_reason(&failures),
1341 AuditFailureReason::KeyAbsent
1342 );
1343 }
1344
1345 #[test]
1346 fn audit_digest_failure_reason_is_digest_mismatch_for_mixed_failures() {
1347 let failures = vec![
1348 AuditKeyFailure::absent([0xD5; 32]),
1349 AuditKeyFailure::digest_mismatch([0xE6; 32]),
1350 ];
1351
1352 assert_eq!(
1353 audit_digest_failure_reason(&failures),
1354 AuditFailureReason::DigestMismatch
1355 );
1356 }
1357
1358 #[test]
1359 fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() {
1360 const HINT_EPOCH: u64 = 7;
1361 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1362 const CHALLENGED_PEER_BYTE: u8 = 0xA1;
1363 const OTHER_PEER_BYTE: u8 = 0xA2;
1364 const NEW_PEER_BYTE: u8 = 0xA3;
1365 const MATURE_KEY_BYTE: u8 = 0xB1;
1366 const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
1367 const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
1368 const STABLE_CHURN_KEY_BYTE: u8 = 0xB4;
1369 const EVICTED_KEY_BYTE: u8 = 0xB5;
1370 const XOR_NAME_LEN: usize = 32;
1371
1372 let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
1373 let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
1374 let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
1375 let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
1376 let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
1377 let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
1378 let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN];
1379 let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN];
1380 let close_group = HashSet::from([challenged_peer, other_peer]);
1381 let changed_close_group = HashSet::from([challenged_peer, new_peer]);
1382 let evicted_close_group = HashSet::from([other_peer, new_peer]);
1383 let mut repair_proofs = RepairProofs::new();
1384
1385 assert!(repair_proofs.record_replica_hint_sent(
1386 challenged_peer,
1387 mature_key,
1388 &close_group,
1389 HINT_EPOCH,
1390 ));
1391 assert!(repair_proofs.record_replica_hint_sent(
1392 challenged_peer,
1393 same_epoch_key,
1394 &close_group,
1395 CURRENT_EPOCH,
1396 ));
1397 assert!(repair_proofs.record_replica_hint_sent(
1398 challenged_peer,
1399 stable_churn_key,
1400 &close_group,
1401 HINT_EPOCH,
1402 ));
1403 assert!(repair_proofs.record_replica_hint_sent(
1404 challenged_peer,
1405 evicted_key,
1406 &close_group,
1407 HINT_EPOCH,
1408 ));
1409
1410 let sampled_key_groups = vec![
1411 (mature_key, close_group.clone()),
1412 (same_epoch_key, close_group.clone()),
1413 (missing_proof_key, close_group.clone()),
1414 (stable_churn_key, changed_close_group),
1415 (evicted_key, evicted_close_group),
1416 ];
1417 let peer_keys = mature_audit_keys_for_peer(
1418 &challenged_peer,
1419 sampled_key_groups,
1420 &mut repair_proofs,
1421 CURRENT_EPOCH,
1422 );
1423
1424 assert_eq!(
1425 peer_keys,
1426 vec![mature_key, stable_churn_key],
1427 "mature proofs for stable close-group peers should become audit keys, while same-epoch, missing, and evicted-peer proofs should not"
1428 );
1429 }
1430
1431 #[tokio::test]
1434 async fn audit_response_must_match_key_count() {
1435 let (storage, _temp) = create_test_storage().await;
1441 let nonce = [0x50; 32];
1442 let peer_id = [0x60; 32];
1443
1444 let content = b"single chunk";
1446 let addr = LmdbStorage::compute_address(content);
1447 storage.put(&addr, content).await.unwrap();
1448
1449 let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1451 let mut keys = vec![addr];
1452 keys.extend_from_slice(&absent_keys);
1453
1454 let key_count = keys.len();
1455 let challenge = make_challenge(300, nonce, peer_id, keys);
1456 let self_id = peer_id_from_bytes(peer_id);
1457
1458 let response =
1459 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1460 match response {
1461 AuditResponse::Digests { digests, .. } => {
1462 assert_eq!(
1463 digests.len(),
1464 key_count,
1465 "must produce exactly one digest per challenged key"
1466 );
1467 }
1468 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1469 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1470 }
1471 }
1472
1473 #[test]
1476 fn audit_digest_uses_full_record_bytes() {
1477 let nonce = [1u8; 32];
1479 let peer = [2u8; 32];
1480 let key = [3u8; 32];
1481
1482 let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1483 let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1484 assert_ne!(
1485 d1, d2,
1486 "Different record bytes must produce different digests"
1487 );
1488 }
1489
1490 #[tokio::test]
1501 async fn scenario_29_audit_start_gate_during_bootstrap() {
1502 let (storage, _temp) = create_test_storage().await;
1503
1504 let content = b"should not be audited during bootstrap";
1506 let addr = LmdbStorage::compute_address(content);
1507 storage.put(&addr, content).await.expect("put");
1508
1509 let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1510 let self_id = peer_id_from_bytes([0x29; 32]);
1511
1512 let response =
1514 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1515 assert!(
1516 matches!(
1517 response,
1518 AuditResponse::Bootstrapping { challenge_id: 2900 }
1519 ),
1520 "bootstrapping node must not compute digests — audit start gate"
1521 );
1522
1523 let response =
1525 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1526 assert!(
1527 matches!(response, AuditResponse::Digests { .. }),
1528 "drained node should compute digests normally"
1529 );
1530 }
1531
1532 #[test]
1542 fn scenario_30_audit_peer_selection_from_sampled_keys() {
1543 assert_eq!(
1545 ReplicationConfig::audit_sample_count(100),
1546 10,
1547 "sample count should scale with sqrt(total_keys)"
1548 );
1549
1550 assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1551
1552 assert_eq!(
1553 ReplicationConfig::audit_sample_count(10_000),
1554 100,
1555 "sqrt(10000) = 100"
1556 );
1557
1558 let never = PeerSyncRecord {
1561 last_sync: None,
1562 cycles_since_sync: 10,
1563 };
1564 assert!(!never.has_repair_opportunity());
1565
1566 let too_soon = PeerSyncRecord {
1568 last_sync: Some(Instant::now()),
1569 cycles_since_sync: 0,
1570 };
1571 assert!(!too_soon.has_repair_opportunity());
1572
1573 let eligible = PeerSyncRecord {
1575 last_sync: Some(Instant::now()),
1576 cycles_since_sync: 2,
1577 };
1578 assert!(eligible.has_repair_opportunity());
1579 }
1580
1581 #[tokio::test]
1590 async fn scenario_32_dynamic_challenge_size() {
1591 let (storage, _temp) = create_test_storage().await;
1592
1593 let mut addrs = Vec::new();
1595 for i in 0u8..5 {
1596 let content = format!("dynamic challenge key {i}");
1597 let addr = LmdbStorage::compute_address(content.as_bytes());
1598 storage.put(&addr, content.as_bytes()).await.expect("put");
1599 addrs.push(addr);
1600 }
1601
1602 let nonce = [0x32; 32];
1603 let peer_id = [0x32; 32];
1604 let self_id = peer_id_from_bytes(peer_id);
1605
1606 let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1608 let resp1 =
1609 handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1610 .await;
1611 if let AuditResponse::Digests { digests, .. } = resp1 {
1612 assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1613 }
1614
1615 let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1617 let resp3 =
1618 handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1619 .await;
1620 if let AuditResponse::Digests { digests, .. } = resp3 {
1621 assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1622 }
1623
1624 let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1626 let resp5 =
1627 handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1628 .await;
1629 if let AuditResponse::Digests { digests, .. } = resp5 {
1630 assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1631 }
1632
1633 let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1635 let resp0 =
1636 handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1637 .await;
1638 if let AuditResponse::Digests { digests, .. } = resp0 {
1639 assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1640 }
1641 }
1642
1643 #[tokio::test]
1649 async fn scenario_47_bootstrap_claim_grace_period_audit() {
1650 let (storage, _temp) = create_test_storage().await;
1651
1652 let content = b"bootstrap grace test";
1654 let addr = LmdbStorage::compute_address(content);
1655 storage.put(&addr, content).await.expect("put");
1656
1657 let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1658 let self_id = peer_id_from_bytes([0x47; 32]);
1659
1660 let response =
1662 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1663 let challenge_id = match response {
1664 AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1665 AuditResponse::Digests { .. } => {
1666 panic!("Expected Bootstrapping response during grace period")
1667 }
1668 AuditResponse::Rejected { .. } => {
1669 panic!("Unexpected Rejected response")
1670 }
1671 };
1672 assert_eq!(challenge_id, 4700);
1673
1674 let peer = PeerId::from_bytes([0x47; 32]);
1676 let mut state = NeighborSyncState::new_cycle(vec![peer]);
1677 let now = Instant::now();
1678 let observed = state.observe_bootstrap_claim(
1679 peer,
1680 now,
1681 crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1682 );
1683
1684 assert_eq!(
1685 observed,
1686 BootstrapClaimObservation::WithinGrace { first_seen: now }
1687 );
1688 assert!(
1689 state.bootstrap_claims.contains_key(&peer),
1690 "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1691 );
1692 assert!(
1693 state.bootstrap_claim_history.contains_key(&peer),
1694 "Bootstrap claim history should remember that the grace window was used"
1695 );
1696 }
1697
1698 #[tokio::test]
1709 async fn scenario_53_partial_failure_mixed_responsibility() {
1710 let (storage, _temp) = create_test_storage().await;
1711 let nonce = [0x53; 32];
1712 let peer_id = [0x53; 32];
1713
1714 let c1 = b"scenario 53 key one";
1716 let c2 = b"scenario 53 key two";
1717 let c3 = b"scenario 53 key three";
1718 let k1 = LmdbStorage::compute_address(c1);
1719 let k2 = LmdbStorage::compute_address(c2);
1720 let k3 = LmdbStorage::compute_address(c3);
1721 storage.put(&k1, c1).await.expect("put k1");
1722 storage.put(&k2, c2).await.expect("put k2");
1723 storage.put(&k3, c3).await.expect("put k3");
1724
1725 let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1727 let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1728 let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1729
1730 let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1732 let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1733
1734 assert_eq!(d1_expected, d1_expected, "K1 should match");
1735 assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1736 assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1737
1738 let digests = [d1_expected, d2_wrong, d3_wrong];
1740 let keys = [k1, k2, k3];
1741 let contents: [&[u8]; 3] = [c1, c2, c3];
1742
1743 let mut failed_keys = Vec::new();
1744 for (i, key) in keys.iter().enumerate() {
1745 if digests[i] == ABSENT_KEY_DIGEST {
1746 failed_keys.push(*key);
1747 continue;
1748 }
1749 let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1750 if digests[i] != expected {
1751 failed_keys.push(*key);
1752 }
1753 }
1754
1755 assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1756 assert!(failed_keys.contains(&k2));
1757 assert!(failed_keys.contains(&k3));
1758 assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1759
1760 let responsible_for_k2 = true;
1763 let responsible_for_k3 = false;
1764 let mut confirmed = Vec::new();
1765 for key in &failed_keys {
1766 let is_responsible = if *key == k2 {
1767 responsible_for_k2
1768 } else {
1769 responsible_for_k3
1770 };
1771 if is_responsible {
1772 confirmed.push(*key);
1773 }
1774 }
1775
1776 assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1777
1778 let challenged_peer = PeerId::from_bytes(peer_id);
1780 let evidence = FailureEvidence::AuditFailure {
1781 challenge_id: 5300,
1782 challenged_peer,
1783 confirmed_failed_keys: confirmed,
1784 summary: AuditFailureSummary::default(),
1785 reason: AuditFailureReason::DigestMismatch,
1786 };
1787
1788 match evidence {
1789 FailureEvidence::AuditFailure {
1790 confirmed_failed_keys,
1791 ..
1792 } => {
1793 assert_eq!(
1794 confirmed_failed_keys.len(),
1795 1,
1796 "Only K2 should generate evidence"
1797 );
1798 assert_eq!(confirmed_failed_keys[0], k2);
1799 }
1800 _ => panic!("Expected AuditFailure evidence"),
1801 }
1802 }
1803}