Skip to main content

ant_node/replication/
audit.rs

1//! Storage audit protocol (Section 15).
2//!
3//! Challenge-response for claimed holders. Anti-outsourcing protection.
4
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::logging::{debug, enabled, info, warn};
10use rand::seq::SliceRandom;
11use rand::Rng;
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
15use crate::replication::protocol::{
16    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
17    ReplicationMessageBody, ABSENT_KEY_DIGEST,
18};
19use crate::replication::types::{
20    AuditFailureReason, AuditFailureSummary, FailureEvidence, PeerSyncRecord, RepairProofs,
21};
22use crate::storage::LmdbStorage;
23use saorsa_core::identity::PeerId;
24use saorsa_core::P2PNode;
25use tokio::sync::RwLock;
26
27// ---------------------------------------------------------------------------
28// Audit tick result
29// ---------------------------------------------------------------------------
30
31/// Result of an audit tick.
32#[derive(Debug)]
33pub enum AuditTickResult {
34    /// Audit completed successfully (all digests matched).
35    Passed {
36        /// The peer that was challenged.
37        challenged_peer: PeerId,
38        /// Number of keys verified.
39        keys_checked: usize,
40    },
41    /// Audit found failures (after responsibility confirmation).
42    Failed {
43        /// Evidence of the failure for trust engine.
44        evidence: FailureEvidence,
45    },
46    /// Audit target claimed bootstrapping.
47    BootstrapClaim {
48        /// The peer claiming bootstrap status.
49        peer: PeerId,
50    },
51    /// No eligible peers for audit this tick.
52    Idle,
53    /// Audit skipped (not enough local keys).
54    InsufficientKeys,
55}
56
57/// Format the first challenged key as a short stable correlation label.
58fn first_challenged_key_label(keys: &[XorName]) -> String {
59    keys.first().map_or_else(
60        || "0x".to_string(),
61        |k| format!("0x{}", hex::encode(&k[..8])),
62    )
63}
64
65/// Best-effort coarse class for the transport/request error returned by
66/// `P2PNode::send_request`.
67///
68/// The current core networking layer can wrap request deadline timeouts inside
69/// display strings, so this deliberately remains a bounded heuristic for logs
70/// rather than protocol/trust semantics.
71fn classify_audit_send_error(error: &str) -> &'static str {
72    let lower = error.to_ascii_lowercase();
73    if lower.contains("timed out") || lower.contains("timeout") {
74        "timeout"
75    } else if lower.contains("peer not found") || lower.contains("no channel") {
76        "peer_unavailable"
77    } else if lower.contains("connection") || lower.contains("connect") || lower.contains("dial") {
78        "connection_failed"
79    } else if lower.contains("closed") || lower.contains("dropped") {
80        "connection_closed"
81    } else if lower.contains("transport") {
82        "transport_error"
83    } else {
84        "other"
85    }
86}
87
88// ---------------------------------------------------------------------------
89// Main audit tick
90// ---------------------------------------------------------------------------
91
92/// Execute one audit tick (Section 15 steps 2-9).
93///
94/// Returns the audit result. Caller is responsible for emitting trust events.
95///
96/// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if
97/// `is_bootstrapping` is `true` — a node must not audit others while it
98/// is still bootstrapping.
99#[allow(clippy::implicit_hasher)]
100pub async fn audit_tick(
101    p2p_node: &Arc<P2PNode>,
102    storage: &Arc<LmdbStorage>,
103    config: &ReplicationConfig,
104    sync_history: &HashMap<PeerId, PeerSyncRecord>,
105    is_bootstrapping: bool,
106) -> AuditTickResult {
107    let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
108    audit_tick_with_repair_proofs(
109        p2p_node,
110        storage,
111        config,
112        sync_history,
113        &repair_proofs,
114        0,
115        is_bootstrapping,
116    )
117    .await
118}
119
120/// Execute one repair-proof-gated audit tick.
121///
122/// This is the production path used by the replication engine. The
123/// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct
124/// callers that have not adopted repair proofs remain conservative and do not
125/// audit peers for unproven keys.
126#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
127pub async fn audit_tick_with_repair_proofs(
128    p2p_node: &Arc<P2PNode>,
129    storage: &Arc<LmdbStorage>,
130    config: &ReplicationConfig,
131    sync_history: &HashMap<PeerId, PeerSyncRecord>,
132    repair_proofs: &Arc<RwLock<RepairProofs>>,
133    current_sync_epoch: u64,
134    is_bootstrapping: bool,
135) -> AuditTickResult {
136    // Invariant 19: never audit while still bootstrapping.
137    if is_bootstrapping {
138        return AuditTickResult::Idle;
139    }
140
141    let dht = p2p_node.dht_manager();
142
143    // Step 2: Select one eligible peer (has RepairOpportunity) at random.
144    // Peers with active bootstrap claims remain eligible. A follow-up audit is
145    // how we observe a continued claim and apply past-grace abuse handling.
146    let eligible_peers = eligible_audit_peers(sync_history);
147
148    if eligible_peers.is_empty() {
149        return AuditTickResult::Idle;
150    }
151
152    let (challenged_peer, nonce, challenge_id) = {
153        let mut rng = rand::thread_rng();
154        let selected = match eligible_peers.choose(&mut rng) {
155            Some(p) => *p,
156            None => return AuditTickResult::Idle,
157        };
158        let n: [u8; 32] = rng.gen();
159        let c: u64 = rng.gen();
160        (selected, n, c)
161    };
162
163    // Step 3: Sample keys from local store and keep those the peer is
164    // responsible for (appears in the close group via local RT lookup).
165    let all_keys = match storage.all_keys().await {
166        Ok(keys) => keys,
167        Err(e) => {
168            warn!("Audit: failed to read local keys: {e}");
169            return AuditTickResult::Idle;
170        }
171    };
172
173    if all_keys.is_empty() {
174        return AuditTickResult::Idle;
175    }
176
177    let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
178    let sampled_keys: Vec<XorName> = {
179        let mut rng = rand::thread_rng();
180        all_keys
181            .choose_multiple(&mut rng, sample_count)
182            .copied()
183            .collect()
184    };
185
186    // Step 4: Filter to keys where the chosen peer is in the close group and
187    // this node has proof that it already sent the peer a repair hint for the
188    // specific key.
189    let mut sampled_key_groups = Vec::new();
190    for key in &sampled_keys {
191        let closest = dht
192            .find_closest_nodes_local_with_self(key, config.close_group_size)
193            .await;
194        let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
195        if close_peers.contains(&challenged_peer) {
196            sampled_key_groups.push((*key, close_peers));
197        }
198    }
199
200    let peer_keys = {
201        let mut proofs = repair_proofs.write().await;
202        let now = Instant::now();
203        mature_audit_keys_for_peer(
204            &challenged_peer,
205            sampled_key_groups,
206            &mut proofs,
207            current_sync_epoch,
208            now,
209        )
210    };
211
212    if peer_keys.is_empty() {
213        return AuditTickResult::Idle;
214    }
215
216    // peer_keys is naturally bounded by audit_sample_count (sqrt-scaled),
217    // so no explicit truncation needed.
218
219    // Step 6: Send challenge.
220
221    let challenge = AuditChallenge {
222        challenge_id,
223        nonce,
224        challenged_peer_id: *challenged_peer.as_bytes(),
225        keys: peer_keys.clone(),
226    };
227
228    let msg = ReplicationMessage {
229        request_id: challenge_id,
230        body: ReplicationMessageBody::AuditChallenge(challenge),
231    };
232
233    let encoded = match msg.encode() {
234        Ok(data) => data,
235        Err(e) => {
236            warn!("Audit: failed to encode challenge: {e}");
237            return AuditTickResult::Idle;
238        }
239    };
240
241    let encoded_len = encoded.len();
242    let audit_timeout = config.audit_response_timeout(peer_keys.len());
243    let audit_started = Instant::now();
244    let response = match p2p_node
245        .send_request(
246            &challenged_peer,
247            REPLICATION_PROTOCOL_ID,
248            encoded,
249            audit_timeout,
250        )
251        .await
252    {
253        Ok(resp) => resp,
254        Err(e) => {
255            if enabled!(crate::logging::Level::WARN) {
256                let elapsed = audit_started.elapsed();
257                let send_error = e.to_string();
258                let send_error_class = classify_audit_send_error(&send_error);
259                let first_key = first_challenged_key_label(&peer_keys);
260                warn!(
261                    audit_type = "responsible_chunk",
262                    audit_phase = "challenge_send",
263                    audit_outcome = "send_request_failed",
264                    challenged_peer = %challenged_peer,
265                    challenge_id,
266                    key_count = peer_keys.len(),
267                    timeout_ms = audit_timeout.as_millis(),
268                    elapsed_ms = elapsed.as_millis(),
269                    first_key = %first_key,
270                    encoded_len,
271                    send_error_class,
272                    "Audit challenge send_request failed: audit_type=responsible_chunk, audit_phase=challenge_send, audit_outcome=send_request_failed, challenged_peer={challenged_peer}, challenge_id={challenge_id}, key_count={}, timeout_ms={}, elapsed_ms={}, first_key={first_key}, encoded_len={encoded_len}, send_error_class={send_error_class}",
273                    peer_keys.len(),
274                    audit_timeout.as_millis(),
275                    elapsed.as_millis(),
276                );
277            }
278            debug!(
279                challenged_peer = %challenged_peer,
280                challenge_id,
281                send_error = %e,
282                "Audit challenge raw send_request error"
283            );
284            // Timeout — need responsibility confirmation before penalty.
285            return handle_audit_timeout(
286                &challenged_peer,
287                challenge_id,
288                &peer_keys,
289                p2p_node,
290                config,
291            )
292            .await;
293        }
294    };
295
296    // Step 7: Parse response.
297    let resp_msg = match ReplicationMessage::decode(&response.data) {
298        Ok(m) => m,
299        Err(e) => {
300            warn!("Audit: failed to decode response from {challenged_peer}: {e}");
301            return handle_audit_failure(
302                &challenged_peer,
303                challenge_id,
304                &peer_keys,
305                AuditFailureReason::MalformedResponse,
306                p2p_node,
307                config,
308            )
309            .await;
310        }
311    };
312
313    match resp_msg.body {
314        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
315            challenge_id: resp_id,
316        }) => {
317            if resp_id != challenge_id {
318                warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
319                return handle_audit_failure(
320                    &challenged_peer,
321                    challenge_id,
322                    &peer_keys,
323                    AuditFailureReason::MalformedResponse,
324                    p2p_node,
325                    config,
326                )
327                .await;
328            }
329            // Step 7b: Bootstrapping claim.
330            AuditTickResult::BootstrapClaim {
331                peer: challenged_peer,
332            }
333        }
334        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
335            challenge_id: resp_id,
336            digests,
337        }) => {
338            if resp_id != challenge_id {
339                warn!("Audit: challenge ID mismatch from {challenged_peer}");
340                return handle_audit_failure(
341                    &challenged_peer,
342                    challenge_id,
343                    &peer_keys,
344                    AuditFailureReason::MalformedResponse,
345                    p2p_node,
346                    config,
347                )
348                .await;
349            }
350            verify_digests(
351                &challenged_peer,
352                challenge_id,
353                &nonce,
354                &peer_keys,
355                &digests,
356                storage,
357                p2p_node,
358                config,
359            )
360            .await
361        }
362        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
363            challenge_id: resp_id,
364            reason,
365        }) => {
366            if resp_id != challenge_id {
367                warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
368                return handle_audit_failure(
369                    &challenged_peer,
370                    challenge_id,
371                    &peer_keys,
372                    AuditFailureReason::MalformedResponse,
373                    p2p_node,
374                    config,
375                )
376                .await;
377            }
378            warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
379            handle_audit_failure(
380                &challenged_peer,
381                challenge_id,
382                &peer_keys,
383                AuditFailureReason::Rejected,
384                p2p_node,
385                config,
386            )
387            .await
388        }
389        _ => {
390            warn!("Audit: unexpected response type from {challenged_peer}");
391            handle_audit_failure(
392                &challenged_peer,
393                challenge_id,
394                &peer_keys,
395                AuditFailureReason::MalformedResponse,
396                p2p_node,
397                config,
398            )
399            .await
400        }
401    }
402}
403
404fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
405    sync_history
406        .iter()
407        .filter(|(_, record)| record.has_repair_opportunity())
408        .map(|(peer, _)| *peer)
409        .collect()
410}
411
412fn mature_audit_keys_for_peer(
413    challenged_peer: &PeerId,
414    sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
415    repair_proofs: &mut RepairProofs,
416    current_sync_epoch: u64,
417    now: Instant,
418) -> Vec<XorName> {
419    sampled_key_groups
420        .into_iter()
421        .filter_map(|(key, close_peers)| {
422            repair_proofs
423                .has_mature_replica_hint(
424                    challenged_peer,
425                    &key,
426                    &close_peers,
427                    current_sync_epoch,
428                    now,
429                )
430                .then_some(key)
431        })
432        .collect()
433}
434
435#[derive(Debug, Clone, Copy, PartialEq, Eq)]
436enum AuditKeyFailureKind {
437    Absent,
438    DigestMismatch,
439    Unclassified,
440}
441
442#[derive(Debug, Clone, Copy, PartialEq, Eq)]
443struct AuditKeyFailure {
444    key: XorName,
445    kind: AuditKeyFailureKind,
446}
447
448impl AuditKeyFailure {
449    fn absent(key: XorName) -> Self {
450        Self {
451            key,
452            kind: AuditKeyFailureKind::Absent,
453        }
454    }
455
456    fn digest_mismatch(key: XorName) -> Self {
457        Self {
458            key,
459            kind: AuditKeyFailureKind::DigestMismatch,
460        }
461    }
462
463    fn unclassified(key: XorName) -> Self {
464        Self {
465            key,
466            kind: AuditKeyFailureKind::Unclassified,
467        }
468    }
469}
470
471fn build_audit_failure_summary(
472    challenged_key_count: usize,
473    confirmed_failures: &[AuditKeyFailure],
474) -> AuditFailureSummary {
475    let mut summary = AuditFailureSummary {
476        challenged_keys: challenged_key_count,
477        failed_keys: confirmed_failures.len(),
478        ..AuditFailureSummary::default()
479    };
480
481    for failure in confirmed_failures {
482        match failure.kind {
483            AuditKeyFailureKind::Absent => summary.absent_keys += 1,
484            AuditKeyFailureKind::DigestMismatch => summary.digest_mismatch_keys += 1,
485            AuditKeyFailureKind::Unclassified => {}
486        }
487    }
488
489    summary
490}
491
492fn audit_digest_failure_reason(confirmed_failures: &[AuditKeyFailure]) -> AuditFailureReason {
493    if confirmed_failures
494        .iter()
495        .all(|failure| failure.kind == AuditKeyFailureKind::Absent)
496    {
497        AuditFailureReason::KeyAbsent
498    } else {
499        AuditFailureReason::DigestMismatch
500    }
501}
502
503// ---------------------------------------------------------------------------
504// Digest verification
505// ---------------------------------------------------------------------------
506
507/// Verify per-key digests from audit response (Step 8).
508#[allow(clippy::too_many_arguments)]
509async fn verify_digests(
510    challenged_peer: &PeerId,
511    challenge_id: u64,
512    nonce: &[u8; 32],
513    keys: &[XorName],
514    digests: &[[u8; 32]],
515    storage: &Arc<LmdbStorage>,
516    p2p_node: &Arc<P2PNode>,
517    config: &ReplicationConfig,
518) -> AuditTickResult {
519    // Requirement: response must have exactly one digest per key.
520    if digests.len() != keys.len() {
521        warn!(
522            "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
523            digests.len(),
524            keys.len()
525        );
526        return handle_audit_failure(
527            challenged_peer,
528            challenge_id,
529            keys,
530            AuditFailureReason::MalformedResponse,
531            p2p_node,
532            config,
533        )
534        .await;
535    }
536
537    let challenged_peer_bytes = challenged_peer.as_bytes();
538    let mut failed_keys = Vec::new();
539
540    for (i, key) in keys.iter().enumerate() {
541        let received_digest = &digests[i];
542
543        // Check for absent sentinel.
544        if *received_digest == ABSENT_KEY_DIGEST {
545            failed_keys.push(AuditKeyFailure::absent(*key));
546            continue;
547        }
548
549        // Recompute expected digest from local copy.
550        let local_bytes = match storage.get_raw(key).await {
551            Ok(Some(bytes)) => bytes,
552            Ok(None) => {
553                // We should hold this key (we sampled it), but it's gone.
554                warn!(
555                    "Audit: local key {} disappeared during audit",
556                    hex::encode(key)
557                );
558                continue;
559            }
560            Err(e) => {
561                warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
562                continue;
563            }
564        };
565
566        let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
567        if *received_digest != expected {
568            failed_keys.push(AuditKeyFailure::digest_mismatch(*key));
569        }
570    }
571
572    if failed_keys.is_empty() {
573        info!(
574            "Audit: peer {challenged_peer} passed (all {} keys verified)",
575            keys.len()
576        );
577        return AuditTickResult::Passed {
578            challenged_peer: *challenged_peer,
579            keys_checked: keys.len(),
580        };
581    }
582
583    // Step 9: Responsibility confirmation for failed keys.
584    handle_classified_audit_failure(
585        challenged_peer,
586        challenge_id,
587        &failed_keys,
588        AuditFailureReason::DigestMismatch,
589        keys.len(),
590        p2p_node,
591        config,
592    )
593    .await
594}
595
596// ---------------------------------------------------------------------------
597// Failure handling with responsibility confirmation
598// ---------------------------------------------------------------------------
599
600/// Handle audit failure: confirm responsibility before emitting evidence (Step 9).
601async fn handle_audit_failure(
602    challenged_peer: &PeerId,
603    challenge_id: u64,
604    failed_keys: &[XorName],
605    reason: AuditFailureReason,
606    p2p_node: &Arc<P2PNode>,
607    config: &ReplicationConfig,
608) -> AuditTickResult {
609    let failures = failed_keys
610        .iter()
611        .copied()
612        .map(AuditKeyFailure::unclassified)
613        .collect::<Vec<_>>();
614    handle_classified_audit_failure(
615        challenged_peer,
616        challenge_id,
617        &failures,
618        reason,
619        failed_keys.len(),
620        p2p_node,
621        config,
622    )
623    .await
624}
625
626async fn handle_classified_audit_failure(
627    challenged_peer: &PeerId,
628    challenge_id: u64,
629    failed_keys: &[AuditKeyFailure],
630    reason: AuditFailureReason,
631    challenged_key_count: usize,
632    p2p_node: &Arc<P2PNode>,
633    config: &ReplicationConfig,
634) -> AuditTickResult {
635    let dht = p2p_node.dht_manager();
636    let mut confirmed_failures = Vec::new();
637
638    // Step 9a-b: Fresh local RT lookup for each failed key.
639    for failure in failed_keys {
640        let closest = dht
641            .find_closest_nodes_local_with_self(&failure.key, config.close_group_size)
642            .await;
643        if closest.iter().any(|n| n.peer_id == *challenged_peer) {
644            confirmed_failures.push(*failure);
645        } else {
646            debug!(
647                "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
648                hex::encode(failure.key)
649            );
650        }
651    }
652
653    // Step 9c: Empty confirmed set -> peer is no longer responsible for any
654    // of the failed keys (topology churn). This is NOT a pass — the peer did
655    // not prove it stores the data. Return Idle to avoid granting unearned
656    // positive trust.
657    if confirmed_failures.is_empty() {
658        info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
659        return AuditTickResult::Idle;
660    }
661
662    let summary = build_audit_failure_summary(challenged_key_count, &confirmed_failures);
663    let reason = if reason == AuditFailureReason::DigestMismatch {
664        audit_digest_failure_reason(&confirmed_failures)
665    } else {
666        reason
667    };
668    let confirmed_failed_keys = confirmed_failures
669        .iter()
670        .map(|failure| failure.key)
671        .collect();
672
673    // Step 9d: Non-empty confirmed set -> emit evidence.
674    let evidence = FailureEvidence::AuditFailure {
675        challenge_id,
676        challenged_peer: *challenged_peer,
677        confirmed_failed_keys,
678        summary,
679        reason,
680    };
681
682    AuditTickResult::Failed { evidence }
683}
684
685/// Handle audit timeout (no response received).
686async fn handle_audit_timeout(
687    challenged_peer: &PeerId,
688    challenge_id: u64,
689    keys: &[XorName],
690    p2p_node: &Arc<P2PNode>,
691    config: &ReplicationConfig,
692) -> AuditTickResult {
693    handle_audit_failure(
694        challenged_peer,
695        challenge_id,
696        keys,
697        AuditFailureReason::Timeout,
698        p2p_node,
699        config,
700    )
701    .await
702}
703
704// ---------------------------------------------------------------------------
705// Responder-side handler
706// ---------------------------------------------------------------------------
707
708/// Handle an incoming audit challenge (responder side).
709///
710/// Validates that the challenge targets this node, computes per-key digests,
711/// and returns the response.  Rejects challenges where
712/// `challenged_peer_id` does not match `self_peer_id` to prevent an oracle
713/// attack where a malicious challenger forges digests for a different peer.
714pub async fn handle_audit_challenge(
715    challenge: &AuditChallenge,
716    storage: &LmdbStorage,
717    self_peer_id: &PeerId,
718    is_bootstrapping: bool,
719    stored_chunks: usize,
720) -> AuditResponse {
721    if is_bootstrapping {
722        return AuditResponse::Bootstrapping {
723            challenge_id: challenge.challenge_id,
724        };
725    }
726
727    if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
728        warn!(
729            "Audit challenge targeted wrong peer: expected {}, got {}",
730            hex::encode(self_peer_id.as_bytes()),
731            hex::encode(challenge.challenged_peer_id),
732        );
733        return AuditResponse::Rejected {
734            challenge_id: challenge.challenge_id,
735            reason: "challenged_peer_id does not match this node".to_string(),
736        };
737    }
738
739    let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
740    if challenge.keys.len() > max_keys {
741        warn!(
742            "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
743             (stored_chunks={stored_chunks})",
744            challenge.keys.len(),
745        );
746        return AuditResponse::Rejected {
747            challenge_id: challenge.challenge_id,
748            reason: format!(
749                "challenge contains {} keys, limit is {max_keys}",
750                challenge.keys.len()
751            ),
752        };
753    }
754
755    let mut digests = Vec::with_capacity(challenge.keys.len());
756
757    for key in &challenge.keys {
758        match storage.get_raw(key).await {
759            Ok(Some(data)) => {
760                let digest = compute_audit_digest(
761                    &challenge.nonce,
762                    &challenge.challenged_peer_id,
763                    key,
764                    &data,
765                );
766                digests.push(digest);
767            }
768            Ok(None) => {
769                digests.push(ABSENT_KEY_DIGEST);
770            }
771            Err(e) => {
772                warn!(
773                    "Audit responder: failed to read key {}: {e}",
774                    hex::encode(key)
775                );
776                digests.push(ABSENT_KEY_DIGEST);
777            }
778        }
779    }
780
781    AuditResponse::Digests {
782        challenge_id: challenge.challenge_id,
783        digests,
784    }
785}
786
787// ---------------------------------------------------------------------------
788// Tests
789// ---------------------------------------------------------------------------
790
791#[cfg(test)]
792#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
793mod tests {
794    use super::*;
795    use crate::replication::config::REPAIR_HINT_MIN_AGE;
796    use crate::replication::protocol::compute_audit_digest;
797    use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
798    use crate::storage::LmdbStorageConfig;
799    use std::time::Instant;
800    use tempfile::TempDir;
801
802    /// Simulated stored chunk count for tests. Large enough that the dynamic
803    /// incoming audit limit (`2 * sqrt(N)`) never rejects small test challenges.
804    const TEST_STORED_CHUNKS: usize = 1_000_000;
805
806    #[test]
807    fn first_challenged_key_label_truncates_to_16_hex_chars() {
808        let mut key = [0u8; 32];
809        key[0] = 0xAB;
810        key[7] = 0xCD;
811        key[8] = 0xEF;
812
813        assert_eq!(first_challenged_key_label(&[key]), "0xab000000000000cd");
814    }
815
816    #[test]
817    fn first_challenged_key_label_falls_back_when_empty() {
818        assert_eq!(first_challenged_key_label(&[]), "0x");
819    }
820
821    #[test]
822    fn classify_audit_send_error_uses_bounded_classes() {
823        assert_eq!(
824            classify_audit_send_error("request to peer timed out after 10s"),
825            "timeout"
826        );
827        assert_eq!(
828            classify_audit_send_error("peer not found in active channels"),
829            "peer_unavailable"
830        );
831        assert_eq!(
832            classify_audit_send_error("dial failed for all candidate addresses"),
833            "connection_failed"
834        );
835        assert_eq!(
836            classify_audit_send_error("response receiver dropped before delivery"),
837            "connection_closed"
838        );
839        assert_eq!(
840            classify_audit_send_error("transport stream error"),
841            "transport_error"
842        );
843        assert_eq!(classify_audit_send_error("unexpected error"), "other");
844    }
845
846    /// Create a test `LmdbStorage` backed by a temp directory.
847    async fn create_test_storage() -> (LmdbStorage, TempDir) {
848        let temp_dir = TempDir::new().expect("create temp dir");
849        let config = LmdbStorageConfig {
850            root_dir: temp_dir.path().to_path_buf(),
851            verify_on_read: false,
852            max_map_size: 0,
853            disk_reserve: 0,
854        };
855        let storage = LmdbStorage::new(config).await.expect("create storage");
856        (storage, temp_dir)
857    }
858
859    /// Build a challenge with the given parameters.
860    fn make_challenge(
861        challenge_id: u64,
862        nonce: [u8; 32],
863        peer_id: [u8; 32],
864        keys: Vec<XorName>,
865    ) -> AuditChallenge {
866        AuditChallenge {
867            challenge_id,
868            nonce,
869            challenged_peer_id: peer_id,
870            keys,
871        }
872    }
873
874    /// Build a `PeerId` matching the raw bytes used in a challenge.
875    fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
876        PeerId::from_bytes(bytes)
877    }
878
879    // -- handle_audit_challenge: present keys ---------------------------------
880
881    #[tokio::test]
882    async fn handle_challenge_present_keys_returns_correct_digests() {
883        let (storage, _temp) = create_test_storage().await;
884
885        // Store two chunks.
886        let content_a = b"chunk alpha";
887        let addr_a = LmdbStorage::compute_address(content_a);
888        storage.put(&addr_a, content_a).await.expect("put a");
889
890        let content_b = b"chunk beta";
891        let addr_b = LmdbStorage::compute_address(content_b);
892        storage.put(&addr_b, content_b).await.expect("put b");
893
894        let nonce = [0xAA; 32];
895        let peer_id = [0xBB; 32];
896        let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
897        let self_id = peer_id_from_bytes(peer_id);
898
899        let response =
900            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
901
902        match response {
903            AuditResponse::Digests {
904                challenge_id,
905                digests,
906            } => {
907                assert_eq!(challenge_id, 42);
908                assert_eq!(digests.len(), 2);
909
910                let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
911                let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
912                assert_eq!(digests[0], expected_a);
913                assert_eq!(digests[1], expected_b);
914            }
915            AuditResponse::Bootstrapping { .. } => {
916                panic!("expected Digests, got Bootstrapping");
917            }
918            AuditResponse::Rejected { .. } => {
919                panic!("Unexpected Rejected response");
920            }
921        }
922    }
923
924    // -- handle_audit_challenge: absent keys ----------------------------------
925
926    #[tokio::test]
927    async fn handle_challenge_absent_keys_returns_sentinel() {
928        let (storage, _temp) = create_test_storage().await;
929
930        let absent_key = [0xFF; 32];
931        let nonce = [0x11; 32];
932        let peer_id = [0x22; 32];
933        let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
934        let self_id = peer_id_from_bytes(peer_id);
935
936        let response =
937            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
938
939        match response {
940            AuditResponse::Digests {
941                challenge_id,
942                digests,
943            } => {
944                assert_eq!(challenge_id, 99);
945                assert_eq!(digests.len(), 1);
946                assert_eq!(
947                    digests[0], ABSENT_KEY_DIGEST,
948                    "absent key should produce sentinel digest"
949                );
950            }
951            AuditResponse::Bootstrapping { .. } => {
952                panic!("expected Digests, got Bootstrapping");
953            }
954            AuditResponse::Rejected { .. } => {
955                panic!("Unexpected Rejected response");
956            }
957        }
958    }
959
960    // -- handle_audit_challenge: mixed present and absent ---------------------
961
962    #[tokio::test]
963    async fn handle_challenge_mixed_present_and_absent() {
964        let (storage, _temp) = create_test_storage().await;
965
966        let content = b"present chunk";
967        let addr_present = LmdbStorage::compute_address(content);
968        storage.put(&addr_present, content).await.expect("put");
969
970        let addr_absent = [0xDE; 32];
971        let nonce = [0x33; 32];
972        let peer_id = [0x44; 32];
973        let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
974        let self_id = peer_id_from_bytes(peer_id);
975
976        let response =
977            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
978
979        match response {
980            AuditResponse::Digests { digests, .. } => {
981                assert_eq!(digests.len(), 2);
982
983                let expected_present =
984                    compute_audit_digest(&nonce, &peer_id, &addr_present, content);
985                assert_eq!(digests[0], expected_present);
986                assert_eq!(
987                    digests[1], ABSENT_KEY_DIGEST,
988                    "absent key should be sentinel"
989                );
990            }
991            AuditResponse::Bootstrapping { .. } => {
992                panic!("expected Digests, got Bootstrapping");
993            }
994            AuditResponse::Rejected { .. } => {
995                panic!("Unexpected Rejected response");
996            }
997        }
998    }
999
1000    // -- handle_audit_challenge: bootstrapping --------------------------------
1001
1002    #[tokio::test]
1003    async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
1004        let (storage, _temp) = create_test_storage().await;
1005
1006        let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
1007        let self_id = peer_id_from_bytes([0x01; 32]);
1008
1009        let response =
1010            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1011
1012        match response {
1013            AuditResponse::Bootstrapping { challenge_id } => {
1014                assert_eq!(challenge_id, 55);
1015            }
1016            AuditResponse::Digests { .. } => {
1017                panic!("expected Bootstrapping, got Digests");
1018            }
1019            AuditResponse::Rejected { .. } => {
1020                panic!("Unexpected Rejected response");
1021            }
1022        }
1023    }
1024
1025    // -- handle_audit_challenge: empty key list -------------------------------
1026
1027    #[tokio::test]
1028    async fn handle_challenge_empty_keys_returns_empty_digests() {
1029        let (storage, _temp) = create_test_storage().await;
1030
1031        let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
1032        let self_id = peer_id_from_bytes([0x20; 32]);
1033
1034        let response =
1035            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1036
1037        match response {
1038            AuditResponse::Digests {
1039                challenge_id,
1040                digests,
1041            } => {
1042                assert_eq!(challenge_id, 100);
1043                assert!(
1044                    digests.is_empty(),
1045                    "empty key list should yield empty digests"
1046                );
1047            }
1048            AuditResponse::Bootstrapping { .. } => {
1049                panic!("expected Digests, got Bootstrapping");
1050            }
1051            AuditResponse::Rejected { .. } => {
1052                panic!("Unexpected Rejected response");
1053            }
1054        }
1055    }
1056
1057    // -- Digest verification: matching ----------------------------------------
1058
1059    #[test]
1060    fn digest_verification_matching() {
1061        let nonce = [0x01; 32];
1062        let peer_id = [0x02; 32];
1063        let key: XorName = [0x03; 32];
1064        let data = b"correct data";
1065
1066        let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
1067        let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
1068
1069        assert_eq!(
1070            expected, recomputed,
1071            "same inputs must produce identical digests"
1072        );
1073        assert_ne!(
1074            expected, ABSENT_KEY_DIGEST,
1075            "real digest must not be sentinel"
1076        );
1077    }
1078
1079    // -- Digest verification: mismatching -------------------------------------
1080
1081    #[test]
1082    fn digest_verification_mismatching_data() {
1083        let nonce = [0x01; 32];
1084        let peer_id = [0x02; 32];
1085        let key: XorName = [0x03; 32];
1086
1087        let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
1088        let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
1089
1090        assert_ne!(
1091            digest_a, digest_b,
1092            "different data must produce different digests"
1093        );
1094    }
1095
1096    #[test]
1097    fn digest_verification_mismatching_nonce() {
1098        let peer_id = [0x02; 32];
1099        let key: XorName = [0x03; 32];
1100        let data = b"same data";
1101
1102        let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
1103        let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
1104
1105        assert_ne!(
1106            digest_a, digest_b,
1107            "different nonces must produce different digests"
1108        );
1109    }
1110
1111    #[test]
1112    fn digest_verification_mismatching_peer() {
1113        let nonce = [0x01; 32];
1114        let key: XorName = [0x03; 32];
1115        let data = b"same data";
1116
1117        let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
1118        let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
1119
1120        assert_ne!(
1121            digest_a, digest_b,
1122            "different peers must produce different digests"
1123        );
1124    }
1125
1126    #[test]
1127    fn digest_verification_mismatching_key() {
1128        let nonce = [0x01; 32];
1129        let peer_id = [0x02; 32];
1130        let data = b"same data";
1131
1132        let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
1133        let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
1134
1135        assert_ne!(
1136            digest_a, digest_b,
1137            "different keys must produce different digests"
1138        );
1139    }
1140
1141    // -- Absent sentinel is all zeros -----------------------------------------
1142
1143    #[test]
1144    fn absent_sentinel_is_all_zeros() {
1145        assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
1146    }
1147
1148    // -- Bootstrapping skips digest computation even with stored keys ---------
1149
1150    #[tokio::test]
1151    async fn bootstrapping_skips_digest_computation() {
1152        let (storage, _temp) = create_test_storage().await;
1153
1154        let content = b"stored but bootstrapping";
1155        let addr = LmdbStorage::compute_address(content);
1156        storage.put(&addr, content).await.expect("put");
1157
1158        let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
1159        let self_id = peer_id_from_bytes([0xDD; 32]);
1160
1161        let response =
1162            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1163
1164        assert!(
1165            matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
1166            "bootstrapping node must not compute digests"
1167        );
1168    }
1169
1170    // -- Scenario 19/53: Partial failure with mixed responsibility ----------------
1171
1172    #[tokio::test]
1173    async fn scenario_19_partial_failure_mixed_responsibility() {
1174        // Three keys challenged: K1 matches, K2 mismatches, K3 absent.
1175        // After responsibility confirmation, only K2 is confirmed responsible.
1176        // AuditFailure emitted for {K2} only.
1177        // Test handle_audit_challenge with mixed results, then verify
1178        // the digest logic manually.
1179
1180        let (storage, _temp) = create_test_storage().await;
1181        let nonce = [0x42u8; 32];
1182        let peer_id = [0xAA; 32];
1183
1184        // Store K1 and K2, but NOT K3
1185        let content_k1 = b"key one data";
1186        let addr_k1 = LmdbStorage::compute_address(content_k1);
1187        storage.put(&addr_k1, content_k1).await.unwrap();
1188
1189        let content_k2 = b"key two data";
1190        let addr_k2 = LmdbStorage::compute_address(content_k2);
1191        storage.put(&addr_k2, content_k2).await.unwrap();
1192
1193        let addr_k3 = [0xFF; 32]; // Not stored
1194
1195        let challenge = AuditChallenge {
1196            challenge_id: 100,
1197            nonce,
1198            challenged_peer_id: peer_id,
1199            keys: vec![addr_k1, addr_k2, addr_k3],
1200        };
1201        let self_id = peer_id_from_bytes(peer_id);
1202
1203        let response =
1204            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1205
1206        match response {
1207            AuditResponse::Digests { digests, .. } => {
1208                assert_eq!(digests.len(), 3);
1209
1210                // K1 should have correct digest
1211                let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
1212                assert_eq!(digests[0], expected_k1);
1213
1214                // K2 should have correct digest
1215                let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
1216                assert_eq!(digests[1], expected_k2);
1217
1218                // K3 absent -> sentinel
1219                assert_eq!(digests[2], ABSENT_KEY_DIGEST);
1220            }
1221            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
1222            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1223        }
1224    }
1225
1226    // -- Scenario 54: All digests pass -------------------------------------------
1227
1228    #[tokio::test]
1229    async fn scenario_54_all_digests_pass() {
1230        // All challenged keys present and digests match.
1231        // Multiple keys to strengthen coverage beyond existing two-key tests.
1232        let (storage, _temp) = create_test_storage().await;
1233        let nonce = [0x10; 32];
1234        let peer_id = [0x20; 32];
1235
1236        let c1 = b"chunk alpha";
1237        let c2 = b"chunk beta";
1238        let c3 = b"chunk gamma";
1239        let a1 = LmdbStorage::compute_address(c1);
1240        let a2 = LmdbStorage::compute_address(c2);
1241        let a3 = LmdbStorage::compute_address(c3);
1242        storage.put(&a1, c1).await.unwrap();
1243        storage.put(&a2, c2).await.unwrap();
1244        storage.put(&a3, c3).await.unwrap();
1245
1246        let challenge = AuditChallenge {
1247            challenge_id: 200,
1248            nonce,
1249            challenged_peer_id: peer_id,
1250            keys: vec![a1, a2, a3],
1251        };
1252        let self_id = peer_id_from_bytes(peer_id);
1253
1254        let response =
1255            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1256        match response {
1257            AuditResponse::Digests { digests, .. } => {
1258                assert_eq!(digests.len(), 3);
1259                for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
1260                    .iter()
1261                    .enumerate()
1262                {
1263                    let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
1264                    assert_eq!(digests[i], expected, "Key {i} digest should match");
1265                }
1266            }
1267            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1268            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1269        }
1270    }
1271
1272    // -- Scenario 55: Empty failure set means no evidence -------------------------
1273
1274    /// Scenario 55: Peer challenged on {K1, K2}. Both digests mismatch.
1275    /// Responsibility confirmation shows the peer is NOT responsible for
1276    /// either key. The confirmed failure set is empty — no `AuditFailure`
1277    /// evidence is emitted.
1278    ///
1279    /// Full `verify_digests` requires a live `P2PNode` for network lookups.
1280    /// This test exercises the deterministic sub-steps:
1281    ///   (1) Digest comparison identifies K1 and K2 as mismatches.
1282    ///   (2) Responsibility confirmation removes both keys.
1283    ///   (3) Empty confirmed failure set means no evidence.
1284    #[tokio::test]
1285    async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1286        let (storage, _temp) = create_test_storage().await;
1287        let nonce = [0x55; 32];
1288        let peer_id = [0x55; 32];
1289
1290        // Store K1 and K2 on the challenger (for expected digest computation).
1291        let c1 = b"scenario 55 key one";
1292        let c2 = b"scenario 55 key two";
1293        let k1 = LmdbStorage::compute_address(c1);
1294        let k2 = LmdbStorage::compute_address(c2);
1295        storage.put(&k1, c1).await.expect("put k1");
1296        storage.put(&k2, c2).await.expect("put k2");
1297
1298        // Challenger computes expected digests.
1299        let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1300        let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1301
1302        // Simulate peer returning WRONG digests for both keys.
1303        let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1304        let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1305        assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1306        assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1307
1308        // Step 1: Identify failed keys via digest comparison.
1309        let keys = [k1, k2];
1310        let expected = [expected_d1, expected_d2];
1311        let received = [wrong_d1, wrong_d2];
1312
1313        let mut failed_keys = Vec::new();
1314        for i in 0..keys.len() {
1315            if received[i] != expected[i] {
1316                failed_keys.push(keys[i]);
1317            }
1318        }
1319        assert_eq!(
1320            failed_keys.len(),
1321            2,
1322            "Both keys should be identified as digest mismatches"
1323        );
1324
1325        // Step 2: Responsibility confirmation — peer is NOT responsible for
1326        // either key (simulated by filtering them all out).
1327        let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1328        let confirmed_failures: Vec<XorName> = failed_keys
1329            .into_iter()
1330            .filter(|k| confirmed_responsible_keys.contains(k))
1331            .collect();
1332
1333        // Step 3: Empty confirmed failure set → no AuditFailure evidence.
1334        assert!(
1335            confirmed_failures.is_empty(),
1336            "With no confirmed responsibility, failure set must be empty — \
1337             no AuditFailure evidence should be emitted"
1338        );
1339
1340        // Verify that constructing evidence with empty keys results in a
1341        // no-penalty outcome (the caller checks is_empty before emitting).
1342        let peer = PeerId::from_bytes(peer_id);
1343        let evidence = FailureEvidence::AuditFailure {
1344            challenge_id: 5500,
1345            challenged_peer: peer,
1346            confirmed_failed_keys: confirmed_failures,
1347            summary: AuditFailureSummary::default(),
1348            reason: AuditFailureReason::DigestMismatch,
1349        };
1350        if let FailureEvidence::AuditFailure {
1351            confirmed_failed_keys,
1352            ..
1353        } = evidence
1354        {
1355            assert!(
1356                confirmed_failed_keys.is_empty(),
1357                "Evidence with empty failure set should not trigger a trust penalty"
1358            );
1359        }
1360    }
1361
1362    // -- Scenario 56: RepairOpportunity filters never-synced peers ----------------
1363
1364    #[test]
1365    fn scenario_56_repair_opportunity_filters_never_synced() {
1366        // PeerSyncRecord with last_sync=None should not pass
1367        // has_repair_opportunity().
1368
1369        let never_synced = PeerSyncRecord {
1370            last_sync: None,
1371            cycles_since_sync: 5,
1372        };
1373        assert!(!never_synced.has_repair_opportunity());
1374
1375        let synced_no_cycle = PeerSyncRecord {
1376            last_sync: Some(Instant::now()),
1377            cycles_since_sync: 0,
1378        };
1379        assert!(!synced_no_cycle.has_repair_opportunity());
1380
1381        let synced_with_cycle = PeerSyncRecord {
1382            last_sync: Some(Instant::now()),
1383            cycles_since_sync: 1,
1384        };
1385        assert!(synced_with_cycle.has_repair_opportunity());
1386    }
1387
1388    #[test]
1389    fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1390        let peer = peer_id_from_bytes([0x57; 32]);
1391        let mut sync_history = HashMap::new();
1392        sync_history.insert(
1393            peer,
1394            PeerSyncRecord {
1395                last_sync: Some(Instant::now()),
1396                cycles_since_sync: 1,
1397            },
1398        );
1399
1400        let mut bootstrap_claims = HashMap::new();
1401        let first_seen = Instant::now()
1402            .checked_sub(
1403                crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1404                    + std::time::Duration::from_secs(1),
1405            )
1406            .unwrap_or_else(Instant::now);
1407        bootstrap_claims.insert(peer, first_seen);
1408
1409        let eligible = eligible_audit_peers(&sync_history);
1410
1411        assert!(bootstrap_claims.contains_key(&peer));
1412        assert!(
1413            eligible.contains(&peer),
1414            "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1415        );
1416    }
1417
1418    #[test]
1419    fn audit_failure_summary_counts_confirmed_absent_and_mismatch_keys() {
1420        let absent_key = [0xA1; 32];
1421        let mismatch_key = [0xB2; 32];
1422        let confirmed = vec![
1423            AuditKeyFailure::absent(absent_key),
1424            AuditKeyFailure::digest_mismatch(mismatch_key),
1425        ];
1426
1427        let summary = build_audit_failure_summary(5, &confirmed);
1428
1429        assert_eq!(summary.challenged_keys, 5);
1430        assert_eq!(summary.failed_keys, 2);
1431        assert_eq!(summary.absent_keys, 1);
1432        assert_eq!(summary.digest_mismatch_keys, 1);
1433    }
1434
1435    #[test]
1436    fn audit_failure_summary_leaves_unclassified_rejections_out_of_absent_mismatch_counts() {
1437        let rejected_key = [0xC3; 32];
1438        let confirmed = vec![AuditKeyFailure::unclassified(rejected_key)];
1439
1440        let summary = build_audit_failure_summary(3, &confirmed);
1441
1442        assert_eq!(summary.challenged_keys, 3);
1443        assert_eq!(summary.failed_keys, 1);
1444        assert_eq!(summary.absent_keys, 0);
1445        assert_eq!(summary.digest_mismatch_keys, 0);
1446    }
1447
1448    #[test]
1449    fn audit_digest_failure_reason_is_key_absent_when_all_confirmed_failures_are_absent() {
1450        let failures = vec![AuditKeyFailure::absent([0xD4; 32])];
1451
1452        assert_eq!(
1453            audit_digest_failure_reason(&failures),
1454            AuditFailureReason::KeyAbsent
1455        );
1456    }
1457
1458    #[test]
1459    fn audit_digest_failure_reason_is_digest_mismatch_for_mixed_failures() {
1460        let failures = vec![
1461            AuditKeyFailure::absent([0xD5; 32]),
1462            AuditKeyFailure::digest_mismatch([0xE6; 32]),
1463        ];
1464
1465        assert_eq!(
1466            audit_digest_failure_reason(&failures),
1467            AuditFailureReason::DigestMismatch
1468        );
1469    }
1470
1471    #[test]
1472    fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() {
1473        const HINT_EPOCH: u64 = 7;
1474        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1475        const CHALLENGED_PEER_BYTE: u8 = 0xA1;
1476        const OTHER_PEER_BYTE: u8 = 0xA2;
1477        const NEW_PEER_BYTE: u8 = 0xA3;
1478        const MATURE_KEY_BYTE: u8 = 0xB1;
1479        const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
1480        const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
1481        const STABLE_CHURN_KEY_BYTE: u8 = 0xB4;
1482        const EVICTED_KEY_BYTE: u8 = 0xB5;
1483        const FRESH_HINT_KEY_BYTE: u8 = 0xB6;
1484        const XOR_NAME_LEN: usize = 32;
1485
1486        let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
1487        let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
1488        let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
1489        let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
1490        let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
1491        let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
1492        let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN];
1493        let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN];
1494        let fresh_hint_key = [FRESH_HINT_KEY_BYTE; XOR_NAME_LEN];
1495        let close_group = HashSet::from([challenged_peer, other_peer]);
1496        let changed_close_group = HashSet::from([challenged_peer, new_peer]);
1497        let evicted_close_group = HashSet::from([other_peer, new_peer]);
1498        let mut repair_proofs = RepairProofs::new();
1499        let mature_hinted_at = Instant::now();
1500        let now = mature_hinted_at
1501            .checked_add(REPAIR_HINT_MIN_AGE)
1502            .unwrap_or(mature_hinted_at);
1503
1504        assert!(repair_proofs.record_replica_hint_sent_at(
1505            challenged_peer,
1506            mature_key,
1507            &close_group,
1508            HINT_EPOCH,
1509            mature_hinted_at,
1510        ));
1511        assert!(repair_proofs.record_replica_hint_sent_at(
1512            challenged_peer,
1513            same_epoch_key,
1514            &close_group,
1515            CURRENT_EPOCH,
1516            mature_hinted_at,
1517        ));
1518        assert!(repair_proofs.record_replica_hint_sent_at(
1519            challenged_peer,
1520            stable_churn_key,
1521            &close_group,
1522            HINT_EPOCH,
1523            mature_hinted_at,
1524        ));
1525        assert!(repair_proofs.record_replica_hint_sent_at(
1526            challenged_peer,
1527            evicted_key,
1528            &close_group,
1529            HINT_EPOCH,
1530            mature_hinted_at,
1531        ));
1532        assert!(repair_proofs.record_replica_hint_sent_at(
1533            challenged_peer,
1534            fresh_hint_key,
1535            &close_group,
1536            HINT_EPOCH,
1537            now,
1538        ));
1539
1540        let sampled_key_groups = vec![
1541            (mature_key, close_group.clone()),
1542            (same_epoch_key, close_group.clone()),
1543            (missing_proof_key, close_group.clone()),
1544            (stable_churn_key, changed_close_group),
1545            (evicted_key, evicted_close_group),
1546            (fresh_hint_key, close_group.clone()),
1547        ];
1548        let peer_keys = mature_audit_keys_for_peer(
1549            &challenged_peer,
1550            sampled_key_groups,
1551            &mut repair_proofs,
1552            CURRENT_EPOCH,
1553            now,
1554        );
1555
1556        assert_eq!(
1557            peer_keys,
1558            vec![mature_key, stable_churn_key],
1559            "mature proofs for stable close-group peers should become audit keys, while same-epoch, fresh, missing, and evicted-peer proofs should not"
1560        );
1561    }
1562
1563    // -- Audit response must match key count --------------------------------------
1564
1565    #[tokio::test]
1566    async fn audit_response_must_match_key_count() {
1567        // Section 15: "A response is invalid if it has fewer or more entries
1568        // than challenged keys."
1569        // Verify handle_audit_challenge always produces exactly N digests for
1570        // N keys, including edge cases.
1571
1572        let (storage, _temp) = create_test_storage().await;
1573        let nonce = [0x50; 32];
1574        let peer_id = [0x60; 32];
1575
1576        // Store a single chunk
1577        let content = b"single chunk";
1578        let addr = LmdbStorage::compute_address(content);
1579        storage.put(&addr, content).await.unwrap();
1580
1581        // Challenge with 1 stored + 4 absent = 5 keys total
1582        let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1583        let mut keys = vec![addr];
1584        keys.extend_from_slice(&absent_keys);
1585
1586        let key_count = keys.len();
1587        let challenge = make_challenge(300, nonce, peer_id, keys);
1588        let self_id = peer_id_from_bytes(peer_id);
1589
1590        let response =
1591            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1592        match response {
1593            AuditResponse::Digests { digests, .. } => {
1594                assert_eq!(
1595                    digests.len(),
1596                    key_count,
1597                    "must produce exactly one digest per challenged key"
1598                );
1599            }
1600            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1601            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1602        }
1603    }
1604
1605    // -- Audit digest uses full record bytes --------------------------------------
1606
1607    #[test]
1608    fn audit_digest_uses_full_record_bytes() {
1609        // Verify digest changes when record content changes.
1610        let nonce = [1u8; 32];
1611        let peer = [2u8; 32];
1612        let key = [3u8; 32];
1613
1614        let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1615        let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1616        assert_ne!(
1617            d1, d2,
1618            "Different record bytes must produce different digests"
1619        );
1620    }
1621
1622    // -- Scenario 29: Audit start gate ------------------------------------------
1623
1624    /// Scenario 29: `handle_audit_challenge` returns `Bootstrapping` when the
1625    /// node is still bootstrapping — audit digests are never computed, and no
1626    /// `AuditFailure` evidence is emitted by the caller.
1627    ///
1628    /// This is the responder-side gate.  The challenger-side gate is enforced
1629    /// by `audit_tick`'s `is_bootstrapping` guard (Invariant 19) and by
1630    /// `check_bootstrap_drained()` in the engine loop; this test confirms the
1631    /// complementary responder behavior.
1632    #[tokio::test]
1633    async fn scenario_29_audit_start_gate_during_bootstrap() {
1634        let (storage, _temp) = create_test_storage().await;
1635
1636        // Store data so there *would* be work to audit.
1637        let content = b"should not be audited during bootstrap";
1638        let addr = LmdbStorage::compute_address(content);
1639        storage.put(&addr, content).await.expect("put");
1640
1641        let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1642        let self_id = peer_id_from_bytes([0x29; 32]);
1643
1644        // Responder is bootstrapping → Bootstrapping response, NOT Digests.
1645        let response =
1646            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1647        assert!(
1648            matches!(
1649                response,
1650                AuditResponse::Bootstrapping { challenge_id: 2900 }
1651            ),
1652            "bootstrapping node must not compute digests — audit start gate"
1653        );
1654
1655        // Responder is NOT bootstrapping → normal Digests.
1656        let response =
1657            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1658        assert!(
1659            matches!(response, AuditResponse::Digests { .. }),
1660            "drained node should compute digests normally"
1661        );
1662    }
1663
1664    // -- Scenario 30: Audit peer selection from sampled keys --------------------
1665
1666    /// Scenario 30: Key sampling uses dynamic sqrt-based batch sizing and
1667    /// `RepairOpportunity` filtering excludes never-synced peers.
1668    ///
1669    /// Full `audit_tick` requires a live network.  This test verifies the two
1670    /// deterministic sub-steps the function relies on:
1671    ///   (a) `audit_sample_count` scales with `sqrt(total_keys)`.
1672    ///   (b) `PeerSyncRecord::has_repair_opportunity` gates peer eligibility.
1673    #[test]
1674    fn scenario_30_audit_peer_selection_from_sampled_keys() {
1675        // (a) Dynamic sample count scales with sqrt(total_keys).
1676        assert_eq!(
1677            ReplicationConfig::audit_sample_count(100),
1678            10,
1679            "sample count should scale with sqrt(total_keys)"
1680        );
1681
1682        assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1683
1684        assert_eq!(
1685            ReplicationConfig::audit_sample_count(10_000),
1686            100,
1687            "sqrt(10000) = 100"
1688        );
1689
1690        // (b) Peer eligibility via RepairOpportunity.
1691        // Never synced → not eligible.
1692        let never = PeerSyncRecord {
1693            last_sync: None,
1694            cycles_since_sync: 10,
1695        };
1696        assert!(!never.has_repair_opportunity());
1697
1698        // Synced but zero subsequent cycles → not eligible.
1699        let too_soon = PeerSyncRecord {
1700            last_sync: Some(Instant::now()),
1701            cycles_since_sync: 0,
1702        };
1703        assert!(!too_soon.has_repair_opportunity());
1704
1705        // Synced with ≥1 cycle → eligible.
1706        let eligible = PeerSyncRecord {
1707            last_sync: Some(Instant::now()),
1708            cycles_since_sync: 2,
1709        };
1710        assert!(eligible.has_repair_opportunity());
1711    }
1712
1713    // -- Scenario 32: Dynamic challenge size ------------------------------------
1714
1715    /// Scenario 32: Challenge key count equals `|PeerKeySet(challenged_peer)|`,
1716    /// which is dynamic per round.  If no eligible peer remains after filtering,
1717    /// the tick is idle.
1718    ///
1719    /// Verified via `handle_audit_challenge`: the response digest count always
1720    /// equals the number of keys in the challenge.
1721    #[tokio::test]
1722    async fn scenario_32_dynamic_challenge_size() {
1723        let (storage, _temp) = create_test_storage().await;
1724
1725        // Store varying numbers of chunks.
1726        let mut addrs = Vec::new();
1727        for i in 0u8..5 {
1728            let content = format!("dynamic challenge key {i}");
1729            let addr = LmdbStorage::compute_address(content.as_bytes());
1730            storage.put(&addr, content.as_bytes()).await.expect("put");
1731            addrs.push(addr);
1732        }
1733
1734        let nonce = [0x32; 32];
1735        let peer_id = [0x32; 32];
1736        let self_id = peer_id_from_bytes(peer_id);
1737
1738        // Challenge with 1 key.
1739        let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1740        let resp1 =
1741            handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1742                .await;
1743        if let AuditResponse::Digests { digests, .. } = resp1 {
1744            assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1745        }
1746
1747        // Challenge with 3 keys.
1748        let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1749        let resp3 =
1750            handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1751                .await;
1752        if let AuditResponse::Digests { digests, .. } = resp3 {
1753            assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1754        }
1755
1756        // Challenge with all 5 keys.
1757        let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1758        let resp5 =
1759            handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1760                .await;
1761        if let AuditResponse::Digests { digests, .. } = resp5 {
1762            assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1763        }
1764
1765        // Challenge with 0 keys (idle equivalent — no work).
1766        let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1767        let resp0 =
1768            handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1769                .await;
1770        if let AuditResponse::Digests { digests, .. } = resp0 {
1771            assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1772        }
1773    }
1774
1775    // -- Scenario 47: Bootstrap claim grace period (audit) ----------------------
1776
1777    /// Scenario 47: Challenged peer responds with bootstrapping claim during
1778    /// audit.  `handle_audit_challenge` returns `Bootstrapping`; caller records
1779    /// `BootstrapClaimFirstSeen`.  No `AuditFailure` evidence is emitted.
1780    #[tokio::test]
1781    async fn scenario_47_bootstrap_claim_grace_period_audit() {
1782        let (storage, _temp) = create_test_storage().await;
1783
1784        // Store data so there is an auditable key.
1785        let content = b"bootstrap grace test";
1786        let addr = LmdbStorage::compute_address(content);
1787        storage.put(&addr, content).await.expect("put");
1788
1789        let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1790        let self_id = peer_id_from_bytes([0x47; 32]);
1791
1792        // Bootstrapping peer → Bootstrapping response (grace period start).
1793        let response =
1794            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1795        let challenge_id = match response {
1796            AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1797            AuditResponse::Digests { .. } => {
1798                panic!("Expected Bootstrapping response during grace period")
1799            }
1800            AuditResponse::Rejected { .. } => {
1801                panic!("Unexpected Rejected response")
1802            }
1803        };
1804        assert_eq!(challenge_id, 4700);
1805
1806        // Caller records BootstrapClaimFirstSeen — verify the types support it.
1807        let peer = PeerId::from_bytes([0x47; 32]);
1808        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1809        let now = Instant::now();
1810        let observed = state.observe_bootstrap_claim(
1811            peer,
1812            now,
1813            crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1814        );
1815
1816        assert_eq!(
1817            observed,
1818            BootstrapClaimObservation::WithinGrace { first_seen: now }
1819        );
1820        assert!(
1821            state.bootstrap_claims.contains_key(&peer),
1822            "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1823        );
1824        assert!(
1825            state.bootstrap_claim_history.contains_key(&peer),
1826            "Bootstrap claim history should remember that the grace window was used"
1827        );
1828    }
1829
1830    // -- Scenario 53: Audit partial per-key failure with mixed responsibility ---
1831
1832    /// Scenario 53: P challenged on {K1, K2, K3}.  K1 matches, K2 and K3
1833    /// mismatch.  Responsibility confirmation: P is responsible for K2 but
1834    /// not K3.  `AuditFailure` emitted for {K2} only.
1835    ///
1836    /// Full `verify_digests` + `handle_audit_failure` requires a `P2PNode` for
1837    /// network lookups.  This test verifies the conceptual steps:
1838    ///   (1) Digest comparison correctly identifies K2 and K3 as failures.
1839    ///   (2) `FailureEvidence::AuditFailure` carries only confirmed keys.
1840    #[tokio::test]
1841    async fn scenario_53_partial_failure_mixed_responsibility() {
1842        let (storage, _temp) = create_test_storage().await;
1843        let nonce = [0x53; 32];
1844        let peer_id = [0x53; 32];
1845
1846        // Store K1, K2, K3.
1847        let c1 = b"scenario 53 key one";
1848        let c2 = b"scenario 53 key two";
1849        let c3 = b"scenario 53 key three";
1850        let k1 = LmdbStorage::compute_address(c1);
1851        let k2 = LmdbStorage::compute_address(c2);
1852        let k3 = LmdbStorage::compute_address(c3);
1853        storage.put(&k1, c1).await.expect("put k1");
1854        storage.put(&k2, c2).await.expect("put k2");
1855        storage.put(&k3, c3).await.expect("put k3");
1856
1857        // Correct digests from challenger's local store.
1858        let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1859        let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1860        let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1861
1862        // Simulate peer response: K1 matches, K2 wrong data, K3 wrong data.
1863        let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1864        let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1865
1866        assert_eq!(d1_expected, d1_expected, "K1 should match");
1867        assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1868        assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1869
1870        // Step 1: Identify failed keys (digest comparison).
1871        let digests = [d1_expected, d2_wrong, d3_wrong];
1872        let keys = [k1, k2, k3];
1873        let contents: [&[u8]; 3] = [c1, c2, c3];
1874
1875        let mut failed_keys = Vec::new();
1876        for (i, key) in keys.iter().enumerate() {
1877            if digests[i] == ABSENT_KEY_DIGEST {
1878                failed_keys.push(*key);
1879                continue;
1880            }
1881            let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1882            if digests[i] != expected {
1883                failed_keys.push(*key);
1884            }
1885        }
1886
1887        assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1888        assert!(failed_keys.contains(&k2));
1889        assert!(failed_keys.contains(&k3));
1890        assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1891
1892        // Step 2: Responsibility confirmation removes K3 (not responsible).
1893        // Simulate: P is in closest peers for K2 but not K3.
1894        let responsible_for_k2 = true;
1895        let responsible_for_k3 = false;
1896        let mut confirmed = Vec::new();
1897        for key in &failed_keys {
1898            let is_responsible = if *key == k2 {
1899                responsible_for_k2
1900            } else {
1901                responsible_for_k3
1902            };
1903            if is_responsible {
1904                confirmed.push(*key);
1905            }
1906        }
1907
1908        assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1909
1910        // Step 3: Construct evidence for confirmed failures only.
1911        let challenged_peer = PeerId::from_bytes(peer_id);
1912        let evidence = FailureEvidence::AuditFailure {
1913            challenge_id: 5300,
1914            challenged_peer,
1915            confirmed_failed_keys: confirmed,
1916            summary: AuditFailureSummary::default(),
1917            reason: AuditFailureReason::DigestMismatch,
1918        };
1919
1920        match evidence {
1921            FailureEvidence::AuditFailure {
1922                confirmed_failed_keys,
1923                ..
1924            } => {
1925                assert_eq!(
1926                    confirmed_failed_keys.len(),
1927                    1,
1928                    "Only K2 should generate evidence"
1929                );
1930                assert_eq!(confirmed_failed_keys[0], k2);
1931            }
1932            _ => panic!("Expected AuditFailure evidence"),
1933        }
1934    }
1935}