1use std::collections::HashMap;
6use std::sync::Arc;
7
8use crate::logging::{debug, info, warn};
9use rand::seq::SliceRandom;
10use rand::Rng;
11
12use crate::ant_protocol::XorName;
13use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
14use crate::replication::protocol::{
15 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
16 ReplicationMessageBody, ABSENT_KEY_DIGEST,
17};
18use crate::replication::types::{AuditFailureReason, FailureEvidence, PeerSyncRecord};
19use crate::storage::LmdbStorage;
20use saorsa_core::identity::PeerId;
21use saorsa_core::P2PNode;
22
23#[derive(Debug)]
29pub enum AuditTickResult {
30 Passed {
32 challenged_peer: PeerId,
34 keys_checked: usize,
36 },
37 Failed {
39 evidence: FailureEvidence,
41 },
42 BootstrapClaim {
44 peer: PeerId,
46 },
47 Idle,
49 InsufficientKeys,
51}
52
53#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
65pub async fn audit_tick(
66 p2p_node: &Arc<P2PNode>,
67 storage: &Arc<LmdbStorage>,
68 config: &ReplicationConfig,
69 sync_history: &HashMap<PeerId, PeerSyncRecord>,
70 is_bootstrapping: bool,
71) -> AuditTickResult {
72 if is_bootstrapping {
74 return AuditTickResult::Idle;
75 }
76
77 let dht = p2p_node.dht_manager();
78
79 let eligible_peers = eligible_audit_peers(sync_history);
83
84 if eligible_peers.is_empty() {
85 return AuditTickResult::Idle;
86 }
87
88 let (challenged_peer, nonce, challenge_id) = {
89 let mut rng = rand::thread_rng();
90 let selected = match eligible_peers.choose(&mut rng) {
91 Some(p) => *p,
92 None => return AuditTickResult::Idle,
93 };
94 let n: [u8; 32] = rng.gen();
95 let c: u64 = rng.gen();
96 (selected, n, c)
97 };
98
99 let all_keys = match storage.all_keys().await {
102 Ok(keys) => keys,
103 Err(e) => {
104 warn!("Audit: failed to read local keys: {e}");
105 return AuditTickResult::Idle;
106 }
107 };
108
109 if all_keys.is_empty() {
110 return AuditTickResult::Idle;
111 }
112
113 let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
114 let sampled_keys: Vec<XorName> = {
115 let mut rng = rand::thread_rng();
116 all_keys
117 .choose_multiple(&mut rng, sample_count)
118 .copied()
119 .collect()
120 };
121
122 let mut peer_keys = Vec::new();
124 for key in &sampled_keys {
125 let closest = dht
126 .find_closest_nodes_local_with_self(key, config.close_group_size)
127 .await;
128 if closest.iter().any(|n| n.peer_id == challenged_peer) {
129 peer_keys.push(*key);
130 }
131 }
132
133 if peer_keys.is_empty() {
134 return AuditTickResult::Idle;
135 }
136
137 let challenge = AuditChallenge {
143 challenge_id,
144 nonce,
145 challenged_peer_id: *challenged_peer.as_bytes(),
146 keys: peer_keys.clone(),
147 };
148
149 let msg = ReplicationMessage {
150 request_id: challenge_id,
151 body: ReplicationMessageBody::AuditChallenge(challenge),
152 };
153
154 let encoded = match msg.encode() {
155 Ok(data) => data,
156 Err(e) => {
157 warn!("Audit: failed to encode challenge: {e}");
158 return AuditTickResult::Idle;
159 }
160 };
161
162 let response = match p2p_node
163 .send_request(
164 &challenged_peer,
165 REPLICATION_PROTOCOL_ID,
166 encoded,
167 config.audit_response_timeout(peer_keys.len()),
168 )
169 .await
170 {
171 Ok(resp) => resp,
172 Err(e) => {
173 debug!("Audit: challenge to {challenged_peer} failed: {e}");
174 return handle_audit_timeout(
176 &challenged_peer,
177 challenge_id,
178 &peer_keys,
179 p2p_node,
180 config,
181 )
182 .await;
183 }
184 };
185
186 let resp_msg = match ReplicationMessage::decode(&response.data) {
188 Ok(m) => m,
189 Err(e) => {
190 warn!("Audit: failed to decode response from {challenged_peer}: {e}");
191 return handle_audit_failure(
192 &challenged_peer,
193 challenge_id,
194 &peer_keys,
195 AuditFailureReason::MalformedResponse,
196 p2p_node,
197 config,
198 )
199 .await;
200 }
201 };
202
203 match resp_msg.body {
204 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
205 challenge_id: resp_id,
206 }) => {
207 if resp_id != challenge_id {
208 warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
209 return handle_audit_failure(
210 &challenged_peer,
211 challenge_id,
212 &peer_keys,
213 AuditFailureReason::MalformedResponse,
214 p2p_node,
215 config,
216 )
217 .await;
218 }
219 AuditTickResult::BootstrapClaim {
221 peer: challenged_peer,
222 }
223 }
224 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
225 challenge_id: resp_id,
226 digests,
227 }) => {
228 if resp_id != challenge_id {
229 warn!("Audit: challenge ID mismatch from {challenged_peer}");
230 return handle_audit_failure(
231 &challenged_peer,
232 challenge_id,
233 &peer_keys,
234 AuditFailureReason::MalformedResponse,
235 p2p_node,
236 config,
237 )
238 .await;
239 }
240 verify_digests(
241 &challenged_peer,
242 challenge_id,
243 &nonce,
244 &peer_keys,
245 &digests,
246 storage,
247 p2p_node,
248 config,
249 )
250 .await
251 }
252 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
253 challenge_id: resp_id,
254 reason,
255 }) => {
256 if resp_id != challenge_id {
257 warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
258 return handle_audit_failure(
259 &challenged_peer,
260 challenge_id,
261 &peer_keys,
262 AuditFailureReason::MalformedResponse,
263 p2p_node,
264 config,
265 )
266 .await;
267 }
268 warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
269 handle_audit_failure(
270 &challenged_peer,
271 challenge_id,
272 &peer_keys,
273 AuditFailureReason::Rejected,
274 p2p_node,
275 config,
276 )
277 .await
278 }
279 _ => {
280 warn!("Audit: unexpected response type from {challenged_peer}");
281 handle_audit_failure(
282 &challenged_peer,
283 challenge_id,
284 &peer_keys,
285 AuditFailureReason::MalformedResponse,
286 p2p_node,
287 config,
288 )
289 .await
290 }
291 }
292}
293
294fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
295 sync_history
296 .iter()
297 .filter(|(_, record)| record.has_repair_opportunity())
298 .map(|(peer, _)| *peer)
299 .collect()
300}
301
302#[allow(clippy::too_many_arguments)]
308async fn verify_digests(
309 challenged_peer: &PeerId,
310 challenge_id: u64,
311 nonce: &[u8; 32],
312 keys: &[XorName],
313 digests: &[[u8; 32]],
314 storage: &Arc<LmdbStorage>,
315 p2p_node: &Arc<P2PNode>,
316 config: &ReplicationConfig,
317) -> AuditTickResult {
318 if digests.len() != keys.len() {
320 warn!(
321 "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
322 digests.len(),
323 keys.len()
324 );
325 return handle_audit_failure(
326 challenged_peer,
327 challenge_id,
328 keys,
329 AuditFailureReason::MalformedResponse,
330 p2p_node,
331 config,
332 )
333 .await;
334 }
335
336 let challenged_peer_bytes = challenged_peer.as_bytes();
337 let mut failed_keys = Vec::new();
338
339 for (i, key) in keys.iter().enumerate() {
340 let received_digest = &digests[i];
341
342 if *received_digest == ABSENT_KEY_DIGEST {
344 failed_keys.push(*key);
345 continue;
346 }
347
348 let local_bytes = match storage.get_raw(key).await {
350 Ok(Some(bytes)) => bytes,
351 Ok(None) => {
352 warn!(
354 "Audit: local key {} disappeared during audit",
355 hex::encode(key)
356 );
357 continue;
358 }
359 Err(e) => {
360 warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
361 continue;
362 }
363 };
364
365 let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
366 if *received_digest != expected {
367 failed_keys.push(*key);
368 }
369 }
370
371 if failed_keys.is_empty() {
372 info!(
373 "Audit: peer {challenged_peer} passed (all {} keys verified)",
374 keys.len()
375 );
376 return AuditTickResult::Passed {
377 challenged_peer: *challenged_peer,
378 keys_checked: keys.len(),
379 };
380 }
381
382 handle_audit_failure(
384 challenged_peer,
385 challenge_id,
386 &failed_keys,
387 AuditFailureReason::DigestMismatch,
388 p2p_node,
389 config,
390 )
391 .await
392}
393
394async fn handle_audit_failure(
400 challenged_peer: &PeerId,
401 challenge_id: u64,
402 failed_keys: &[XorName],
403 reason: AuditFailureReason,
404 p2p_node: &Arc<P2PNode>,
405 config: &ReplicationConfig,
406) -> AuditTickResult {
407 let dht = p2p_node.dht_manager();
408 let mut confirmed_failures = Vec::new();
409
410 for key in failed_keys {
412 let closest = dht
413 .find_closest_nodes_local_with_self(key, config.close_group_size)
414 .await;
415 if closest.iter().any(|n| n.peer_id == *challenged_peer) {
416 confirmed_failures.push(*key);
417 } else {
418 debug!(
419 "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
420 hex::encode(key)
421 );
422 }
423 }
424
425 if confirmed_failures.is_empty() {
430 info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
431 return AuditTickResult::Idle;
432 }
433
434 let evidence = FailureEvidence::AuditFailure {
436 challenge_id,
437 challenged_peer: *challenged_peer,
438 confirmed_failed_keys: confirmed_failures,
439 reason,
440 };
441
442 AuditTickResult::Failed { evidence }
443}
444
445async fn handle_audit_timeout(
447 challenged_peer: &PeerId,
448 challenge_id: u64,
449 keys: &[XorName],
450 p2p_node: &Arc<P2PNode>,
451 config: &ReplicationConfig,
452) -> AuditTickResult {
453 handle_audit_failure(
454 challenged_peer,
455 challenge_id,
456 keys,
457 AuditFailureReason::Timeout,
458 p2p_node,
459 config,
460 )
461 .await
462}
463
464pub async fn handle_audit_challenge(
475 challenge: &AuditChallenge,
476 storage: &LmdbStorage,
477 self_peer_id: &PeerId,
478 is_bootstrapping: bool,
479 stored_chunks: usize,
480) -> AuditResponse {
481 if is_bootstrapping {
482 return AuditResponse::Bootstrapping {
483 challenge_id: challenge.challenge_id,
484 };
485 }
486
487 if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
488 warn!(
489 "Audit challenge targeted wrong peer: expected {}, got {}",
490 hex::encode(self_peer_id.as_bytes()),
491 hex::encode(challenge.challenged_peer_id),
492 );
493 return AuditResponse::Rejected {
494 challenge_id: challenge.challenge_id,
495 reason: "challenged_peer_id does not match this node".to_string(),
496 };
497 }
498
499 let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
500 if challenge.keys.len() > max_keys {
501 warn!(
502 "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
503 (stored_chunks={stored_chunks})",
504 challenge.keys.len(),
505 );
506 return AuditResponse::Rejected {
507 challenge_id: challenge.challenge_id,
508 reason: format!(
509 "challenge contains {} keys, limit is {max_keys}",
510 challenge.keys.len()
511 ),
512 };
513 }
514
515 let mut digests = Vec::with_capacity(challenge.keys.len());
516
517 for key in &challenge.keys {
518 match storage.get_raw(key).await {
519 Ok(Some(data)) => {
520 let digest = compute_audit_digest(
521 &challenge.nonce,
522 &challenge.challenged_peer_id,
523 key,
524 &data,
525 );
526 digests.push(digest);
527 }
528 Ok(None) => {
529 digests.push(ABSENT_KEY_DIGEST);
530 }
531 Err(e) => {
532 warn!(
533 "Audit responder: failed to read key {}: {e}",
534 hex::encode(key)
535 );
536 digests.push(ABSENT_KEY_DIGEST);
537 }
538 }
539 }
540
541 AuditResponse::Digests {
542 challenge_id: challenge.challenge_id,
543 digests,
544 }
545}
546
547#[cfg(test)]
552#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
553mod tests {
554 use super::*;
555 use crate::replication::protocol::compute_audit_digest;
556 use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
557 use crate::storage::LmdbStorageConfig;
558 use std::time::Instant;
559 use tempfile::TempDir;
560
561 const TEST_STORED_CHUNKS: usize = 1_000_000;
564
565 async fn create_test_storage() -> (LmdbStorage, TempDir) {
567 let temp_dir = TempDir::new().expect("create temp dir");
568 let config = LmdbStorageConfig {
569 root_dir: temp_dir.path().to_path_buf(),
570 verify_on_read: false,
571 max_map_size: 0,
572 disk_reserve: 0,
573 };
574 let storage = LmdbStorage::new(config).await.expect("create storage");
575 (storage, temp_dir)
576 }
577
578 fn make_challenge(
580 challenge_id: u64,
581 nonce: [u8; 32],
582 peer_id: [u8; 32],
583 keys: Vec<XorName>,
584 ) -> AuditChallenge {
585 AuditChallenge {
586 challenge_id,
587 nonce,
588 challenged_peer_id: peer_id,
589 keys,
590 }
591 }
592
593 fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
595 PeerId::from_bytes(bytes)
596 }
597
598 #[tokio::test]
601 async fn handle_challenge_present_keys_returns_correct_digests() {
602 let (storage, _temp) = create_test_storage().await;
603
604 let content_a = b"chunk alpha";
606 let addr_a = LmdbStorage::compute_address(content_a);
607 storage.put(&addr_a, content_a).await.expect("put a");
608
609 let content_b = b"chunk beta";
610 let addr_b = LmdbStorage::compute_address(content_b);
611 storage.put(&addr_b, content_b).await.expect("put b");
612
613 let nonce = [0xAA; 32];
614 let peer_id = [0xBB; 32];
615 let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
616 let self_id = peer_id_from_bytes(peer_id);
617
618 let response =
619 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
620
621 match response {
622 AuditResponse::Digests {
623 challenge_id,
624 digests,
625 } => {
626 assert_eq!(challenge_id, 42);
627 assert_eq!(digests.len(), 2);
628
629 let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
630 let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
631 assert_eq!(digests[0], expected_a);
632 assert_eq!(digests[1], expected_b);
633 }
634 AuditResponse::Bootstrapping { .. } => {
635 panic!("expected Digests, got Bootstrapping");
636 }
637 AuditResponse::Rejected { .. } => {
638 panic!("Unexpected Rejected response");
639 }
640 }
641 }
642
643 #[tokio::test]
646 async fn handle_challenge_absent_keys_returns_sentinel() {
647 let (storage, _temp) = create_test_storage().await;
648
649 let absent_key = [0xFF; 32];
650 let nonce = [0x11; 32];
651 let peer_id = [0x22; 32];
652 let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
653 let self_id = peer_id_from_bytes(peer_id);
654
655 let response =
656 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
657
658 match response {
659 AuditResponse::Digests {
660 challenge_id,
661 digests,
662 } => {
663 assert_eq!(challenge_id, 99);
664 assert_eq!(digests.len(), 1);
665 assert_eq!(
666 digests[0], ABSENT_KEY_DIGEST,
667 "absent key should produce sentinel digest"
668 );
669 }
670 AuditResponse::Bootstrapping { .. } => {
671 panic!("expected Digests, got Bootstrapping");
672 }
673 AuditResponse::Rejected { .. } => {
674 panic!("Unexpected Rejected response");
675 }
676 }
677 }
678
679 #[tokio::test]
682 async fn handle_challenge_mixed_present_and_absent() {
683 let (storage, _temp) = create_test_storage().await;
684
685 let content = b"present chunk";
686 let addr_present = LmdbStorage::compute_address(content);
687 storage.put(&addr_present, content).await.expect("put");
688
689 let addr_absent = [0xDE; 32];
690 let nonce = [0x33; 32];
691 let peer_id = [0x44; 32];
692 let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
693 let self_id = peer_id_from_bytes(peer_id);
694
695 let response =
696 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
697
698 match response {
699 AuditResponse::Digests { digests, .. } => {
700 assert_eq!(digests.len(), 2);
701
702 let expected_present =
703 compute_audit_digest(&nonce, &peer_id, &addr_present, content);
704 assert_eq!(digests[0], expected_present);
705 assert_eq!(
706 digests[1], ABSENT_KEY_DIGEST,
707 "absent key should be sentinel"
708 );
709 }
710 AuditResponse::Bootstrapping { .. } => {
711 panic!("expected Digests, got Bootstrapping");
712 }
713 AuditResponse::Rejected { .. } => {
714 panic!("Unexpected Rejected response");
715 }
716 }
717 }
718
719 #[tokio::test]
722 async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
723 let (storage, _temp) = create_test_storage().await;
724
725 let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
726 let self_id = peer_id_from_bytes([0x01; 32]);
727
728 let response =
729 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
730
731 match response {
732 AuditResponse::Bootstrapping { challenge_id } => {
733 assert_eq!(challenge_id, 55);
734 }
735 AuditResponse::Digests { .. } => {
736 panic!("expected Bootstrapping, got Digests");
737 }
738 AuditResponse::Rejected { .. } => {
739 panic!("Unexpected Rejected response");
740 }
741 }
742 }
743
744 #[tokio::test]
747 async fn handle_challenge_empty_keys_returns_empty_digests() {
748 let (storage, _temp) = create_test_storage().await;
749
750 let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
751 let self_id = peer_id_from_bytes([0x20; 32]);
752
753 let response =
754 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
755
756 match response {
757 AuditResponse::Digests {
758 challenge_id,
759 digests,
760 } => {
761 assert_eq!(challenge_id, 100);
762 assert!(
763 digests.is_empty(),
764 "empty key list should yield empty digests"
765 );
766 }
767 AuditResponse::Bootstrapping { .. } => {
768 panic!("expected Digests, got Bootstrapping");
769 }
770 AuditResponse::Rejected { .. } => {
771 panic!("Unexpected Rejected response");
772 }
773 }
774 }
775
776 #[test]
779 fn digest_verification_matching() {
780 let nonce = [0x01; 32];
781 let peer_id = [0x02; 32];
782 let key: XorName = [0x03; 32];
783 let data = b"correct data";
784
785 let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
786 let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
787
788 assert_eq!(
789 expected, recomputed,
790 "same inputs must produce identical digests"
791 );
792 assert_ne!(
793 expected, ABSENT_KEY_DIGEST,
794 "real digest must not be sentinel"
795 );
796 }
797
798 #[test]
801 fn digest_verification_mismatching_data() {
802 let nonce = [0x01; 32];
803 let peer_id = [0x02; 32];
804 let key: XorName = [0x03; 32];
805
806 let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
807 let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
808
809 assert_ne!(
810 digest_a, digest_b,
811 "different data must produce different digests"
812 );
813 }
814
815 #[test]
816 fn digest_verification_mismatching_nonce() {
817 let peer_id = [0x02; 32];
818 let key: XorName = [0x03; 32];
819 let data = b"same data";
820
821 let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
822 let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
823
824 assert_ne!(
825 digest_a, digest_b,
826 "different nonces must produce different digests"
827 );
828 }
829
830 #[test]
831 fn digest_verification_mismatching_peer() {
832 let nonce = [0x01; 32];
833 let key: XorName = [0x03; 32];
834 let data = b"same data";
835
836 let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
837 let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
838
839 assert_ne!(
840 digest_a, digest_b,
841 "different peers must produce different digests"
842 );
843 }
844
845 #[test]
846 fn digest_verification_mismatching_key() {
847 let nonce = [0x01; 32];
848 let peer_id = [0x02; 32];
849 let data = b"same data";
850
851 let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
852 let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
853
854 assert_ne!(
855 digest_a, digest_b,
856 "different keys must produce different digests"
857 );
858 }
859
860 #[test]
863 fn absent_sentinel_is_all_zeros() {
864 assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
865 }
866
867 #[tokio::test]
870 async fn bootstrapping_skips_digest_computation() {
871 let (storage, _temp) = create_test_storage().await;
872
873 let content = b"stored but bootstrapping";
874 let addr = LmdbStorage::compute_address(content);
875 storage.put(&addr, content).await.expect("put");
876
877 let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
878 let self_id = peer_id_from_bytes([0xDD; 32]);
879
880 let response =
881 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
882
883 assert!(
884 matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
885 "bootstrapping node must not compute digests"
886 );
887 }
888
889 #[tokio::test]
892 async fn scenario_19_partial_failure_mixed_responsibility() {
893 let (storage, _temp) = create_test_storage().await;
900 let nonce = [0x42u8; 32];
901 let peer_id = [0xAA; 32];
902
903 let content_k1 = b"key one data";
905 let addr_k1 = LmdbStorage::compute_address(content_k1);
906 storage.put(&addr_k1, content_k1).await.unwrap();
907
908 let content_k2 = b"key two data";
909 let addr_k2 = LmdbStorage::compute_address(content_k2);
910 storage.put(&addr_k2, content_k2).await.unwrap();
911
912 let addr_k3 = [0xFF; 32]; let challenge = AuditChallenge {
915 challenge_id: 100,
916 nonce,
917 challenged_peer_id: peer_id,
918 keys: vec![addr_k1, addr_k2, addr_k3],
919 };
920 let self_id = peer_id_from_bytes(peer_id);
921
922 let response =
923 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
924
925 match response {
926 AuditResponse::Digests { digests, .. } => {
927 assert_eq!(digests.len(), 3);
928
929 let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
931 assert_eq!(digests[0], expected_k1);
932
933 let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
935 assert_eq!(digests[1], expected_k2);
936
937 assert_eq!(digests[2], ABSENT_KEY_DIGEST);
939 }
940 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
941 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
942 }
943 }
944
945 #[tokio::test]
948 async fn scenario_54_all_digests_pass() {
949 let (storage, _temp) = create_test_storage().await;
952 let nonce = [0x10; 32];
953 let peer_id = [0x20; 32];
954
955 let c1 = b"chunk alpha";
956 let c2 = b"chunk beta";
957 let c3 = b"chunk gamma";
958 let a1 = LmdbStorage::compute_address(c1);
959 let a2 = LmdbStorage::compute_address(c2);
960 let a3 = LmdbStorage::compute_address(c3);
961 storage.put(&a1, c1).await.unwrap();
962 storage.put(&a2, c2).await.unwrap();
963 storage.put(&a3, c3).await.unwrap();
964
965 let challenge = AuditChallenge {
966 challenge_id: 200,
967 nonce,
968 challenged_peer_id: peer_id,
969 keys: vec![a1, a2, a3],
970 };
971 let self_id = peer_id_from_bytes(peer_id);
972
973 let response =
974 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
975 match response {
976 AuditResponse::Digests { digests, .. } => {
977 assert_eq!(digests.len(), 3);
978 for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
979 .iter()
980 .enumerate()
981 {
982 let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
983 assert_eq!(digests[i], expected, "Key {i} digest should match");
984 }
985 }
986 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
987 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
988 }
989 }
990
991 #[tokio::test]
1004 async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1005 let (storage, _temp) = create_test_storage().await;
1006 let nonce = [0x55; 32];
1007 let peer_id = [0x55; 32];
1008
1009 let c1 = b"scenario 55 key one";
1011 let c2 = b"scenario 55 key two";
1012 let k1 = LmdbStorage::compute_address(c1);
1013 let k2 = LmdbStorage::compute_address(c2);
1014 storage.put(&k1, c1).await.expect("put k1");
1015 storage.put(&k2, c2).await.expect("put k2");
1016
1017 let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1019 let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1020
1021 let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1023 let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1024 assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1025 assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1026
1027 let keys = [k1, k2];
1029 let expected = [expected_d1, expected_d2];
1030 let received = [wrong_d1, wrong_d2];
1031
1032 let mut failed_keys = Vec::new();
1033 for i in 0..keys.len() {
1034 if received[i] != expected[i] {
1035 failed_keys.push(keys[i]);
1036 }
1037 }
1038 assert_eq!(
1039 failed_keys.len(),
1040 2,
1041 "Both keys should be identified as digest mismatches"
1042 );
1043
1044 let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1047 let confirmed_failures: Vec<XorName> = failed_keys
1048 .into_iter()
1049 .filter(|k| confirmed_responsible_keys.contains(k))
1050 .collect();
1051
1052 assert!(
1054 confirmed_failures.is_empty(),
1055 "With no confirmed responsibility, failure set must be empty — \
1056 no AuditFailure evidence should be emitted"
1057 );
1058
1059 let peer = PeerId::from_bytes(peer_id);
1062 let evidence = FailureEvidence::AuditFailure {
1063 challenge_id: 5500,
1064 challenged_peer: peer,
1065 confirmed_failed_keys: confirmed_failures,
1066 reason: AuditFailureReason::DigestMismatch,
1067 };
1068 if let FailureEvidence::AuditFailure {
1069 confirmed_failed_keys,
1070 ..
1071 } = evidence
1072 {
1073 assert!(
1074 confirmed_failed_keys.is_empty(),
1075 "Evidence with empty failure set should not trigger a trust penalty"
1076 );
1077 }
1078 }
1079
1080 #[test]
1083 fn scenario_56_repair_opportunity_filters_never_synced() {
1084 let never_synced = PeerSyncRecord {
1088 last_sync: None,
1089 cycles_since_sync: 5,
1090 };
1091 assert!(!never_synced.has_repair_opportunity());
1092
1093 let synced_no_cycle = PeerSyncRecord {
1094 last_sync: Some(Instant::now()),
1095 cycles_since_sync: 0,
1096 };
1097 assert!(!synced_no_cycle.has_repair_opportunity());
1098
1099 let synced_with_cycle = PeerSyncRecord {
1100 last_sync: Some(Instant::now()),
1101 cycles_since_sync: 1,
1102 };
1103 assert!(synced_with_cycle.has_repair_opportunity());
1104 }
1105
1106 #[test]
1107 fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1108 let peer = peer_id_from_bytes([0x57; 32]);
1109 let mut sync_history = HashMap::new();
1110 sync_history.insert(
1111 peer,
1112 PeerSyncRecord {
1113 last_sync: Some(Instant::now()),
1114 cycles_since_sync: 1,
1115 },
1116 );
1117
1118 let mut bootstrap_claims = HashMap::new();
1119 let first_seen = Instant::now()
1120 .checked_sub(
1121 crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1122 + std::time::Duration::from_secs(1),
1123 )
1124 .unwrap_or_else(Instant::now);
1125 bootstrap_claims.insert(peer, first_seen);
1126
1127 let eligible = eligible_audit_peers(&sync_history);
1128
1129 assert!(bootstrap_claims.contains_key(&peer));
1130 assert!(
1131 eligible.contains(&peer),
1132 "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1133 );
1134 }
1135
1136 #[tokio::test]
1139 async fn audit_response_must_match_key_count() {
1140 let (storage, _temp) = create_test_storage().await;
1146 let nonce = [0x50; 32];
1147 let peer_id = [0x60; 32];
1148
1149 let content = b"single chunk";
1151 let addr = LmdbStorage::compute_address(content);
1152 storage.put(&addr, content).await.unwrap();
1153
1154 let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1156 let mut keys = vec![addr];
1157 keys.extend_from_slice(&absent_keys);
1158
1159 let key_count = keys.len();
1160 let challenge = make_challenge(300, nonce, peer_id, keys);
1161 let self_id = peer_id_from_bytes(peer_id);
1162
1163 let response =
1164 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1165 match response {
1166 AuditResponse::Digests { digests, .. } => {
1167 assert_eq!(
1168 digests.len(),
1169 key_count,
1170 "must produce exactly one digest per challenged key"
1171 );
1172 }
1173 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1174 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1175 }
1176 }
1177
1178 #[test]
1181 fn audit_digest_uses_full_record_bytes() {
1182 let nonce = [1u8; 32];
1184 let peer = [2u8; 32];
1185 let key = [3u8; 32];
1186
1187 let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1188 let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1189 assert_ne!(
1190 d1, d2,
1191 "Different record bytes must produce different digests"
1192 );
1193 }
1194
1195 #[tokio::test]
1206 async fn scenario_29_audit_start_gate_during_bootstrap() {
1207 let (storage, _temp) = create_test_storage().await;
1208
1209 let content = b"should not be audited during bootstrap";
1211 let addr = LmdbStorage::compute_address(content);
1212 storage.put(&addr, content).await.expect("put");
1213
1214 let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1215 let self_id = peer_id_from_bytes([0x29; 32]);
1216
1217 let response =
1219 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1220 assert!(
1221 matches!(
1222 response,
1223 AuditResponse::Bootstrapping { challenge_id: 2900 }
1224 ),
1225 "bootstrapping node must not compute digests — audit start gate"
1226 );
1227
1228 let response =
1230 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1231 assert!(
1232 matches!(response, AuditResponse::Digests { .. }),
1233 "drained node should compute digests normally"
1234 );
1235 }
1236
1237 #[test]
1247 fn scenario_30_audit_peer_selection_from_sampled_keys() {
1248 assert_eq!(
1250 ReplicationConfig::audit_sample_count(100),
1251 10,
1252 "sample count should scale with sqrt(total_keys)"
1253 );
1254
1255 assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1256
1257 assert_eq!(
1258 ReplicationConfig::audit_sample_count(10_000),
1259 100,
1260 "sqrt(10000) = 100"
1261 );
1262
1263 let never = PeerSyncRecord {
1266 last_sync: None,
1267 cycles_since_sync: 10,
1268 };
1269 assert!(!never.has_repair_opportunity());
1270
1271 let too_soon = PeerSyncRecord {
1273 last_sync: Some(Instant::now()),
1274 cycles_since_sync: 0,
1275 };
1276 assert!(!too_soon.has_repair_opportunity());
1277
1278 let eligible = PeerSyncRecord {
1280 last_sync: Some(Instant::now()),
1281 cycles_since_sync: 2,
1282 };
1283 assert!(eligible.has_repair_opportunity());
1284 }
1285
1286 #[tokio::test]
1295 async fn scenario_32_dynamic_challenge_size() {
1296 let (storage, _temp) = create_test_storage().await;
1297
1298 let mut addrs = Vec::new();
1300 for i in 0u8..5 {
1301 let content = format!("dynamic challenge key {i}");
1302 let addr = LmdbStorage::compute_address(content.as_bytes());
1303 storage.put(&addr, content.as_bytes()).await.expect("put");
1304 addrs.push(addr);
1305 }
1306
1307 let nonce = [0x32; 32];
1308 let peer_id = [0x32; 32];
1309 let self_id = peer_id_from_bytes(peer_id);
1310
1311 let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1313 let resp1 =
1314 handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1315 .await;
1316 if let AuditResponse::Digests { digests, .. } = resp1 {
1317 assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1318 }
1319
1320 let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1322 let resp3 =
1323 handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1324 .await;
1325 if let AuditResponse::Digests { digests, .. } = resp3 {
1326 assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1327 }
1328
1329 let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1331 let resp5 =
1332 handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1333 .await;
1334 if let AuditResponse::Digests { digests, .. } = resp5 {
1335 assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1336 }
1337
1338 let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1340 let resp0 =
1341 handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1342 .await;
1343 if let AuditResponse::Digests { digests, .. } = resp0 {
1344 assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1345 }
1346 }
1347
1348 #[tokio::test]
1354 async fn scenario_47_bootstrap_claim_grace_period_audit() {
1355 let (storage, _temp) = create_test_storage().await;
1356
1357 let content = b"bootstrap grace test";
1359 let addr = LmdbStorage::compute_address(content);
1360 storage.put(&addr, content).await.expect("put");
1361
1362 let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1363 let self_id = peer_id_from_bytes([0x47; 32]);
1364
1365 let response =
1367 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1368 let challenge_id = match response {
1369 AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1370 AuditResponse::Digests { .. } => {
1371 panic!("Expected Bootstrapping response during grace period")
1372 }
1373 AuditResponse::Rejected { .. } => {
1374 panic!("Unexpected Rejected response")
1375 }
1376 };
1377 assert_eq!(challenge_id, 4700);
1378
1379 let peer = PeerId::from_bytes([0x47; 32]);
1381 let mut state = NeighborSyncState::new_cycle(vec![peer]);
1382 let now = Instant::now();
1383 let observed = state.observe_bootstrap_claim(
1384 peer,
1385 now,
1386 crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1387 );
1388
1389 assert_eq!(
1390 observed,
1391 BootstrapClaimObservation::WithinGrace { first_seen: now }
1392 );
1393 assert!(
1394 state.bootstrap_claims.contains_key(&peer),
1395 "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1396 );
1397 assert!(
1398 state.bootstrap_claim_history.contains_key(&peer),
1399 "Bootstrap claim history should remember that the grace window was used"
1400 );
1401 }
1402
1403 #[tokio::test]
1414 async fn scenario_53_partial_failure_mixed_responsibility() {
1415 let (storage, _temp) = create_test_storage().await;
1416 let nonce = [0x53; 32];
1417 let peer_id = [0x53; 32];
1418
1419 let c1 = b"scenario 53 key one";
1421 let c2 = b"scenario 53 key two";
1422 let c3 = b"scenario 53 key three";
1423 let k1 = LmdbStorage::compute_address(c1);
1424 let k2 = LmdbStorage::compute_address(c2);
1425 let k3 = LmdbStorage::compute_address(c3);
1426 storage.put(&k1, c1).await.expect("put k1");
1427 storage.put(&k2, c2).await.expect("put k2");
1428 storage.put(&k3, c3).await.expect("put k3");
1429
1430 let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1432 let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1433 let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1434
1435 let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1437 let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1438
1439 assert_eq!(d1_expected, d1_expected, "K1 should match");
1440 assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1441 assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1442
1443 let digests = [d1_expected, d2_wrong, d3_wrong];
1445 let keys = [k1, k2, k3];
1446 let contents: [&[u8]; 3] = [c1, c2, c3];
1447
1448 let mut failed_keys = Vec::new();
1449 for (i, key) in keys.iter().enumerate() {
1450 if digests[i] == ABSENT_KEY_DIGEST {
1451 failed_keys.push(*key);
1452 continue;
1453 }
1454 let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1455 if digests[i] != expected {
1456 failed_keys.push(*key);
1457 }
1458 }
1459
1460 assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1461 assert!(failed_keys.contains(&k2));
1462 assert!(failed_keys.contains(&k3));
1463 assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1464
1465 let responsible_for_k2 = true;
1468 let responsible_for_k3 = false;
1469 let mut confirmed = Vec::new();
1470 for key in &failed_keys {
1471 let is_responsible = if *key == k2 {
1472 responsible_for_k2
1473 } else {
1474 responsible_for_k3
1475 };
1476 if is_responsible {
1477 confirmed.push(*key);
1478 }
1479 }
1480
1481 assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1482
1483 let challenged_peer = PeerId::from_bytes(peer_id);
1485 let evidence = FailureEvidence::AuditFailure {
1486 challenge_id: 5300,
1487 challenged_peer,
1488 confirmed_failed_keys: confirmed,
1489 reason: AuditFailureReason::DigestMismatch,
1490 };
1491
1492 match evidence {
1493 FailureEvidence::AuditFailure {
1494 confirmed_failed_keys,
1495 ..
1496 } => {
1497 assert_eq!(
1498 confirmed_failed_keys.len(),
1499 1,
1500 "Only K2 should generate evidence"
1501 );
1502 assert_eq!(confirmed_failed_keys[0], k2);
1503 }
1504 _ => panic!("Expected AuditFailure evidence"),
1505 }
1506 }
1507}