1use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::logging::{debug, info, warn};
10use rand::seq::SliceRandom;
11use rand::Rng;
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
15use crate::replication::protocol::{
16 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
17 ReplicationMessageBody, ABSENT_KEY_DIGEST,
18};
19use crate::replication::types::{
20 AuditFailureReason, AuditFailureSummary, FailureEvidence, PeerSyncRecord, RepairProofs,
21};
22use crate::storage::LmdbStorage;
23use saorsa_core::identity::PeerId;
24use saorsa_core::P2PNode;
25use tokio::sync::RwLock;
26
27#[derive(Debug)]
33pub enum AuditTickResult {
34 Passed {
36 challenged_peer: PeerId,
38 keys_checked: usize,
40 },
41 Failed {
43 evidence: FailureEvidence,
45 },
46 BootstrapClaim {
48 peer: PeerId,
50 },
51 Idle,
53 InsufficientKeys,
55}
56
57#[allow(clippy::implicit_hasher)]
69pub async fn audit_tick(
70 p2p_node: &Arc<P2PNode>,
71 storage: &Arc<LmdbStorage>,
72 config: &ReplicationConfig,
73 sync_history: &HashMap<PeerId, PeerSyncRecord>,
74 is_bootstrapping: bool,
75) -> AuditTickResult {
76 let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
77 audit_tick_with_repair_proofs(
78 p2p_node,
79 storage,
80 config,
81 sync_history,
82 &repair_proofs,
83 0,
84 is_bootstrapping,
85 )
86 .await
87}
88
89#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
96pub async fn audit_tick_with_repair_proofs(
97 p2p_node: &Arc<P2PNode>,
98 storage: &Arc<LmdbStorage>,
99 config: &ReplicationConfig,
100 sync_history: &HashMap<PeerId, PeerSyncRecord>,
101 repair_proofs: &Arc<RwLock<RepairProofs>>,
102 current_sync_epoch: u64,
103 is_bootstrapping: bool,
104) -> AuditTickResult {
105 if is_bootstrapping {
107 return AuditTickResult::Idle;
108 }
109
110 let dht = p2p_node.dht_manager();
111
112 let eligible_peers = eligible_audit_peers(sync_history);
116
117 if eligible_peers.is_empty() {
118 return AuditTickResult::Idle;
119 }
120
121 let (challenged_peer, nonce, challenge_id) = {
122 let mut rng = rand::thread_rng();
123 let selected = match eligible_peers.choose(&mut rng) {
124 Some(p) => *p,
125 None => return AuditTickResult::Idle,
126 };
127 let n: [u8; 32] = rng.gen();
128 let c: u64 = rng.gen();
129 (selected, n, c)
130 };
131
132 let all_keys = match storage.all_keys().await {
135 Ok(keys) => keys,
136 Err(e) => {
137 warn!("Audit: failed to read local keys: {e}");
138 return AuditTickResult::Idle;
139 }
140 };
141
142 if all_keys.is_empty() {
143 return AuditTickResult::Idle;
144 }
145
146 let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
147 let sampled_keys: Vec<XorName> = {
148 let mut rng = rand::thread_rng();
149 all_keys
150 .choose_multiple(&mut rng, sample_count)
151 .copied()
152 .collect()
153 };
154
155 let mut sampled_key_groups = Vec::new();
159 for key in &sampled_keys {
160 let closest = dht
161 .find_closest_nodes_local_with_self(key, config.close_group_size)
162 .await;
163 let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
164 if close_peers.contains(&challenged_peer) {
165 sampled_key_groups.push((*key, close_peers));
166 }
167 }
168
169 let peer_keys = {
170 let mut proofs = repair_proofs.write().await;
171 let now = Instant::now();
172 mature_audit_keys_for_peer(
173 &challenged_peer,
174 sampled_key_groups,
175 &mut proofs,
176 current_sync_epoch,
177 now,
178 )
179 };
180
181 if peer_keys.is_empty() {
182 return AuditTickResult::Idle;
183 }
184
185 let challenge = AuditChallenge {
191 challenge_id,
192 nonce,
193 challenged_peer_id: *challenged_peer.as_bytes(),
194 keys: peer_keys.clone(),
195 };
196
197 let msg = ReplicationMessage {
198 request_id: challenge_id,
199 body: ReplicationMessageBody::AuditChallenge(challenge),
200 };
201
202 let encoded = match msg.encode() {
203 Ok(data) => data,
204 Err(e) => {
205 warn!("Audit: failed to encode challenge: {e}");
206 return AuditTickResult::Idle;
207 }
208 };
209
210 let response = match p2p_node
211 .send_request(
212 &challenged_peer,
213 REPLICATION_PROTOCOL_ID,
214 encoded,
215 config.audit_response_timeout(peer_keys.len()),
216 )
217 .await
218 {
219 Ok(resp) => resp,
220 Err(e) => {
221 debug!("Audit: challenge to {challenged_peer} failed: {e}");
222 return handle_audit_timeout(
224 &challenged_peer,
225 challenge_id,
226 &peer_keys,
227 p2p_node,
228 config,
229 )
230 .await;
231 }
232 };
233
234 let resp_msg = match ReplicationMessage::decode(&response.data) {
236 Ok(m) => m,
237 Err(e) => {
238 warn!("Audit: failed to decode response from {challenged_peer}: {e}");
239 return handle_audit_failure(
240 &challenged_peer,
241 challenge_id,
242 &peer_keys,
243 AuditFailureReason::MalformedResponse,
244 p2p_node,
245 config,
246 )
247 .await;
248 }
249 };
250
251 match resp_msg.body {
252 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
253 challenge_id: resp_id,
254 }) => {
255 if resp_id != challenge_id {
256 warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
257 return handle_audit_failure(
258 &challenged_peer,
259 challenge_id,
260 &peer_keys,
261 AuditFailureReason::MalformedResponse,
262 p2p_node,
263 config,
264 )
265 .await;
266 }
267 AuditTickResult::BootstrapClaim {
269 peer: challenged_peer,
270 }
271 }
272 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
273 challenge_id: resp_id,
274 digests,
275 }) => {
276 if resp_id != challenge_id {
277 warn!("Audit: challenge ID mismatch from {challenged_peer}");
278 return handle_audit_failure(
279 &challenged_peer,
280 challenge_id,
281 &peer_keys,
282 AuditFailureReason::MalformedResponse,
283 p2p_node,
284 config,
285 )
286 .await;
287 }
288 verify_digests(
289 &challenged_peer,
290 challenge_id,
291 &nonce,
292 &peer_keys,
293 &digests,
294 storage,
295 p2p_node,
296 config,
297 )
298 .await
299 }
300 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
301 challenge_id: resp_id,
302 reason,
303 }) => {
304 if resp_id != challenge_id {
305 warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
306 return handle_audit_failure(
307 &challenged_peer,
308 challenge_id,
309 &peer_keys,
310 AuditFailureReason::MalformedResponse,
311 p2p_node,
312 config,
313 )
314 .await;
315 }
316 warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
317 handle_audit_failure(
318 &challenged_peer,
319 challenge_id,
320 &peer_keys,
321 AuditFailureReason::Rejected,
322 p2p_node,
323 config,
324 )
325 .await
326 }
327 _ => {
328 warn!("Audit: unexpected response type from {challenged_peer}");
329 handle_audit_failure(
330 &challenged_peer,
331 challenge_id,
332 &peer_keys,
333 AuditFailureReason::MalformedResponse,
334 p2p_node,
335 config,
336 )
337 .await
338 }
339 }
340}
341
342fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
343 sync_history
344 .iter()
345 .filter(|(_, record)| record.has_repair_opportunity())
346 .map(|(peer, _)| *peer)
347 .collect()
348}
349
350fn mature_audit_keys_for_peer(
351 challenged_peer: &PeerId,
352 sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
353 repair_proofs: &mut RepairProofs,
354 current_sync_epoch: u64,
355 now: Instant,
356) -> Vec<XorName> {
357 sampled_key_groups
358 .into_iter()
359 .filter_map(|(key, close_peers)| {
360 repair_proofs
361 .has_mature_replica_hint(
362 challenged_peer,
363 &key,
364 &close_peers,
365 current_sync_epoch,
366 now,
367 )
368 .then_some(key)
369 })
370 .collect()
371}
372
373#[derive(Debug, Clone, Copy, PartialEq, Eq)]
374enum AuditKeyFailureKind {
375 Absent,
376 DigestMismatch,
377 Unclassified,
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381struct AuditKeyFailure {
382 key: XorName,
383 kind: AuditKeyFailureKind,
384}
385
386impl AuditKeyFailure {
387 fn absent(key: XorName) -> Self {
388 Self {
389 key,
390 kind: AuditKeyFailureKind::Absent,
391 }
392 }
393
394 fn digest_mismatch(key: XorName) -> Self {
395 Self {
396 key,
397 kind: AuditKeyFailureKind::DigestMismatch,
398 }
399 }
400
401 fn unclassified(key: XorName) -> Self {
402 Self {
403 key,
404 kind: AuditKeyFailureKind::Unclassified,
405 }
406 }
407}
408
409fn build_audit_failure_summary(
410 challenged_key_count: usize,
411 confirmed_failures: &[AuditKeyFailure],
412) -> AuditFailureSummary {
413 let mut summary = AuditFailureSummary {
414 challenged_keys: challenged_key_count,
415 failed_keys: confirmed_failures.len(),
416 ..AuditFailureSummary::default()
417 };
418
419 for failure in confirmed_failures {
420 match failure.kind {
421 AuditKeyFailureKind::Absent => summary.absent_keys += 1,
422 AuditKeyFailureKind::DigestMismatch => summary.digest_mismatch_keys += 1,
423 AuditKeyFailureKind::Unclassified => {}
424 }
425 }
426
427 summary
428}
429
430fn audit_digest_failure_reason(confirmed_failures: &[AuditKeyFailure]) -> AuditFailureReason {
431 if confirmed_failures
432 .iter()
433 .all(|failure| failure.kind == AuditKeyFailureKind::Absent)
434 {
435 AuditFailureReason::KeyAbsent
436 } else {
437 AuditFailureReason::DigestMismatch
438 }
439}
440
441#[allow(clippy::too_many_arguments)]
447async fn verify_digests(
448 challenged_peer: &PeerId,
449 challenge_id: u64,
450 nonce: &[u8; 32],
451 keys: &[XorName],
452 digests: &[[u8; 32]],
453 storage: &Arc<LmdbStorage>,
454 p2p_node: &Arc<P2PNode>,
455 config: &ReplicationConfig,
456) -> AuditTickResult {
457 if digests.len() != keys.len() {
459 warn!(
460 "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
461 digests.len(),
462 keys.len()
463 );
464 return handle_audit_failure(
465 challenged_peer,
466 challenge_id,
467 keys,
468 AuditFailureReason::MalformedResponse,
469 p2p_node,
470 config,
471 )
472 .await;
473 }
474
475 let challenged_peer_bytes = challenged_peer.as_bytes();
476 let mut failed_keys = Vec::new();
477
478 for (i, key) in keys.iter().enumerate() {
479 let received_digest = &digests[i];
480
481 if *received_digest == ABSENT_KEY_DIGEST {
483 failed_keys.push(AuditKeyFailure::absent(*key));
484 continue;
485 }
486
487 let local_bytes = match storage.get_raw(key).await {
489 Ok(Some(bytes)) => bytes,
490 Ok(None) => {
491 warn!(
493 "Audit: local key {} disappeared during audit",
494 hex::encode(key)
495 );
496 continue;
497 }
498 Err(e) => {
499 warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
500 continue;
501 }
502 };
503
504 let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
505 if *received_digest != expected {
506 failed_keys.push(AuditKeyFailure::digest_mismatch(*key));
507 }
508 }
509
510 if failed_keys.is_empty() {
511 info!(
512 "Audit: peer {challenged_peer} passed (all {} keys verified)",
513 keys.len()
514 );
515 return AuditTickResult::Passed {
516 challenged_peer: *challenged_peer,
517 keys_checked: keys.len(),
518 };
519 }
520
521 handle_classified_audit_failure(
523 challenged_peer,
524 challenge_id,
525 &failed_keys,
526 AuditFailureReason::DigestMismatch,
527 keys.len(),
528 p2p_node,
529 config,
530 )
531 .await
532}
533
534async fn handle_audit_failure(
540 challenged_peer: &PeerId,
541 challenge_id: u64,
542 failed_keys: &[XorName],
543 reason: AuditFailureReason,
544 p2p_node: &Arc<P2PNode>,
545 config: &ReplicationConfig,
546) -> AuditTickResult {
547 let failures = failed_keys
548 .iter()
549 .copied()
550 .map(AuditKeyFailure::unclassified)
551 .collect::<Vec<_>>();
552 handle_classified_audit_failure(
553 challenged_peer,
554 challenge_id,
555 &failures,
556 reason,
557 failed_keys.len(),
558 p2p_node,
559 config,
560 )
561 .await
562}
563
564async fn handle_classified_audit_failure(
565 challenged_peer: &PeerId,
566 challenge_id: u64,
567 failed_keys: &[AuditKeyFailure],
568 reason: AuditFailureReason,
569 challenged_key_count: usize,
570 p2p_node: &Arc<P2PNode>,
571 config: &ReplicationConfig,
572) -> AuditTickResult {
573 let dht = p2p_node.dht_manager();
574 let mut confirmed_failures = Vec::new();
575
576 for failure in failed_keys {
578 let closest = dht
579 .find_closest_nodes_local_with_self(&failure.key, config.close_group_size)
580 .await;
581 if closest.iter().any(|n| n.peer_id == *challenged_peer) {
582 confirmed_failures.push(*failure);
583 } else {
584 debug!(
585 "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
586 hex::encode(failure.key)
587 );
588 }
589 }
590
591 if confirmed_failures.is_empty() {
596 info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
597 return AuditTickResult::Idle;
598 }
599
600 let summary = build_audit_failure_summary(challenged_key_count, &confirmed_failures);
601 let reason = if reason == AuditFailureReason::DigestMismatch {
602 audit_digest_failure_reason(&confirmed_failures)
603 } else {
604 reason
605 };
606 let confirmed_failed_keys = confirmed_failures
607 .iter()
608 .map(|failure| failure.key)
609 .collect();
610
611 let evidence = FailureEvidence::AuditFailure {
613 challenge_id,
614 challenged_peer: *challenged_peer,
615 confirmed_failed_keys,
616 summary,
617 reason,
618 };
619
620 AuditTickResult::Failed { evidence }
621}
622
623async fn handle_audit_timeout(
625 challenged_peer: &PeerId,
626 challenge_id: u64,
627 keys: &[XorName],
628 p2p_node: &Arc<P2PNode>,
629 config: &ReplicationConfig,
630) -> AuditTickResult {
631 handle_audit_failure(
632 challenged_peer,
633 challenge_id,
634 keys,
635 AuditFailureReason::Timeout,
636 p2p_node,
637 config,
638 )
639 .await
640}
641
642pub async fn handle_audit_challenge(
653 challenge: &AuditChallenge,
654 storage: &LmdbStorage,
655 self_peer_id: &PeerId,
656 is_bootstrapping: bool,
657 stored_chunks: usize,
658) -> AuditResponse {
659 if is_bootstrapping {
660 return AuditResponse::Bootstrapping {
661 challenge_id: challenge.challenge_id,
662 };
663 }
664
665 if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
666 warn!(
667 "Audit challenge targeted wrong peer: expected {}, got {}",
668 hex::encode(self_peer_id.as_bytes()),
669 hex::encode(challenge.challenged_peer_id),
670 );
671 return AuditResponse::Rejected {
672 challenge_id: challenge.challenge_id,
673 reason: "challenged_peer_id does not match this node".to_string(),
674 };
675 }
676
677 let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
678 if challenge.keys.len() > max_keys {
679 warn!(
680 "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
681 (stored_chunks={stored_chunks})",
682 challenge.keys.len(),
683 );
684 return AuditResponse::Rejected {
685 challenge_id: challenge.challenge_id,
686 reason: format!(
687 "challenge contains {} keys, limit is {max_keys}",
688 challenge.keys.len()
689 ),
690 };
691 }
692
693 let mut digests = Vec::with_capacity(challenge.keys.len());
694
695 for key in &challenge.keys {
696 match storage.get_raw(key).await {
697 Ok(Some(data)) => {
698 let digest = compute_audit_digest(
699 &challenge.nonce,
700 &challenge.challenged_peer_id,
701 key,
702 &data,
703 );
704 digests.push(digest);
705 }
706 Ok(None) => {
707 digests.push(ABSENT_KEY_DIGEST);
708 }
709 Err(e) => {
710 warn!(
711 "Audit responder: failed to read key {}: {e}",
712 hex::encode(key)
713 );
714 digests.push(ABSENT_KEY_DIGEST);
715 }
716 }
717 }
718
719 AuditResponse::Digests {
720 challenge_id: challenge.challenge_id,
721 digests,
722 }
723}
724
725#[cfg(test)]
730#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
731mod tests {
732 use super::*;
733 use crate::replication::config::REPAIR_HINT_MIN_AGE;
734 use crate::replication::protocol::compute_audit_digest;
735 use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
736 use crate::storage::LmdbStorageConfig;
737 use std::time::Instant;
738 use tempfile::TempDir;
739
740 const TEST_STORED_CHUNKS: usize = 1_000_000;
743
744 async fn create_test_storage() -> (LmdbStorage, TempDir) {
746 let temp_dir = TempDir::new().expect("create temp dir");
747 let config = LmdbStorageConfig {
748 root_dir: temp_dir.path().to_path_buf(),
749 verify_on_read: false,
750 max_map_size: 0,
751 disk_reserve: 0,
752 };
753 let storage = LmdbStorage::new(config).await.expect("create storage");
754 (storage, temp_dir)
755 }
756
757 fn make_challenge(
759 challenge_id: u64,
760 nonce: [u8; 32],
761 peer_id: [u8; 32],
762 keys: Vec<XorName>,
763 ) -> AuditChallenge {
764 AuditChallenge {
765 challenge_id,
766 nonce,
767 challenged_peer_id: peer_id,
768 keys,
769 }
770 }
771
772 fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
774 PeerId::from_bytes(bytes)
775 }
776
777 #[tokio::test]
780 async fn handle_challenge_present_keys_returns_correct_digests() {
781 let (storage, _temp) = create_test_storage().await;
782
783 let content_a = b"chunk alpha";
785 let addr_a = LmdbStorage::compute_address(content_a);
786 storage.put(&addr_a, content_a).await.expect("put a");
787
788 let content_b = b"chunk beta";
789 let addr_b = LmdbStorage::compute_address(content_b);
790 storage.put(&addr_b, content_b).await.expect("put b");
791
792 let nonce = [0xAA; 32];
793 let peer_id = [0xBB; 32];
794 let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
795 let self_id = peer_id_from_bytes(peer_id);
796
797 let response =
798 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
799
800 match response {
801 AuditResponse::Digests {
802 challenge_id,
803 digests,
804 } => {
805 assert_eq!(challenge_id, 42);
806 assert_eq!(digests.len(), 2);
807
808 let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
809 let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
810 assert_eq!(digests[0], expected_a);
811 assert_eq!(digests[1], expected_b);
812 }
813 AuditResponse::Bootstrapping { .. } => {
814 panic!("expected Digests, got Bootstrapping");
815 }
816 AuditResponse::Rejected { .. } => {
817 panic!("Unexpected Rejected response");
818 }
819 }
820 }
821
822 #[tokio::test]
825 async fn handle_challenge_absent_keys_returns_sentinel() {
826 let (storage, _temp) = create_test_storage().await;
827
828 let absent_key = [0xFF; 32];
829 let nonce = [0x11; 32];
830 let peer_id = [0x22; 32];
831 let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
832 let self_id = peer_id_from_bytes(peer_id);
833
834 let response =
835 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
836
837 match response {
838 AuditResponse::Digests {
839 challenge_id,
840 digests,
841 } => {
842 assert_eq!(challenge_id, 99);
843 assert_eq!(digests.len(), 1);
844 assert_eq!(
845 digests[0], ABSENT_KEY_DIGEST,
846 "absent key should produce sentinel digest"
847 );
848 }
849 AuditResponse::Bootstrapping { .. } => {
850 panic!("expected Digests, got Bootstrapping");
851 }
852 AuditResponse::Rejected { .. } => {
853 panic!("Unexpected Rejected response");
854 }
855 }
856 }
857
858 #[tokio::test]
861 async fn handle_challenge_mixed_present_and_absent() {
862 let (storage, _temp) = create_test_storage().await;
863
864 let content = b"present chunk";
865 let addr_present = LmdbStorage::compute_address(content);
866 storage.put(&addr_present, content).await.expect("put");
867
868 let addr_absent = [0xDE; 32];
869 let nonce = [0x33; 32];
870 let peer_id = [0x44; 32];
871 let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
872 let self_id = peer_id_from_bytes(peer_id);
873
874 let response =
875 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
876
877 match response {
878 AuditResponse::Digests { digests, .. } => {
879 assert_eq!(digests.len(), 2);
880
881 let expected_present =
882 compute_audit_digest(&nonce, &peer_id, &addr_present, content);
883 assert_eq!(digests[0], expected_present);
884 assert_eq!(
885 digests[1], ABSENT_KEY_DIGEST,
886 "absent key should be sentinel"
887 );
888 }
889 AuditResponse::Bootstrapping { .. } => {
890 panic!("expected Digests, got Bootstrapping");
891 }
892 AuditResponse::Rejected { .. } => {
893 panic!("Unexpected Rejected response");
894 }
895 }
896 }
897
898 #[tokio::test]
901 async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
902 let (storage, _temp) = create_test_storage().await;
903
904 let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
905 let self_id = peer_id_from_bytes([0x01; 32]);
906
907 let response =
908 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
909
910 match response {
911 AuditResponse::Bootstrapping { challenge_id } => {
912 assert_eq!(challenge_id, 55);
913 }
914 AuditResponse::Digests { .. } => {
915 panic!("expected Bootstrapping, got Digests");
916 }
917 AuditResponse::Rejected { .. } => {
918 panic!("Unexpected Rejected response");
919 }
920 }
921 }
922
923 #[tokio::test]
926 async fn handle_challenge_empty_keys_returns_empty_digests() {
927 let (storage, _temp) = create_test_storage().await;
928
929 let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
930 let self_id = peer_id_from_bytes([0x20; 32]);
931
932 let response =
933 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
934
935 match response {
936 AuditResponse::Digests {
937 challenge_id,
938 digests,
939 } => {
940 assert_eq!(challenge_id, 100);
941 assert!(
942 digests.is_empty(),
943 "empty key list should yield empty digests"
944 );
945 }
946 AuditResponse::Bootstrapping { .. } => {
947 panic!("expected Digests, got Bootstrapping");
948 }
949 AuditResponse::Rejected { .. } => {
950 panic!("Unexpected Rejected response");
951 }
952 }
953 }
954
955 #[test]
958 fn digest_verification_matching() {
959 let nonce = [0x01; 32];
960 let peer_id = [0x02; 32];
961 let key: XorName = [0x03; 32];
962 let data = b"correct data";
963
964 let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
965 let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
966
967 assert_eq!(
968 expected, recomputed,
969 "same inputs must produce identical digests"
970 );
971 assert_ne!(
972 expected, ABSENT_KEY_DIGEST,
973 "real digest must not be sentinel"
974 );
975 }
976
977 #[test]
980 fn digest_verification_mismatching_data() {
981 let nonce = [0x01; 32];
982 let peer_id = [0x02; 32];
983 let key: XorName = [0x03; 32];
984
985 let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
986 let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
987
988 assert_ne!(
989 digest_a, digest_b,
990 "different data must produce different digests"
991 );
992 }
993
994 #[test]
995 fn digest_verification_mismatching_nonce() {
996 let peer_id = [0x02; 32];
997 let key: XorName = [0x03; 32];
998 let data = b"same data";
999
1000 let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
1001 let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
1002
1003 assert_ne!(
1004 digest_a, digest_b,
1005 "different nonces must produce different digests"
1006 );
1007 }
1008
1009 #[test]
1010 fn digest_verification_mismatching_peer() {
1011 let nonce = [0x01; 32];
1012 let key: XorName = [0x03; 32];
1013 let data = b"same data";
1014
1015 let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
1016 let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
1017
1018 assert_ne!(
1019 digest_a, digest_b,
1020 "different peers must produce different digests"
1021 );
1022 }
1023
1024 #[test]
1025 fn digest_verification_mismatching_key() {
1026 let nonce = [0x01; 32];
1027 let peer_id = [0x02; 32];
1028 let data = b"same data";
1029
1030 let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
1031 let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
1032
1033 assert_ne!(
1034 digest_a, digest_b,
1035 "different keys must produce different digests"
1036 );
1037 }
1038
1039 #[test]
1042 fn absent_sentinel_is_all_zeros() {
1043 assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
1044 }
1045
1046 #[tokio::test]
1049 async fn bootstrapping_skips_digest_computation() {
1050 let (storage, _temp) = create_test_storage().await;
1051
1052 let content = b"stored but bootstrapping";
1053 let addr = LmdbStorage::compute_address(content);
1054 storage.put(&addr, content).await.expect("put");
1055
1056 let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
1057 let self_id = peer_id_from_bytes([0xDD; 32]);
1058
1059 let response =
1060 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1061
1062 assert!(
1063 matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
1064 "bootstrapping node must not compute digests"
1065 );
1066 }
1067
1068 #[tokio::test]
1071 async fn scenario_19_partial_failure_mixed_responsibility() {
1072 let (storage, _temp) = create_test_storage().await;
1079 let nonce = [0x42u8; 32];
1080 let peer_id = [0xAA; 32];
1081
1082 let content_k1 = b"key one data";
1084 let addr_k1 = LmdbStorage::compute_address(content_k1);
1085 storage.put(&addr_k1, content_k1).await.unwrap();
1086
1087 let content_k2 = b"key two data";
1088 let addr_k2 = LmdbStorage::compute_address(content_k2);
1089 storage.put(&addr_k2, content_k2).await.unwrap();
1090
1091 let addr_k3 = [0xFF; 32]; let challenge = AuditChallenge {
1094 challenge_id: 100,
1095 nonce,
1096 challenged_peer_id: peer_id,
1097 keys: vec![addr_k1, addr_k2, addr_k3],
1098 };
1099 let self_id = peer_id_from_bytes(peer_id);
1100
1101 let response =
1102 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1103
1104 match response {
1105 AuditResponse::Digests { digests, .. } => {
1106 assert_eq!(digests.len(), 3);
1107
1108 let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
1110 assert_eq!(digests[0], expected_k1);
1111
1112 let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
1114 assert_eq!(digests[1], expected_k2);
1115
1116 assert_eq!(digests[2], ABSENT_KEY_DIGEST);
1118 }
1119 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
1120 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1121 }
1122 }
1123
1124 #[tokio::test]
1127 async fn scenario_54_all_digests_pass() {
1128 let (storage, _temp) = create_test_storage().await;
1131 let nonce = [0x10; 32];
1132 let peer_id = [0x20; 32];
1133
1134 let c1 = b"chunk alpha";
1135 let c2 = b"chunk beta";
1136 let c3 = b"chunk gamma";
1137 let a1 = LmdbStorage::compute_address(c1);
1138 let a2 = LmdbStorage::compute_address(c2);
1139 let a3 = LmdbStorage::compute_address(c3);
1140 storage.put(&a1, c1).await.unwrap();
1141 storage.put(&a2, c2).await.unwrap();
1142 storage.put(&a3, c3).await.unwrap();
1143
1144 let challenge = AuditChallenge {
1145 challenge_id: 200,
1146 nonce,
1147 challenged_peer_id: peer_id,
1148 keys: vec![a1, a2, a3],
1149 };
1150 let self_id = peer_id_from_bytes(peer_id);
1151
1152 let response =
1153 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1154 match response {
1155 AuditResponse::Digests { digests, .. } => {
1156 assert_eq!(digests.len(), 3);
1157 for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
1158 .iter()
1159 .enumerate()
1160 {
1161 let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
1162 assert_eq!(digests[i], expected, "Key {i} digest should match");
1163 }
1164 }
1165 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1166 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1167 }
1168 }
1169
1170 #[tokio::test]
1183 async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1184 let (storage, _temp) = create_test_storage().await;
1185 let nonce = [0x55; 32];
1186 let peer_id = [0x55; 32];
1187
1188 let c1 = b"scenario 55 key one";
1190 let c2 = b"scenario 55 key two";
1191 let k1 = LmdbStorage::compute_address(c1);
1192 let k2 = LmdbStorage::compute_address(c2);
1193 storage.put(&k1, c1).await.expect("put k1");
1194 storage.put(&k2, c2).await.expect("put k2");
1195
1196 let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1198 let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1199
1200 let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1202 let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1203 assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1204 assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1205
1206 let keys = [k1, k2];
1208 let expected = [expected_d1, expected_d2];
1209 let received = [wrong_d1, wrong_d2];
1210
1211 let mut failed_keys = Vec::new();
1212 for i in 0..keys.len() {
1213 if received[i] != expected[i] {
1214 failed_keys.push(keys[i]);
1215 }
1216 }
1217 assert_eq!(
1218 failed_keys.len(),
1219 2,
1220 "Both keys should be identified as digest mismatches"
1221 );
1222
1223 let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1226 let confirmed_failures: Vec<XorName> = failed_keys
1227 .into_iter()
1228 .filter(|k| confirmed_responsible_keys.contains(k))
1229 .collect();
1230
1231 assert!(
1233 confirmed_failures.is_empty(),
1234 "With no confirmed responsibility, failure set must be empty — \
1235 no AuditFailure evidence should be emitted"
1236 );
1237
1238 let peer = PeerId::from_bytes(peer_id);
1241 let evidence = FailureEvidence::AuditFailure {
1242 challenge_id: 5500,
1243 challenged_peer: peer,
1244 confirmed_failed_keys: confirmed_failures,
1245 summary: AuditFailureSummary::default(),
1246 reason: AuditFailureReason::DigestMismatch,
1247 };
1248 if let FailureEvidence::AuditFailure {
1249 confirmed_failed_keys,
1250 ..
1251 } = evidence
1252 {
1253 assert!(
1254 confirmed_failed_keys.is_empty(),
1255 "Evidence with empty failure set should not trigger a trust penalty"
1256 );
1257 }
1258 }
1259
1260 #[test]
1263 fn scenario_56_repair_opportunity_filters_never_synced() {
1264 let never_synced = PeerSyncRecord {
1268 last_sync: None,
1269 cycles_since_sync: 5,
1270 };
1271 assert!(!never_synced.has_repair_opportunity());
1272
1273 let synced_no_cycle = PeerSyncRecord {
1274 last_sync: Some(Instant::now()),
1275 cycles_since_sync: 0,
1276 };
1277 assert!(!synced_no_cycle.has_repair_opportunity());
1278
1279 let synced_with_cycle = PeerSyncRecord {
1280 last_sync: Some(Instant::now()),
1281 cycles_since_sync: 1,
1282 };
1283 assert!(synced_with_cycle.has_repair_opportunity());
1284 }
1285
1286 #[test]
1287 fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1288 let peer = peer_id_from_bytes([0x57; 32]);
1289 let mut sync_history = HashMap::new();
1290 sync_history.insert(
1291 peer,
1292 PeerSyncRecord {
1293 last_sync: Some(Instant::now()),
1294 cycles_since_sync: 1,
1295 },
1296 );
1297
1298 let mut bootstrap_claims = HashMap::new();
1299 let first_seen = Instant::now()
1300 .checked_sub(
1301 crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1302 + std::time::Duration::from_secs(1),
1303 )
1304 .unwrap_or_else(Instant::now);
1305 bootstrap_claims.insert(peer, first_seen);
1306
1307 let eligible = eligible_audit_peers(&sync_history);
1308
1309 assert!(bootstrap_claims.contains_key(&peer));
1310 assert!(
1311 eligible.contains(&peer),
1312 "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1313 );
1314 }
1315
1316 #[test]
1317 fn audit_failure_summary_counts_confirmed_absent_and_mismatch_keys() {
1318 let absent_key = [0xA1; 32];
1319 let mismatch_key = [0xB2; 32];
1320 let confirmed = vec![
1321 AuditKeyFailure::absent(absent_key),
1322 AuditKeyFailure::digest_mismatch(mismatch_key),
1323 ];
1324
1325 let summary = build_audit_failure_summary(5, &confirmed);
1326
1327 assert_eq!(summary.challenged_keys, 5);
1328 assert_eq!(summary.failed_keys, 2);
1329 assert_eq!(summary.absent_keys, 1);
1330 assert_eq!(summary.digest_mismatch_keys, 1);
1331 }
1332
1333 #[test]
1334 fn audit_failure_summary_leaves_unclassified_rejections_out_of_absent_mismatch_counts() {
1335 let rejected_key = [0xC3; 32];
1336 let confirmed = vec![AuditKeyFailure::unclassified(rejected_key)];
1337
1338 let summary = build_audit_failure_summary(3, &confirmed);
1339
1340 assert_eq!(summary.challenged_keys, 3);
1341 assert_eq!(summary.failed_keys, 1);
1342 assert_eq!(summary.absent_keys, 0);
1343 assert_eq!(summary.digest_mismatch_keys, 0);
1344 }
1345
1346 #[test]
1347 fn audit_digest_failure_reason_is_key_absent_when_all_confirmed_failures_are_absent() {
1348 let failures = vec![AuditKeyFailure::absent([0xD4; 32])];
1349
1350 assert_eq!(
1351 audit_digest_failure_reason(&failures),
1352 AuditFailureReason::KeyAbsent
1353 );
1354 }
1355
1356 #[test]
1357 fn audit_digest_failure_reason_is_digest_mismatch_for_mixed_failures() {
1358 let failures = vec![
1359 AuditKeyFailure::absent([0xD5; 32]),
1360 AuditKeyFailure::digest_mismatch([0xE6; 32]),
1361 ];
1362
1363 assert_eq!(
1364 audit_digest_failure_reason(&failures),
1365 AuditFailureReason::DigestMismatch
1366 );
1367 }
1368
1369 #[test]
1370 fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() {
1371 const HINT_EPOCH: u64 = 7;
1372 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1373 const CHALLENGED_PEER_BYTE: u8 = 0xA1;
1374 const OTHER_PEER_BYTE: u8 = 0xA2;
1375 const NEW_PEER_BYTE: u8 = 0xA3;
1376 const MATURE_KEY_BYTE: u8 = 0xB1;
1377 const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
1378 const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
1379 const STABLE_CHURN_KEY_BYTE: u8 = 0xB4;
1380 const EVICTED_KEY_BYTE: u8 = 0xB5;
1381 const FRESH_HINT_KEY_BYTE: u8 = 0xB6;
1382 const XOR_NAME_LEN: usize = 32;
1383
1384 let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
1385 let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
1386 let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
1387 let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
1388 let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
1389 let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
1390 let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN];
1391 let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN];
1392 let fresh_hint_key = [FRESH_HINT_KEY_BYTE; XOR_NAME_LEN];
1393 let close_group = HashSet::from([challenged_peer, other_peer]);
1394 let changed_close_group = HashSet::from([challenged_peer, new_peer]);
1395 let evicted_close_group = HashSet::from([other_peer, new_peer]);
1396 let mut repair_proofs = RepairProofs::new();
1397 let mature_hinted_at = Instant::now();
1398 let now = mature_hinted_at
1399 .checked_add(REPAIR_HINT_MIN_AGE)
1400 .unwrap_or(mature_hinted_at);
1401
1402 assert!(repair_proofs.record_replica_hint_sent_at(
1403 challenged_peer,
1404 mature_key,
1405 &close_group,
1406 HINT_EPOCH,
1407 mature_hinted_at,
1408 ));
1409 assert!(repair_proofs.record_replica_hint_sent_at(
1410 challenged_peer,
1411 same_epoch_key,
1412 &close_group,
1413 CURRENT_EPOCH,
1414 mature_hinted_at,
1415 ));
1416 assert!(repair_proofs.record_replica_hint_sent_at(
1417 challenged_peer,
1418 stable_churn_key,
1419 &close_group,
1420 HINT_EPOCH,
1421 mature_hinted_at,
1422 ));
1423 assert!(repair_proofs.record_replica_hint_sent_at(
1424 challenged_peer,
1425 evicted_key,
1426 &close_group,
1427 HINT_EPOCH,
1428 mature_hinted_at,
1429 ));
1430 assert!(repair_proofs.record_replica_hint_sent_at(
1431 challenged_peer,
1432 fresh_hint_key,
1433 &close_group,
1434 HINT_EPOCH,
1435 now,
1436 ));
1437
1438 let sampled_key_groups = vec![
1439 (mature_key, close_group.clone()),
1440 (same_epoch_key, close_group.clone()),
1441 (missing_proof_key, close_group.clone()),
1442 (stable_churn_key, changed_close_group),
1443 (evicted_key, evicted_close_group),
1444 (fresh_hint_key, close_group.clone()),
1445 ];
1446 let peer_keys = mature_audit_keys_for_peer(
1447 &challenged_peer,
1448 sampled_key_groups,
1449 &mut repair_proofs,
1450 CURRENT_EPOCH,
1451 now,
1452 );
1453
1454 assert_eq!(
1455 peer_keys,
1456 vec![mature_key, stable_churn_key],
1457 "mature proofs for stable close-group peers should become audit keys, while same-epoch, fresh, missing, and evicted-peer proofs should not"
1458 );
1459 }
1460
1461 #[tokio::test]
1464 async fn audit_response_must_match_key_count() {
1465 let (storage, _temp) = create_test_storage().await;
1471 let nonce = [0x50; 32];
1472 let peer_id = [0x60; 32];
1473
1474 let content = b"single chunk";
1476 let addr = LmdbStorage::compute_address(content);
1477 storage.put(&addr, content).await.unwrap();
1478
1479 let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1481 let mut keys = vec![addr];
1482 keys.extend_from_slice(&absent_keys);
1483
1484 let key_count = keys.len();
1485 let challenge = make_challenge(300, nonce, peer_id, keys);
1486 let self_id = peer_id_from_bytes(peer_id);
1487
1488 let response =
1489 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1490 match response {
1491 AuditResponse::Digests { digests, .. } => {
1492 assert_eq!(
1493 digests.len(),
1494 key_count,
1495 "must produce exactly one digest per challenged key"
1496 );
1497 }
1498 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1499 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1500 }
1501 }
1502
1503 #[test]
1506 fn audit_digest_uses_full_record_bytes() {
1507 let nonce = [1u8; 32];
1509 let peer = [2u8; 32];
1510 let key = [3u8; 32];
1511
1512 let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1513 let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1514 assert_ne!(
1515 d1, d2,
1516 "Different record bytes must produce different digests"
1517 );
1518 }
1519
1520 #[tokio::test]
1531 async fn scenario_29_audit_start_gate_during_bootstrap() {
1532 let (storage, _temp) = create_test_storage().await;
1533
1534 let content = b"should not be audited during bootstrap";
1536 let addr = LmdbStorage::compute_address(content);
1537 storage.put(&addr, content).await.expect("put");
1538
1539 let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1540 let self_id = peer_id_from_bytes([0x29; 32]);
1541
1542 let response =
1544 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1545 assert!(
1546 matches!(
1547 response,
1548 AuditResponse::Bootstrapping { challenge_id: 2900 }
1549 ),
1550 "bootstrapping node must not compute digests — audit start gate"
1551 );
1552
1553 let response =
1555 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1556 assert!(
1557 matches!(response, AuditResponse::Digests { .. }),
1558 "drained node should compute digests normally"
1559 );
1560 }
1561
1562 #[test]
1572 fn scenario_30_audit_peer_selection_from_sampled_keys() {
1573 assert_eq!(
1575 ReplicationConfig::audit_sample_count(100),
1576 10,
1577 "sample count should scale with sqrt(total_keys)"
1578 );
1579
1580 assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1581
1582 assert_eq!(
1583 ReplicationConfig::audit_sample_count(10_000),
1584 100,
1585 "sqrt(10000) = 100"
1586 );
1587
1588 let never = PeerSyncRecord {
1591 last_sync: None,
1592 cycles_since_sync: 10,
1593 };
1594 assert!(!never.has_repair_opportunity());
1595
1596 let too_soon = PeerSyncRecord {
1598 last_sync: Some(Instant::now()),
1599 cycles_since_sync: 0,
1600 };
1601 assert!(!too_soon.has_repair_opportunity());
1602
1603 let eligible = PeerSyncRecord {
1605 last_sync: Some(Instant::now()),
1606 cycles_since_sync: 2,
1607 };
1608 assert!(eligible.has_repair_opportunity());
1609 }
1610
1611 #[tokio::test]
1620 async fn scenario_32_dynamic_challenge_size() {
1621 let (storage, _temp) = create_test_storage().await;
1622
1623 let mut addrs = Vec::new();
1625 for i in 0u8..5 {
1626 let content = format!("dynamic challenge key {i}");
1627 let addr = LmdbStorage::compute_address(content.as_bytes());
1628 storage.put(&addr, content.as_bytes()).await.expect("put");
1629 addrs.push(addr);
1630 }
1631
1632 let nonce = [0x32; 32];
1633 let peer_id = [0x32; 32];
1634 let self_id = peer_id_from_bytes(peer_id);
1635
1636 let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1638 let resp1 =
1639 handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1640 .await;
1641 if let AuditResponse::Digests { digests, .. } = resp1 {
1642 assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1643 }
1644
1645 let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1647 let resp3 =
1648 handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1649 .await;
1650 if let AuditResponse::Digests { digests, .. } = resp3 {
1651 assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1652 }
1653
1654 let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1656 let resp5 =
1657 handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1658 .await;
1659 if let AuditResponse::Digests { digests, .. } = resp5 {
1660 assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1661 }
1662
1663 let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1665 let resp0 =
1666 handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1667 .await;
1668 if let AuditResponse::Digests { digests, .. } = resp0 {
1669 assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1670 }
1671 }
1672
1673 #[tokio::test]
1679 async fn scenario_47_bootstrap_claim_grace_period_audit() {
1680 let (storage, _temp) = create_test_storage().await;
1681
1682 let content = b"bootstrap grace test";
1684 let addr = LmdbStorage::compute_address(content);
1685 storage.put(&addr, content).await.expect("put");
1686
1687 let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1688 let self_id = peer_id_from_bytes([0x47; 32]);
1689
1690 let response =
1692 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1693 let challenge_id = match response {
1694 AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1695 AuditResponse::Digests { .. } => {
1696 panic!("Expected Bootstrapping response during grace period")
1697 }
1698 AuditResponse::Rejected { .. } => {
1699 panic!("Unexpected Rejected response")
1700 }
1701 };
1702 assert_eq!(challenge_id, 4700);
1703
1704 let peer = PeerId::from_bytes([0x47; 32]);
1706 let mut state = NeighborSyncState::new_cycle(vec![peer]);
1707 let now = Instant::now();
1708 let observed = state.observe_bootstrap_claim(
1709 peer,
1710 now,
1711 crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1712 );
1713
1714 assert_eq!(
1715 observed,
1716 BootstrapClaimObservation::WithinGrace { first_seen: now }
1717 );
1718 assert!(
1719 state.bootstrap_claims.contains_key(&peer),
1720 "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1721 );
1722 assert!(
1723 state.bootstrap_claim_history.contains_key(&peer),
1724 "Bootstrap claim history should remember that the grace window was used"
1725 );
1726 }
1727
1728 #[tokio::test]
1739 async fn scenario_53_partial_failure_mixed_responsibility() {
1740 let (storage, _temp) = create_test_storage().await;
1741 let nonce = [0x53; 32];
1742 let peer_id = [0x53; 32];
1743
1744 let c1 = b"scenario 53 key one";
1746 let c2 = b"scenario 53 key two";
1747 let c3 = b"scenario 53 key three";
1748 let k1 = LmdbStorage::compute_address(c1);
1749 let k2 = LmdbStorage::compute_address(c2);
1750 let k3 = LmdbStorage::compute_address(c3);
1751 storage.put(&k1, c1).await.expect("put k1");
1752 storage.put(&k2, c2).await.expect("put k2");
1753 storage.put(&k3, c3).await.expect("put k3");
1754
1755 let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1757 let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1758 let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1759
1760 let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1762 let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1763
1764 assert_eq!(d1_expected, d1_expected, "K1 should match");
1765 assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1766 assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1767
1768 let digests = [d1_expected, d2_wrong, d3_wrong];
1770 let keys = [k1, k2, k3];
1771 let contents: [&[u8]; 3] = [c1, c2, c3];
1772
1773 let mut failed_keys = Vec::new();
1774 for (i, key) in keys.iter().enumerate() {
1775 if digests[i] == ABSENT_KEY_DIGEST {
1776 failed_keys.push(*key);
1777 continue;
1778 }
1779 let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1780 if digests[i] != expected {
1781 failed_keys.push(*key);
1782 }
1783 }
1784
1785 assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1786 assert!(failed_keys.contains(&k2));
1787 assert!(failed_keys.contains(&k3));
1788 assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1789
1790 let responsible_for_k2 = true;
1793 let responsible_for_k3 = false;
1794 let mut confirmed = Vec::new();
1795 for key in &failed_keys {
1796 let is_responsible = if *key == k2 {
1797 responsible_for_k2
1798 } else {
1799 responsible_for_k3
1800 };
1801 if is_responsible {
1802 confirmed.push(*key);
1803 }
1804 }
1805
1806 assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1807
1808 let challenged_peer = PeerId::from_bytes(peer_id);
1810 let evidence = FailureEvidence::AuditFailure {
1811 challenge_id: 5300,
1812 challenged_peer,
1813 confirmed_failed_keys: confirmed,
1814 summary: AuditFailureSummary::default(),
1815 reason: AuditFailureReason::DigestMismatch,
1816 };
1817
1818 match evidence {
1819 FailureEvidence::AuditFailure {
1820 confirmed_failed_keys,
1821 ..
1822 } => {
1823 assert_eq!(
1824 confirmed_failed_keys.len(),
1825 1,
1826 "Only K2 should generate evidence"
1827 );
1828 assert_eq!(confirmed_failed_keys[0], k2);
1829 }
1830 _ => panic!("Expected AuditFailure evidence"),
1831 }
1832 }
1833}