1use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::logging::{debug, enabled, info, warn};
10use rand::seq::SliceRandom;
11use rand::Rng;
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
15use crate::replication::protocol::{
16 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
17 ReplicationMessageBody, ABSENT_KEY_DIGEST,
18};
19use crate::replication::types::{
20 AuditFailureReason, AuditFailureSummary, FailureEvidence, PeerSyncRecord, RepairProofs,
21};
22use crate::storage::LmdbStorage;
23use saorsa_core::identity::PeerId;
24use saorsa_core::P2PNode;
25use tokio::sync::RwLock;
26
27#[derive(Debug)]
33pub enum AuditTickResult {
34 Passed {
36 challenged_peer: PeerId,
38 keys_checked: usize,
40 },
41 Failed {
43 evidence: FailureEvidence,
45 },
46 BootstrapClaim {
48 peer: PeerId,
50 },
51 Idle,
53 InsufficientKeys,
55}
56
57fn first_challenged_key_label(keys: &[XorName]) -> String {
59 keys.first().map_or_else(
60 || "0x".to_string(),
61 |k| format!("0x{}", hex::encode(&k[..8])),
62 )
63}
64
65fn classify_audit_send_error(error: &str) -> &'static str {
72 let lower = error.to_ascii_lowercase();
73 if lower.contains("timed out") || lower.contains("timeout") {
74 "timeout"
75 } else if lower.contains("peer not found") || lower.contains("no channel") {
76 "peer_unavailable"
77 } else if lower.contains("connection") || lower.contains("connect") || lower.contains("dial") {
78 "connection_failed"
79 } else if lower.contains("closed") || lower.contains("dropped") {
80 "connection_closed"
81 } else if lower.contains("transport") {
82 "transport_error"
83 } else {
84 "other"
85 }
86}
87
88#[allow(clippy::implicit_hasher)]
100pub async fn audit_tick(
101 p2p_node: &Arc<P2PNode>,
102 storage: &Arc<LmdbStorage>,
103 config: &ReplicationConfig,
104 sync_history: &HashMap<PeerId, PeerSyncRecord>,
105 is_bootstrapping: bool,
106) -> AuditTickResult {
107 let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
108 audit_tick_with_repair_proofs(
109 p2p_node,
110 storage,
111 config,
112 sync_history,
113 &repair_proofs,
114 0,
115 is_bootstrapping,
116 )
117 .await
118}
119
120#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
127pub async fn audit_tick_with_repair_proofs(
128 p2p_node: &Arc<P2PNode>,
129 storage: &Arc<LmdbStorage>,
130 config: &ReplicationConfig,
131 sync_history: &HashMap<PeerId, PeerSyncRecord>,
132 repair_proofs: &Arc<RwLock<RepairProofs>>,
133 current_sync_epoch: u64,
134 is_bootstrapping: bool,
135) -> AuditTickResult {
136 if is_bootstrapping {
138 return AuditTickResult::Idle;
139 }
140
141 let dht = p2p_node.dht_manager();
142
143 let eligible_peers = eligible_audit_peers(sync_history);
147
148 if eligible_peers.is_empty() {
149 return AuditTickResult::Idle;
150 }
151
152 let (challenged_peer, nonce, challenge_id) = {
153 let mut rng = rand::thread_rng();
154 let selected = match eligible_peers.choose(&mut rng) {
155 Some(p) => *p,
156 None => return AuditTickResult::Idle,
157 };
158 let n: [u8; 32] = rng.gen();
159 let c: u64 = rng.gen();
160 (selected, n, c)
161 };
162
163 let all_keys = match storage.all_keys().await {
166 Ok(keys) => keys,
167 Err(e) => {
168 warn!("Audit: failed to read local keys: {e}");
169 return AuditTickResult::Idle;
170 }
171 };
172
173 if all_keys.is_empty() {
174 return AuditTickResult::Idle;
175 }
176
177 let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
178 let sampled_keys: Vec<XorName> = {
179 let mut rng = rand::thread_rng();
180 all_keys
181 .choose_multiple(&mut rng, sample_count)
182 .copied()
183 .collect()
184 };
185
186 let mut sampled_key_groups = Vec::new();
190 for key in &sampled_keys {
191 let closest = dht
192 .find_closest_nodes_local_with_self(key, config.close_group_size)
193 .await;
194 let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
195 if close_peers.contains(&challenged_peer) {
196 sampled_key_groups.push((*key, close_peers));
197 }
198 }
199
200 let peer_keys = {
201 let mut proofs = repair_proofs.write().await;
202 let now = Instant::now();
203 mature_audit_keys_for_peer(
204 &challenged_peer,
205 sampled_key_groups,
206 &mut proofs,
207 current_sync_epoch,
208 now,
209 )
210 };
211
212 if peer_keys.is_empty() {
213 return AuditTickResult::Idle;
214 }
215
216 let challenge = AuditChallenge {
222 challenge_id,
223 nonce,
224 challenged_peer_id: *challenged_peer.as_bytes(),
225 keys: peer_keys.clone(),
226 };
227
228 let msg = ReplicationMessage {
229 request_id: challenge_id,
230 body: ReplicationMessageBody::AuditChallenge(challenge),
231 };
232
233 let encoded = match msg.encode() {
234 Ok(data) => data,
235 Err(e) => {
236 warn!("Audit: failed to encode challenge: {e}");
237 return AuditTickResult::Idle;
238 }
239 };
240
241 let encoded_len = encoded.len();
242 let audit_timeout = config.audit_response_timeout(peer_keys.len());
243 let audit_started = Instant::now();
244 let response = match p2p_node
245 .send_request(
246 &challenged_peer,
247 REPLICATION_PROTOCOL_ID,
248 encoded,
249 audit_timeout,
250 )
251 .await
252 {
253 Ok(resp) => resp,
254 Err(e) => {
255 if enabled!(crate::logging::Level::WARN) {
256 let elapsed = audit_started.elapsed();
257 let send_error = e.to_string();
258 let send_error_class = classify_audit_send_error(&send_error);
259 let first_key = first_challenged_key_label(&peer_keys);
260 warn!(
261 audit_type = "responsible_chunk",
262 audit_phase = "challenge_send",
263 audit_outcome = "send_request_failed",
264 challenged_peer = %challenged_peer,
265 challenge_id,
266 key_count = peer_keys.len(),
267 timeout_ms = audit_timeout.as_millis(),
268 elapsed_ms = elapsed.as_millis(),
269 first_key = %first_key,
270 encoded_len,
271 send_error_class,
272 "Audit challenge send_request failed: audit_type=responsible_chunk, audit_phase=challenge_send, audit_outcome=send_request_failed, challenged_peer={challenged_peer}, challenge_id={challenge_id}, key_count={}, timeout_ms={}, elapsed_ms={}, first_key={first_key}, encoded_len={encoded_len}, send_error_class={send_error_class}",
273 peer_keys.len(),
274 audit_timeout.as_millis(),
275 elapsed.as_millis(),
276 );
277 }
278 debug!(
279 challenged_peer = %challenged_peer,
280 challenge_id,
281 send_error = %e,
282 "Audit challenge raw send_request error"
283 );
284 return handle_audit_timeout(
286 &challenged_peer,
287 challenge_id,
288 &peer_keys,
289 p2p_node,
290 config,
291 )
292 .await;
293 }
294 };
295
296 let resp_msg = match ReplicationMessage::decode(&response.data) {
298 Ok(m) => m,
299 Err(e) => {
300 warn!("Audit: failed to decode response from {challenged_peer}: {e}");
301 return handle_audit_failure(
302 &challenged_peer,
303 challenge_id,
304 &peer_keys,
305 AuditFailureReason::MalformedResponse,
306 p2p_node,
307 config,
308 )
309 .await;
310 }
311 };
312
313 match resp_msg.body {
314 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
315 challenge_id: resp_id,
316 }) => {
317 if resp_id != challenge_id {
318 warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
319 return handle_audit_failure(
320 &challenged_peer,
321 challenge_id,
322 &peer_keys,
323 AuditFailureReason::MalformedResponse,
324 p2p_node,
325 config,
326 )
327 .await;
328 }
329 AuditTickResult::BootstrapClaim {
331 peer: challenged_peer,
332 }
333 }
334 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
335 challenge_id: resp_id,
336 digests,
337 }) => {
338 if resp_id != challenge_id {
339 warn!("Audit: challenge ID mismatch from {challenged_peer}");
340 return handle_audit_failure(
341 &challenged_peer,
342 challenge_id,
343 &peer_keys,
344 AuditFailureReason::MalformedResponse,
345 p2p_node,
346 config,
347 )
348 .await;
349 }
350 verify_digests(
351 &challenged_peer,
352 challenge_id,
353 &nonce,
354 &peer_keys,
355 &digests,
356 storage,
357 p2p_node,
358 config,
359 )
360 .await
361 }
362 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
363 challenge_id: resp_id,
364 reason,
365 }) => {
366 if resp_id != challenge_id {
367 warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
368 return handle_audit_failure(
369 &challenged_peer,
370 challenge_id,
371 &peer_keys,
372 AuditFailureReason::MalformedResponse,
373 p2p_node,
374 config,
375 )
376 .await;
377 }
378 warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
379 handle_audit_failure(
380 &challenged_peer,
381 challenge_id,
382 &peer_keys,
383 AuditFailureReason::Rejected,
384 p2p_node,
385 config,
386 )
387 .await
388 }
389 _ => {
390 warn!("Audit: unexpected response type from {challenged_peer}");
391 handle_audit_failure(
392 &challenged_peer,
393 challenge_id,
394 &peer_keys,
395 AuditFailureReason::MalformedResponse,
396 p2p_node,
397 config,
398 )
399 .await
400 }
401 }
402}
403
404fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
405 sync_history
406 .iter()
407 .filter(|(_, record)| record.has_repair_opportunity())
408 .map(|(peer, _)| *peer)
409 .collect()
410}
411
412fn mature_audit_keys_for_peer(
413 challenged_peer: &PeerId,
414 sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
415 repair_proofs: &mut RepairProofs,
416 current_sync_epoch: u64,
417 now: Instant,
418) -> Vec<XorName> {
419 sampled_key_groups
420 .into_iter()
421 .filter_map(|(key, close_peers)| {
422 repair_proofs
423 .has_mature_replica_hint(
424 challenged_peer,
425 &key,
426 &close_peers,
427 current_sync_epoch,
428 now,
429 )
430 .then_some(key)
431 })
432 .collect()
433}
434
435#[derive(Debug, Clone, Copy, PartialEq, Eq)]
436enum AuditKeyFailureKind {
437 Absent,
438 DigestMismatch,
439 Unclassified,
440}
441
442#[derive(Debug, Clone, Copy, PartialEq, Eq)]
443struct AuditKeyFailure {
444 key: XorName,
445 kind: AuditKeyFailureKind,
446}
447
448impl AuditKeyFailure {
449 fn absent(key: XorName) -> Self {
450 Self {
451 key,
452 kind: AuditKeyFailureKind::Absent,
453 }
454 }
455
456 fn digest_mismatch(key: XorName) -> Self {
457 Self {
458 key,
459 kind: AuditKeyFailureKind::DigestMismatch,
460 }
461 }
462
463 fn unclassified(key: XorName) -> Self {
464 Self {
465 key,
466 kind: AuditKeyFailureKind::Unclassified,
467 }
468 }
469}
470
471fn build_audit_failure_summary(
472 challenged_key_count: usize,
473 confirmed_failures: &[AuditKeyFailure],
474) -> AuditFailureSummary {
475 let mut summary = AuditFailureSummary {
476 challenged_keys: challenged_key_count,
477 failed_keys: confirmed_failures.len(),
478 ..AuditFailureSummary::default()
479 };
480
481 for failure in confirmed_failures {
482 match failure.kind {
483 AuditKeyFailureKind::Absent => summary.absent_keys += 1,
484 AuditKeyFailureKind::DigestMismatch => summary.digest_mismatch_keys += 1,
485 AuditKeyFailureKind::Unclassified => {}
486 }
487 }
488
489 summary
490}
491
492fn audit_digest_failure_reason(confirmed_failures: &[AuditKeyFailure]) -> AuditFailureReason {
493 if confirmed_failures
494 .iter()
495 .all(|failure| failure.kind == AuditKeyFailureKind::Absent)
496 {
497 AuditFailureReason::KeyAbsent
498 } else {
499 AuditFailureReason::DigestMismatch
500 }
501}
502
503#[allow(clippy::too_many_arguments)]
509async fn verify_digests(
510 challenged_peer: &PeerId,
511 challenge_id: u64,
512 nonce: &[u8; 32],
513 keys: &[XorName],
514 digests: &[[u8; 32]],
515 storage: &Arc<LmdbStorage>,
516 p2p_node: &Arc<P2PNode>,
517 config: &ReplicationConfig,
518) -> AuditTickResult {
519 if digests.len() != keys.len() {
521 warn!(
522 "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
523 digests.len(),
524 keys.len()
525 );
526 return handle_audit_failure(
527 challenged_peer,
528 challenge_id,
529 keys,
530 AuditFailureReason::MalformedResponse,
531 p2p_node,
532 config,
533 )
534 .await;
535 }
536
537 let challenged_peer_bytes = challenged_peer.as_bytes();
538 let mut failed_keys = Vec::new();
539
540 for (i, key) in keys.iter().enumerate() {
541 let received_digest = &digests[i];
542
543 if *received_digest == ABSENT_KEY_DIGEST {
545 failed_keys.push(AuditKeyFailure::absent(*key));
546 continue;
547 }
548
549 let local_bytes = match storage.get_raw(key).await {
551 Ok(Some(bytes)) => bytes,
552 Ok(None) => {
553 warn!(
555 "Audit: local key {} disappeared during audit",
556 hex::encode(key)
557 );
558 continue;
559 }
560 Err(e) => {
561 warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
562 continue;
563 }
564 };
565
566 let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
567 if *received_digest != expected {
568 failed_keys.push(AuditKeyFailure::digest_mismatch(*key));
569 }
570 }
571
572 if failed_keys.is_empty() {
573 info!(
574 "Audit: peer {challenged_peer} passed (all {} keys verified)",
575 keys.len()
576 );
577 return AuditTickResult::Passed {
578 challenged_peer: *challenged_peer,
579 keys_checked: keys.len(),
580 };
581 }
582
583 handle_classified_audit_failure(
585 challenged_peer,
586 challenge_id,
587 &failed_keys,
588 AuditFailureReason::DigestMismatch,
589 keys.len(),
590 p2p_node,
591 config,
592 )
593 .await
594}
595
596async fn handle_audit_failure(
602 challenged_peer: &PeerId,
603 challenge_id: u64,
604 failed_keys: &[XorName],
605 reason: AuditFailureReason,
606 p2p_node: &Arc<P2PNode>,
607 config: &ReplicationConfig,
608) -> AuditTickResult {
609 let failures = failed_keys
610 .iter()
611 .copied()
612 .map(AuditKeyFailure::unclassified)
613 .collect::<Vec<_>>();
614 handle_classified_audit_failure(
615 challenged_peer,
616 challenge_id,
617 &failures,
618 reason,
619 failed_keys.len(),
620 p2p_node,
621 config,
622 )
623 .await
624}
625
626async fn handle_classified_audit_failure(
627 challenged_peer: &PeerId,
628 challenge_id: u64,
629 failed_keys: &[AuditKeyFailure],
630 reason: AuditFailureReason,
631 challenged_key_count: usize,
632 p2p_node: &Arc<P2PNode>,
633 config: &ReplicationConfig,
634) -> AuditTickResult {
635 let dht = p2p_node.dht_manager();
636 let mut confirmed_failures = Vec::new();
637
638 for failure in failed_keys {
640 let closest = dht
641 .find_closest_nodes_local_with_self(&failure.key, config.close_group_size)
642 .await;
643 if closest.iter().any(|n| n.peer_id == *challenged_peer) {
644 confirmed_failures.push(*failure);
645 } else {
646 debug!(
647 "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
648 hex::encode(failure.key)
649 );
650 }
651 }
652
653 if confirmed_failures.is_empty() {
658 info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
659 return AuditTickResult::Idle;
660 }
661
662 let summary = build_audit_failure_summary(challenged_key_count, &confirmed_failures);
663 let reason = if reason == AuditFailureReason::DigestMismatch {
664 audit_digest_failure_reason(&confirmed_failures)
665 } else {
666 reason
667 };
668 let confirmed_failed_keys = confirmed_failures
669 .iter()
670 .map(|failure| failure.key)
671 .collect();
672
673 let evidence = FailureEvidence::AuditFailure {
675 challenge_id,
676 challenged_peer: *challenged_peer,
677 confirmed_failed_keys,
678 summary,
679 reason,
680 };
681
682 AuditTickResult::Failed { evidence }
683}
684
685async fn handle_audit_timeout(
687 challenged_peer: &PeerId,
688 challenge_id: u64,
689 keys: &[XorName],
690 p2p_node: &Arc<P2PNode>,
691 config: &ReplicationConfig,
692) -> AuditTickResult {
693 handle_audit_failure(
694 challenged_peer,
695 challenge_id,
696 keys,
697 AuditFailureReason::Timeout,
698 p2p_node,
699 config,
700 )
701 .await
702}
703
704pub async fn handle_audit_challenge(
715 challenge: &AuditChallenge,
716 storage: &LmdbStorage,
717 self_peer_id: &PeerId,
718 is_bootstrapping: bool,
719 stored_chunks: usize,
720) -> AuditResponse {
721 if is_bootstrapping {
722 return AuditResponse::Bootstrapping {
723 challenge_id: challenge.challenge_id,
724 };
725 }
726
727 if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
728 warn!(
729 "Audit challenge targeted wrong peer: expected {}, got {}",
730 hex::encode(self_peer_id.as_bytes()),
731 hex::encode(challenge.challenged_peer_id),
732 );
733 return AuditResponse::Rejected {
734 challenge_id: challenge.challenge_id,
735 reason: "challenged_peer_id does not match this node".to_string(),
736 };
737 }
738
739 let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
740 if challenge.keys.len() > max_keys {
741 warn!(
742 "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
743 (stored_chunks={stored_chunks})",
744 challenge.keys.len(),
745 );
746 return AuditResponse::Rejected {
747 challenge_id: challenge.challenge_id,
748 reason: format!(
749 "challenge contains {} keys, limit is {max_keys}",
750 challenge.keys.len()
751 ),
752 };
753 }
754
755 let mut digests = Vec::with_capacity(challenge.keys.len());
756
757 for key in &challenge.keys {
758 match storage.get_raw(key).await {
759 Ok(Some(data)) => {
760 let digest = compute_audit_digest(
761 &challenge.nonce,
762 &challenge.challenged_peer_id,
763 key,
764 &data,
765 );
766 digests.push(digest);
767 }
768 Ok(None) => {
769 digests.push(ABSENT_KEY_DIGEST);
770 }
771 Err(e) => {
772 warn!(
773 "Audit responder: failed to read key {}: {e}",
774 hex::encode(key)
775 );
776 digests.push(ABSENT_KEY_DIGEST);
777 }
778 }
779 }
780
781 AuditResponse::Digests {
782 challenge_id: challenge.challenge_id,
783 digests,
784 }
785}
786
787#[cfg(test)]
792#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
793mod tests {
794 use super::*;
795 use crate::replication::config::REPAIR_HINT_MIN_AGE;
796 use crate::replication::protocol::compute_audit_digest;
797 use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
798 use crate::storage::LmdbStorageConfig;
799 use std::time::Instant;
800 use tempfile::TempDir;
801
802 const TEST_STORED_CHUNKS: usize = 1_000_000;
805
806 #[test]
807 fn first_challenged_key_label_truncates_to_16_hex_chars() {
808 let mut key = [0u8; 32];
809 key[0] = 0xAB;
810 key[7] = 0xCD;
811 key[8] = 0xEF;
812
813 assert_eq!(first_challenged_key_label(&[key]), "0xab000000000000cd");
814 }
815
816 #[test]
817 fn first_challenged_key_label_falls_back_when_empty() {
818 assert_eq!(first_challenged_key_label(&[]), "0x");
819 }
820
821 #[test]
822 fn classify_audit_send_error_uses_bounded_classes() {
823 assert_eq!(
824 classify_audit_send_error("request to peer timed out after 10s"),
825 "timeout"
826 );
827 assert_eq!(
828 classify_audit_send_error("peer not found in active channels"),
829 "peer_unavailable"
830 );
831 assert_eq!(
832 classify_audit_send_error("dial failed for all candidate addresses"),
833 "connection_failed"
834 );
835 assert_eq!(
836 classify_audit_send_error("response receiver dropped before delivery"),
837 "connection_closed"
838 );
839 assert_eq!(
840 classify_audit_send_error("transport stream error"),
841 "transport_error"
842 );
843 assert_eq!(classify_audit_send_error("unexpected error"), "other");
844 }
845
846 async fn create_test_storage() -> (LmdbStorage, TempDir) {
848 let temp_dir = TempDir::new().expect("create temp dir");
849 let config = LmdbStorageConfig {
850 root_dir: temp_dir.path().to_path_buf(),
851 verify_on_read: false,
852 max_map_size: 0,
853 disk_reserve: 0,
854 };
855 let storage = LmdbStorage::new(config).await.expect("create storage");
856 (storage, temp_dir)
857 }
858
859 fn make_challenge(
861 challenge_id: u64,
862 nonce: [u8; 32],
863 peer_id: [u8; 32],
864 keys: Vec<XorName>,
865 ) -> AuditChallenge {
866 AuditChallenge {
867 challenge_id,
868 nonce,
869 challenged_peer_id: peer_id,
870 keys,
871 }
872 }
873
874 fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
876 PeerId::from_bytes(bytes)
877 }
878
879 #[tokio::test]
882 async fn handle_challenge_present_keys_returns_correct_digests() {
883 let (storage, _temp) = create_test_storage().await;
884
885 let content_a = b"chunk alpha";
887 let addr_a = LmdbStorage::compute_address(content_a);
888 storage.put(&addr_a, content_a).await.expect("put a");
889
890 let content_b = b"chunk beta";
891 let addr_b = LmdbStorage::compute_address(content_b);
892 storage.put(&addr_b, content_b).await.expect("put b");
893
894 let nonce = [0xAA; 32];
895 let peer_id = [0xBB; 32];
896 let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
897 let self_id = peer_id_from_bytes(peer_id);
898
899 let response =
900 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
901
902 match response {
903 AuditResponse::Digests {
904 challenge_id,
905 digests,
906 } => {
907 assert_eq!(challenge_id, 42);
908 assert_eq!(digests.len(), 2);
909
910 let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
911 let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
912 assert_eq!(digests[0], expected_a);
913 assert_eq!(digests[1], expected_b);
914 }
915 AuditResponse::Bootstrapping { .. } => {
916 panic!("expected Digests, got Bootstrapping");
917 }
918 AuditResponse::Rejected { .. } => {
919 panic!("Unexpected Rejected response");
920 }
921 }
922 }
923
924 #[tokio::test]
927 async fn handle_challenge_absent_keys_returns_sentinel() {
928 let (storage, _temp) = create_test_storage().await;
929
930 let absent_key = [0xFF; 32];
931 let nonce = [0x11; 32];
932 let peer_id = [0x22; 32];
933 let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
934 let self_id = peer_id_from_bytes(peer_id);
935
936 let response =
937 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
938
939 match response {
940 AuditResponse::Digests {
941 challenge_id,
942 digests,
943 } => {
944 assert_eq!(challenge_id, 99);
945 assert_eq!(digests.len(), 1);
946 assert_eq!(
947 digests[0], ABSENT_KEY_DIGEST,
948 "absent key should produce sentinel digest"
949 );
950 }
951 AuditResponse::Bootstrapping { .. } => {
952 panic!("expected Digests, got Bootstrapping");
953 }
954 AuditResponse::Rejected { .. } => {
955 panic!("Unexpected Rejected response");
956 }
957 }
958 }
959
960 #[tokio::test]
963 async fn handle_challenge_mixed_present_and_absent() {
964 let (storage, _temp) = create_test_storage().await;
965
966 let content = b"present chunk";
967 let addr_present = LmdbStorage::compute_address(content);
968 storage.put(&addr_present, content).await.expect("put");
969
970 let addr_absent = [0xDE; 32];
971 let nonce = [0x33; 32];
972 let peer_id = [0x44; 32];
973 let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
974 let self_id = peer_id_from_bytes(peer_id);
975
976 let response =
977 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
978
979 match response {
980 AuditResponse::Digests { digests, .. } => {
981 assert_eq!(digests.len(), 2);
982
983 let expected_present =
984 compute_audit_digest(&nonce, &peer_id, &addr_present, content);
985 assert_eq!(digests[0], expected_present);
986 assert_eq!(
987 digests[1], ABSENT_KEY_DIGEST,
988 "absent key should be sentinel"
989 );
990 }
991 AuditResponse::Bootstrapping { .. } => {
992 panic!("expected Digests, got Bootstrapping");
993 }
994 AuditResponse::Rejected { .. } => {
995 panic!("Unexpected Rejected response");
996 }
997 }
998 }
999
1000 #[tokio::test]
1003 async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
1004 let (storage, _temp) = create_test_storage().await;
1005
1006 let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
1007 let self_id = peer_id_from_bytes([0x01; 32]);
1008
1009 let response =
1010 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1011
1012 match response {
1013 AuditResponse::Bootstrapping { challenge_id } => {
1014 assert_eq!(challenge_id, 55);
1015 }
1016 AuditResponse::Digests { .. } => {
1017 panic!("expected Bootstrapping, got Digests");
1018 }
1019 AuditResponse::Rejected { .. } => {
1020 panic!("Unexpected Rejected response");
1021 }
1022 }
1023 }
1024
1025 #[tokio::test]
1028 async fn handle_challenge_empty_keys_returns_empty_digests() {
1029 let (storage, _temp) = create_test_storage().await;
1030
1031 let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
1032 let self_id = peer_id_from_bytes([0x20; 32]);
1033
1034 let response =
1035 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1036
1037 match response {
1038 AuditResponse::Digests {
1039 challenge_id,
1040 digests,
1041 } => {
1042 assert_eq!(challenge_id, 100);
1043 assert!(
1044 digests.is_empty(),
1045 "empty key list should yield empty digests"
1046 );
1047 }
1048 AuditResponse::Bootstrapping { .. } => {
1049 panic!("expected Digests, got Bootstrapping");
1050 }
1051 AuditResponse::Rejected { .. } => {
1052 panic!("Unexpected Rejected response");
1053 }
1054 }
1055 }
1056
1057 #[test]
1060 fn digest_verification_matching() {
1061 let nonce = [0x01; 32];
1062 let peer_id = [0x02; 32];
1063 let key: XorName = [0x03; 32];
1064 let data = b"correct data";
1065
1066 let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
1067 let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
1068
1069 assert_eq!(
1070 expected, recomputed,
1071 "same inputs must produce identical digests"
1072 );
1073 assert_ne!(
1074 expected, ABSENT_KEY_DIGEST,
1075 "real digest must not be sentinel"
1076 );
1077 }
1078
1079 #[test]
1082 fn digest_verification_mismatching_data() {
1083 let nonce = [0x01; 32];
1084 let peer_id = [0x02; 32];
1085 let key: XorName = [0x03; 32];
1086
1087 let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
1088 let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
1089
1090 assert_ne!(
1091 digest_a, digest_b,
1092 "different data must produce different digests"
1093 );
1094 }
1095
1096 #[test]
1097 fn digest_verification_mismatching_nonce() {
1098 let peer_id = [0x02; 32];
1099 let key: XorName = [0x03; 32];
1100 let data = b"same data";
1101
1102 let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
1103 let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
1104
1105 assert_ne!(
1106 digest_a, digest_b,
1107 "different nonces must produce different digests"
1108 );
1109 }
1110
1111 #[test]
1112 fn digest_verification_mismatching_peer() {
1113 let nonce = [0x01; 32];
1114 let key: XorName = [0x03; 32];
1115 let data = b"same data";
1116
1117 let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
1118 let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
1119
1120 assert_ne!(
1121 digest_a, digest_b,
1122 "different peers must produce different digests"
1123 );
1124 }
1125
1126 #[test]
1127 fn digest_verification_mismatching_key() {
1128 let nonce = [0x01; 32];
1129 let peer_id = [0x02; 32];
1130 let data = b"same data";
1131
1132 let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
1133 let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
1134
1135 assert_ne!(
1136 digest_a, digest_b,
1137 "different keys must produce different digests"
1138 );
1139 }
1140
1141 #[test]
1144 fn absent_sentinel_is_all_zeros() {
1145 assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
1146 }
1147
1148 #[tokio::test]
1151 async fn bootstrapping_skips_digest_computation() {
1152 let (storage, _temp) = create_test_storage().await;
1153
1154 let content = b"stored but bootstrapping";
1155 let addr = LmdbStorage::compute_address(content);
1156 storage.put(&addr, content).await.expect("put");
1157
1158 let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
1159 let self_id = peer_id_from_bytes([0xDD; 32]);
1160
1161 let response =
1162 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1163
1164 assert!(
1165 matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
1166 "bootstrapping node must not compute digests"
1167 );
1168 }
1169
1170 #[tokio::test]
1173 async fn scenario_19_partial_failure_mixed_responsibility() {
1174 let (storage, _temp) = create_test_storage().await;
1181 let nonce = [0x42u8; 32];
1182 let peer_id = [0xAA; 32];
1183
1184 let content_k1 = b"key one data";
1186 let addr_k1 = LmdbStorage::compute_address(content_k1);
1187 storage.put(&addr_k1, content_k1).await.unwrap();
1188
1189 let content_k2 = b"key two data";
1190 let addr_k2 = LmdbStorage::compute_address(content_k2);
1191 storage.put(&addr_k2, content_k2).await.unwrap();
1192
1193 let addr_k3 = [0xFF; 32]; let challenge = AuditChallenge {
1196 challenge_id: 100,
1197 nonce,
1198 challenged_peer_id: peer_id,
1199 keys: vec![addr_k1, addr_k2, addr_k3],
1200 };
1201 let self_id = peer_id_from_bytes(peer_id);
1202
1203 let response =
1204 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1205
1206 match response {
1207 AuditResponse::Digests { digests, .. } => {
1208 assert_eq!(digests.len(), 3);
1209
1210 let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
1212 assert_eq!(digests[0], expected_k1);
1213
1214 let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
1216 assert_eq!(digests[1], expected_k2);
1217
1218 assert_eq!(digests[2], ABSENT_KEY_DIGEST);
1220 }
1221 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
1222 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1223 }
1224 }
1225
1226 #[tokio::test]
1229 async fn scenario_54_all_digests_pass() {
1230 let (storage, _temp) = create_test_storage().await;
1233 let nonce = [0x10; 32];
1234 let peer_id = [0x20; 32];
1235
1236 let c1 = b"chunk alpha";
1237 let c2 = b"chunk beta";
1238 let c3 = b"chunk gamma";
1239 let a1 = LmdbStorage::compute_address(c1);
1240 let a2 = LmdbStorage::compute_address(c2);
1241 let a3 = LmdbStorage::compute_address(c3);
1242 storage.put(&a1, c1).await.unwrap();
1243 storage.put(&a2, c2).await.unwrap();
1244 storage.put(&a3, c3).await.unwrap();
1245
1246 let challenge = AuditChallenge {
1247 challenge_id: 200,
1248 nonce,
1249 challenged_peer_id: peer_id,
1250 keys: vec![a1, a2, a3],
1251 };
1252 let self_id = peer_id_from_bytes(peer_id);
1253
1254 let response =
1255 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1256 match response {
1257 AuditResponse::Digests { digests, .. } => {
1258 assert_eq!(digests.len(), 3);
1259 for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
1260 .iter()
1261 .enumerate()
1262 {
1263 let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
1264 assert_eq!(digests[i], expected, "Key {i} digest should match");
1265 }
1266 }
1267 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1268 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1269 }
1270 }
1271
1272 #[tokio::test]
1285 async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1286 let (storage, _temp) = create_test_storage().await;
1287 let nonce = [0x55; 32];
1288 let peer_id = [0x55; 32];
1289
1290 let c1 = b"scenario 55 key one";
1292 let c2 = b"scenario 55 key two";
1293 let k1 = LmdbStorage::compute_address(c1);
1294 let k2 = LmdbStorage::compute_address(c2);
1295 storage.put(&k1, c1).await.expect("put k1");
1296 storage.put(&k2, c2).await.expect("put k2");
1297
1298 let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1300 let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1301
1302 let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1304 let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1305 assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1306 assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1307
1308 let keys = [k1, k2];
1310 let expected = [expected_d1, expected_d2];
1311 let received = [wrong_d1, wrong_d2];
1312
1313 let mut failed_keys = Vec::new();
1314 for i in 0..keys.len() {
1315 if received[i] != expected[i] {
1316 failed_keys.push(keys[i]);
1317 }
1318 }
1319 assert_eq!(
1320 failed_keys.len(),
1321 2,
1322 "Both keys should be identified as digest mismatches"
1323 );
1324
1325 let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1328 let confirmed_failures: Vec<XorName> = failed_keys
1329 .into_iter()
1330 .filter(|k| confirmed_responsible_keys.contains(k))
1331 .collect();
1332
1333 assert!(
1335 confirmed_failures.is_empty(),
1336 "With no confirmed responsibility, failure set must be empty — \
1337 no AuditFailure evidence should be emitted"
1338 );
1339
1340 let peer = PeerId::from_bytes(peer_id);
1343 let evidence = FailureEvidence::AuditFailure {
1344 challenge_id: 5500,
1345 challenged_peer: peer,
1346 confirmed_failed_keys: confirmed_failures,
1347 summary: AuditFailureSummary::default(),
1348 reason: AuditFailureReason::DigestMismatch,
1349 };
1350 if let FailureEvidence::AuditFailure {
1351 confirmed_failed_keys,
1352 ..
1353 } = evidence
1354 {
1355 assert!(
1356 confirmed_failed_keys.is_empty(),
1357 "Evidence with empty failure set should not trigger a trust penalty"
1358 );
1359 }
1360 }
1361
1362 #[test]
1365 fn scenario_56_repair_opportunity_filters_never_synced() {
1366 let never_synced = PeerSyncRecord {
1370 last_sync: None,
1371 cycles_since_sync: 5,
1372 };
1373 assert!(!never_synced.has_repair_opportunity());
1374
1375 let synced_no_cycle = PeerSyncRecord {
1376 last_sync: Some(Instant::now()),
1377 cycles_since_sync: 0,
1378 };
1379 assert!(!synced_no_cycle.has_repair_opportunity());
1380
1381 let synced_with_cycle = PeerSyncRecord {
1382 last_sync: Some(Instant::now()),
1383 cycles_since_sync: 1,
1384 };
1385 assert!(synced_with_cycle.has_repair_opportunity());
1386 }
1387
1388 #[test]
1389 fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1390 let peer = peer_id_from_bytes([0x57; 32]);
1391 let mut sync_history = HashMap::new();
1392 sync_history.insert(
1393 peer,
1394 PeerSyncRecord {
1395 last_sync: Some(Instant::now()),
1396 cycles_since_sync: 1,
1397 },
1398 );
1399
1400 let mut bootstrap_claims = HashMap::new();
1401 let first_seen = Instant::now()
1402 .checked_sub(
1403 crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1404 + std::time::Duration::from_secs(1),
1405 )
1406 .unwrap_or_else(Instant::now);
1407 bootstrap_claims.insert(peer, first_seen);
1408
1409 let eligible = eligible_audit_peers(&sync_history);
1410
1411 assert!(bootstrap_claims.contains_key(&peer));
1412 assert!(
1413 eligible.contains(&peer),
1414 "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1415 );
1416 }
1417
1418 #[test]
1419 fn audit_failure_summary_counts_confirmed_absent_and_mismatch_keys() {
1420 let absent_key = [0xA1; 32];
1421 let mismatch_key = [0xB2; 32];
1422 let confirmed = vec![
1423 AuditKeyFailure::absent(absent_key),
1424 AuditKeyFailure::digest_mismatch(mismatch_key),
1425 ];
1426
1427 let summary = build_audit_failure_summary(5, &confirmed);
1428
1429 assert_eq!(summary.challenged_keys, 5);
1430 assert_eq!(summary.failed_keys, 2);
1431 assert_eq!(summary.absent_keys, 1);
1432 assert_eq!(summary.digest_mismatch_keys, 1);
1433 }
1434
1435 #[test]
1436 fn audit_failure_summary_leaves_unclassified_rejections_out_of_absent_mismatch_counts() {
1437 let rejected_key = [0xC3; 32];
1438 let confirmed = vec![AuditKeyFailure::unclassified(rejected_key)];
1439
1440 let summary = build_audit_failure_summary(3, &confirmed);
1441
1442 assert_eq!(summary.challenged_keys, 3);
1443 assert_eq!(summary.failed_keys, 1);
1444 assert_eq!(summary.absent_keys, 0);
1445 assert_eq!(summary.digest_mismatch_keys, 0);
1446 }
1447
1448 #[test]
1449 fn audit_digest_failure_reason_is_key_absent_when_all_confirmed_failures_are_absent() {
1450 let failures = vec![AuditKeyFailure::absent([0xD4; 32])];
1451
1452 assert_eq!(
1453 audit_digest_failure_reason(&failures),
1454 AuditFailureReason::KeyAbsent
1455 );
1456 }
1457
1458 #[test]
1459 fn audit_digest_failure_reason_is_digest_mismatch_for_mixed_failures() {
1460 let failures = vec![
1461 AuditKeyFailure::absent([0xD5; 32]),
1462 AuditKeyFailure::digest_mismatch([0xE6; 32]),
1463 ];
1464
1465 assert_eq!(
1466 audit_digest_failure_reason(&failures),
1467 AuditFailureReason::DigestMismatch
1468 );
1469 }
1470
1471 #[test]
1472 fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() {
1473 const HINT_EPOCH: u64 = 7;
1474 const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1475 const CHALLENGED_PEER_BYTE: u8 = 0xA1;
1476 const OTHER_PEER_BYTE: u8 = 0xA2;
1477 const NEW_PEER_BYTE: u8 = 0xA3;
1478 const MATURE_KEY_BYTE: u8 = 0xB1;
1479 const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
1480 const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
1481 const STABLE_CHURN_KEY_BYTE: u8 = 0xB4;
1482 const EVICTED_KEY_BYTE: u8 = 0xB5;
1483 const FRESH_HINT_KEY_BYTE: u8 = 0xB6;
1484 const XOR_NAME_LEN: usize = 32;
1485
1486 let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
1487 let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
1488 let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
1489 let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
1490 let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
1491 let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
1492 let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN];
1493 let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN];
1494 let fresh_hint_key = [FRESH_HINT_KEY_BYTE; XOR_NAME_LEN];
1495 let close_group = HashSet::from([challenged_peer, other_peer]);
1496 let changed_close_group = HashSet::from([challenged_peer, new_peer]);
1497 let evicted_close_group = HashSet::from([other_peer, new_peer]);
1498 let mut repair_proofs = RepairProofs::new();
1499 let mature_hinted_at = Instant::now();
1500 let now = mature_hinted_at
1501 .checked_add(REPAIR_HINT_MIN_AGE)
1502 .unwrap_or(mature_hinted_at);
1503
1504 assert!(repair_proofs.record_replica_hint_sent_at(
1505 challenged_peer,
1506 mature_key,
1507 &close_group,
1508 HINT_EPOCH,
1509 mature_hinted_at,
1510 ));
1511 assert!(repair_proofs.record_replica_hint_sent_at(
1512 challenged_peer,
1513 same_epoch_key,
1514 &close_group,
1515 CURRENT_EPOCH,
1516 mature_hinted_at,
1517 ));
1518 assert!(repair_proofs.record_replica_hint_sent_at(
1519 challenged_peer,
1520 stable_churn_key,
1521 &close_group,
1522 HINT_EPOCH,
1523 mature_hinted_at,
1524 ));
1525 assert!(repair_proofs.record_replica_hint_sent_at(
1526 challenged_peer,
1527 evicted_key,
1528 &close_group,
1529 HINT_EPOCH,
1530 mature_hinted_at,
1531 ));
1532 assert!(repair_proofs.record_replica_hint_sent_at(
1533 challenged_peer,
1534 fresh_hint_key,
1535 &close_group,
1536 HINT_EPOCH,
1537 now,
1538 ));
1539
1540 let sampled_key_groups = vec![
1541 (mature_key, close_group.clone()),
1542 (same_epoch_key, close_group.clone()),
1543 (missing_proof_key, close_group.clone()),
1544 (stable_churn_key, changed_close_group),
1545 (evicted_key, evicted_close_group),
1546 (fresh_hint_key, close_group.clone()),
1547 ];
1548 let peer_keys = mature_audit_keys_for_peer(
1549 &challenged_peer,
1550 sampled_key_groups,
1551 &mut repair_proofs,
1552 CURRENT_EPOCH,
1553 now,
1554 );
1555
1556 assert_eq!(
1557 peer_keys,
1558 vec![mature_key, stable_churn_key],
1559 "mature proofs for stable close-group peers should become audit keys, while same-epoch, fresh, missing, and evicted-peer proofs should not"
1560 );
1561 }
1562
1563 #[tokio::test]
1566 async fn audit_response_must_match_key_count() {
1567 let (storage, _temp) = create_test_storage().await;
1573 let nonce = [0x50; 32];
1574 let peer_id = [0x60; 32];
1575
1576 let content = b"single chunk";
1578 let addr = LmdbStorage::compute_address(content);
1579 storage.put(&addr, content).await.unwrap();
1580
1581 let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1583 let mut keys = vec![addr];
1584 keys.extend_from_slice(&absent_keys);
1585
1586 let key_count = keys.len();
1587 let challenge = make_challenge(300, nonce, peer_id, keys);
1588 let self_id = peer_id_from_bytes(peer_id);
1589
1590 let response =
1591 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1592 match response {
1593 AuditResponse::Digests { digests, .. } => {
1594 assert_eq!(
1595 digests.len(),
1596 key_count,
1597 "must produce exactly one digest per challenged key"
1598 );
1599 }
1600 AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1601 AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1602 }
1603 }
1604
1605 #[test]
1608 fn audit_digest_uses_full_record_bytes() {
1609 let nonce = [1u8; 32];
1611 let peer = [2u8; 32];
1612 let key = [3u8; 32];
1613
1614 let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1615 let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1616 assert_ne!(
1617 d1, d2,
1618 "Different record bytes must produce different digests"
1619 );
1620 }
1621
1622 #[tokio::test]
1633 async fn scenario_29_audit_start_gate_during_bootstrap() {
1634 let (storage, _temp) = create_test_storage().await;
1635
1636 let content = b"should not be audited during bootstrap";
1638 let addr = LmdbStorage::compute_address(content);
1639 storage.put(&addr, content).await.expect("put");
1640
1641 let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1642 let self_id = peer_id_from_bytes([0x29; 32]);
1643
1644 let response =
1646 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1647 assert!(
1648 matches!(
1649 response,
1650 AuditResponse::Bootstrapping { challenge_id: 2900 }
1651 ),
1652 "bootstrapping node must not compute digests — audit start gate"
1653 );
1654
1655 let response =
1657 handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1658 assert!(
1659 matches!(response, AuditResponse::Digests { .. }),
1660 "drained node should compute digests normally"
1661 );
1662 }
1663
1664 #[test]
1674 fn scenario_30_audit_peer_selection_from_sampled_keys() {
1675 assert_eq!(
1677 ReplicationConfig::audit_sample_count(100),
1678 10,
1679 "sample count should scale with sqrt(total_keys)"
1680 );
1681
1682 assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1683
1684 assert_eq!(
1685 ReplicationConfig::audit_sample_count(10_000),
1686 100,
1687 "sqrt(10000) = 100"
1688 );
1689
1690 let never = PeerSyncRecord {
1693 last_sync: None,
1694 cycles_since_sync: 10,
1695 };
1696 assert!(!never.has_repair_opportunity());
1697
1698 let too_soon = PeerSyncRecord {
1700 last_sync: Some(Instant::now()),
1701 cycles_since_sync: 0,
1702 };
1703 assert!(!too_soon.has_repair_opportunity());
1704
1705 let eligible = PeerSyncRecord {
1707 last_sync: Some(Instant::now()),
1708 cycles_since_sync: 2,
1709 };
1710 assert!(eligible.has_repair_opportunity());
1711 }
1712
1713 #[tokio::test]
1722 async fn scenario_32_dynamic_challenge_size() {
1723 let (storage, _temp) = create_test_storage().await;
1724
1725 let mut addrs = Vec::new();
1727 for i in 0u8..5 {
1728 let content = format!("dynamic challenge key {i}");
1729 let addr = LmdbStorage::compute_address(content.as_bytes());
1730 storage.put(&addr, content.as_bytes()).await.expect("put");
1731 addrs.push(addr);
1732 }
1733
1734 let nonce = [0x32; 32];
1735 let peer_id = [0x32; 32];
1736 let self_id = peer_id_from_bytes(peer_id);
1737
1738 let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1740 let resp1 =
1741 handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1742 .await;
1743 if let AuditResponse::Digests { digests, .. } = resp1 {
1744 assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1745 }
1746
1747 let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1749 let resp3 =
1750 handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1751 .await;
1752 if let AuditResponse::Digests { digests, .. } = resp3 {
1753 assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1754 }
1755
1756 let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1758 let resp5 =
1759 handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1760 .await;
1761 if let AuditResponse::Digests { digests, .. } = resp5 {
1762 assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1763 }
1764
1765 let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1767 let resp0 =
1768 handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1769 .await;
1770 if let AuditResponse::Digests { digests, .. } = resp0 {
1771 assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1772 }
1773 }
1774
1775 #[tokio::test]
1781 async fn scenario_47_bootstrap_claim_grace_period_audit() {
1782 let (storage, _temp) = create_test_storage().await;
1783
1784 let content = b"bootstrap grace test";
1786 let addr = LmdbStorage::compute_address(content);
1787 storage.put(&addr, content).await.expect("put");
1788
1789 let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1790 let self_id = peer_id_from_bytes([0x47; 32]);
1791
1792 let response =
1794 handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1795 let challenge_id = match response {
1796 AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1797 AuditResponse::Digests { .. } => {
1798 panic!("Expected Bootstrapping response during grace period")
1799 }
1800 AuditResponse::Rejected { .. } => {
1801 panic!("Unexpected Rejected response")
1802 }
1803 };
1804 assert_eq!(challenge_id, 4700);
1805
1806 let peer = PeerId::from_bytes([0x47; 32]);
1808 let mut state = NeighborSyncState::new_cycle(vec![peer]);
1809 let now = Instant::now();
1810 let observed = state.observe_bootstrap_claim(
1811 peer,
1812 now,
1813 crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1814 );
1815
1816 assert_eq!(
1817 observed,
1818 BootstrapClaimObservation::WithinGrace { first_seen: now }
1819 );
1820 assert!(
1821 state.bootstrap_claims.contains_key(&peer),
1822 "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1823 );
1824 assert!(
1825 state.bootstrap_claim_history.contains_key(&peer),
1826 "Bootstrap claim history should remember that the grace window was used"
1827 );
1828 }
1829
1830 #[tokio::test]
1841 async fn scenario_53_partial_failure_mixed_responsibility() {
1842 let (storage, _temp) = create_test_storage().await;
1843 let nonce = [0x53; 32];
1844 let peer_id = [0x53; 32];
1845
1846 let c1 = b"scenario 53 key one";
1848 let c2 = b"scenario 53 key two";
1849 let c3 = b"scenario 53 key three";
1850 let k1 = LmdbStorage::compute_address(c1);
1851 let k2 = LmdbStorage::compute_address(c2);
1852 let k3 = LmdbStorage::compute_address(c3);
1853 storage.put(&k1, c1).await.expect("put k1");
1854 storage.put(&k2, c2).await.expect("put k2");
1855 storage.put(&k3, c3).await.expect("put k3");
1856
1857 let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1859 let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1860 let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1861
1862 let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1864 let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1865
1866 assert_eq!(d1_expected, d1_expected, "K1 should match");
1867 assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1868 assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1869
1870 let digests = [d1_expected, d2_wrong, d3_wrong];
1872 let keys = [k1, k2, k3];
1873 let contents: [&[u8]; 3] = [c1, c2, c3];
1874
1875 let mut failed_keys = Vec::new();
1876 for (i, key) in keys.iter().enumerate() {
1877 if digests[i] == ABSENT_KEY_DIGEST {
1878 failed_keys.push(*key);
1879 continue;
1880 }
1881 let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1882 if digests[i] != expected {
1883 failed_keys.push(*key);
1884 }
1885 }
1886
1887 assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1888 assert!(failed_keys.contains(&k2));
1889 assert!(failed_keys.contains(&k3));
1890 assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1891
1892 let responsible_for_k2 = true;
1895 let responsible_for_k3 = false;
1896 let mut confirmed = Vec::new();
1897 for key in &failed_keys {
1898 let is_responsible = if *key == k2 {
1899 responsible_for_k2
1900 } else {
1901 responsible_for_k3
1902 };
1903 if is_responsible {
1904 confirmed.push(*key);
1905 }
1906 }
1907
1908 assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1909
1910 let challenged_peer = PeerId::from_bytes(peer_id);
1912 let evidence = FailureEvidence::AuditFailure {
1913 challenge_id: 5300,
1914 challenged_peer,
1915 confirmed_failed_keys: confirmed,
1916 summary: AuditFailureSummary::default(),
1917 reason: AuditFailureReason::DigestMismatch,
1918 };
1919
1920 match evidence {
1921 FailureEvidence::AuditFailure {
1922 confirmed_failed_keys,
1923 ..
1924 } => {
1925 assert_eq!(
1926 confirmed_failed_keys.len(),
1927 1,
1928 "Only K2 should generate evidence"
1929 );
1930 assert_eq!(confirmed_failed_keys[0], k2);
1931 }
1932 _ => panic!("Expected AuditFailure evidence"),
1933 }
1934 }
1935}