Skip to main content

ant_node/replication/
audit.rs

1//! Storage audit protocol (Section 15).
2//!
3//! Challenge-response for claimed holders. Anti-outsourcing protection.
4
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::logging::{debug, info, warn};
10use rand::seq::SliceRandom;
11use rand::Rng;
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
15use crate::replication::protocol::{
16    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
17    ReplicationMessageBody, ABSENT_KEY_DIGEST,
18};
19use crate::replication::types::{
20    AuditFailureReason, AuditFailureSummary, FailureEvidence, PeerSyncRecord, RepairProofs,
21};
22use crate::storage::LmdbStorage;
23use saorsa_core::identity::PeerId;
24use saorsa_core::P2PNode;
25use tokio::sync::RwLock;
26
27// ---------------------------------------------------------------------------
28// Audit tick result
29// ---------------------------------------------------------------------------
30
31/// Result of an audit tick.
32#[derive(Debug)]
33pub enum AuditTickResult {
34    /// Audit completed successfully (all digests matched).
35    Passed {
36        /// The peer that was challenged.
37        challenged_peer: PeerId,
38        /// Number of keys verified.
39        keys_checked: usize,
40    },
41    /// Audit found failures (after responsibility confirmation).
42    Failed {
43        /// Evidence of the failure for trust engine.
44        evidence: FailureEvidence,
45    },
46    /// Audit target claimed bootstrapping.
47    BootstrapClaim {
48        /// The peer claiming bootstrap status.
49        peer: PeerId,
50    },
51    /// No eligible peers for audit this tick.
52    Idle,
53    /// Audit skipped (not enough local keys).
54    InsufficientKeys,
55}
56
57// ---------------------------------------------------------------------------
58// Main audit tick
59// ---------------------------------------------------------------------------
60
61/// Execute one audit tick (Section 15 steps 2-9).
62///
63/// Returns the audit result. Caller is responsible for emitting trust events.
64///
65/// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if
66/// `is_bootstrapping` is `true` — a node must not audit others while it
67/// is still bootstrapping.
68#[allow(clippy::implicit_hasher)]
69pub async fn audit_tick(
70    p2p_node: &Arc<P2PNode>,
71    storage: &Arc<LmdbStorage>,
72    config: &ReplicationConfig,
73    sync_history: &HashMap<PeerId, PeerSyncRecord>,
74    is_bootstrapping: bool,
75) -> AuditTickResult {
76    let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
77    audit_tick_with_repair_proofs(
78        p2p_node,
79        storage,
80        config,
81        sync_history,
82        &repair_proofs,
83        0,
84        is_bootstrapping,
85    )
86    .await
87}
88
89/// Execute one repair-proof-gated audit tick.
90///
91/// This is the production path used by the replication engine. The
92/// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct
93/// callers that have not adopted repair proofs remain conservative and do not
94/// audit peers for unproven keys.
95#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
96pub async fn audit_tick_with_repair_proofs(
97    p2p_node: &Arc<P2PNode>,
98    storage: &Arc<LmdbStorage>,
99    config: &ReplicationConfig,
100    sync_history: &HashMap<PeerId, PeerSyncRecord>,
101    repair_proofs: &Arc<RwLock<RepairProofs>>,
102    current_sync_epoch: u64,
103    is_bootstrapping: bool,
104) -> AuditTickResult {
105    // Invariant 19: never audit while still bootstrapping.
106    if is_bootstrapping {
107        return AuditTickResult::Idle;
108    }
109
110    let dht = p2p_node.dht_manager();
111
112    // Step 2: Select one eligible peer (has RepairOpportunity) at random.
113    // Peers with active bootstrap claims remain eligible. A follow-up audit is
114    // how we observe a continued claim and apply past-grace abuse handling.
115    let eligible_peers = eligible_audit_peers(sync_history);
116
117    if eligible_peers.is_empty() {
118        return AuditTickResult::Idle;
119    }
120
121    let (challenged_peer, nonce, challenge_id) = {
122        let mut rng = rand::thread_rng();
123        let selected = match eligible_peers.choose(&mut rng) {
124            Some(p) => *p,
125            None => return AuditTickResult::Idle,
126        };
127        let n: [u8; 32] = rng.gen();
128        let c: u64 = rng.gen();
129        (selected, n, c)
130    };
131
132    // Step 3: Sample keys from local store and keep those the peer is
133    // responsible for (appears in the close group via local RT lookup).
134    let all_keys = match storage.all_keys().await {
135        Ok(keys) => keys,
136        Err(e) => {
137            warn!("Audit: failed to read local keys: {e}");
138            return AuditTickResult::Idle;
139        }
140    };
141
142    if all_keys.is_empty() {
143        return AuditTickResult::Idle;
144    }
145
146    let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
147    let sampled_keys: Vec<XorName> = {
148        let mut rng = rand::thread_rng();
149        all_keys
150            .choose_multiple(&mut rng, sample_count)
151            .copied()
152            .collect()
153    };
154
155    // Step 4: Filter to keys where the chosen peer is in the close group and
156    // this node has proof that it already sent the peer a repair hint for the
157    // specific key.
158    let mut sampled_key_groups = Vec::new();
159    for key in &sampled_keys {
160        let closest = dht
161            .find_closest_nodes_local_with_self(key, config.close_group_size)
162            .await;
163        let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
164        if close_peers.contains(&challenged_peer) {
165            sampled_key_groups.push((*key, close_peers));
166        }
167    }
168
169    let peer_keys = {
170        let mut proofs = repair_proofs.write().await;
171        let now = Instant::now();
172        mature_audit_keys_for_peer(
173            &challenged_peer,
174            sampled_key_groups,
175            &mut proofs,
176            current_sync_epoch,
177            now,
178        )
179    };
180
181    if peer_keys.is_empty() {
182        return AuditTickResult::Idle;
183    }
184
185    // peer_keys is naturally bounded by audit_sample_count (sqrt-scaled),
186    // so no explicit truncation needed.
187
188    // Step 6: Send challenge.
189
190    let challenge = AuditChallenge {
191        challenge_id,
192        nonce,
193        challenged_peer_id: *challenged_peer.as_bytes(),
194        keys: peer_keys.clone(),
195    };
196
197    let msg = ReplicationMessage {
198        request_id: challenge_id,
199        body: ReplicationMessageBody::AuditChallenge(challenge),
200    };
201
202    let encoded = match msg.encode() {
203        Ok(data) => data,
204        Err(e) => {
205            warn!("Audit: failed to encode challenge: {e}");
206            return AuditTickResult::Idle;
207        }
208    };
209
210    let response = match p2p_node
211        .send_request(
212            &challenged_peer,
213            REPLICATION_PROTOCOL_ID,
214            encoded,
215            config.audit_response_timeout(peer_keys.len()),
216        )
217        .await
218    {
219        Ok(resp) => resp,
220        Err(e) => {
221            debug!("Audit: challenge to {challenged_peer} failed: {e}");
222            // Timeout — need responsibility confirmation before penalty.
223            return handle_audit_timeout(
224                &challenged_peer,
225                challenge_id,
226                &peer_keys,
227                p2p_node,
228                config,
229            )
230            .await;
231        }
232    };
233
234    // Step 7: Parse response.
235    let resp_msg = match ReplicationMessage::decode(&response.data) {
236        Ok(m) => m,
237        Err(e) => {
238            warn!("Audit: failed to decode response from {challenged_peer}: {e}");
239            return handle_audit_failure(
240                &challenged_peer,
241                challenge_id,
242                &peer_keys,
243                AuditFailureReason::MalformedResponse,
244                p2p_node,
245                config,
246            )
247            .await;
248        }
249    };
250
251    match resp_msg.body {
252        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
253            challenge_id: resp_id,
254        }) => {
255            if resp_id != challenge_id {
256                warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
257                return handle_audit_failure(
258                    &challenged_peer,
259                    challenge_id,
260                    &peer_keys,
261                    AuditFailureReason::MalformedResponse,
262                    p2p_node,
263                    config,
264                )
265                .await;
266            }
267            // Step 7b: Bootstrapping claim.
268            AuditTickResult::BootstrapClaim {
269                peer: challenged_peer,
270            }
271        }
272        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
273            challenge_id: resp_id,
274            digests,
275        }) => {
276            if resp_id != challenge_id {
277                warn!("Audit: challenge ID mismatch from {challenged_peer}");
278                return handle_audit_failure(
279                    &challenged_peer,
280                    challenge_id,
281                    &peer_keys,
282                    AuditFailureReason::MalformedResponse,
283                    p2p_node,
284                    config,
285                )
286                .await;
287            }
288            verify_digests(
289                &challenged_peer,
290                challenge_id,
291                &nonce,
292                &peer_keys,
293                &digests,
294                storage,
295                p2p_node,
296                config,
297            )
298            .await
299        }
300        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
301            challenge_id: resp_id,
302            reason,
303        }) => {
304            if resp_id != challenge_id {
305                warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
306                return handle_audit_failure(
307                    &challenged_peer,
308                    challenge_id,
309                    &peer_keys,
310                    AuditFailureReason::MalformedResponse,
311                    p2p_node,
312                    config,
313                )
314                .await;
315            }
316            warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
317            handle_audit_failure(
318                &challenged_peer,
319                challenge_id,
320                &peer_keys,
321                AuditFailureReason::Rejected,
322                p2p_node,
323                config,
324            )
325            .await
326        }
327        _ => {
328            warn!("Audit: unexpected response type from {challenged_peer}");
329            handle_audit_failure(
330                &challenged_peer,
331                challenge_id,
332                &peer_keys,
333                AuditFailureReason::MalformedResponse,
334                p2p_node,
335                config,
336            )
337            .await
338        }
339    }
340}
341
342fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
343    sync_history
344        .iter()
345        .filter(|(_, record)| record.has_repair_opportunity())
346        .map(|(peer, _)| *peer)
347        .collect()
348}
349
350fn mature_audit_keys_for_peer(
351    challenged_peer: &PeerId,
352    sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
353    repair_proofs: &mut RepairProofs,
354    current_sync_epoch: u64,
355    now: Instant,
356) -> Vec<XorName> {
357    sampled_key_groups
358        .into_iter()
359        .filter_map(|(key, close_peers)| {
360            repair_proofs
361                .has_mature_replica_hint(
362                    challenged_peer,
363                    &key,
364                    &close_peers,
365                    current_sync_epoch,
366                    now,
367                )
368                .then_some(key)
369        })
370        .collect()
371}
372
373#[derive(Debug, Clone, Copy, PartialEq, Eq)]
374enum AuditKeyFailureKind {
375    Absent,
376    DigestMismatch,
377    Unclassified,
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381struct AuditKeyFailure {
382    key: XorName,
383    kind: AuditKeyFailureKind,
384}
385
386impl AuditKeyFailure {
387    fn absent(key: XorName) -> Self {
388        Self {
389            key,
390            kind: AuditKeyFailureKind::Absent,
391        }
392    }
393
394    fn digest_mismatch(key: XorName) -> Self {
395        Self {
396            key,
397            kind: AuditKeyFailureKind::DigestMismatch,
398        }
399    }
400
401    fn unclassified(key: XorName) -> Self {
402        Self {
403            key,
404            kind: AuditKeyFailureKind::Unclassified,
405        }
406    }
407}
408
409fn build_audit_failure_summary(
410    challenged_key_count: usize,
411    confirmed_failures: &[AuditKeyFailure],
412) -> AuditFailureSummary {
413    let mut summary = AuditFailureSummary {
414        challenged_keys: challenged_key_count,
415        failed_keys: confirmed_failures.len(),
416        ..AuditFailureSummary::default()
417    };
418
419    for failure in confirmed_failures {
420        match failure.kind {
421            AuditKeyFailureKind::Absent => summary.absent_keys += 1,
422            AuditKeyFailureKind::DigestMismatch => summary.digest_mismatch_keys += 1,
423            AuditKeyFailureKind::Unclassified => {}
424        }
425    }
426
427    summary
428}
429
430fn audit_digest_failure_reason(confirmed_failures: &[AuditKeyFailure]) -> AuditFailureReason {
431    if confirmed_failures
432        .iter()
433        .all(|failure| failure.kind == AuditKeyFailureKind::Absent)
434    {
435        AuditFailureReason::KeyAbsent
436    } else {
437        AuditFailureReason::DigestMismatch
438    }
439}
440
441// ---------------------------------------------------------------------------
442// Digest verification
443// ---------------------------------------------------------------------------
444
445/// Verify per-key digests from audit response (Step 8).
446#[allow(clippy::too_many_arguments)]
447async fn verify_digests(
448    challenged_peer: &PeerId,
449    challenge_id: u64,
450    nonce: &[u8; 32],
451    keys: &[XorName],
452    digests: &[[u8; 32]],
453    storage: &Arc<LmdbStorage>,
454    p2p_node: &Arc<P2PNode>,
455    config: &ReplicationConfig,
456) -> AuditTickResult {
457    // Requirement: response must have exactly one digest per key.
458    if digests.len() != keys.len() {
459        warn!(
460            "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
461            digests.len(),
462            keys.len()
463        );
464        return handle_audit_failure(
465            challenged_peer,
466            challenge_id,
467            keys,
468            AuditFailureReason::MalformedResponse,
469            p2p_node,
470            config,
471        )
472        .await;
473    }
474
475    let challenged_peer_bytes = challenged_peer.as_bytes();
476    let mut failed_keys = Vec::new();
477
478    for (i, key) in keys.iter().enumerate() {
479        let received_digest = &digests[i];
480
481        // Check for absent sentinel.
482        if *received_digest == ABSENT_KEY_DIGEST {
483            failed_keys.push(AuditKeyFailure::absent(*key));
484            continue;
485        }
486
487        // Recompute expected digest from local copy.
488        let local_bytes = match storage.get_raw(key).await {
489            Ok(Some(bytes)) => bytes,
490            Ok(None) => {
491                // We should hold this key (we sampled it), but it's gone.
492                warn!(
493                    "Audit: local key {} disappeared during audit",
494                    hex::encode(key)
495                );
496                continue;
497            }
498            Err(e) => {
499                warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
500                continue;
501            }
502        };
503
504        let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
505        if *received_digest != expected {
506            failed_keys.push(AuditKeyFailure::digest_mismatch(*key));
507        }
508    }
509
510    if failed_keys.is_empty() {
511        info!(
512            "Audit: peer {challenged_peer} passed (all {} keys verified)",
513            keys.len()
514        );
515        return AuditTickResult::Passed {
516            challenged_peer: *challenged_peer,
517            keys_checked: keys.len(),
518        };
519    }
520
521    // Step 9: Responsibility confirmation for failed keys.
522    handle_classified_audit_failure(
523        challenged_peer,
524        challenge_id,
525        &failed_keys,
526        AuditFailureReason::DigestMismatch,
527        keys.len(),
528        p2p_node,
529        config,
530    )
531    .await
532}
533
534// ---------------------------------------------------------------------------
535// Failure handling with responsibility confirmation
536// ---------------------------------------------------------------------------
537
538/// Handle audit failure: confirm responsibility before emitting evidence (Step 9).
539async fn handle_audit_failure(
540    challenged_peer: &PeerId,
541    challenge_id: u64,
542    failed_keys: &[XorName],
543    reason: AuditFailureReason,
544    p2p_node: &Arc<P2PNode>,
545    config: &ReplicationConfig,
546) -> AuditTickResult {
547    let failures = failed_keys
548        .iter()
549        .copied()
550        .map(AuditKeyFailure::unclassified)
551        .collect::<Vec<_>>();
552    handle_classified_audit_failure(
553        challenged_peer,
554        challenge_id,
555        &failures,
556        reason,
557        failed_keys.len(),
558        p2p_node,
559        config,
560    )
561    .await
562}
563
564async fn handle_classified_audit_failure(
565    challenged_peer: &PeerId,
566    challenge_id: u64,
567    failed_keys: &[AuditKeyFailure],
568    reason: AuditFailureReason,
569    challenged_key_count: usize,
570    p2p_node: &Arc<P2PNode>,
571    config: &ReplicationConfig,
572) -> AuditTickResult {
573    let dht = p2p_node.dht_manager();
574    let mut confirmed_failures = Vec::new();
575
576    // Step 9a-b: Fresh local RT lookup for each failed key.
577    for failure in failed_keys {
578        let closest = dht
579            .find_closest_nodes_local_with_self(&failure.key, config.close_group_size)
580            .await;
581        if closest.iter().any(|n| n.peer_id == *challenged_peer) {
582            confirmed_failures.push(*failure);
583        } else {
584            debug!(
585                "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
586                hex::encode(failure.key)
587            );
588        }
589    }
590
591    // Step 9c: Empty confirmed set -> peer is no longer responsible for any
592    // of the failed keys (topology churn). This is NOT a pass — the peer did
593    // not prove it stores the data. Return Idle to avoid granting unearned
594    // positive trust.
595    if confirmed_failures.is_empty() {
596        info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
597        return AuditTickResult::Idle;
598    }
599
600    let summary = build_audit_failure_summary(challenged_key_count, &confirmed_failures);
601    let reason = if reason == AuditFailureReason::DigestMismatch {
602        audit_digest_failure_reason(&confirmed_failures)
603    } else {
604        reason
605    };
606    let confirmed_failed_keys = confirmed_failures
607        .iter()
608        .map(|failure| failure.key)
609        .collect();
610
611    // Step 9d: Non-empty confirmed set -> emit evidence.
612    let evidence = FailureEvidence::AuditFailure {
613        challenge_id,
614        challenged_peer: *challenged_peer,
615        confirmed_failed_keys,
616        summary,
617        reason,
618    };
619
620    AuditTickResult::Failed { evidence }
621}
622
623/// Handle audit timeout (no response received).
624async fn handle_audit_timeout(
625    challenged_peer: &PeerId,
626    challenge_id: u64,
627    keys: &[XorName],
628    p2p_node: &Arc<P2PNode>,
629    config: &ReplicationConfig,
630) -> AuditTickResult {
631    handle_audit_failure(
632        challenged_peer,
633        challenge_id,
634        keys,
635        AuditFailureReason::Timeout,
636        p2p_node,
637        config,
638    )
639    .await
640}
641
642// ---------------------------------------------------------------------------
643// Responder-side handler
644// ---------------------------------------------------------------------------
645
646/// Handle an incoming audit challenge (responder side).
647///
648/// Validates that the challenge targets this node, computes per-key digests,
649/// and returns the response.  Rejects challenges where
650/// `challenged_peer_id` does not match `self_peer_id` to prevent an oracle
651/// attack where a malicious challenger forges digests for a different peer.
652pub async fn handle_audit_challenge(
653    challenge: &AuditChallenge,
654    storage: &LmdbStorage,
655    self_peer_id: &PeerId,
656    is_bootstrapping: bool,
657    stored_chunks: usize,
658) -> AuditResponse {
659    if is_bootstrapping {
660        return AuditResponse::Bootstrapping {
661            challenge_id: challenge.challenge_id,
662        };
663    }
664
665    if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
666        warn!(
667            "Audit challenge targeted wrong peer: expected {}, got {}",
668            hex::encode(self_peer_id.as_bytes()),
669            hex::encode(challenge.challenged_peer_id),
670        );
671        return AuditResponse::Rejected {
672            challenge_id: challenge.challenge_id,
673            reason: "challenged_peer_id does not match this node".to_string(),
674        };
675    }
676
677    let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
678    if challenge.keys.len() > max_keys {
679        warn!(
680            "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
681             (stored_chunks={stored_chunks})",
682            challenge.keys.len(),
683        );
684        return AuditResponse::Rejected {
685            challenge_id: challenge.challenge_id,
686            reason: format!(
687                "challenge contains {} keys, limit is {max_keys}",
688                challenge.keys.len()
689            ),
690        };
691    }
692
693    let mut digests = Vec::with_capacity(challenge.keys.len());
694
695    for key in &challenge.keys {
696        match storage.get_raw(key).await {
697            Ok(Some(data)) => {
698                let digest = compute_audit_digest(
699                    &challenge.nonce,
700                    &challenge.challenged_peer_id,
701                    key,
702                    &data,
703                );
704                digests.push(digest);
705            }
706            Ok(None) => {
707                digests.push(ABSENT_KEY_DIGEST);
708            }
709            Err(e) => {
710                warn!(
711                    "Audit responder: failed to read key {}: {e}",
712                    hex::encode(key)
713                );
714                digests.push(ABSENT_KEY_DIGEST);
715            }
716        }
717    }
718
719    AuditResponse::Digests {
720        challenge_id: challenge.challenge_id,
721        digests,
722    }
723}
724
725// ---------------------------------------------------------------------------
726// Tests
727// ---------------------------------------------------------------------------
728
729#[cfg(test)]
730#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
731mod tests {
732    use super::*;
733    use crate::replication::config::REPAIR_HINT_MIN_AGE;
734    use crate::replication::protocol::compute_audit_digest;
735    use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
736    use crate::storage::LmdbStorageConfig;
737    use std::time::Instant;
738    use tempfile::TempDir;
739
740    /// Simulated stored chunk count for tests. Large enough that the dynamic
741    /// incoming audit limit (`2 * sqrt(N)`) never rejects small test challenges.
742    const TEST_STORED_CHUNKS: usize = 1_000_000;
743
744    /// Create a test `LmdbStorage` backed by a temp directory.
745    async fn create_test_storage() -> (LmdbStorage, TempDir) {
746        let temp_dir = TempDir::new().expect("create temp dir");
747        let config = LmdbStorageConfig {
748            root_dir: temp_dir.path().to_path_buf(),
749            verify_on_read: false,
750            max_map_size: 0,
751            disk_reserve: 0,
752        };
753        let storage = LmdbStorage::new(config).await.expect("create storage");
754        (storage, temp_dir)
755    }
756
757    /// Build a challenge with the given parameters.
758    fn make_challenge(
759        challenge_id: u64,
760        nonce: [u8; 32],
761        peer_id: [u8; 32],
762        keys: Vec<XorName>,
763    ) -> AuditChallenge {
764        AuditChallenge {
765            challenge_id,
766            nonce,
767            challenged_peer_id: peer_id,
768            keys,
769        }
770    }
771
772    /// Build a `PeerId` matching the raw bytes used in a challenge.
773    fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
774        PeerId::from_bytes(bytes)
775    }
776
777    // -- handle_audit_challenge: present keys ---------------------------------
778
779    #[tokio::test]
780    async fn handle_challenge_present_keys_returns_correct_digests() {
781        let (storage, _temp) = create_test_storage().await;
782
783        // Store two chunks.
784        let content_a = b"chunk alpha";
785        let addr_a = LmdbStorage::compute_address(content_a);
786        storage.put(&addr_a, content_a).await.expect("put a");
787
788        let content_b = b"chunk beta";
789        let addr_b = LmdbStorage::compute_address(content_b);
790        storage.put(&addr_b, content_b).await.expect("put b");
791
792        let nonce = [0xAA; 32];
793        let peer_id = [0xBB; 32];
794        let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
795        let self_id = peer_id_from_bytes(peer_id);
796
797        let response =
798            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
799
800        match response {
801            AuditResponse::Digests {
802                challenge_id,
803                digests,
804            } => {
805                assert_eq!(challenge_id, 42);
806                assert_eq!(digests.len(), 2);
807
808                let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
809                let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
810                assert_eq!(digests[0], expected_a);
811                assert_eq!(digests[1], expected_b);
812            }
813            AuditResponse::Bootstrapping { .. } => {
814                panic!("expected Digests, got Bootstrapping");
815            }
816            AuditResponse::Rejected { .. } => {
817                panic!("Unexpected Rejected response");
818            }
819        }
820    }
821
822    // -- handle_audit_challenge: absent keys ----------------------------------
823
824    #[tokio::test]
825    async fn handle_challenge_absent_keys_returns_sentinel() {
826        let (storage, _temp) = create_test_storage().await;
827
828        let absent_key = [0xFF; 32];
829        let nonce = [0x11; 32];
830        let peer_id = [0x22; 32];
831        let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
832        let self_id = peer_id_from_bytes(peer_id);
833
834        let response =
835            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
836
837        match response {
838            AuditResponse::Digests {
839                challenge_id,
840                digests,
841            } => {
842                assert_eq!(challenge_id, 99);
843                assert_eq!(digests.len(), 1);
844                assert_eq!(
845                    digests[0], ABSENT_KEY_DIGEST,
846                    "absent key should produce sentinel digest"
847                );
848            }
849            AuditResponse::Bootstrapping { .. } => {
850                panic!("expected Digests, got Bootstrapping");
851            }
852            AuditResponse::Rejected { .. } => {
853                panic!("Unexpected Rejected response");
854            }
855        }
856    }
857
858    // -- handle_audit_challenge: mixed present and absent ---------------------
859
860    #[tokio::test]
861    async fn handle_challenge_mixed_present_and_absent() {
862        let (storage, _temp) = create_test_storage().await;
863
864        let content = b"present chunk";
865        let addr_present = LmdbStorage::compute_address(content);
866        storage.put(&addr_present, content).await.expect("put");
867
868        let addr_absent = [0xDE; 32];
869        let nonce = [0x33; 32];
870        let peer_id = [0x44; 32];
871        let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
872        let self_id = peer_id_from_bytes(peer_id);
873
874        let response =
875            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
876
877        match response {
878            AuditResponse::Digests { digests, .. } => {
879                assert_eq!(digests.len(), 2);
880
881                let expected_present =
882                    compute_audit_digest(&nonce, &peer_id, &addr_present, content);
883                assert_eq!(digests[0], expected_present);
884                assert_eq!(
885                    digests[1], ABSENT_KEY_DIGEST,
886                    "absent key should be sentinel"
887                );
888            }
889            AuditResponse::Bootstrapping { .. } => {
890                panic!("expected Digests, got Bootstrapping");
891            }
892            AuditResponse::Rejected { .. } => {
893                panic!("Unexpected Rejected response");
894            }
895        }
896    }
897
898    // -- handle_audit_challenge: bootstrapping --------------------------------
899
900    #[tokio::test]
901    async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
902        let (storage, _temp) = create_test_storage().await;
903
904        let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
905        let self_id = peer_id_from_bytes([0x01; 32]);
906
907        let response =
908            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
909
910        match response {
911            AuditResponse::Bootstrapping { challenge_id } => {
912                assert_eq!(challenge_id, 55);
913            }
914            AuditResponse::Digests { .. } => {
915                panic!("expected Bootstrapping, got Digests");
916            }
917            AuditResponse::Rejected { .. } => {
918                panic!("Unexpected Rejected response");
919            }
920        }
921    }
922
923    // -- handle_audit_challenge: empty key list -------------------------------
924
925    #[tokio::test]
926    async fn handle_challenge_empty_keys_returns_empty_digests() {
927        let (storage, _temp) = create_test_storage().await;
928
929        let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
930        let self_id = peer_id_from_bytes([0x20; 32]);
931
932        let response =
933            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
934
935        match response {
936            AuditResponse::Digests {
937                challenge_id,
938                digests,
939            } => {
940                assert_eq!(challenge_id, 100);
941                assert!(
942                    digests.is_empty(),
943                    "empty key list should yield empty digests"
944                );
945            }
946            AuditResponse::Bootstrapping { .. } => {
947                panic!("expected Digests, got Bootstrapping");
948            }
949            AuditResponse::Rejected { .. } => {
950                panic!("Unexpected Rejected response");
951            }
952        }
953    }
954
955    // -- Digest verification: matching ----------------------------------------
956
957    #[test]
958    fn digest_verification_matching() {
959        let nonce = [0x01; 32];
960        let peer_id = [0x02; 32];
961        let key: XorName = [0x03; 32];
962        let data = b"correct data";
963
964        let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
965        let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
966
967        assert_eq!(
968            expected, recomputed,
969            "same inputs must produce identical digests"
970        );
971        assert_ne!(
972            expected, ABSENT_KEY_DIGEST,
973            "real digest must not be sentinel"
974        );
975    }
976
977    // -- Digest verification: mismatching -------------------------------------
978
979    #[test]
980    fn digest_verification_mismatching_data() {
981        let nonce = [0x01; 32];
982        let peer_id = [0x02; 32];
983        let key: XorName = [0x03; 32];
984
985        let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
986        let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
987
988        assert_ne!(
989            digest_a, digest_b,
990            "different data must produce different digests"
991        );
992    }
993
994    #[test]
995    fn digest_verification_mismatching_nonce() {
996        let peer_id = [0x02; 32];
997        let key: XorName = [0x03; 32];
998        let data = b"same data";
999
1000        let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
1001        let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
1002
1003        assert_ne!(
1004            digest_a, digest_b,
1005            "different nonces must produce different digests"
1006        );
1007    }
1008
1009    #[test]
1010    fn digest_verification_mismatching_peer() {
1011        let nonce = [0x01; 32];
1012        let key: XorName = [0x03; 32];
1013        let data = b"same data";
1014
1015        let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
1016        let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
1017
1018        assert_ne!(
1019            digest_a, digest_b,
1020            "different peers must produce different digests"
1021        );
1022    }
1023
1024    #[test]
1025    fn digest_verification_mismatching_key() {
1026        let nonce = [0x01; 32];
1027        let peer_id = [0x02; 32];
1028        let data = b"same data";
1029
1030        let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
1031        let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
1032
1033        assert_ne!(
1034            digest_a, digest_b,
1035            "different keys must produce different digests"
1036        );
1037    }
1038
1039    // -- Absent sentinel is all zeros -----------------------------------------
1040
1041    #[test]
1042    fn absent_sentinel_is_all_zeros() {
1043        assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
1044    }
1045
1046    // -- Bootstrapping skips digest computation even with stored keys ---------
1047
1048    #[tokio::test]
1049    async fn bootstrapping_skips_digest_computation() {
1050        let (storage, _temp) = create_test_storage().await;
1051
1052        let content = b"stored but bootstrapping";
1053        let addr = LmdbStorage::compute_address(content);
1054        storage.put(&addr, content).await.expect("put");
1055
1056        let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
1057        let self_id = peer_id_from_bytes([0xDD; 32]);
1058
1059        let response =
1060            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1061
1062        assert!(
1063            matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
1064            "bootstrapping node must not compute digests"
1065        );
1066    }
1067
1068    // -- Scenario 19/53: Partial failure with mixed responsibility ----------------
1069
1070    #[tokio::test]
1071    async fn scenario_19_partial_failure_mixed_responsibility() {
1072        // Three keys challenged: K1 matches, K2 mismatches, K3 absent.
1073        // After responsibility confirmation, only K2 is confirmed responsible.
1074        // AuditFailure emitted for {K2} only.
1075        // Test handle_audit_challenge with mixed results, then verify
1076        // the digest logic manually.
1077
1078        let (storage, _temp) = create_test_storage().await;
1079        let nonce = [0x42u8; 32];
1080        let peer_id = [0xAA; 32];
1081
1082        // Store K1 and K2, but NOT K3
1083        let content_k1 = b"key one data";
1084        let addr_k1 = LmdbStorage::compute_address(content_k1);
1085        storage.put(&addr_k1, content_k1).await.unwrap();
1086
1087        let content_k2 = b"key two data";
1088        let addr_k2 = LmdbStorage::compute_address(content_k2);
1089        storage.put(&addr_k2, content_k2).await.unwrap();
1090
1091        let addr_k3 = [0xFF; 32]; // Not stored
1092
1093        let challenge = AuditChallenge {
1094            challenge_id: 100,
1095            nonce,
1096            challenged_peer_id: peer_id,
1097            keys: vec![addr_k1, addr_k2, addr_k3],
1098        };
1099        let self_id = peer_id_from_bytes(peer_id);
1100
1101        let response =
1102            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1103
1104        match response {
1105            AuditResponse::Digests { digests, .. } => {
1106                assert_eq!(digests.len(), 3);
1107
1108                // K1 should have correct digest
1109                let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
1110                assert_eq!(digests[0], expected_k1);
1111
1112                // K2 should have correct digest
1113                let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
1114                assert_eq!(digests[1], expected_k2);
1115
1116                // K3 absent -> sentinel
1117                assert_eq!(digests[2], ABSENT_KEY_DIGEST);
1118            }
1119            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
1120            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1121        }
1122    }
1123
1124    // -- Scenario 54: All digests pass -------------------------------------------
1125
1126    #[tokio::test]
1127    async fn scenario_54_all_digests_pass() {
1128        // All challenged keys present and digests match.
1129        // Multiple keys to strengthen coverage beyond existing two-key tests.
1130        let (storage, _temp) = create_test_storage().await;
1131        let nonce = [0x10; 32];
1132        let peer_id = [0x20; 32];
1133
1134        let c1 = b"chunk alpha";
1135        let c2 = b"chunk beta";
1136        let c3 = b"chunk gamma";
1137        let a1 = LmdbStorage::compute_address(c1);
1138        let a2 = LmdbStorage::compute_address(c2);
1139        let a3 = LmdbStorage::compute_address(c3);
1140        storage.put(&a1, c1).await.unwrap();
1141        storage.put(&a2, c2).await.unwrap();
1142        storage.put(&a3, c3).await.unwrap();
1143
1144        let challenge = AuditChallenge {
1145            challenge_id: 200,
1146            nonce,
1147            challenged_peer_id: peer_id,
1148            keys: vec![a1, a2, a3],
1149        };
1150        let self_id = peer_id_from_bytes(peer_id);
1151
1152        let response =
1153            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1154        match response {
1155            AuditResponse::Digests { digests, .. } => {
1156                assert_eq!(digests.len(), 3);
1157                for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
1158                    .iter()
1159                    .enumerate()
1160                {
1161                    let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
1162                    assert_eq!(digests[i], expected, "Key {i} digest should match");
1163                }
1164            }
1165            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1166            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1167        }
1168    }
1169
1170    // -- Scenario 55: Empty failure set means no evidence -------------------------
1171
1172    /// Scenario 55: Peer challenged on {K1, K2}. Both digests mismatch.
1173    /// Responsibility confirmation shows the peer is NOT responsible for
1174    /// either key. The confirmed failure set is empty — no `AuditFailure`
1175    /// evidence is emitted.
1176    ///
1177    /// Full `verify_digests` requires a live `P2PNode` for network lookups.
1178    /// This test exercises the deterministic sub-steps:
1179    ///   (1) Digest comparison identifies K1 and K2 as mismatches.
1180    ///   (2) Responsibility confirmation removes both keys.
1181    ///   (3) Empty confirmed failure set means no evidence.
1182    #[tokio::test]
1183    async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1184        let (storage, _temp) = create_test_storage().await;
1185        let nonce = [0x55; 32];
1186        let peer_id = [0x55; 32];
1187
1188        // Store K1 and K2 on the challenger (for expected digest computation).
1189        let c1 = b"scenario 55 key one";
1190        let c2 = b"scenario 55 key two";
1191        let k1 = LmdbStorage::compute_address(c1);
1192        let k2 = LmdbStorage::compute_address(c2);
1193        storage.put(&k1, c1).await.expect("put k1");
1194        storage.put(&k2, c2).await.expect("put k2");
1195
1196        // Challenger computes expected digests.
1197        let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1198        let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1199
1200        // Simulate peer returning WRONG digests for both keys.
1201        let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1202        let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1203        assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1204        assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1205
1206        // Step 1: Identify failed keys via digest comparison.
1207        let keys = [k1, k2];
1208        let expected = [expected_d1, expected_d2];
1209        let received = [wrong_d1, wrong_d2];
1210
1211        let mut failed_keys = Vec::new();
1212        for i in 0..keys.len() {
1213            if received[i] != expected[i] {
1214                failed_keys.push(keys[i]);
1215            }
1216        }
1217        assert_eq!(
1218            failed_keys.len(),
1219            2,
1220            "Both keys should be identified as digest mismatches"
1221        );
1222
1223        // Step 2: Responsibility confirmation — peer is NOT responsible for
1224        // either key (simulated by filtering them all out).
1225        let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1226        let confirmed_failures: Vec<XorName> = failed_keys
1227            .into_iter()
1228            .filter(|k| confirmed_responsible_keys.contains(k))
1229            .collect();
1230
1231        // Step 3: Empty confirmed failure set → no AuditFailure evidence.
1232        assert!(
1233            confirmed_failures.is_empty(),
1234            "With no confirmed responsibility, failure set must be empty — \
1235             no AuditFailure evidence should be emitted"
1236        );
1237
1238        // Verify that constructing evidence with empty keys results in a
1239        // no-penalty outcome (the caller checks is_empty before emitting).
1240        let peer = PeerId::from_bytes(peer_id);
1241        let evidence = FailureEvidence::AuditFailure {
1242            challenge_id: 5500,
1243            challenged_peer: peer,
1244            confirmed_failed_keys: confirmed_failures,
1245            summary: AuditFailureSummary::default(),
1246            reason: AuditFailureReason::DigestMismatch,
1247        };
1248        if let FailureEvidence::AuditFailure {
1249            confirmed_failed_keys,
1250            ..
1251        } = evidence
1252        {
1253            assert!(
1254                confirmed_failed_keys.is_empty(),
1255                "Evidence with empty failure set should not trigger a trust penalty"
1256            );
1257        }
1258    }
1259
1260    // -- Scenario 56: RepairOpportunity filters never-synced peers ----------------
1261
1262    #[test]
1263    fn scenario_56_repair_opportunity_filters_never_synced() {
1264        // PeerSyncRecord with last_sync=None should not pass
1265        // has_repair_opportunity().
1266
1267        let never_synced = PeerSyncRecord {
1268            last_sync: None,
1269            cycles_since_sync: 5,
1270        };
1271        assert!(!never_synced.has_repair_opportunity());
1272
1273        let synced_no_cycle = PeerSyncRecord {
1274            last_sync: Some(Instant::now()),
1275            cycles_since_sync: 0,
1276        };
1277        assert!(!synced_no_cycle.has_repair_opportunity());
1278
1279        let synced_with_cycle = PeerSyncRecord {
1280            last_sync: Some(Instant::now()),
1281            cycles_since_sync: 1,
1282        };
1283        assert!(synced_with_cycle.has_repair_opportunity());
1284    }
1285
1286    #[test]
1287    fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1288        let peer = peer_id_from_bytes([0x57; 32]);
1289        let mut sync_history = HashMap::new();
1290        sync_history.insert(
1291            peer,
1292            PeerSyncRecord {
1293                last_sync: Some(Instant::now()),
1294                cycles_since_sync: 1,
1295            },
1296        );
1297
1298        let mut bootstrap_claims = HashMap::new();
1299        let first_seen = Instant::now()
1300            .checked_sub(
1301                crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1302                    + std::time::Duration::from_secs(1),
1303            )
1304            .unwrap_or_else(Instant::now);
1305        bootstrap_claims.insert(peer, first_seen);
1306
1307        let eligible = eligible_audit_peers(&sync_history);
1308
1309        assert!(bootstrap_claims.contains_key(&peer));
1310        assert!(
1311            eligible.contains(&peer),
1312            "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1313        );
1314    }
1315
1316    #[test]
1317    fn audit_failure_summary_counts_confirmed_absent_and_mismatch_keys() {
1318        let absent_key = [0xA1; 32];
1319        let mismatch_key = [0xB2; 32];
1320        let confirmed = vec![
1321            AuditKeyFailure::absent(absent_key),
1322            AuditKeyFailure::digest_mismatch(mismatch_key),
1323        ];
1324
1325        let summary = build_audit_failure_summary(5, &confirmed);
1326
1327        assert_eq!(summary.challenged_keys, 5);
1328        assert_eq!(summary.failed_keys, 2);
1329        assert_eq!(summary.absent_keys, 1);
1330        assert_eq!(summary.digest_mismatch_keys, 1);
1331    }
1332
1333    #[test]
1334    fn audit_failure_summary_leaves_unclassified_rejections_out_of_absent_mismatch_counts() {
1335        let rejected_key = [0xC3; 32];
1336        let confirmed = vec![AuditKeyFailure::unclassified(rejected_key)];
1337
1338        let summary = build_audit_failure_summary(3, &confirmed);
1339
1340        assert_eq!(summary.challenged_keys, 3);
1341        assert_eq!(summary.failed_keys, 1);
1342        assert_eq!(summary.absent_keys, 0);
1343        assert_eq!(summary.digest_mismatch_keys, 0);
1344    }
1345
1346    #[test]
1347    fn audit_digest_failure_reason_is_key_absent_when_all_confirmed_failures_are_absent() {
1348        let failures = vec![AuditKeyFailure::absent([0xD4; 32])];
1349
1350        assert_eq!(
1351            audit_digest_failure_reason(&failures),
1352            AuditFailureReason::KeyAbsent
1353        );
1354    }
1355
1356    #[test]
1357    fn audit_digest_failure_reason_is_digest_mismatch_for_mixed_failures() {
1358        let failures = vec![
1359            AuditKeyFailure::absent([0xD5; 32]),
1360            AuditKeyFailure::digest_mismatch([0xE6; 32]),
1361        ];
1362
1363        assert_eq!(
1364            audit_digest_failure_reason(&failures),
1365            AuditFailureReason::DigestMismatch
1366        );
1367    }
1368
1369    #[test]
1370    fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() {
1371        const HINT_EPOCH: u64 = 7;
1372        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1373        const CHALLENGED_PEER_BYTE: u8 = 0xA1;
1374        const OTHER_PEER_BYTE: u8 = 0xA2;
1375        const NEW_PEER_BYTE: u8 = 0xA3;
1376        const MATURE_KEY_BYTE: u8 = 0xB1;
1377        const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
1378        const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
1379        const STABLE_CHURN_KEY_BYTE: u8 = 0xB4;
1380        const EVICTED_KEY_BYTE: u8 = 0xB5;
1381        const FRESH_HINT_KEY_BYTE: u8 = 0xB6;
1382        const XOR_NAME_LEN: usize = 32;
1383
1384        let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
1385        let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
1386        let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
1387        let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
1388        let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
1389        let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
1390        let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN];
1391        let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN];
1392        let fresh_hint_key = [FRESH_HINT_KEY_BYTE; XOR_NAME_LEN];
1393        let close_group = HashSet::from([challenged_peer, other_peer]);
1394        let changed_close_group = HashSet::from([challenged_peer, new_peer]);
1395        let evicted_close_group = HashSet::from([other_peer, new_peer]);
1396        let mut repair_proofs = RepairProofs::new();
1397        let mature_hinted_at = Instant::now();
1398        let now = mature_hinted_at
1399            .checked_add(REPAIR_HINT_MIN_AGE)
1400            .unwrap_or(mature_hinted_at);
1401
1402        assert!(repair_proofs.record_replica_hint_sent_at(
1403            challenged_peer,
1404            mature_key,
1405            &close_group,
1406            HINT_EPOCH,
1407            mature_hinted_at,
1408        ));
1409        assert!(repair_proofs.record_replica_hint_sent_at(
1410            challenged_peer,
1411            same_epoch_key,
1412            &close_group,
1413            CURRENT_EPOCH,
1414            mature_hinted_at,
1415        ));
1416        assert!(repair_proofs.record_replica_hint_sent_at(
1417            challenged_peer,
1418            stable_churn_key,
1419            &close_group,
1420            HINT_EPOCH,
1421            mature_hinted_at,
1422        ));
1423        assert!(repair_proofs.record_replica_hint_sent_at(
1424            challenged_peer,
1425            evicted_key,
1426            &close_group,
1427            HINT_EPOCH,
1428            mature_hinted_at,
1429        ));
1430        assert!(repair_proofs.record_replica_hint_sent_at(
1431            challenged_peer,
1432            fresh_hint_key,
1433            &close_group,
1434            HINT_EPOCH,
1435            now,
1436        ));
1437
1438        let sampled_key_groups = vec![
1439            (mature_key, close_group.clone()),
1440            (same_epoch_key, close_group.clone()),
1441            (missing_proof_key, close_group.clone()),
1442            (stable_churn_key, changed_close_group),
1443            (evicted_key, evicted_close_group),
1444            (fresh_hint_key, close_group.clone()),
1445        ];
1446        let peer_keys = mature_audit_keys_for_peer(
1447            &challenged_peer,
1448            sampled_key_groups,
1449            &mut repair_proofs,
1450            CURRENT_EPOCH,
1451            now,
1452        );
1453
1454        assert_eq!(
1455            peer_keys,
1456            vec![mature_key, stable_churn_key],
1457            "mature proofs for stable close-group peers should become audit keys, while same-epoch, fresh, missing, and evicted-peer proofs should not"
1458        );
1459    }
1460
1461    // -- Audit response must match key count --------------------------------------
1462
1463    #[tokio::test]
1464    async fn audit_response_must_match_key_count() {
1465        // Section 15: "A response is invalid if it has fewer or more entries
1466        // than challenged keys."
1467        // Verify handle_audit_challenge always produces exactly N digests for
1468        // N keys, including edge cases.
1469
1470        let (storage, _temp) = create_test_storage().await;
1471        let nonce = [0x50; 32];
1472        let peer_id = [0x60; 32];
1473
1474        // Store a single chunk
1475        let content = b"single chunk";
1476        let addr = LmdbStorage::compute_address(content);
1477        storage.put(&addr, content).await.unwrap();
1478
1479        // Challenge with 1 stored + 4 absent = 5 keys total
1480        let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1481        let mut keys = vec![addr];
1482        keys.extend_from_slice(&absent_keys);
1483
1484        let key_count = keys.len();
1485        let challenge = make_challenge(300, nonce, peer_id, keys);
1486        let self_id = peer_id_from_bytes(peer_id);
1487
1488        let response =
1489            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1490        match response {
1491            AuditResponse::Digests { digests, .. } => {
1492                assert_eq!(
1493                    digests.len(),
1494                    key_count,
1495                    "must produce exactly one digest per challenged key"
1496                );
1497            }
1498            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1499            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1500        }
1501    }
1502
1503    // -- Audit digest uses full record bytes --------------------------------------
1504
1505    #[test]
1506    fn audit_digest_uses_full_record_bytes() {
1507        // Verify digest changes when record content changes.
1508        let nonce = [1u8; 32];
1509        let peer = [2u8; 32];
1510        let key = [3u8; 32];
1511
1512        let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1513        let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1514        assert_ne!(
1515            d1, d2,
1516            "Different record bytes must produce different digests"
1517        );
1518    }
1519
1520    // -- Scenario 29: Audit start gate ------------------------------------------
1521
1522    /// Scenario 29: `handle_audit_challenge` returns `Bootstrapping` when the
1523    /// node is still bootstrapping — audit digests are never computed, and no
1524    /// `AuditFailure` evidence is emitted by the caller.
1525    ///
1526    /// This is the responder-side gate.  The challenger-side gate is enforced
1527    /// by `audit_tick`'s `is_bootstrapping` guard (Invariant 19) and by
1528    /// `check_bootstrap_drained()` in the engine loop; this test confirms the
1529    /// complementary responder behavior.
1530    #[tokio::test]
1531    async fn scenario_29_audit_start_gate_during_bootstrap() {
1532        let (storage, _temp) = create_test_storage().await;
1533
1534        // Store data so there *would* be work to audit.
1535        let content = b"should not be audited during bootstrap";
1536        let addr = LmdbStorage::compute_address(content);
1537        storage.put(&addr, content).await.expect("put");
1538
1539        let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1540        let self_id = peer_id_from_bytes([0x29; 32]);
1541
1542        // Responder is bootstrapping → Bootstrapping response, NOT Digests.
1543        let response =
1544            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1545        assert!(
1546            matches!(
1547                response,
1548                AuditResponse::Bootstrapping { challenge_id: 2900 }
1549            ),
1550            "bootstrapping node must not compute digests — audit start gate"
1551        );
1552
1553        // Responder is NOT bootstrapping → normal Digests.
1554        let response =
1555            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1556        assert!(
1557            matches!(response, AuditResponse::Digests { .. }),
1558            "drained node should compute digests normally"
1559        );
1560    }
1561
1562    // -- Scenario 30: Audit peer selection from sampled keys --------------------
1563
1564    /// Scenario 30: Key sampling uses dynamic sqrt-based batch sizing and
1565    /// `RepairOpportunity` filtering excludes never-synced peers.
1566    ///
1567    /// Full `audit_tick` requires a live network.  This test verifies the two
1568    /// deterministic sub-steps the function relies on:
1569    ///   (a) `audit_sample_count` scales with `sqrt(total_keys)`.
1570    ///   (b) `PeerSyncRecord::has_repair_opportunity` gates peer eligibility.
1571    #[test]
1572    fn scenario_30_audit_peer_selection_from_sampled_keys() {
1573        // (a) Dynamic sample count scales with sqrt(total_keys).
1574        assert_eq!(
1575            ReplicationConfig::audit_sample_count(100),
1576            10,
1577            "sample count should scale with sqrt(total_keys)"
1578        );
1579
1580        assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1581
1582        assert_eq!(
1583            ReplicationConfig::audit_sample_count(10_000),
1584            100,
1585            "sqrt(10000) = 100"
1586        );
1587
1588        // (b) Peer eligibility via RepairOpportunity.
1589        // Never synced → not eligible.
1590        let never = PeerSyncRecord {
1591            last_sync: None,
1592            cycles_since_sync: 10,
1593        };
1594        assert!(!never.has_repair_opportunity());
1595
1596        // Synced but zero subsequent cycles → not eligible.
1597        let too_soon = PeerSyncRecord {
1598            last_sync: Some(Instant::now()),
1599            cycles_since_sync: 0,
1600        };
1601        assert!(!too_soon.has_repair_opportunity());
1602
1603        // Synced with ≥1 cycle → eligible.
1604        let eligible = PeerSyncRecord {
1605            last_sync: Some(Instant::now()),
1606            cycles_since_sync: 2,
1607        };
1608        assert!(eligible.has_repair_opportunity());
1609    }
1610
1611    // -- Scenario 32: Dynamic challenge size ------------------------------------
1612
1613    /// Scenario 32: Challenge key count equals `|PeerKeySet(challenged_peer)|`,
1614    /// which is dynamic per round.  If no eligible peer remains after filtering,
1615    /// the tick is idle.
1616    ///
1617    /// Verified via `handle_audit_challenge`: the response digest count always
1618    /// equals the number of keys in the challenge.
1619    #[tokio::test]
1620    async fn scenario_32_dynamic_challenge_size() {
1621        let (storage, _temp) = create_test_storage().await;
1622
1623        // Store varying numbers of chunks.
1624        let mut addrs = Vec::new();
1625        for i in 0u8..5 {
1626            let content = format!("dynamic challenge key {i}");
1627            let addr = LmdbStorage::compute_address(content.as_bytes());
1628            storage.put(&addr, content.as_bytes()).await.expect("put");
1629            addrs.push(addr);
1630        }
1631
1632        let nonce = [0x32; 32];
1633        let peer_id = [0x32; 32];
1634        let self_id = peer_id_from_bytes(peer_id);
1635
1636        // Challenge with 1 key.
1637        let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1638        let resp1 =
1639            handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1640                .await;
1641        if let AuditResponse::Digests { digests, .. } = resp1 {
1642            assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1643        }
1644
1645        // Challenge with 3 keys.
1646        let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1647        let resp3 =
1648            handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1649                .await;
1650        if let AuditResponse::Digests { digests, .. } = resp3 {
1651            assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1652        }
1653
1654        // Challenge with all 5 keys.
1655        let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1656        let resp5 =
1657            handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1658                .await;
1659        if let AuditResponse::Digests { digests, .. } = resp5 {
1660            assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1661        }
1662
1663        // Challenge with 0 keys (idle equivalent — no work).
1664        let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1665        let resp0 =
1666            handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1667                .await;
1668        if let AuditResponse::Digests { digests, .. } = resp0 {
1669            assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1670        }
1671    }
1672
1673    // -- Scenario 47: Bootstrap claim grace period (audit) ----------------------
1674
1675    /// Scenario 47: Challenged peer responds with bootstrapping claim during
1676    /// audit.  `handle_audit_challenge` returns `Bootstrapping`; caller records
1677    /// `BootstrapClaimFirstSeen`.  No `AuditFailure` evidence is emitted.
1678    #[tokio::test]
1679    async fn scenario_47_bootstrap_claim_grace_period_audit() {
1680        let (storage, _temp) = create_test_storage().await;
1681
1682        // Store data so there is an auditable key.
1683        let content = b"bootstrap grace test";
1684        let addr = LmdbStorage::compute_address(content);
1685        storage.put(&addr, content).await.expect("put");
1686
1687        let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1688        let self_id = peer_id_from_bytes([0x47; 32]);
1689
1690        // Bootstrapping peer → Bootstrapping response (grace period start).
1691        let response =
1692            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1693        let challenge_id = match response {
1694            AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1695            AuditResponse::Digests { .. } => {
1696                panic!("Expected Bootstrapping response during grace period")
1697            }
1698            AuditResponse::Rejected { .. } => {
1699                panic!("Unexpected Rejected response")
1700            }
1701        };
1702        assert_eq!(challenge_id, 4700);
1703
1704        // Caller records BootstrapClaimFirstSeen — verify the types support it.
1705        let peer = PeerId::from_bytes([0x47; 32]);
1706        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1707        let now = Instant::now();
1708        let observed = state.observe_bootstrap_claim(
1709            peer,
1710            now,
1711            crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1712        );
1713
1714        assert_eq!(
1715            observed,
1716            BootstrapClaimObservation::WithinGrace { first_seen: now }
1717        );
1718        assert!(
1719            state.bootstrap_claims.contains_key(&peer),
1720            "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1721        );
1722        assert!(
1723            state.bootstrap_claim_history.contains_key(&peer),
1724            "Bootstrap claim history should remember that the grace window was used"
1725        );
1726    }
1727
1728    // -- Scenario 53: Audit partial per-key failure with mixed responsibility ---
1729
1730    /// Scenario 53: P challenged on {K1, K2, K3}.  K1 matches, K2 and K3
1731    /// mismatch.  Responsibility confirmation: P is responsible for K2 but
1732    /// not K3.  `AuditFailure` emitted for {K2} only.
1733    ///
1734    /// Full `verify_digests` + `handle_audit_failure` requires a `P2PNode` for
1735    /// network lookups.  This test verifies the conceptual steps:
1736    ///   (1) Digest comparison correctly identifies K2 and K3 as failures.
1737    ///   (2) `FailureEvidence::AuditFailure` carries only confirmed keys.
1738    #[tokio::test]
1739    async fn scenario_53_partial_failure_mixed_responsibility() {
1740        let (storage, _temp) = create_test_storage().await;
1741        let nonce = [0x53; 32];
1742        let peer_id = [0x53; 32];
1743
1744        // Store K1, K2, K3.
1745        let c1 = b"scenario 53 key one";
1746        let c2 = b"scenario 53 key two";
1747        let c3 = b"scenario 53 key three";
1748        let k1 = LmdbStorage::compute_address(c1);
1749        let k2 = LmdbStorage::compute_address(c2);
1750        let k3 = LmdbStorage::compute_address(c3);
1751        storage.put(&k1, c1).await.expect("put k1");
1752        storage.put(&k2, c2).await.expect("put k2");
1753        storage.put(&k3, c3).await.expect("put k3");
1754
1755        // Correct digests from challenger's local store.
1756        let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1757        let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1758        let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1759
1760        // Simulate peer response: K1 matches, K2 wrong data, K3 wrong data.
1761        let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1762        let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1763
1764        assert_eq!(d1_expected, d1_expected, "K1 should match");
1765        assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1766        assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1767
1768        // Step 1: Identify failed keys (digest comparison).
1769        let digests = [d1_expected, d2_wrong, d3_wrong];
1770        let keys = [k1, k2, k3];
1771        let contents: [&[u8]; 3] = [c1, c2, c3];
1772
1773        let mut failed_keys = Vec::new();
1774        for (i, key) in keys.iter().enumerate() {
1775            if digests[i] == ABSENT_KEY_DIGEST {
1776                failed_keys.push(*key);
1777                continue;
1778            }
1779            let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1780            if digests[i] != expected {
1781                failed_keys.push(*key);
1782            }
1783        }
1784
1785        assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1786        assert!(failed_keys.contains(&k2));
1787        assert!(failed_keys.contains(&k3));
1788        assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1789
1790        // Step 2: Responsibility confirmation removes K3 (not responsible).
1791        // Simulate: P is in closest peers for K2 but not K3.
1792        let responsible_for_k2 = true;
1793        let responsible_for_k3 = false;
1794        let mut confirmed = Vec::new();
1795        for key in &failed_keys {
1796            let is_responsible = if *key == k2 {
1797                responsible_for_k2
1798            } else {
1799                responsible_for_k3
1800            };
1801            if is_responsible {
1802                confirmed.push(*key);
1803            }
1804        }
1805
1806        assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1807
1808        // Step 3: Construct evidence for confirmed failures only.
1809        let challenged_peer = PeerId::from_bytes(peer_id);
1810        let evidence = FailureEvidence::AuditFailure {
1811            challenge_id: 5300,
1812            challenged_peer,
1813            confirmed_failed_keys: confirmed,
1814            summary: AuditFailureSummary::default(),
1815            reason: AuditFailureReason::DigestMismatch,
1816        };
1817
1818        match evidence {
1819            FailureEvidence::AuditFailure {
1820                confirmed_failed_keys,
1821                ..
1822            } => {
1823                assert_eq!(
1824                    confirmed_failed_keys.len(),
1825                    1,
1826                    "Only K2 should generate evidence"
1827                );
1828                assert_eq!(confirmed_failed_keys[0], k2);
1829            }
1830            _ => panic!("Expected AuditFailure evidence"),
1831        }
1832    }
1833}