Skip to main content

ant_node/replication/
audit.rs

1//! Storage audit protocol (Section 15).
2//!
3//! Challenge-response for claimed holders. Anti-outsourcing protection.
4
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7
8use crate::logging::{debug, info, warn};
9use rand::seq::SliceRandom;
10use rand::Rng;
11
12use crate::ant_protocol::XorName;
13use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
14use crate::replication::protocol::{
15    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
16    ReplicationMessageBody, ABSENT_KEY_DIGEST,
17};
18use crate::replication::types::{
19    AuditFailureReason, AuditFailureSummary, FailureEvidence, PeerSyncRecord, RepairProofs,
20};
21use crate::storage::LmdbStorage;
22use saorsa_core::identity::PeerId;
23use saorsa_core::P2PNode;
24use tokio::sync::RwLock;
25
26// ---------------------------------------------------------------------------
27// Audit tick result
28// ---------------------------------------------------------------------------
29
30/// Result of an audit tick.
31#[derive(Debug)]
32pub enum AuditTickResult {
33    /// Audit completed successfully (all digests matched).
34    Passed {
35        /// The peer that was challenged.
36        challenged_peer: PeerId,
37        /// Number of keys verified.
38        keys_checked: usize,
39    },
40    /// Audit found failures (after responsibility confirmation).
41    Failed {
42        /// Evidence of the failure for trust engine.
43        evidence: FailureEvidence,
44    },
45    /// Audit target claimed bootstrapping.
46    BootstrapClaim {
47        /// The peer claiming bootstrap status.
48        peer: PeerId,
49    },
50    /// No eligible peers for audit this tick.
51    Idle,
52    /// Audit skipped (not enough local keys).
53    InsufficientKeys,
54}
55
56// ---------------------------------------------------------------------------
57// Main audit tick
58// ---------------------------------------------------------------------------
59
60/// Execute one audit tick (Section 15 steps 2-9).
61///
62/// Returns the audit result. Caller is responsible for emitting trust events.
63///
64/// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if
65/// `is_bootstrapping` is `true` — a node must not audit others while it
66/// is still bootstrapping.
67#[allow(clippy::implicit_hasher)]
68pub async fn audit_tick(
69    p2p_node: &Arc<P2PNode>,
70    storage: &Arc<LmdbStorage>,
71    config: &ReplicationConfig,
72    sync_history: &HashMap<PeerId, PeerSyncRecord>,
73    is_bootstrapping: bool,
74) -> AuditTickResult {
75    let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
76    audit_tick_with_repair_proofs(
77        p2p_node,
78        storage,
79        config,
80        sync_history,
81        &repair_proofs,
82        0,
83        is_bootstrapping,
84    )
85    .await
86}
87
88/// Execute one repair-proof-gated audit tick.
89///
90/// This is the production path used by the replication engine. The
91/// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct
92/// callers that have not adopted repair proofs remain conservative and do not
93/// audit peers for unproven keys.
94#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
95pub async fn audit_tick_with_repair_proofs(
96    p2p_node: &Arc<P2PNode>,
97    storage: &Arc<LmdbStorage>,
98    config: &ReplicationConfig,
99    sync_history: &HashMap<PeerId, PeerSyncRecord>,
100    repair_proofs: &Arc<RwLock<RepairProofs>>,
101    current_sync_epoch: u64,
102    is_bootstrapping: bool,
103) -> AuditTickResult {
104    // Invariant 19: never audit while still bootstrapping.
105    if is_bootstrapping {
106        return AuditTickResult::Idle;
107    }
108
109    let dht = p2p_node.dht_manager();
110
111    // Step 2: Select one eligible peer (has RepairOpportunity) at random.
112    // Peers with active bootstrap claims remain eligible. A follow-up audit is
113    // how we observe a continued claim and apply past-grace abuse handling.
114    let eligible_peers = eligible_audit_peers(sync_history);
115
116    if eligible_peers.is_empty() {
117        return AuditTickResult::Idle;
118    }
119
120    let (challenged_peer, nonce, challenge_id) = {
121        let mut rng = rand::thread_rng();
122        let selected = match eligible_peers.choose(&mut rng) {
123            Some(p) => *p,
124            None => return AuditTickResult::Idle,
125        };
126        let n: [u8; 32] = rng.gen();
127        let c: u64 = rng.gen();
128        (selected, n, c)
129    };
130
131    // Step 3: Sample keys from local store and keep those the peer is
132    // responsible for (appears in the close group via local RT lookup).
133    let all_keys = match storage.all_keys().await {
134        Ok(keys) => keys,
135        Err(e) => {
136            warn!("Audit: failed to read local keys: {e}");
137            return AuditTickResult::Idle;
138        }
139    };
140
141    if all_keys.is_empty() {
142        return AuditTickResult::Idle;
143    }
144
145    let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
146    let sampled_keys: Vec<XorName> = {
147        let mut rng = rand::thread_rng();
148        all_keys
149            .choose_multiple(&mut rng, sample_count)
150            .copied()
151            .collect()
152    };
153
154    // Step 4: Filter to keys where the chosen peer is in the close group and
155    // this node has proof that it already sent the peer a repair hint for the
156    // specific key.
157    let mut sampled_key_groups = Vec::new();
158    for key in &sampled_keys {
159        let closest = dht
160            .find_closest_nodes_local_with_self(key, config.close_group_size)
161            .await;
162        let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
163        if close_peers.contains(&challenged_peer) {
164            sampled_key_groups.push((*key, close_peers));
165        }
166    }
167
168    let peer_keys = {
169        let mut proofs = repair_proofs.write().await;
170        mature_audit_keys_for_peer(
171            &challenged_peer,
172            sampled_key_groups,
173            &mut proofs,
174            current_sync_epoch,
175        )
176    };
177
178    if peer_keys.is_empty() {
179        return AuditTickResult::Idle;
180    }
181
182    // peer_keys is naturally bounded by audit_sample_count (sqrt-scaled),
183    // so no explicit truncation needed.
184
185    // Step 6: Send challenge.
186
187    let challenge = AuditChallenge {
188        challenge_id,
189        nonce,
190        challenged_peer_id: *challenged_peer.as_bytes(),
191        keys: peer_keys.clone(),
192    };
193
194    let msg = ReplicationMessage {
195        request_id: challenge_id,
196        body: ReplicationMessageBody::AuditChallenge(challenge),
197    };
198
199    let encoded = match msg.encode() {
200        Ok(data) => data,
201        Err(e) => {
202            warn!("Audit: failed to encode challenge: {e}");
203            return AuditTickResult::Idle;
204        }
205    };
206
207    let response = match p2p_node
208        .send_request(
209            &challenged_peer,
210            REPLICATION_PROTOCOL_ID,
211            encoded,
212            config.audit_response_timeout(peer_keys.len()),
213        )
214        .await
215    {
216        Ok(resp) => resp,
217        Err(e) => {
218            debug!("Audit: challenge to {challenged_peer} failed: {e}");
219            // Timeout — need responsibility confirmation before penalty.
220            return handle_audit_timeout(
221                &challenged_peer,
222                challenge_id,
223                &peer_keys,
224                p2p_node,
225                config,
226            )
227            .await;
228        }
229    };
230
231    // Step 7: Parse response.
232    let resp_msg = match ReplicationMessage::decode(&response.data) {
233        Ok(m) => m,
234        Err(e) => {
235            warn!("Audit: failed to decode response from {challenged_peer}: {e}");
236            return handle_audit_failure(
237                &challenged_peer,
238                challenge_id,
239                &peer_keys,
240                AuditFailureReason::MalformedResponse,
241                p2p_node,
242                config,
243            )
244            .await;
245        }
246    };
247
248    match resp_msg.body {
249        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
250            challenge_id: resp_id,
251        }) => {
252            if resp_id != challenge_id {
253                warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
254                return handle_audit_failure(
255                    &challenged_peer,
256                    challenge_id,
257                    &peer_keys,
258                    AuditFailureReason::MalformedResponse,
259                    p2p_node,
260                    config,
261                )
262                .await;
263            }
264            // Step 7b: Bootstrapping claim.
265            AuditTickResult::BootstrapClaim {
266                peer: challenged_peer,
267            }
268        }
269        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
270            challenge_id: resp_id,
271            digests,
272        }) => {
273            if resp_id != challenge_id {
274                warn!("Audit: challenge ID mismatch from {challenged_peer}");
275                return handle_audit_failure(
276                    &challenged_peer,
277                    challenge_id,
278                    &peer_keys,
279                    AuditFailureReason::MalformedResponse,
280                    p2p_node,
281                    config,
282                )
283                .await;
284            }
285            verify_digests(
286                &challenged_peer,
287                challenge_id,
288                &nonce,
289                &peer_keys,
290                &digests,
291                storage,
292                p2p_node,
293                config,
294            )
295            .await
296        }
297        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
298            challenge_id: resp_id,
299            reason,
300        }) => {
301            if resp_id != challenge_id {
302                warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
303                return handle_audit_failure(
304                    &challenged_peer,
305                    challenge_id,
306                    &peer_keys,
307                    AuditFailureReason::MalformedResponse,
308                    p2p_node,
309                    config,
310                )
311                .await;
312            }
313            warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
314            handle_audit_failure(
315                &challenged_peer,
316                challenge_id,
317                &peer_keys,
318                AuditFailureReason::Rejected,
319                p2p_node,
320                config,
321            )
322            .await
323        }
324        _ => {
325            warn!("Audit: unexpected response type from {challenged_peer}");
326            handle_audit_failure(
327                &challenged_peer,
328                challenge_id,
329                &peer_keys,
330                AuditFailureReason::MalformedResponse,
331                p2p_node,
332                config,
333            )
334            .await
335        }
336    }
337}
338
339fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
340    sync_history
341        .iter()
342        .filter(|(_, record)| record.has_repair_opportunity())
343        .map(|(peer, _)| *peer)
344        .collect()
345}
346
347fn mature_audit_keys_for_peer(
348    challenged_peer: &PeerId,
349    sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
350    repair_proofs: &mut RepairProofs,
351    current_sync_epoch: u64,
352) -> Vec<XorName> {
353    sampled_key_groups
354        .into_iter()
355        .filter_map(|(key, close_peers)| {
356            repair_proofs
357                .has_mature_replica_hint(challenged_peer, &key, &close_peers, current_sync_epoch)
358                .then_some(key)
359        })
360        .collect()
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364enum AuditKeyFailureKind {
365    Absent,
366    DigestMismatch,
367    Unclassified,
368}
369
370#[derive(Debug, Clone, Copy, PartialEq, Eq)]
371struct AuditKeyFailure {
372    key: XorName,
373    kind: AuditKeyFailureKind,
374}
375
376impl AuditKeyFailure {
377    fn absent(key: XorName) -> Self {
378        Self {
379            key,
380            kind: AuditKeyFailureKind::Absent,
381        }
382    }
383
384    fn digest_mismatch(key: XorName) -> Self {
385        Self {
386            key,
387            kind: AuditKeyFailureKind::DigestMismatch,
388        }
389    }
390
391    fn unclassified(key: XorName) -> Self {
392        Self {
393            key,
394            kind: AuditKeyFailureKind::Unclassified,
395        }
396    }
397}
398
399fn build_audit_failure_summary(
400    challenged_key_count: usize,
401    confirmed_failures: &[AuditKeyFailure],
402) -> AuditFailureSummary {
403    let mut summary = AuditFailureSummary {
404        challenged_keys: challenged_key_count,
405        failed_keys: confirmed_failures.len(),
406        ..AuditFailureSummary::default()
407    };
408
409    for failure in confirmed_failures {
410        match failure.kind {
411            AuditKeyFailureKind::Absent => summary.absent_keys += 1,
412            AuditKeyFailureKind::DigestMismatch => summary.digest_mismatch_keys += 1,
413            AuditKeyFailureKind::Unclassified => {}
414        }
415    }
416
417    summary
418}
419
420fn audit_digest_failure_reason(confirmed_failures: &[AuditKeyFailure]) -> AuditFailureReason {
421    if confirmed_failures
422        .iter()
423        .all(|failure| failure.kind == AuditKeyFailureKind::Absent)
424    {
425        AuditFailureReason::KeyAbsent
426    } else {
427        AuditFailureReason::DigestMismatch
428    }
429}
430
431// ---------------------------------------------------------------------------
432// Digest verification
433// ---------------------------------------------------------------------------
434
435/// Verify per-key digests from audit response (Step 8).
436#[allow(clippy::too_many_arguments)]
437async fn verify_digests(
438    challenged_peer: &PeerId,
439    challenge_id: u64,
440    nonce: &[u8; 32],
441    keys: &[XorName],
442    digests: &[[u8; 32]],
443    storage: &Arc<LmdbStorage>,
444    p2p_node: &Arc<P2PNode>,
445    config: &ReplicationConfig,
446) -> AuditTickResult {
447    // Requirement: response must have exactly one digest per key.
448    if digests.len() != keys.len() {
449        warn!(
450            "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
451            digests.len(),
452            keys.len()
453        );
454        return handle_audit_failure(
455            challenged_peer,
456            challenge_id,
457            keys,
458            AuditFailureReason::MalformedResponse,
459            p2p_node,
460            config,
461        )
462        .await;
463    }
464
465    let challenged_peer_bytes = challenged_peer.as_bytes();
466    let mut failed_keys = Vec::new();
467
468    for (i, key) in keys.iter().enumerate() {
469        let received_digest = &digests[i];
470
471        // Check for absent sentinel.
472        if *received_digest == ABSENT_KEY_DIGEST {
473            failed_keys.push(AuditKeyFailure::absent(*key));
474            continue;
475        }
476
477        // Recompute expected digest from local copy.
478        let local_bytes = match storage.get_raw(key).await {
479            Ok(Some(bytes)) => bytes,
480            Ok(None) => {
481                // We should hold this key (we sampled it), but it's gone.
482                warn!(
483                    "Audit: local key {} disappeared during audit",
484                    hex::encode(key)
485                );
486                continue;
487            }
488            Err(e) => {
489                warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
490                continue;
491            }
492        };
493
494        let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
495        if *received_digest != expected {
496            failed_keys.push(AuditKeyFailure::digest_mismatch(*key));
497        }
498    }
499
500    if failed_keys.is_empty() {
501        info!(
502            "Audit: peer {challenged_peer} passed (all {} keys verified)",
503            keys.len()
504        );
505        return AuditTickResult::Passed {
506            challenged_peer: *challenged_peer,
507            keys_checked: keys.len(),
508        };
509    }
510
511    // Step 9: Responsibility confirmation for failed keys.
512    handle_classified_audit_failure(
513        challenged_peer,
514        challenge_id,
515        &failed_keys,
516        AuditFailureReason::DigestMismatch,
517        keys.len(),
518        p2p_node,
519        config,
520    )
521    .await
522}
523
524// ---------------------------------------------------------------------------
525// Failure handling with responsibility confirmation
526// ---------------------------------------------------------------------------
527
528/// Handle audit failure: confirm responsibility before emitting evidence (Step 9).
529async fn handle_audit_failure(
530    challenged_peer: &PeerId,
531    challenge_id: u64,
532    failed_keys: &[XorName],
533    reason: AuditFailureReason,
534    p2p_node: &Arc<P2PNode>,
535    config: &ReplicationConfig,
536) -> AuditTickResult {
537    let failures = failed_keys
538        .iter()
539        .copied()
540        .map(AuditKeyFailure::unclassified)
541        .collect::<Vec<_>>();
542    handle_classified_audit_failure(
543        challenged_peer,
544        challenge_id,
545        &failures,
546        reason,
547        failed_keys.len(),
548        p2p_node,
549        config,
550    )
551    .await
552}
553
554async fn handle_classified_audit_failure(
555    challenged_peer: &PeerId,
556    challenge_id: u64,
557    failed_keys: &[AuditKeyFailure],
558    reason: AuditFailureReason,
559    challenged_key_count: usize,
560    p2p_node: &Arc<P2PNode>,
561    config: &ReplicationConfig,
562) -> AuditTickResult {
563    let dht = p2p_node.dht_manager();
564    let mut confirmed_failures = Vec::new();
565
566    // Step 9a-b: Fresh local RT lookup for each failed key.
567    for failure in failed_keys {
568        let closest = dht
569            .find_closest_nodes_local_with_self(&failure.key, config.close_group_size)
570            .await;
571        if closest.iter().any(|n| n.peer_id == *challenged_peer) {
572            confirmed_failures.push(*failure);
573        } else {
574            debug!(
575                "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
576                hex::encode(failure.key)
577            );
578        }
579    }
580
581    // Step 9c: Empty confirmed set -> peer is no longer responsible for any
582    // of the failed keys (topology churn). This is NOT a pass — the peer did
583    // not prove it stores the data. Return Idle to avoid granting unearned
584    // positive trust.
585    if confirmed_failures.is_empty() {
586        info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
587        return AuditTickResult::Idle;
588    }
589
590    let summary = build_audit_failure_summary(challenged_key_count, &confirmed_failures);
591    let reason = if reason == AuditFailureReason::DigestMismatch {
592        audit_digest_failure_reason(&confirmed_failures)
593    } else {
594        reason
595    };
596    let confirmed_failed_keys = confirmed_failures
597        .iter()
598        .map(|failure| failure.key)
599        .collect();
600
601    // Step 9d: Non-empty confirmed set -> emit evidence.
602    let evidence = FailureEvidence::AuditFailure {
603        challenge_id,
604        challenged_peer: *challenged_peer,
605        confirmed_failed_keys,
606        summary,
607        reason,
608    };
609
610    AuditTickResult::Failed { evidence }
611}
612
613/// Handle audit timeout (no response received).
614async fn handle_audit_timeout(
615    challenged_peer: &PeerId,
616    challenge_id: u64,
617    keys: &[XorName],
618    p2p_node: &Arc<P2PNode>,
619    config: &ReplicationConfig,
620) -> AuditTickResult {
621    handle_audit_failure(
622        challenged_peer,
623        challenge_id,
624        keys,
625        AuditFailureReason::Timeout,
626        p2p_node,
627        config,
628    )
629    .await
630}
631
632// ---------------------------------------------------------------------------
633// Responder-side handler
634// ---------------------------------------------------------------------------
635
636/// Handle an incoming audit challenge (responder side).
637///
638/// Validates that the challenge targets this node, computes per-key digests,
639/// and returns the response.  Rejects challenges where
640/// `challenged_peer_id` does not match `self_peer_id` to prevent an oracle
641/// attack where a malicious challenger forges digests for a different peer.
642pub async fn handle_audit_challenge(
643    challenge: &AuditChallenge,
644    storage: &LmdbStorage,
645    self_peer_id: &PeerId,
646    is_bootstrapping: bool,
647    stored_chunks: usize,
648) -> AuditResponse {
649    if is_bootstrapping {
650        return AuditResponse::Bootstrapping {
651            challenge_id: challenge.challenge_id,
652        };
653    }
654
655    if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
656        warn!(
657            "Audit challenge targeted wrong peer: expected {}, got {}",
658            hex::encode(self_peer_id.as_bytes()),
659            hex::encode(challenge.challenged_peer_id),
660        );
661        return AuditResponse::Rejected {
662            challenge_id: challenge.challenge_id,
663            reason: "challenged_peer_id does not match this node".to_string(),
664        };
665    }
666
667    let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
668    if challenge.keys.len() > max_keys {
669        warn!(
670            "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
671             (stored_chunks={stored_chunks})",
672            challenge.keys.len(),
673        );
674        return AuditResponse::Rejected {
675            challenge_id: challenge.challenge_id,
676            reason: format!(
677                "challenge contains {} keys, limit is {max_keys}",
678                challenge.keys.len()
679            ),
680        };
681    }
682
683    let mut digests = Vec::with_capacity(challenge.keys.len());
684
685    for key in &challenge.keys {
686        match storage.get_raw(key).await {
687            Ok(Some(data)) => {
688                let digest = compute_audit_digest(
689                    &challenge.nonce,
690                    &challenge.challenged_peer_id,
691                    key,
692                    &data,
693                );
694                digests.push(digest);
695            }
696            Ok(None) => {
697                digests.push(ABSENT_KEY_DIGEST);
698            }
699            Err(e) => {
700                warn!(
701                    "Audit responder: failed to read key {}: {e}",
702                    hex::encode(key)
703                );
704                digests.push(ABSENT_KEY_DIGEST);
705            }
706        }
707    }
708
709    AuditResponse::Digests {
710        challenge_id: challenge.challenge_id,
711        digests,
712    }
713}
714
715// ---------------------------------------------------------------------------
716// Tests
717// ---------------------------------------------------------------------------
718
719#[cfg(test)]
720#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
721mod tests {
722    use super::*;
723    use crate::replication::protocol::compute_audit_digest;
724    use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
725    use crate::storage::LmdbStorageConfig;
726    use std::time::Instant;
727    use tempfile::TempDir;
728
729    /// Simulated stored chunk count for tests. Large enough that the dynamic
730    /// incoming audit limit (`2 * sqrt(N)`) never rejects small test challenges.
731    const TEST_STORED_CHUNKS: usize = 1_000_000;
732
733    /// Create a test `LmdbStorage` backed by a temp directory.
734    async fn create_test_storage() -> (LmdbStorage, TempDir) {
735        let temp_dir = TempDir::new().expect("create temp dir");
736        let config = LmdbStorageConfig {
737            root_dir: temp_dir.path().to_path_buf(),
738            verify_on_read: false,
739            max_map_size: 0,
740            disk_reserve: 0,
741        };
742        let storage = LmdbStorage::new(config).await.expect("create storage");
743        (storage, temp_dir)
744    }
745
746    /// Build a challenge with the given parameters.
747    fn make_challenge(
748        challenge_id: u64,
749        nonce: [u8; 32],
750        peer_id: [u8; 32],
751        keys: Vec<XorName>,
752    ) -> AuditChallenge {
753        AuditChallenge {
754            challenge_id,
755            nonce,
756            challenged_peer_id: peer_id,
757            keys,
758        }
759    }
760
761    /// Build a `PeerId` matching the raw bytes used in a challenge.
762    fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
763        PeerId::from_bytes(bytes)
764    }
765
766    // -- handle_audit_challenge: present keys ---------------------------------
767
768    #[tokio::test]
769    async fn handle_challenge_present_keys_returns_correct_digests() {
770        let (storage, _temp) = create_test_storage().await;
771
772        // Store two chunks.
773        let content_a = b"chunk alpha";
774        let addr_a = LmdbStorage::compute_address(content_a);
775        storage.put(&addr_a, content_a).await.expect("put a");
776
777        let content_b = b"chunk beta";
778        let addr_b = LmdbStorage::compute_address(content_b);
779        storage.put(&addr_b, content_b).await.expect("put b");
780
781        let nonce = [0xAA; 32];
782        let peer_id = [0xBB; 32];
783        let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
784        let self_id = peer_id_from_bytes(peer_id);
785
786        let response =
787            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
788
789        match response {
790            AuditResponse::Digests {
791                challenge_id,
792                digests,
793            } => {
794                assert_eq!(challenge_id, 42);
795                assert_eq!(digests.len(), 2);
796
797                let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
798                let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
799                assert_eq!(digests[0], expected_a);
800                assert_eq!(digests[1], expected_b);
801            }
802            AuditResponse::Bootstrapping { .. } => {
803                panic!("expected Digests, got Bootstrapping");
804            }
805            AuditResponse::Rejected { .. } => {
806                panic!("Unexpected Rejected response");
807            }
808        }
809    }
810
811    // -- handle_audit_challenge: absent keys ----------------------------------
812
813    #[tokio::test]
814    async fn handle_challenge_absent_keys_returns_sentinel() {
815        let (storage, _temp) = create_test_storage().await;
816
817        let absent_key = [0xFF; 32];
818        let nonce = [0x11; 32];
819        let peer_id = [0x22; 32];
820        let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
821        let self_id = peer_id_from_bytes(peer_id);
822
823        let response =
824            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
825
826        match response {
827            AuditResponse::Digests {
828                challenge_id,
829                digests,
830            } => {
831                assert_eq!(challenge_id, 99);
832                assert_eq!(digests.len(), 1);
833                assert_eq!(
834                    digests[0], ABSENT_KEY_DIGEST,
835                    "absent key should produce sentinel digest"
836                );
837            }
838            AuditResponse::Bootstrapping { .. } => {
839                panic!("expected Digests, got Bootstrapping");
840            }
841            AuditResponse::Rejected { .. } => {
842                panic!("Unexpected Rejected response");
843            }
844        }
845    }
846
847    // -- handle_audit_challenge: mixed present and absent ---------------------
848
849    #[tokio::test]
850    async fn handle_challenge_mixed_present_and_absent() {
851        let (storage, _temp) = create_test_storage().await;
852
853        let content = b"present chunk";
854        let addr_present = LmdbStorage::compute_address(content);
855        storage.put(&addr_present, content).await.expect("put");
856
857        let addr_absent = [0xDE; 32];
858        let nonce = [0x33; 32];
859        let peer_id = [0x44; 32];
860        let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
861        let self_id = peer_id_from_bytes(peer_id);
862
863        let response =
864            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
865
866        match response {
867            AuditResponse::Digests { digests, .. } => {
868                assert_eq!(digests.len(), 2);
869
870                let expected_present =
871                    compute_audit_digest(&nonce, &peer_id, &addr_present, content);
872                assert_eq!(digests[0], expected_present);
873                assert_eq!(
874                    digests[1], ABSENT_KEY_DIGEST,
875                    "absent key should be sentinel"
876                );
877            }
878            AuditResponse::Bootstrapping { .. } => {
879                panic!("expected Digests, got Bootstrapping");
880            }
881            AuditResponse::Rejected { .. } => {
882                panic!("Unexpected Rejected response");
883            }
884        }
885    }
886
887    // -- handle_audit_challenge: bootstrapping --------------------------------
888
889    #[tokio::test]
890    async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
891        let (storage, _temp) = create_test_storage().await;
892
893        let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
894        let self_id = peer_id_from_bytes([0x01; 32]);
895
896        let response =
897            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
898
899        match response {
900            AuditResponse::Bootstrapping { challenge_id } => {
901                assert_eq!(challenge_id, 55);
902            }
903            AuditResponse::Digests { .. } => {
904                panic!("expected Bootstrapping, got Digests");
905            }
906            AuditResponse::Rejected { .. } => {
907                panic!("Unexpected Rejected response");
908            }
909        }
910    }
911
912    // -- handle_audit_challenge: empty key list -------------------------------
913
914    #[tokio::test]
915    async fn handle_challenge_empty_keys_returns_empty_digests() {
916        let (storage, _temp) = create_test_storage().await;
917
918        let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
919        let self_id = peer_id_from_bytes([0x20; 32]);
920
921        let response =
922            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
923
924        match response {
925            AuditResponse::Digests {
926                challenge_id,
927                digests,
928            } => {
929                assert_eq!(challenge_id, 100);
930                assert!(
931                    digests.is_empty(),
932                    "empty key list should yield empty digests"
933                );
934            }
935            AuditResponse::Bootstrapping { .. } => {
936                panic!("expected Digests, got Bootstrapping");
937            }
938            AuditResponse::Rejected { .. } => {
939                panic!("Unexpected Rejected response");
940            }
941        }
942    }
943
944    // -- Digest verification: matching ----------------------------------------
945
946    #[test]
947    fn digest_verification_matching() {
948        let nonce = [0x01; 32];
949        let peer_id = [0x02; 32];
950        let key: XorName = [0x03; 32];
951        let data = b"correct data";
952
953        let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
954        let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
955
956        assert_eq!(
957            expected, recomputed,
958            "same inputs must produce identical digests"
959        );
960        assert_ne!(
961            expected, ABSENT_KEY_DIGEST,
962            "real digest must not be sentinel"
963        );
964    }
965
966    // -- Digest verification: mismatching -------------------------------------
967
968    #[test]
969    fn digest_verification_mismatching_data() {
970        let nonce = [0x01; 32];
971        let peer_id = [0x02; 32];
972        let key: XorName = [0x03; 32];
973
974        let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
975        let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
976
977        assert_ne!(
978            digest_a, digest_b,
979            "different data must produce different digests"
980        );
981    }
982
983    #[test]
984    fn digest_verification_mismatching_nonce() {
985        let peer_id = [0x02; 32];
986        let key: XorName = [0x03; 32];
987        let data = b"same data";
988
989        let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
990        let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
991
992        assert_ne!(
993            digest_a, digest_b,
994            "different nonces must produce different digests"
995        );
996    }
997
998    #[test]
999    fn digest_verification_mismatching_peer() {
1000        let nonce = [0x01; 32];
1001        let key: XorName = [0x03; 32];
1002        let data = b"same data";
1003
1004        let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
1005        let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
1006
1007        assert_ne!(
1008            digest_a, digest_b,
1009            "different peers must produce different digests"
1010        );
1011    }
1012
1013    #[test]
1014    fn digest_verification_mismatching_key() {
1015        let nonce = [0x01; 32];
1016        let peer_id = [0x02; 32];
1017        let data = b"same data";
1018
1019        let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
1020        let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
1021
1022        assert_ne!(
1023            digest_a, digest_b,
1024            "different keys must produce different digests"
1025        );
1026    }
1027
1028    // -- Absent sentinel is all zeros -----------------------------------------
1029
1030    #[test]
1031    fn absent_sentinel_is_all_zeros() {
1032        assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
1033    }
1034
1035    // -- Bootstrapping skips digest computation even with stored keys ---------
1036
1037    #[tokio::test]
1038    async fn bootstrapping_skips_digest_computation() {
1039        let (storage, _temp) = create_test_storage().await;
1040
1041        let content = b"stored but bootstrapping";
1042        let addr = LmdbStorage::compute_address(content);
1043        storage.put(&addr, content).await.expect("put");
1044
1045        let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
1046        let self_id = peer_id_from_bytes([0xDD; 32]);
1047
1048        let response =
1049            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1050
1051        assert!(
1052            matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
1053            "bootstrapping node must not compute digests"
1054        );
1055    }
1056
1057    // -- Scenario 19/53: Partial failure with mixed responsibility ----------------
1058
1059    #[tokio::test]
1060    async fn scenario_19_partial_failure_mixed_responsibility() {
1061        // Three keys challenged: K1 matches, K2 mismatches, K3 absent.
1062        // After responsibility confirmation, only K2 is confirmed responsible.
1063        // AuditFailure emitted for {K2} only.
1064        // Test handle_audit_challenge with mixed results, then verify
1065        // the digest logic manually.
1066
1067        let (storage, _temp) = create_test_storage().await;
1068        let nonce = [0x42u8; 32];
1069        let peer_id = [0xAA; 32];
1070
1071        // Store K1 and K2, but NOT K3
1072        let content_k1 = b"key one data";
1073        let addr_k1 = LmdbStorage::compute_address(content_k1);
1074        storage.put(&addr_k1, content_k1).await.unwrap();
1075
1076        let content_k2 = b"key two data";
1077        let addr_k2 = LmdbStorage::compute_address(content_k2);
1078        storage.put(&addr_k2, content_k2).await.unwrap();
1079
1080        let addr_k3 = [0xFF; 32]; // Not stored
1081
1082        let challenge = AuditChallenge {
1083            challenge_id: 100,
1084            nonce,
1085            challenged_peer_id: peer_id,
1086            keys: vec![addr_k1, addr_k2, addr_k3],
1087        };
1088        let self_id = peer_id_from_bytes(peer_id);
1089
1090        let response =
1091            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1092
1093        match response {
1094            AuditResponse::Digests { digests, .. } => {
1095                assert_eq!(digests.len(), 3);
1096
1097                // K1 should have correct digest
1098                let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
1099                assert_eq!(digests[0], expected_k1);
1100
1101                // K2 should have correct digest
1102                let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
1103                assert_eq!(digests[1], expected_k2);
1104
1105                // K3 absent -> sentinel
1106                assert_eq!(digests[2], ABSENT_KEY_DIGEST);
1107            }
1108            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
1109            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1110        }
1111    }
1112
1113    // -- Scenario 54: All digests pass -------------------------------------------
1114
1115    #[tokio::test]
1116    async fn scenario_54_all_digests_pass() {
1117        // All challenged keys present and digests match.
1118        // Multiple keys to strengthen coverage beyond existing two-key tests.
1119        let (storage, _temp) = create_test_storage().await;
1120        let nonce = [0x10; 32];
1121        let peer_id = [0x20; 32];
1122
1123        let c1 = b"chunk alpha";
1124        let c2 = b"chunk beta";
1125        let c3 = b"chunk gamma";
1126        let a1 = LmdbStorage::compute_address(c1);
1127        let a2 = LmdbStorage::compute_address(c2);
1128        let a3 = LmdbStorage::compute_address(c3);
1129        storage.put(&a1, c1).await.unwrap();
1130        storage.put(&a2, c2).await.unwrap();
1131        storage.put(&a3, c3).await.unwrap();
1132
1133        let challenge = AuditChallenge {
1134            challenge_id: 200,
1135            nonce,
1136            challenged_peer_id: peer_id,
1137            keys: vec![a1, a2, a3],
1138        };
1139        let self_id = peer_id_from_bytes(peer_id);
1140
1141        let response =
1142            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1143        match response {
1144            AuditResponse::Digests { digests, .. } => {
1145                assert_eq!(digests.len(), 3);
1146                for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
1147                    .iter()
1148                    .enumerate()
1149                {
1150                    let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
1151                    assert_eq!(digests[i], expected, "Key {i} digest should match");
1152                }
1153            }
1154            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1155            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1156        }
1157    }
1158
1159    // -- Scenario 55: Empty failure set means no evidence -------------------------
1160
1161    /// Scenario 55: Peer challenged on {K1, K2}. Both digests mismatch.
1162    /// Responsibility confirmation shows the peer is NOT responsible for
1163    /// either key. The confirmed failure set is empty — no `AuditFailure`
1164    /// evidence is emitted.
1165    ///
1166    /// Full `verify_digests` requires a live `P2PNode` for network lookups.
1167    /// This test exercises the deterministic sub-steps:
1168    ///   (1) Digest comparison identifies K1 and K2 as mismatches.
1169    ///   (2) Responsibility confirmation removes both keys.
1170    ///   (3) Empty confirmed failure set means no evidence.
1171    #[tokio::test]
1172    async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1173        let (storage, _temp) = create_test_storage().await;
1174        let nonce = [0x55; 32];
1175        let peer_id = [0x55; 32];
1176
1177        // Store K1 and K2 on the challenger (for expected digest computation).
1178        let c1 = b"scenario 55 key one";
1179        let c2 = b"scenario 55 key two";
1180        let k1 = LmdbStorage::compute_address(c1);
1181        let k2 = LmdbStorage::compute_address(c2);
1182        storage.put(&k1, c1).await.expect("put k1");
1183        storage.put(&k2, c2).await.expect("put k2");
1184
1185        // Challenger computes expected digests.
1186        let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1187        let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1188
1189        // Simulate peer returning WRONG digests for both keys.
1190        let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1191        let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1192        assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1193        assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1194
1195        // Step 1: Identify failed keys via digest comparison.
1196        let keys = [k1, k2];
1197        let expected = [expected_d1, expected_d2];
1198        let received = [wrong_d1, wrong_d2];
1199
1200        let mut failed_keys = Vec::new();
1201        for i in 0..keys.len() {
1202            if received[i] != expected[i] {
1203                failed_keys.push(keys[i]);
1204            }
1205        }
1206        assert_eq!(
1207            failed_keys.len(),
1208            2,
1209            "Both keys should be identified as digest mismatches"
1210        );
1211
1212        // Step 2: Responsibility confirmation — peer is NOT responsible for
1213        // either key (simulated by filtering them all out).
1214        let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1215        let confirmed_failures: Vec<XorName> = failed_keys
1216            .into_iter()
1217            .filter(|k| confirmed_responsible_keys.contains(k))
1218            .collect();
1219
1220        // Step 3: Empty confirmed failure set → no AuditFailure evidence.
1221        assert!(
1222            confirmed_failures.is_empty(),
1223            "With no confirmed responsibility, failure set must be empty — \
1224             no AuditFailure evidence should be emitted"
1225        );
1226
1227        // Verify that constructing evidence with empty keys results in a
1228        // no-penalty outcome (the caller checks is_empty before emitting).
1229        let peer = PeerId::from_bytes(peer_id);
1230        let evidence = FailureEvidence::AuditFailure {
1231            challenge_id: 5500,
1232            challenged_peer: peer,
1233            confirmed_failed_keys: confirmed_failures,
1234            summary: AuditFailureSummary::default(),
1235            reason: AuditFailureReason::DigestMismatch,
1236        };
1237        if let FailureEvidence::AuditFailure {
1238            confirmed_failed_keys,
1239            ..
1240        } = evidence
1241        {
1242            assert!(
1243                confirmed_failed_keys.is_empty(),
1244                "Evidence with empty failure set should not trigger a trust penalty"
1245            );
1246        }
1247    }
1248
1249    // -- Scenario 56: RepairOpportunity filters never-synced peers ----------------
1250
1251    #[test]
1252    fn scenario_56_repair_opportunity_filters_never_synced() {
1253        // PeerSyncRecord with last_sync=None should not pass
1254        // has_repair_opportunity().
1255
1256        let never_synced = PeerSyncRecord {
1257            last_sync: None,
1258            cycles_since_sync: 5,
1259        };
1260        assert!(!never_synced.has_repair_opportunity());
1261
1262        let synced_no_cycle = PeerSyncRecord {
1263            last_sync: Some(Instant::now()),
1264            cycles_since_sync: 0,
1265        };
1266        assert!(!synced_no_cycle.has_repair_opportunity());
1267
1268        let synced_with_cycle = PeerSyncRecord {
1269            last_sync: Some(Instant::now()),
1270            cycles_since_sync: 1,
1271        };
1272        assert!(synced_with_cycle.has_repair_opportunity());
1273    }
1274
1275    #[test]
1276    fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1277        let peer = peer_id_from_bytes([0x57; 32]);
1278        let mut sync_history = HashMap::new();
1279        sync_history.insert(
1280            peer,
1281            PeerSyncRecord {
1282                last_sync: Some(Instant::now()),
1283                cycles_since_sync: 1,
1284            },
1285        );
1286
1287        let mut bootstrap_claims = HashMap::new();
1288        let first_seen = Instant::now()
1289            .checked_sub(
1290                crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1291                    + std::time::Duration::from_secs(1),
1292            )
1293            .unwrap_or_else(Instant::now);
1294        bootstrap_claims.insert(peer, first_seen);
1295
1296        let eligible = eligible_audit_peers(&sync_history);
1297
1298        assert!(bootstrap_claims.contains_key(&peer));
1299        assert!(
1300            eligible.contains(&peer),
1301            "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1302        );
1303    }
1304
1305    #[test]
1306    fn audit_failure_summary_counts_confirmed_absent_and_mismatch_keys() {
1307        let absent_key = [0xA1; 32];
1308        let mismatch_key = [0xB2; 32];
1309        let confirmed = vec![
1310            AuditKeyFailure::absent(absent_key),
1311            AuditKeyFailure::digest_mismatch(mismatch_key),
1312        ];
1313
1314        let summary = build_audit_failure_summary(5, &confirmed);
1315
1316        assert_eq!(summary.challenged_keys, 5);
1317        assert_eq!(summary.failed_keys, 2);
1318        assert_eq!(summary.absent_keys, 1);
1319        assert_eq!(summary.digest_mismatch_keys, 1);
1320    }
1321
1322    #[test]
1323    fn audit_failure_summary_leaves_unclassified_rejections_out_of_absent_mismatch_counts() {
1324        let rejected_key = [0xC3; 32];
1325        let confirmed = vec![AuditKeyFailure::unclassified(rejected_key)];
1326
1327        let summary = build_audit_failure_summary(3, &confirmed);
1328
1329        assert_eq!(summary.challenged_keys, 3);
1330        assert_eq!(summary.failed_keys, 1);
1331        assert_eq!(summary.absent_keys, 0);
1332        assert_eq!(summary.digest_mismatch_keys, 0);
1333    }
1334
1335    #[test]
1336    fn audit_digest_failure_reason_is_key_absent_when_all_confirmed_failures_are_absent() {
1337        let failures = vec![AuditKeyFailure::absent([0xD4; 32])];
1338
1339        assert_eq!(
1340            audit_digest_failure_reason(&failures),
1341            AuditFailureReason::KeyAbsent
1342        );
1343    }
1344
1345    #[test]
1346    fn audit_digest_failure_reason_is_digest_mismatch_for_mixed_failures() {
1347        let failures = vec![
1348            AuditKeyFailure::absent([0xD5; 32]),
1349            AuditKeyFailure::digest_mismatch([0xE6; 32]),
1350        ];
1351
1352        assert_eq!(
1353            audit_digest_failure_reason(&failures),
1354            AuditFailureReason::DigestMismatch
1355        );
1356    }
1357
1358    #[test]
1359    fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() {
1360        const HINT_EPOCH: u64 = 7;
1361        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1362        const CHALLENGED_PEER_BYTE: u8 = 0xA1;
1363        const OTHER_PEER_BYTE: u8 = 0xA2;
1364        const NEW_PEER_BYTE: u8 = 0xA3;
1365        const MATURE_KEY_BYTE: u8 = 0xB1;
1366        const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
1367        const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
1368        const STABLE_CHURN_KEY_BYTE: u8 = 0xB4;
1369        const EVICTED_KEY_BYTE: u8 = 0xB5;
1370        const XOR_NAME_LEN: usize = 32;
1371
1372        let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
1373        let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
1374        let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
1375        let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
1376        let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
1377        let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
1378        let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN];
1379        let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN];
1380        let close_group = HashSet::from([challenged_peer, other_peer]);
1381        let changed_close_group = HashSet::from([challenged_peer, new_peer]);
1382        let evicted_close_group = HashSet::from([other_peer, new_peer]);
1383        let mut repair_proofs = RepairProofs::new();
1384
1385        assert!(repair_proofs.record_replica_hint_sent(
1386            challenged_peer,
1387            mature_key,
1388            &close_group,
1389            HINT_EPOCH,
1390        ));
1391        assert!(repair_proofs.record_replica_hint_sent(
1392            challenged_peer,
1393            same_epoch_key,
1394            &close_group,
1395            CURRENT_EPOCH,
1396        ));
1397        assert!(repair_proofs.record_replica_hint_sent(
1398            challenged_peer,
1399            stable_churn_key,
1400            &close_group,
1401            HINT_EPOCH,
1402        ));
1403        assert!(repair_proofs.record_replica_hint_sent(
1404            challenged_peer,
1405            evicted_key,
1406            &close_group,
1407            HINT_EPOCH,
1408        ));
1409
1410        let sampled_key_groups = vec![
1411            (mature_key, close_group.clone()),
1412            (same_epoch_key, close_group.clone()),
1413            (missing_proof_key, close_group.clone()),
1414            (stable_churn_key, changed_close_group),
1415            (evicted_key, evicted_close_group),
1416        ];
1417        let peer_keys = mature_audit_keys_for_peer(
1418            &challenged_peer,
1419            sampled_key_groups,
1420            &mut repair_proofs,
1421            CURRENT_EPOCH,
1422        );
1423
1424        assert_eq!(
1425            peer_keys,
1426            vec![mature_key, stable_churn_key],
1427            "mature proofs for stable close-group peers should become audit keys, while same-epoch, missing, and evicted-peer proofs should not"
1428        );
1429    }
1430
1431    // -- Audit response must match key count --------------------------------------
1432
1433    #[tokio::test]
1434    async fn audit_response_must_match_key_count() {
1435        // Section 15: "A response is invalid if it has fewer or more entries
1436        // than challenged keys."
1437        // Verify handle_audit_challenge always produces exactly N digests for
1438        // N keys, including edge cases.
1439
1440        let (storage, _temp) = create_test_storage().await;
1441        let nonce = [0x50; 32];
1442        let peer_id = [0x60; 32];
1443
1444        // Store a single chunk
1445        let content = b"single chunk";
1446        let addr = LmdbStorage::compute_address(content);
1447        storage.put(&addr, content).await.unwrap();
1448
1449        // Challenge with 1 stored + 4 absent = 5 keys total
1450        let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1451        let mut keys = vec![addr];
1452        keys.extend_from_slice(&absent_keys);
1453
1454        let key_count = keys.len();
1455        let challenge = make_challenge(300, nonce, peer_id, keys);
1456        let self_id = peer_id_from_bytes(peer_id);
1457
1458        let response =
1459            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1460        match response {
1461            AuditResponse::Digests { digests, .. } => {
1462                assert_eq!(
1463                    digests.len(),
1464                    key_count,
1465                    "must produce exactly one digest per challenged key"
1466                );
1467            }
1468            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1469            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1470        }
1471    }
1472
1473    // -- Audit digest uses full record bytes --------------------------------------
1474
1475    #[test]
1476    fn audit_digest_uses_full_record_bytes() {
1477        // Verify digest changes when record content changes.
1478        let nonce = [1u8; 32];
1479        let peer = [2u8; 32];
1480        let key = [3u8; 32];
1481
1482        let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1483        let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1484        assert_ne!(
1485            d1, d2,
1486            "Different record bytes must produce different digests"
1487        );
1488    }
1489
1490    // -- Scenario 29: Audit start gate ------------------------------------------
1491
1492    /// Scenario 29: `handle_audit_challenge` returns `Bootstrapping` when the
1493    /// node is still bootstrapping — audit digests are never computed, and no
1494    /// `AuditFailure` evidence is emitted by the caller.
1495    ///
1496    /// This is the responder-side gate.  The challenger-side gate is enforced
1497    /// by `audit_tick`'s `is_bootstrapping` guard (Invariant 19) and by
1498    /// `check_bootstrap_drained()` in the engine loop; this test confirms the
1499    /// complementary responder behavior.
1500    #[tokio::test]
1501    async fn scenario_29_audit_start_gate_during_bootstrap() {
1502        let (storage, _temp) = create_test_storage().await;
1503
1504        // Store data so there *would* be work to audit.
1505        let content = b"should not be audited during bootstrap";
1506        let addr = LmdbStorage::compute_address(content);
1507        storage.put(&addr, content).await.expect("put");
1508
1509        let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1510        let self_id = peer_id_from_bytes([0x29; 32]);
1511
1512        // Responder is bootstrapping → Bootstrapping response, NOT Digests.
1513        let response =
1514            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1515        assert!(
1516            matches!(
1517                response,
1518                AuditResponse::Bootstrapping { challenge_id: 2900 }
1519            ),
1520            "bootstrapping node must not compute digests — audit start gate"
1521        );
1522
1523        // Responder is NOT bootstrapping → normal Digests.
1524        let response =
1525            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1526        assert!(
1527            matches!(response, AuditResponse::Digests { .. }),
1528            "drained node should compute digests normally"
1529        );
1530    }
1531
1532    // -- Scenario 30: Audit peer selection from sampled keys --------------------
1533
1534    /// Scenario 30: Key sampling uses dynamic sqrt-based batch sizing and
1535    /// `RepairOpportunity` filtering excludes never-synced peers.
1536    ///
1537    /// Full `audit_tick` requires a live network.  This test verifies the two
1538    /// deterministic sub-steps the function relies on:
1539    ///   (a) `audit_sample_count` scales with `sqrt(total_keys)`.
1540    ///   (b) `PeerSyncRecord::has_repair_opportunity` gates peer eligibility.
1541    #[test]
1542    fn scenario_30_audit_peer_selection_from_sampled_keys() {
1543        // (a) Dynamic sample count scales with sqrt(total_keys).
1544        assert_eq!(
1545            ReplicationConfig::audit_sample_count(100),
1546            10,
1547            "sample count should scale with sqrt(total_keys)"
1548        );
1549
1550        assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1551
1552        assert_eq!(
1553            ReplicationConfig::audit_sample_count(10_000),
1554            100,
1555            "sqrt(10000) = 100"
1556        );
1557
1558        // (b) Peer eligibility via RepairOpportunity.
1559        // Never synced → not eligible.
1560        let never = PeerSyncRecord {
1561            last_sync: None,
1562            cycles_since_sync: 10,
1563        };
1564        assert!(!never.has_repair_opportunity());
1565
1566        // Synced but zero subsequent cycles → not eligible.
1567        let too_soon = PeerSyncRecord {
1568            last_sync: Some(Instant::now()),
1569            cycles_since_sync: 0,
1570        };
1571        assert!(!too_soon.has_repair_opportunity());
1572
1573        // Synced with ≥1 cycle → eligible.
1574        let eligible = PeerSyncRecord {
1575            last_sync: Some(Instant::now()),
1576            cycles_since_sync: 2,
1577        };
1578        assert!(eligible.has_repair_opportunity());
1579    }
1580
1581    // -- Scenario 32: Dynamic challenge size ------------------------------------
1582
1583    /// Scenario 32: Challenge key count equals `|PeerKeySet(challenged_peer)|`,
1584    /// which is dynamic per round.  If no eligible peer remains after filtering,
1585    /// the tick is idle.
1586    ///
1587    /// Verified via `handle_audit_challenge`: the response digest count always
1588    /// equals the number of keys in the challenge.
1589    #[tokio::test]
1590    async fn scenario_32_dynamic_challenge_size() {
1591        let (storage, _temp) = create_test_storage().await;
1592
1593        // Store varying numbers of chunks.
1594        let mut addrs = Vec::new();
1595        for i in 0u8..5 {
1596            let content = format!("dynamic challenge key {i}");
1597            let addr = LmdbStorage::compute_address(content.as_bytes());
1598            storage.put(&addr, content.as_bytes()).await.expect("put");
1599            addrs.push(addr);
1600        }
1601
1602        let nonce = [0x32; 32];
1603        let peer_id = [0x32; 32];
1604        let self_id = peer_id_from_bytes(peer_id);
1605
1606        // Challenge with 1 key.
1607        let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1608        let resp1 =
1609            handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1610                .await;
1611        if let AuditResponse::Digests { digests, .. } = resp1 {
1612            assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1613        }
1614
1615        // Challenge with 3 keys.
1616        let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1617        let resp3 =
1618            handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1619                .await;
1620        if let AuditResponse::Digests { digests, .. } = resp3 {
1621            assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1622        }
1623
1624        // Challenge with all 5 keys.
1625        let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1626        let resp5 =
1627            handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1628                .await;
1629        if let AuditResponse::Digests { digests, .. } = resp5 {
1630            assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1631        }
1632
1633        // Challenge with 0 keys (idle equivalent — no work).
1634        let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1635        let resp0 =
1636            handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1637                .await;
1638        if let AuditResponse::Digests { digests, .. } = resp0 {
1639            assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1640        }
1641    }
1642
1643    // -- Scenario 47: Bootstrap claim grace period (audit) ----------------------
1644
1645    /// Scenario 47: Challenged peer responds with bootstrapping claim during
1646    /// audit.  `handle_audit_challenge` returns `Bootstrapping`; caller records
1647    /// `BootstrapClaimFirstSeen`.  No `AuditFailure` evidence is emitted.
1648    #[tokio::test]
1649    async fn scenario_47_bootstrap_claim_grace_period_audit() {
1650        let (storage, _temp) = create_test_storage().await;
1651
1652        // Store data so there is an auditable key.
1653        let content = b"bootstrap grace test";
1654        let addr = LmdbStorage::compute_address(content);
1655        storage.put(&addr, content).await.expect("put");
1656
1657        let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1658        let self_id = peer_id_from_bytes([0x47; 32]);
1659
1660        // Bootstrapping peer → Bootstrapping response (grace period start).
1661        let response =
1662            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1663        let challenge_id = match response {
1664            AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1665            AuditResponse::Digests { .. } => {
1666                panic!("Expected Bootstrapping response during grace period")
1667            }
1668            AuditResponse::Rejected { .. } => {
1669                panic!("Unexpected Rejected response")
1670            }
1671        };
1672        assert_eq!(challenge_id, 4700);
1673
1674        // Caller records BootstrapClaimFirstSeen — verify the types support it.
1675        let peer = PeerId::from_bytes([0x47; 32]);
1676        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1677        let now = Instant::now();
1678        let observed = state.observe_bootstrap_claim(
1679            peer,
1680            now,
1681            crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1682        );
1683
1684        assert_eq!(
1685            observed,
1686            BootstrapClaimObservation::WithinGrace { first_seen: now }
1687        );
1688        assert!(
1689            state.bootstrap_claims.contains_key(&peer),
1690            "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1691        );
1692        assert!(
1693            state.bootstrap_claim_history.contains_key(&peer),
1694            "Bootstrap claim history should remember that the grace window was used"
1695        );
1696    }
1697
1698    // -- Scenario 53: Audit partial per-key failure with mixed responsibility ---
1699
1700    /// Scenario 53: P challenged on {K1, K2, K3}.  K1 matches, K2 and K3
1701    /// mismatch.  Responsibility confirmation: P is responsible for K2 but
1702    /// not K3.  `AuditFailure` emitted for {K2} only.
1703    ///
1704    /// Full `verify_digests` + `handle_audit_failure` requires a `P2PNode` for
1705    /// network lookups.  This test verifies the conceptual steps:
1706    ///   (1) Digest comparison correctly identifies K2 and K3 as failures.
1707    ///   (2) `FailureEvidence::AuditFailure` carries only confirmed keys.
1708    #[tokio::test]
1709    async fn scenario_53_partial_failure_mixed_responsibility() {
1710        let (storage, _temp) = create_test_storage().await;
1711        let nonce = [0x53; 32];
1712        let peer_id = [0x53; 32];
1713
1714        // Store K1, K2, K3.
1715        let c1 = b"scenario 53 key one";
1716        let c2 = b"scenario 53 key two";
1717        let c3 = b"scenario 53 key three";
1718        let k1 = LmdbStorage::compute_address(c1);
1719        let k2 = LmdbStorage::compute_address(c2);
1720        let k3 = LmdbStorage::compute_address(c3);
1721        storage.put(&k1, c1).await.expect("put k1");
1722        storage.put(&k2, c2).await.expect("put k2");
1723        storage.put(&k3, c3).await.expect("put k3");
1724
1725        // Correct digests from challenger's local store.
1726        let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1727        let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1728        let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1729
1730        // Simulate peer response: K1 matches, K2 wrong data, K3 wrong data.
1731        let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1732        let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1733
1734        assert_eq!(d1_expected, d1_expected, "K1 should match");
1735        assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1736        assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1737
1738        // Step 1: Identify failed keys (digest comparison).
1739        let digests = [d1_expected, d2_wrong, d3_wrong];
1740        let keys = [k1, k2, k3];
1741        let contents: [&[u8]; 3] = [c1, c2, c3];
1742
1743        let mut failed_keys = Vec::new();
1744        for (i, key) in keys.iter().enumerate() {
1745            if digests[i] == ABSENT_KEY_DIGEST {
1746                failed_keys.push(*key);
1747                continue;
1748            }
1749            let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1750            if digests[i] != expected {
1751                failed_keys.push(*key);
1752            }
1753        }
1754
1755        assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1756        assert!(failed_keys.contains(&k2));
1757        assert!(failed_keys.contains(&k3));
1758        assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1759
1760        // Step 2: Responsibility confirmation removes K3 (not responsible).
1761        // Simulate: P is in closest peers for K2 but not K3.
1762        let responsible_for_k2 = true;
1763        let responsible_for_k3 = false;
1764        let mut confirmed = Vec::new();
1765        for key in &failed_keys {
1766            let is_responsible = if *key == k2 {
1767                responsible_for_k2
1768            } else {
1769                responsible_for_k3
1770            };
1771            if is_responsible {
1772                confirmed.push(*key);
1773            }
1774        }
1775
1776        assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1777
1778        // Step 3: Construct evidence for confirmed failures only.
1779        let challenged_peer = PeerId::from_bytes(peer_id);
1780        let evidence = FailureEvidence::AuditFailure {
1781            challenge_id: 5300,
1782            challenged_peer,
1783            confirmed_failed_keys: confirmed,
1784            summary: AuditFailureSummary::default(),
1785            reason: AuditFailureReason::DigestMismatch,
1786        };
1787
1788        match evidence {
1789            FailureEvidence::AuditFailure {
1790                confirmed_failed_keys,
1791                ..
1792            } => {
1793                assert_eq!(
1794                    confirmed_failed_keys.len(),
1795                    1,
1796                    "Only K2 should generate evidence"
1797                );
1798                assert_eq!(confirmed_failed_keys[0], k2);
1799            }
1800            _ => panic!("Expected AuditFailure evidence"),
1801        }
1802    }
1803}