1use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::logging::{debug, info, warn};
10use rand::seq::SliceRandom;
11use rand::Rng;
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
15use crate::replication::protocol::{
16 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
17 ReplicationMessageBody, ABSENT_KEY_DIGEST,
18};
19use crate::replication::types::{AuditFailureReason, FailureEvidence, PeerSyncRecord};
20use crate::storage::LmdbStorage;
21use saorsa_core::identity::PeerId;
22use saorsa_core::P2PNode;
23
24#[derive(Debug)]
30pub enum AuditTickResult {
31 Passed {
33 challenged_peer: PeerId,
35 keys_checked: usize,
37 },
38 Failed {
40 evidence: FailureEvidence,
42 },
43 BootstrapClaim {
45 peer: PeerId,
47 },
48 Idle,
50 InsufficientKeys,
52}
53
54#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
66pub async fn audit_tick(
67 p2p_node: &Arc<P2PNode>,
68 storage: &Arc<LmdbStorage>,
69 config: &ReplicationConfig,
70 sync_history: &HashMap<PeerId, PeerSyncRecord>,
71 bootstrap_claims: &HashMap<PeerId, Instant>,
72 is_bootstrapping: bool,
73) -> AuditTickResult {
74 if is_bootstrapping {
76 return AuditTickResult::Idle;
77 }
78
79 let dht = p2p_node.dht_manager();
80 let now = Instant::now();
81
82 let eligible_peers: Vec<PeerId> = sync_history
87 .iter()
88 .filter(|(_, record)| record.has_repair_opportunity())
89 .filter(|(peer, _)| {
90 bootstrap_claims.get(peer).map_or(true, |first_seen| {
91 now.duration_since(*first_seen) <= config.bootstrap_claim_grace_period
92 })
93 })
94 .map(|(peer, _)| *peer)
95 .collect();
96
97 if eligible_peers.is_empty() {
98 return AuditTickResult::Idle;
99 }
100
101 let (challenged_peer, nonce, challenge_id) = {
102 let mut rng = rand::thread_rng();
103 let selected = match eligible_peers.choose(&mut rng) {
104 Some(p) => *p,
105 None => return AuditTickResult::Idle,
106 };
107 let n: [u8; 32] = rng.gen();
108 let c: u64 = rng.gen();
109 (selected, n, c)
110 };
111
112 let all_keys = match storage.all_keys().await {
115 Ok(keys) => keys,
116 Err(e) => {
117 warn!("Audit: failed to read local keys: {e}");
118 return AuditTickResult::Idle;
119 }
120 };
121
122 if all_keys.is_empty() {
123 return AuditTickResult::Idle;
124 }
125
126 let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
127 let sampled_keys: Vec<XorName> = {
128 let mut rng = rand::thread_rng();
129 all_keys
130 .choose_multiple(&mut rng, sample_count)
131 .copied()
132 .collect()
133 };
134
135 let mut peer_keys = Vec::new();
137 for key in &sampled_keys {
138 let closest = dht
139 .find_closest_nodes_local_with_self(key, config.close_group_size)
140 .await;
141 if closest.iter().any(|n| n.peer_id == challenged_peer) {
142 peer_keys.push(*key);
143 }
144 }
145
146 if peer_keys.is_empty() {
147 return AuditTickResult::Idle;
148 }
149
150 let challenge = AuditChallenge {
156 challenge_id,
157 nonce,
158 challenged_peer_id: *challenged_peer.as_bytes(),
159 keys: peer_keys.clone(),
160 };
161
162 let msg = ReplicationMessage {
163 request_id: challenge_id,
164 body: ReplicationMessageBody::AuditChallenge(challenge),
165 };
166
167 let encoded = match msg.encode() {
168 Ok(data) => data,
169 Err(e) => {
170 warn!("Audit: failed to encode challenge: {e}");
171 return AuditTickResult::Idle;
172 }
173 };
174
175 let response = match p2p_node
176 .send_request(
177 &challenged_peer,
178 REPLICATION_PROTOCOL_ID,
179 encoded,
180 config.audit_response_timeout(peer_keys.len()),
181 )
182 .await
183 {
184 Ok(resp) => resp,
185 Err(e) => {
186 debug!("Audit: challenge to {challenged_peer} failed: {e}");
187 return handle_audit_timeout(
189 &challenged_peer,
190 challenge_id,
191 &peer_keys,
192 p2p_node,
193 config,
194 )
195 .await;
196 }
197 };
198
199 let resp_msg = match ReplicationMessage::decode(&response.data) {
201 Ok(m) => m,
202 Err(e) => {
203 warn!("Audit: failed to decode response from {challenged_peer}: {e}");
204 return handle_audit_failure(
205 &challenged_peer,
206 challenge_id,
207 &peer_keys,
208 AuditFailureReason::MalformedResponse,
209 p2p_node,
210 config,
211 )
212 .await;
213 }
214 };
215
216 match resp_msg.body {
217 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
218 challenge_id: resp_id,
219 }) => {
220 if resp_id != challenge_id {
221 warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
222 return handle_audit_failure(
223 &challenged_peer,
224 challenge_id,
225 &peer_keys,
226 AuditFailureReason::MalformedResponse,
227 p2p_node,
228 config,
229 )
230 .await;
231 }
232 AuditTickResult::BootstrapClaim {
234 peer: challenged_peer,
235 }
236 }
237 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
238 challenge_id: resp_id,
239 digests,
240 }) => {
241 if resp_id != challenge_id {
242 warn!("Audit: challenge ID mismatch from {challenged_peer}");
243 return handle_audit_failure(
244 &challenged_peer,
245 challenge_id,
246 &peer_keys,
247 AuditFailureReason::MalformedResponse,
248 p2p_node,
249 config,
250 )
251 .await;
252 }
253 verify_digests(
254 &challenged_peer,
255 challenge_id,
256 &nonce,
257 &peer_keys,
258 &digests,
259 storage,
260 p2p_node,
261 config,
262 )
263 .await
264 }
265 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
266 challenge_id: resp_id,
267 reason,
268 }) => {
269 if resp_id != challenge_id {
270 warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
271 return handle_audit_failure(
272 &challenged_peer,
273 challenge_id,
274 &peer_keys,
275 AuditFailureReason::MalformedResponse,
276 p2p_node,
277 config,
278 )
279 .await;
280 }
281 warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
282 handle_audit_failure(
283 &challenged_peer,
284 challenge_id,
285 &peer_keys,
286 AuditFailureReason::Rejected,
287 p2p_node,
288 config,
289 )
290 .await
291 }
292 _ => {
293 warn!("Audit: unexpected response type from {challenged_peer}");
294 handle_audit_failure(
295 &challenged_peer,
296 challenge_id,
297 &peer_keys,
298 AuditFailureReason::MalformedResponse,
299 p2p_node,
300 config,
301 )
302 .await
303 }
304 }
305}
306
307#[allow(clippy::too_many_arguments)]
313async fn verify_digests(
314 challenged_peer: &PeerId,
315 challenge_id: u64,
316 nonce: &[u8; 32],
317 keys: &[XorName],
318 digests: &[[u8; 32]],
319 storage: &Arc<LmdbStorage>,
320 p2p_node: &Arc<P2PNode>,
321 config: &ReplicationConfig,
322) -> AuditTickResult {
323 if digests.len() != keys.len() {
325 warn!(
326 "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
327 digests.len(),
328 keys.len()
329 );
330 return handle_audit_failure(
331 challenged_peer,
332 challenge_id,
333 keys,
334 AuditFailureReason::MalformedResponse,
335 p2p_node,
336 config,
337 )
338 .await;
339 }
340
341 let challenged_peer_bytes = challenged_peer.as_bytes();
342 let mut failed_keys = Vec::new();
343
344 for (i, key) in keys.iter().enumerate() {
345 let received_digest = &digests[i];
346
347 if *received_digest == ABSENT_KEY_DIGEST {
349 failed_keys.push(*key);
350 continue;
351 }
352
353 let local_bytes = match storage.get_raw(key).await {
355 Ok(Some(bytes)) => bytes,
356 Ok(None) => {
357 warn!(
359 "Audit: local key {} disappeared during audit",
360 hex::encode(key)
361 );
362 continue;
363 }
364 Err(e) => {
365 warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
366 continue;
367 }
368 };
369
370 let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
371 if *received_digest != expected {
372 failed_keys.push(*key);
373 }
374 }
375
376 if failed_keys.is_empty() {
377 info!(
378 "Audit: peer {challenged_peer} passed (all {} keys verified)",
379 keys.len()
380 );
381 return AuditTickResult::Passed {
382 challenged_peer: *challenged_peer,
383 keys_checked: keys.len(),
384 };
385 }
386
387 handle_audit_failure(
389 challenged_peer,
390 challenge_id,
391 &failed_keys,
392 AuditFailureReason::DigestMismatch,
393 p2p_node,
394 config,
395 )
396 .await
397}
398
399async fn handle_audit_failure(
405 challenged_peer: &PeerId,
406 challenge_id: u64,
407 failed_keys: &[XorName],
408 reason: AuditFailureReason,
409 p2p_node: &Arc<P2PNode>,
410 config: &ReplicationConfig,
411) -> AuditTickResult {
412 let dht = p2p_node.dht_manager();
413 let mut confirmed_failures = Vec::new();
414
415 for key in failed_keys {
417 let closest = dht
418 .find_closest_nodes_local_with_self(key, config.close_group_size)
419 .await;
420 if closest.iter().any(|n| n.peer_id == *challenged_peer) {
421 confirmed_failures.push(*key);
422 } else {
423 debug!(
424 "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
425 hex::encode(key)
426 );
427 }
428 }
429
430 if confirmed_failures.is_empty() {
435 info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
436 return AuditTickResult::Idle;
437 }
438
439 let evidence = FailureEvidence::AuditFailure {
441 challenge_id,
442 challenged_peer: *challenged_peer,
443 confirmed_failed_keys: confirmed_failures,
444 reason,
445 };
446
447 AuditTickResult::Failed { evidence }
448}
449
450async fn handle_audit_timeout(
452 challenged_peer: &PeerId,
453 challenge_id: u64,
454 keys: &[XorName],
455 p2p_node: &Arc<P2PNode>,
456 config: &ReplicationConfig,
457) -> AuditTickResult {
458 handle_audit_failure(
459 challenged_peer,
460 challenge_id,
461 keys,
462 AuditFailureReason::Timeout,
463 p2p_node,
464 config,
465 )
466 .await
467}
468
469pub async fn handle_audit_challenge(
480 challenge: &AuditChallenge,
481 storage: &LmdbStorage,
482 self_peer_id: &PeerId,
483 is_bootstrapping: bool,
484 stored_chunks: usize,
485) -> AuditResponse {
486 if is_bootstrapping {
487 return AuditResponse::Bootstrapping {
488 challenge_id: challenge.challenge_id,
489 };
490 }
491
492 if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
493 warn!(
494 "Audit challenge targeted wrong peer: expected {}, got {}",
495 hex::encode(self_peer_id.as_bytes()),
496 hex::encode(challenge.challenged_peer_id),
497 );
498 return AuditResponse::Rejected {
499 challenge_id: challenge.challenge_id,
500 reason: "challenged_peer_id does not match this node".to_string(),
501 };
502 }
503
504 let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
505 if challenge.keys.len() > max_keys {
506 warn!(
507 "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
508 (stored_chunks={stored_chunks})",
509 challenge.keys.len(),
510 );
511 return AuditResponse::Rejected {
512 challenge_id: challenge.challenge_id,
513 reason: format!(
514 "challenge contains {} keys, limit is {max_keys}",
515 challenge.keys.len()
516 ),
517 };
518 }
519
520 let mut digests = Vec::with_capacity(challenge.keys.len());
521
522 for key in &challenge.keys {
523 match storage.get_raw(key).await {
524 Ok(Some(data)) => {
525 let digest = compute_audit_digest(
526 &challenge.nonce,
527 &challenge.challenged_peer_id,
528 key,
529 &data,
530 );
531 digests.push(digest);
532 }
533 Ok(None) => {
534 digests.push(ABSENT_KEY_DIGEST);
535 }
536 Err(e) => {
537 warn!(
538 "Audit responder: failed to read key {}: {e}",
539 hex::encode(key)
540 );
541 digests.push(ABSENT_KEY_DIGEST);
542 }
543 }
544 }
545
546 AuditResponse::Digests {
547 challenge_id: challenge.challenge_id,
548 digests,
549 }
550}
551
552#[cfg(test)]
557#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
558mod tests {
559 use super::*;
560 use crate::replication::protocol::compute_audit_digest;
561 use crate::replication::types::NeighborSyncState;
562 use crate::storage::LmdbStorageConfig;
563 use tempfile::TempDir;
564
565 const TEST_STORED_CHUNKS: usize = 1_000_000;
568
569 async fn create_test_storage() -> (LmdbStorage, TempDir) {
571 let temp_dir = TempDir::new().expect("create temp dir");
572 let config = LmdbStorageConfig {
573 root_dir: temp_dir.path().to_path_buf(),
574 verify_on_read: false,
575 max_map_size: 0,
576 disk_reserve: 0,
577 };
578 let storage = LmdbStorage::new(config).await.expect("create storage");
579 (storage, temp_dir)
580 }
581
582 fn make_challenge(
584 challenge_id: u64,
585 nonce: [u8; 32],
586 peer_id: [u8; 32],
587 keys: Vec<XorName>,
588 ) -> AuditChallenge {
589 AuditChallenge {
590 challenge_id,
591 nonce,
592 challenged_peer_id: peer_id,
593 keys,
594 }
595 }
596
597 fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
599 PeerId::from_bytes(bytes)
600 }
601
602 #[tokio::test]
605 async fn handle_challenge_present_keys_returns_correct_digests() {
606 let (storage, _temp) = create_test_storage().await;
607
608 let content_a = b"chunk alpha";
610 let addr_a = LmdbStorage::compute_address(content_a);
611 storage.put(&addr_a, content_a).await.expect("put a");
612
613 let content_b = b"chunk beta";
614 let addr_b = LmdbStorage::compute_address(content_b);
615 storage.put(&addr_b, content_b).await.expect("put b");
616
617 let nonce = [0xAA; 32];
618 let peer_id = [0xBB; 32];
619 let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
620 let self_id = peer_id_from_bytes(peer_id);
621
622 let response =
623 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
624
625 match response {
626 AuditResponse::Digests {
627 challenge_id,
628 digests,
629 } => {
630 assert_eq!(challenge_id, 42);
631 assert_eq!(digests.len(), 2);
632
633 let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
634 let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
635 assert_eq!(digests[0], expected_a);
636 assert_eq!(digests[1], expected_b);
637 }
638 AuditResponse::Bootstrapping { .. } => {
639 panic!("expected Digests, got Bootstrapping");
640 }
641 AuditResponse::Rejected { .. } => {
642 panic!("Unexpected Rejected response");
643 }
644 }
645 }
646
647 #[tokio::test]
650 async fn handle_challenge_absent_keys_returns_sentinel() {
651 let (storage, _temp) = create_test_storage().await;
652
653 let absent_key = [0xFF; 32];
654 let nonce = [0x11; 32];
655 let peer_id = [0x22; 32];
656 let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
657 let self_id = peer_id_from_bytes(peer_id);
658
659 let response =
660 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
661
662 match response {
663 AuditResponse::Digests {
664 challenge_id,
665 digests,
666 } => {
667 assert_eq!(challenge_id, 99);
668 assert_eq!(digests.len(), 1);
669 assert_eq!(
670 digests[0], ABSENT_KEY_DIGEST,
671 "absent key should produce sentinel digest"
672 );
673 }
674 AuditResponse::Bootstrapping { .. } => {
675 panic!("expected Digests, got Bootstrapping");
676 }
677 AuditResponse::Rejected { .. } => {
678 panic!("Unexpected Rejected response");
679 }
680 }
681 }
682
683 #[tokio::test]
686 async fn handle_challenge_mixed_present_and_absent() {
687 let (storage, _temp) = create_test_storage().await;
688
689 let content = b"present chunk";
690 let addr_present = LmdbStorage::compute_address(content);
691 storage.put(&addr_present, content).await.expect("put");
692
693 let addr_absent = [0xDE; 32];
694 let nonce = [0x33; 32];
695 let peer_id = [0x44; 32];
696 let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
697 let self_id = peer_id_from_bytes(peer_id);
698
699 let response =
700 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
701
702 match response {
703 AuditResponse::Digests { digests, .. } => {
704 assert_eq!(digests.len(), 2);
705
706 let expected_present =
707 compute_audit_digest(&nonce, &peer_id, &addr_present, content);
708 assert_eq!(digests[0], expected_present);
709 assert_eq!(
710 digests[1], ABSENT_KEY_DIGEST,
711 "absent key should be sentinel"
712 );
713 }
714 AuditResponse::Bootstrapping { .. } => {
715 panic!("expected Digests, got Bootstrapping");
716 }
717 AuditResponse::Rejected { .. } => {
718 panic!("Unexpected Rejected response");
719 }
720 }
721 }
722
723 #[tokio::test]
726 async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
727 let (storage, _temp) = create_test_storage().await;
728
729 let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
730 let self_id = peer_id_from_bytes([0x01; 32]);
731
732 let response =
733 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
734
735 match response {
736 AuditResponse::Bootstrapping { challenge_id } => {
737 assert_eq!(challenge_id, 55);
738 }
739 AuditResponse::Digests { .. } => {
740 panic!("expected Bootstrapping, got Digests");
741 }
742 AuditResponse::Rejected { .. } => {
743 panic!("Unexpected Rejected response");
744 }
745 }
746 }
747
748 #[tokio::test]
751 async fn handle_challenge_empty_keys_returns_empty_digests() {
752 let (storage, _temp) = create_test_storage().await;
753
754 let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
755 let self_id = peer_id_from_bytes([0x20; 32]);
756
757 let response =
758 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
759
760 match response {
761 AuditResponse::Digests {
762 challenge_id,
763 digests,
764 } => {
765 assert_eq!(challenge_id, 100);
766 assert!(
767 digests.is_empty(),
768 "empty key list should yield empty digests"
769 );
770 }
771 AuditResponse::Bootstrapping { .. } => {
772 panic!("expected Digests, got Bootstrapping");
773 }
774 AuditResponse::Rejected { .. } => {
775 panic!("Unexpected Rejected response");
776 }
777 }
778 }
779
780 #[test]
783 fn digest_verification_matching() {
784 let nonce = [0x01; 32];
785 let peer_id = [0x02; 32];
786 let key: XorName = [0x03; 32];
787 let data = b"correct data";
788
789 let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
790 let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
791
792 assert_eq!(
793 expected, recomputed,
794 "same inputs must produce identical digests"
795 );
796 assert_ne!(
797 expected, ABSENT_KEY_DIGEST,
798 "real digest must not be sentinel"
799 );
800 }
801
802 #[test]
805 fn digest_verification_mismatching_data() {
806 let nonce = [0x01; 32];
807 let peer_id = [0x02; 32];
808 let key: XorName = [0x03; 32];
809
810 let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
811 let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
812
813 assert_ne!(
814 digest_a, digest_b,
815 "different data must produce different digests"
816 );
817 }
818
819 #[test]
820 fn digest_verification_mismatching_nonce() {
821 let peer_id = [0x02; 32];
822 let key: XorName = [0x03; 32];
823 let data = b"same data";
824
825 let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
826 let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
827
828 assert_ne!(
829 digest_a, digest_b,
830 "different nonces must produce different digests"
831 );
832 }
833
834 #[test]
835 fn digest_verification_mismatching_peer() {
836 let nonce = [0x01; 32];
837 let key: XorName = [0x03; 32];
838 let data = b"same data";
839
840 let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
841 let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
842
843 assert_ne!(
844 digest_a, digest_b,
845 "different peers must produce different digests"
846 );
847 }
848
849 #[test]
850 fn digest_verification_mismatching_key() {
851 let nonce = [0x01; 32];
852 let peer_id = [0x02; 32];
853 let data = b"same data";
854
855 let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
856 let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
857
858 assert_ne!(
859 digest_a, digest_b,
860 "different keys must produce different digests"
861 );
862 }
863
864 #[test]
867 fn absent_sentinel_is_all_zeros() {
868 assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
869 }
870
871 #[tokio::test]
874 async fn bootstrapping_skips_digest_computation() {
875 let (storage, _temp) = create_test_storage().await;
876
877 let content = b"stored but bootstrapping";
878 let addr = LmdbStorage::compute_address(content);
879 storage.put(&addr, content).await.expect("put");
880
881 let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
882 let self_id = peer_id_from_bytes([0xDD; 32]);
883
884 let response =
885 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
886
887 assert!(
888 matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
889 "bootstrapping node must not compute digests"
890 );
891 }
892
893 #[tokio::test]
896 async fn scenario_19_partial_failure_mixed_responsibility() {
897 let (storage, _temp) = create_test_storage().await;
904 let nonce = [0x42u8; 32];
905 let peer_id = [0xAA; 32];
906
907 let content_k1 = b"key one data";
909 let addr_k1 = LmdbStorage::compute_address(content_k1);
910 storage.put(&addr_k1, content_k1).await.unwrap();
911
912 let content_k2 = b"key two data";
913 let addr_k2 = LmdbStorage::compute_address(content_k2);
914 storage.put(&addr_k2, content_k2).await.unwrap();
915
916 let addr_k3 = [0xFF; 32]; let challenge = AuditChallenge {
919 challenge_id: 100,
920 nonce,
921 challenged_peer_id: peer_id,
922 keys: vec![addr_k1, addr_k2, addr_k3],
923 };
924 let self_id = peer_id_from_bytes(peer_id);
925
926 let response =
927 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
928
929 match response {
930 AuditResponse::Digests { digests, .. } => {
931 assert_eq!(digests.len(), 3);
932
933 let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
935 assert_eq!(digests[0], expected_k1);
936
937 let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
939 assert_eq!(digests[1], expected_k2);
940
941 assert_eq!(digests[2], ABSENT_KEY_DIGEST);
943 }
944 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
945 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
946 }
947 }
948
949 #[tokio::test]
952 async fn scenario_54_all_digests_pass() {
953 let (storage, _temp) = create_test_storage().await;
956 let nonce = [0x10; 32];
957 let peer_id = [0x20; 32];
958
959 let c1 = b"chunk alpha";
960 let c2 = b"chunk beta";
961 let c3 = b"chunk gamma";
962 let a1 = LmdbStorage::compute_address(c1);
963 let a2 = LmdbStorage::compute_address(c2);
964 let a3 = LmdbStorage::compute_address(c3);
965 storage.put(&a1, c1).await.unwrap();
966 storage.put(&a2, c2).await.unwrap();
967 storage.put(&a3, c3).await.unwrap();
968
969 let challenge = AuditChallenge {
970 challenge_id: 200,
971 nonce,
972 challenged_peer_id: peer_id,
973 keys: vec![a1, a2, a3],
974 };
975 let self_id = peer_id_from_bytes(peer_id);
976
977 let response =
978 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
979 match response {
980 AuditResponse::Digests { digests, .. } => {
981 assert_eq!(digests.len(), 3);
982 for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
983 .iter()
984 .enumerate()
985 {
986 let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
987 assert_eq!(digests[i], expected, "Key {i} digest should match");
988 }
989 }
990 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
991 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
992 }
993 }
994
995 #[tokio::test]
1008 async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1009 let (storage, _temp) = create_test_storage().await;
1010 let nonce = [0x55; 32];
1011 let peer_id = [0x55; 32];
1012
1013 let c1 = b"scenario 55 key one";
1015 let c2 = b"scenario 55 key two";
1016 let k1 = LmdbStorage::compute_address(c1);
1017 let k2 = LmdbStorage::compute_address(c2);
1018 storage.put(&k1, c1).await.expect("put k1");
1019 storage.put(&k2, c2).await.expect("put k2");
1020
1021 let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1023 let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1024
1025 let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1027 let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1028 assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1029 assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1030
1031 let keys = [k1, k2];
1033 let expected = [expected_d1, expected_d2];
1034 let received = [wrong_d1, wrong_d2];
1035
1036 let mut failed_keys = Vec::new();
1037 for i in 0..keys.len() {
1038 if received[i] != expected[i] {
1039 failed_keys.push(keys[i]);
1040 }
1041 }
1042 assert_eq!(
1043 failed_keys.len(),
1044 2,
1045 "Both keys should be identified as digest mismatches"
1046 );
1047
1048 let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1051 let confirmed_failures: Vec<XorName> = failed_keys
1052 .into_iter()
1053 .filter(|k| confirmed_responsible_keys.contains(k))
1054 .collect();
1055
1056 assert!(
1058 confirmed_failures.is_empty(),
1059 "With no confirmed responsibility, failure set must be empty — \
1060 no AuditFailure evidence should be emitted"
1061 );
1062
1063 let peer = PeerId::from_bytes(peer_id);
1066 let evidence = FailureEvidence::AuditFailure {
1067 challenge_id: 5500,
1068 challenged_peer: peer,
1069 confirmed_failed_keys: confirmed_failures,
1070 reason: AuditFailureReason::DigestMismatch,
1071 };
1072 if let FailureEvidence::AuditFailure {
1073 confirmed_failed_keys,
1074 ..
1075 } = evidence
1076 {
1077 assert!(
1078 confirmed_failed_keys.is_empty(),
1079 "Evidence with empty failure set should not trigger a trust penalty"
1080 );
1081 }
1082 }
1083
1084 #[test]
1087 fn scenario_56_repair_opportunity_filters_never_synced() {
1088 let never_synced = PeerSyncRecord {
1092 last_sync: None,
1093 cycles_since_sync: 5,
1094 };
1095 assert!(!never_synced.has_repair_opportunity());
1096
1097 let synced_no_cycle = PeerSyncRecord {
1098 last_sync: Some(Instant::now()),
1099 cycles_since_sync: 0,
1100 };
1101 assert!(!synced_no_cycle.has_repair_opportunity());
1102
1103 let synced_with_cycle = PeerSyncRecord {
1104 last_sync: Some(Instant::now()),
1105 cycles_since_sync: 1,
1106 };
1107 assert!(synced_with_cycle.has_repair_opportunity());
1108 }
1109
1110 #[tokio::test]
1113 async fn audit_response_must_match_key_count() {
1114 let (storage, _temp) = create_test_storage().await;
1120 let nonce = [0x50; 32];
1121 let peer_id = [0x60; 32];
1122
1123 let content = b"single chunk";
1125 let addr = LmdbStorage::compute_address(content);
1126 storage.put(&addr, content).await.unwrap();
1127
1128 let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1130 let mut keys = vec![addr];
1131 keys.extend_from_slice(&absent_keys);
1132
1133 let key_count = keys.len();
1134 let challenge = make_challenge(300, nonce, peer_id, keys);
1135 let self_id = peer_id_from_bytes(peer_id);
1136
1137 let response =
1138 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1139 match response {
1140 AuditResponse::Digests { digests, .. } => {
1141 assert_eq!(
1142 digests.len(),
1143 key_count,
1144 "must produce exactly one digest per challenged key"
1145 );
1146 }
1147 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1148 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1149 }
1150 }
1151
1152 #[test]
1155 fn audit_digest_uses_full_record_bytes() {
1156 let nonce = [1u8; 32];
1158 let peer = [2u8; 32];
1159 let key = [3u8; 32];
1160
1161 let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1162 let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1163 assert_ne!(
1164 d1, d2,
1165 "Different record bytes must produce different digests"
1166 );
1167 }
1168
1169 #[tokio::test]
1180 async fn scenario_29_audit_start_gate_during_bootstrap() {
1181 let (storage, _temp) = create_test_storage().await;
1182
1183 let content = b"should not be audited during bootstrap";
1185 let addr = LmdbStorage::compute_address(content);
1186 storage.put(&addr, content).await.expect("put");
1187
1188 let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1189 let self_id = peer_id_from_bytes([0x29; 32]);
1190
1191 let response =
1193 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1194 assert!(
1195 matches!(
1196 response,
1197 AuditResponse::Bootstrapping { challenge_id: 2900 }
1198 ),
1199 "bootstrapping node must not compute digests — audit start gate"
1200 );
1201
1202 let response =
1204 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1205 assert!(
1206 matches!(response, AuditResponse::Digests { .. }),
1207 "drained node should compute digests normally"
1208 );
1209 }
1210
1211 #[test]
1221 fn scenario_30_audit_peer_selection_from_sampled_keys() {
1222 assert_eq!(
1224 ReplicationConfig::audit_sample_count(100),
1225 10,
1226 "sample count should scale with sqrt(total_keys)"
1227 );
1228
1229 assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1230
1231 assert_eq!(
1232 ReplicationConfig::audit_sample_count(10_000),
1233 100,
1234 "sqrt(10000) = 100"
1235 );
1236
1237 let never = PeerSyncRecord {
1240 last_sync: None,
1241 cycles_since_sync: 10,
1242 };
1243 assert!(!never.has_repair_opportunity());
1244
1245 let too_soon = PeerSyncRecord {
1247 last_sync: Some(Instant::now()),
1248 cycles_since_sync: 0,
1249 };
1250 assert!(!too_soon.has_repair_opportunity());
1251
1252 let eligible = PeerSyncRecord {
1254 last_sync: Some(Instant::now()),
1255 cycles_since_sync: 2,
1256 };
1257 assert!(eligible.has_repair_opportunity());
1258 }
1259
1260 #[tokio::test]
1269 async fn scenario_32_dynamic_challenge_size() {
1270 let (storage, _temp) = create_test_storage().await;
1271
1272 let mut addrs = Vec::new();
1274 for i in 0u8..5 {
1275 let content = format!("dynamic challenge key {i}");
1276 let addr = LmdbStorage::compute_address(content.as_bytes());
1277 storage.put(&addr, content.as_bytes()).await.expect("put");
1278 addrs.push(addr);
1279 }
1280
1281 let nonce = [0x32; 32];
1282 let peer_id = [0x32; 32];
1283 let self_id = peer_id_from_bytes(peer_id);
1284
1285 let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1287 let resp1 =
1288 handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1289 .await;
1290 if let AuditResponse::Digests { digests, .. } = resp1 {
1291 assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1292 }
1293
1294 let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1296 let resp3 =
1297 handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1298 .await;
1299 if let AuditResponse::Digests { digests, .. } = resp3 {
1300 assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1301 }
1302
1303 let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1305 let resp5 =
1306 handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1307 .await;
1308 if let AuditResponse::Digests { digests, .. } = resp5 {
1309 assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1310 }
1311
1312 let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1314 let resp0 =
1315 handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1316 .await;
1317 if let AuditResponse::Digests { digests, .. } = resp0 {
1318 assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1319 }
1320 }
1321
1322 #[tokio::test]
1328 async fn scenario_47_bootstrap_claim_grace_period_audit() {
1329 let (storage, _temp) = create_test_storage().await;
1330
1331 let content = b"bootstrap grace test";
1333 let addr = LmdbStorage::compute_address(content);
1334 storage.put(&addr, content).await.expect("put");
1335
1336 let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1337 let self_id = peer_id_from_bytes([0x47; 32]);
1338
1339 let response =
1341 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1342 let challenge_id = match response {
1343 AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1344 AuditResponse::Digests { .. } => {
1345 panic!("Expected Bootstrapping response during grace period")
1346 }
1347 AuditResponse::Rejected { .. } => {
1348 panic!("Unexpected Rejected response")
1349 }
1350 };
1351 assert_eq!(challenge_id, 4700);
1352
1353 let peer = PeerId::from_bytes([0x47; 32]);
1355 let mut state = NeighborSyncState::new_cycle(vec![peer]);
1356 let now = Instant::now();
1357 state.bootstrap_claims.entry(peer).or_insert(now);
1358
1359 assert!(
1360 state.bootstrap_claims.contains_key(&peer),
1361 "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1362 );
1363 }
1364
1365 #[tokio::test]
1376 async fn scenario_53_partial_failure_mixed_responsibility() {
1377 let (storage, _temp) = create_test_storage().await;
1378 let nonce = [0x53; 32];
1379 let peer_id = [0x53; 32];
1380
1381 let c1 = b"scenario 53 key one";
1383 let c2 = b"scenario 53 key two";
1384 let c3 = b"scenario 53 key three";
1385 let k1 = LmdbStorage::compute_address(c1);
1386 let k2 = LmdbStorage::compute_address(c2);
1387 let k3 = LmdbStorage::compute_address(c3);
1388 storage.put(&k1, c1).await.expect("put k1");
1389 storage.put(&k2, c2).await.expect("put k2");
1390 storage.put(&k3, c3).await.expect("put k3");
1391
1392 let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1394 let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1395 let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1396
1397 let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1399 let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1400
1401 assert_eq!(d1_expected, d1_expected, "K1 should match");
1402 assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1403 assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1404
1405 let digests = [d1_expected, d2_wrong, d3_wrong];
1407 let keys = [k1, k2, k3];
1408 let contents: [&[u8]; 3] = [c1, c2, c3];
1409
1410 let mut failed_keys = Vec::new();
1411 for (i, key) in keys.iter().enumerate() {
1412 if digests[i] == ABSENT_KEY_DIGEST {
1413 failed_keys.push(*key);
1414 continue;
1415 }
1416 let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1417 if digests[i] != expected {
1418 failed_keys.push(*key);
1419 }
1420 }
1421
1422 assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1423 assert!(failed_keys.contains(&k2));
1424 assert!(failed_keys.contains(&k3));
1425 assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1426
1427 let responsible_for_k2 = true;
1430 let responsible_for_k3 = false;
1431 let mut confirmed = Vec::new();
1432 for key in &failed_keys {
1433 let is_responsible = if *key == k2 {
1434 responsible_for_k2
1435 } else {
1436 responsible_for_k3
1437 };
1438 if is_responsible {
1439 confirmed.push(*key);
1440 }
1441 }
1442
1443 assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1444
1445 let challenged_peer = PeerId::from_bytes(peer_id);
1447 let evidence = FailureEvidence::AuditFailure {
1448 challenge_id: 5300,
1449 challenged_peer,
1450 confirmed_failed_keys: confirmed,
1451 reason: AuditFailureReason::DigestMismatch,
1452 };
1453
1454 match evidence {
1455 FailureEvidence::AuditFailure {
1456 confirmed_failed_keys,
1457 ..
1458 } => {
1459 assert_eq!(
1460 confirmed_failed_keys.len(),
1461 1,
1462 "Only K2 should generate evidence"
1463 );
1464 assert_eq!(confirmed_failed_keys[0], k2);
1465 }
1466 _ => panic!("Expected AuditFailure evidence"),
1467 }
1468 }
1469}