Skip to main content

ant_node/replication/
audit.rs

1//! Storage audit protocol (Section 15).
2//!
3//! Challenge-response for claimed holders. Anti-outsourcing protection.
4
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7
8use crate::logging::{debug, info, warn};
9use rand::seq::SliceRandom;
10use rand::Rng;
11
12use crate::ant_protocol::XorName;
13use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
14use crate::replication::protocol::{
15    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
16    ReplicationMessageBody, ABSENT_KEY_DIGEST,
17};
18use crate::replication::types::{
19    AuditFailureReason, FailureEvidence, PeerSyncRecord, RepairProofs,
20};
21use crate::storage::LmdbStorage;
22use saorsa_core::identity::PeerId;
23use saorsa_core::P2PNode;
24use tokio::sync::RwLock;
25
26// ---------------------------------------------------------------------------
27// Audit tick result
28// ---------------------------------------------------------------------------
29
30/// Result of an audit tick.
31#[derive(Debug)]
32pub enum AuditTickResult {
33    /// Audit completed successfully (all digests matched).
34    Passed {
35        /// The peer that was challenged.
36        challenged_peer: PeerId,
37        /// Number of keys verified.
38        keys_checked: usize,
39    },
40    /// Audit found failures (after responsibility confirmation).
41    Failed {
42        /// Evidence of the failure for trust engine.
43        evidence: FailureEvidence,
44    },
45    /// Audit target claimed bootstrapping.
46    BootstrapClaim {
47        /// The peer claiming bootstrap status.
48        peer: PeerId,
49    },
50    /// No eligible peers for audit this tick.
51    Idle,
52    /// Audit skipped (not enough local keys).
53    InsufficientKeys,
54}
55
56// ---------------------------------------------------------------------------
57// Main audit tick
58// ---------------------------------------------------------------------------
59
60/// Execute one audit tick (Section 15 steps 2-9).
61///
62/// Returns the audit result. Caller is responsible for emitting trust events.
63///
64/// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if
65/// `is_bootstrapping` is `true` — a node must not audit others while it
66/// is still bootstrapping.
67#[allow(clippy::implicit_hasher)]
68pub async fn audit_tick(
69    p2p_node: &Arc<P2PNode>,
70    storage: &Arc<LmdbStorage>,
71    config: &ReplicationConfig,
72    sync_history: &HashMap<PeerId, PeerSyncRecord>,
73    is_bootstrapping: bool,
74) -> AuditTickResult {
75    let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
76    audit_tick_with_repair_proofs(
77        p2p_node,
78        storage,
79        config,
80        sync_history,
81        &repair_proofs,
82        0,
83        is_bootstrapping,
84    )
85    .await
86}
87
88/// Execute one repair-proof-gated audit tick.
89///
90/// This is the production path used by the replication engine. The
91/// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct
92/// callers that have not adopted repair proofs remain conservative and do not
93/// audit peers for unproven keys.
94#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
95pub async fn audit_tick_with_repair_proofs(
96    p2p_node: &Arc<P2PNode>,
97    storage: &Arc<LmdbStorage>,
98    config: &ReplicationConfig,
99    sync_history: &HashMap<PeerId, PeerSyncRecord>,
100    repair_proofs: &Arc<RwLock<RepairProofs>>,
101    current_sync_epoch: u64,
102    is_bootstrapping: bool,
103) -> AuditTickResult {
104    // Invariant 19: never audit while still bootstrapping.
105    if is_bootstrapping {
106        return AuditTickResult::Idle;
107    }
108
109    let dht = p2p_node.dht_manager();
110
111    // Step 2: Select one eligible peer (has RepairOpportunity) at random.
112    // Peers with active bootstrap claims remain eligible. A follow-up audit is
113    // how we observe a continued claim and apply past-grace abuse handling.
114    let eligible_peers = eligible_audit_peers(sync_history);
115
116    if eligible_peers.is_empty() {
117        return AuditTickResult::Idle;
118    }
119
120    let (challenged_peer, nonce, challenge_id) = {
121        let mut rng = rand::thread_rng();
122        let selected = match eligible_peers.choose(&mut rng) {
123            Some(p) => *p,
124            None => return AuditTickResult::Idle,
125        };
126        let n: [u8; 32] = rng.gen();
127        let c: u64 = rng.gen();
128        (selected, n, c)
129    };
130
131    // Step 3: Sample keys from local store and keep those the peer is
132    // responsible for (appears in the close group via local RT lookup).
133    let all_keys = match storage.all_keys().await {
134        Ok(keys) => keys,
135        Err(e) => {
136            warn!("Audit: failed to read local keys: {e}");
137            return AuditTickResult::Idle;
138        }
139    };
140
141    if all_keys.is_empty() {
142        return AuditTickResult::Idle;
143    }
144
145    let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
146    let sampled_keys: Vec<XorName> = {
147        let mut rng = rand::thread_rng();
148        all_keys
149            .choose_multiple(&mut rng, sample_count)
150            .copied()
151            .collect()
152    };
153
154    // Step 4: Filter to keys where the chosen peer is in the close group and
155    // this node has proof that it already sent the peer a repair hint for the
156    // specific key.
157    let mut sampled_key_groups = Vec::new();
158    for key in &sampled_keys {
159        let closest = dht
160            .find_closest_nodes_local_with_self(key, config.close_group_size)
161            .await;
162        let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
163        if close_peers.contains(&challenged_peer) {
164            sampled_key_groups.push((*key, close_peers));
165        }
166    }
167
168    let peer_keys = {
169        let mut proofs = repair_proofs.write().await;
170        mature_audit_keys_for_peer(
171            &challenged_peer,
172            sampled_key_groups,
173            &mut proofs,
174            current_sync_epoch,
175        )
176    };
177
178    if peer_keys.is_empty() {
179        return AuditTickResult::Idle;
180    }
181
182    // peer_keys is naturally bounded by audit_sample_count (sqrt-scaled),
183    // so no explicit truncation needed.
184
185    // Step 6: Send challenge.
186
187    let challenge = AuditChallenge {
188        challenge_id,
189        nonce,
190        challenged_peer_id: *challenged_peer.as_bytes(),
191        keys: peer_keys.clone(),
192    };
193
194    let msg = ReplicationMessage {
195        request_id: challenge_id,
196        body: ReplicationMessageBody::AuditChallenge(challenge),
197    };
198
199    let encoded = match msg.encode() {
200        Ok(data) => data,
201        Err(e) => {
202            warn!("Audit: failed to encode challenge: {e}");
203            return AuditTickResult::Idle;
204        }
205    };
206
207    let response = match p2p_node
208        .send_request(
209            &challenged_peer,
210            REPLICATION_PROTOCOL_ID,
211            encoded,
212            config.audit_response_timeout(peer_keys.len()),
213        )
214        .await
215    {
216        Ok(resp) => resp,
217        Err(e) => {
218            debug!("Audit: challenge to {challenged_peer} failed: {e}");
219            // Timeout — need responsibility confirmation before penalty.
220            return handle_audit_timeout(
221                &challenged_peer,
222                challenge_id,
223                &peer_keys,
224                p2p_node,
225                config,
226            )
227            .await;
228        }
229    };
230
231    // Step 7: Parse response.
232    let resp_msg = match ReplicationMessage::decode(&response.data) {
233        Ok(m) => m,
234        Err(e) => {
235            warn!("Audit: failed to decode response from {challenged_peer}: {e}");
236            return handle_audit_failure(
237                &challenged_peer,
238                challenge_id,
239                &peer_keys,
240                AuditFailureReason::MalformedResponse,
241                p2p_node,
242                config,
243            )
244            .await;
245        }
246    };
247
248    match resp_msg.body {
249        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
250            challenge_id: resp_id,
251        }) => {
252            if resp_id != challenge_id {
253                warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
254                return handle_audit_failure(
255                    &challenged_peer,
256                    challenge_id,
257                    &peer_keys,
258                    AuditFailureReason::MalformedResponse,
259                    p2p_node,
260                    config,
261                )
262                .await;
263            }
264            // Step 7b: Bootstrapping claim.
265            AuditTickResult::BootstrapClaim {
266                peer: challenged_peer,
267            }
268        }
269        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
270            challenge_id: resp_id,
271            digests,
272        }) => {
273            if resp_id != challenge_id {
274                warn!("Audit: challenge ID mismatch from {challenged_peer}");
275                return handle_audit_failure(
276                    &challenged_peer,
277                    challenge_id,
278                    &peer_keys,
279                    AuditFailureReason::MalformedResponse,
280                    p2p_node,
281                    config,
282                )
283                .await;
284            }
285            verify_digests(
286                &challenged_peer,
287                challenge_id,
288                &nonce,
289                &peer_keys,
290                &digests,
291                storage,
292                p2p_node,
293                config,
294            )
295            .await
296        }
297        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
298            challenge_id: resp_id,
299            reason,
300        }) => {
301            if resp_id != challenge_id {
302                warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
303                return handle_audit_failure(
304                    &challenged_peer,
305                    challenge_id,
306                    &peer_keys,
307                    AuditFailureReason::MalformedResponse,
308                    p2p_node,
309                    config,
310                )
311                .await;
312            }
313            warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
314            handle_audit_failure(
315                &challenged_peer,
316                challenge_id,
317                &peer_keys,
318                AuditFailureReason::Rejected,
319                p2p_node,
320                config,
321            )
322            .await
323        }
324        _ => {
325            warn!("Audit: unexpected response type from {challenged_peer}");
326            handle_audit_failure(
327                &challenged_peer,
328                challenge_id,
329                &peer_keys,
330                AuditFailureReason::MalformedResponse,
331                p2p_node,
332                config,
333            )
334            .await
335        }
336    }
337}
338
339fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
340    sync_history
341        .iter()
342        .filter(|(_, record)| record.has_repair_opportunity())
343        .map(|(peer, _)| *peer)
344        .collect()
345}
346
347fn mature_audit_keys_for_peer(
348    challenged_peer: &PeerId,
349    sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
350    repair_proofs: &mut RepairProofs,
351    current_sync_epoch: u64,
352) -> Vec<XorName> {
353    sampled_key_groups
354        .into_iter()
355        .filter_map(|(key, close_peers)| {
356            repair_proofs
357                .has_mature_replica_hint(challenged_peer, &key, &close_peers, current_sync_epoch)
358                .then_some(key)
359        })
360        .collect()
361}
362
363// ---------------------------------------------------------------------------
364// Digest verification
365// ---------------------------------------------------------------------------
366
367/// Verify per-key digests from audit response (Step 8).
368#[allow(clippy::too_many_arguments)]
369async fn verify_digests(
370    challenged_peer: &PeerId,
371    challenge_id: u64,
372    nonce: &[u8; 32],
373    keys: &[XorName],
374    digests: &[[u8; 32]],
375    storage: &Arc<LmdbStorage>,
376    p2p_node: &Arc<P2PNode>,
377    config: &ReplicationConfig,
378) -> AuditTickResult {
379    // Requirement: response must have exactly one digest per key.
380    if digests.len() != keys.len() {
381        warn!(
382            "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
383            digests.len(),
384            keys.len()
385        );
386        return handle_audit_failure(
387            challenged_peer,
388            challenge_id,
389            keys,
390            AuditFailureReason::MalformedResponse,
391            p2p_node,
392            config,
393        )
394        .await;
395    }
396
397    let challenged_peer_bytes = challenged_peer.as_bytes();
398    let mut failed_keys = Vec::new();
399
400    for (i, key) in keys.iter().enumerate() {
401        let received_digest = &digests[i];
402
403        // Check for absent sentinel.
404        if *received_digest == ABSENT_KEY_DIGEST {
405            failed_keys.push(*key);
406            continue;
407        }
408
409        // Recompute expected digest from local copy.
410        let local_bytes = match storage.get_raw(key).await {
411            Ok(Some(bytes)) => bytes,
412            Ok(None) => {
413                // We should hold this key (we sampled it), but it's gone.
414                warn!(
415                    "Audit: local key {} disappeared during audit",
416                    hex::encode(key)
417                );
418                continue;
419            }
420            Err(e) => {
421                warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
422                continue;
423            }
424        };
425
426        let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
427        if *received_digest != expected {
428            failed_keys.push(*key);
429        }
430    }
431
432    if failed_keys.is_empty() {
433        info!(
434            "Audit: peer {challenged_peer} passed (all {} keys verified)",
435            keys.len()
436        );
437        return AuditTickResult::Passed {
438            challenged_peer: *challenged_peer,
439            keys_checked: keys.len(),
440        };
441    }
442
443    // Step 9: Responsibility confirmation for failed keys.
444    handle_audit_failure(
445        challenged_peer,
446        challenge_id,
447        &failed_keys,
448        AuditFailureReason::DigestMismatch,
449        p2p_node,
450        config,
451    )
452    .await
453}
454
455// ---------------------------------------------------------------------------
456// Failure handling with responsibility confirmation
457// ---------------------------------------------------------------------------
458
459/// Handle audit failure: confirm responsibility before emitting evidence (Step 9).
460async fn handle_audit_failure(
461    challenged_peer: &PeerId,
462    challenge_id: u64,
463    failed_keys: &[XorName],
464    reason: AuditFailureReason,
465    p2p_node: &Arc<P2PNode>,
466    config: &ReplicationConfig,
467) -> AuditTickResult {
468    let dht = p2p_node.dht_manager();
469    let mut confirmed_failures = Vec::new();
470
471    // Step 9a-b: Fresh local RT lookup for each failed key.
472    for key in failed_keys {
473        let closest = dht
474            .find_closest_nodes_local_with_self(key, config.close_group_size)
475            .await;
476        if closest.iter().any(|n| n.peer_id == *challenged_peer) {
477            confirmed_failures.push(*key);
478        } else {
479            debug!(
480                "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
481                hex::encode(key)
482            );
483        }
484    }
485
486    // Step 9c: Empty confirmed set -> peer is no longer responsible for any
487    // of the failed keys (topology churn). This is NOT a pass — the peer did
488    // not prove it stores the data. Return Idle to avoid granting unearned
489    // positive trust.
490    if confirmed_failures.is_empty() {
491        info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
492        return AuditTickResult::Idle;
493    }
494
495    // Step 9d: Non-empty confirmed set -> emit evidence.
496    let evidence = FailureEvidence::AuditFailure {
497        challenge_id,
498        challenged_peer: *challenged_peer,
499        confirmed_failed_keys: confirmed_failures,
500        reason,
501    };
502
503    AuditTickResult::Failed { evidence }
504}
505
506/// Handle audit timeout (no response received).
507async fn handle_audit_timeout(
508    challenged_peer: &PeerId,
509    challenge_id: u64,
510    keys: &[XorName],
511    p2p_node: &Arc<P2PNode>,
512    config: &ReplicationConfig,
513) -> AuditTickResult {
514    handle_audit_failure(
515        challenged_peer,
516        challenge_id,
517        keys,
518        AuditFailureReason::Timeout,
519        p2p_node,
520        config,
521    )
522    .await
523}
524
525// ---------------------------------------------------------------------------
526// Responder-side handler
527// ---------------------------------------------------------------------------
528
529/// Handle an incoming audit challenge (responder side).
530///
531/// Validates that the challenge targets this node, computes per-key digests,
532/// and returns the response.  Rejects challenges where
533/// `challenged_peer_id` does not match `self_peer_id` to prevent an oracle
534/// attack where a malicious challenger forges digests for a different peer.
535pub async fn handle_audit_challenge(
536    challenge: &AuditChallenge,
537    storage: &LmdbStorage,
538    self_peer_id: &PeerId,
539    is_bootstrapping: bool,
540    stored_chunks: usize,
541) -> AuditResponse {
542    if is_bootstrapping {
543        return AuditResponse::Bootstrapping {
544            challenge_id: challenge.challenge_id,
545        };
546    }
547
548    if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
549        warn!(
550            "Audit challenge targeted wrong peer: expected {}, got {}",
551            hex::encode(self_peer_id.as_bytes()),
552            hex::encode(challenge.challenged_peer_id),
553        );
554        return AuditResponse::Rejected {
555            challenge_id: challenge.challenge_id,
556            reason: "challenged_peer_id does not match this node".to_string(),
557        };
558    }
559
560    let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
561    if challenge.keys.len() > max_keys {
562        warn!(
563            "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
564             (stored_chunks={stored_chunks})",
565            challenge.keys.len(),
566        );
567        return AuditResponse::Rejected {
568            challenge_id: challenge.challenge_id,
569            reason: format!(
570                "challenge contains {} keys, limit is {max_keys}",
571                challenge.keys.len()
572            ),
573        };
574    }
575
576    let mut digests = Vec::with_capacity(challenge.keys.len());
577
578    for key in &challenge.keys {
579        match storage.get_raw(key).await {
580            Ok(Some(data)) => {
581                let digest = compute_audit_digest(
582                    &challenge.nonce,
583                    &challenge.challenged_peer_id,
584                    key,
585                    &data,
586                );
587                digests.push(digest);
588            }
589            Ok(None) => {
590                digests.push(ABSENT_KEY_DIGEST);
591            }
592            Err(e) => {
593                warn!(
594                    "Audit responder: failed to read key {}: {e}",
595                    hex::encode(key)
596                );
597                digests.push(ABSENT_KEY_DIGEST);
598            }
599        }
600    }
601
602    AuditResponse::Digests {
603        challenge_id: challenge.challenge_id,
604        digests,
605    }
606}
607
608// ---------------------------------------------------------------------------
609// Tests
610// ---------------------------------------------------------------------------
611
612#[cfg(test)]
613#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
614mod tests {
615    use super::*;
616    use crate::replication::protocol::compute_audit_digest;
617    use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
618    use crate::storage::LmdbStorageConfig;
619    use std::time::Instant;
620    use tempfile::TempDir;
621
622    /// Simulated stored chunk count for tests. Large enough that the dynamic
623    /// incoming audit limit (`2 * sqrt(N)`) never rejects small test challenges.
624    const TEST_STORED_CHUNKS: usize = 1_000_000;
625
626    /// Create a test `LmdbStorage` backed by a temp directory.
627    async fn create_test_storage() -> (LmdbStorage, TempDir) {
628        let temp_dir = TempDir::new().expect("create temp dir");
629        let config = LmdbStorageConfig {
630            root_dir: temp_dir.path().to_path_buf(),
631            verify_on_read: false,
632            max_map_size: 0,
633            disk_reserve: 0,
634        };
635        let storage = LmdbStorage::new(config).await.expect("create storage");
636        (storage, temp_dir)
637    }
638
639    /// Build a challenge with the given parameters.
640    fn make_challenge(
641        challenge_id: u64,
642        nonce: [u8; 32],
643        peer_id: [u8; 32],
644        keys: Vec<XorName>,
645    ) -> AuditChallenge {
646        AuditChallenge {
647            challenge_id,
648            nonce,
649            challenged_peer_id: peer_id,
650            keys,
651        }
652    }
653
654    /// Build a `PeerId` matching the raw bytes used in a challenge.
655    fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
656        PeerId::from_bytes(bytes)
657    }
658
659    // -- handle_audit_challenge: present keys ---------------------------------
660
661    #[tokio::test]
662    async fn handle_challenge_present_keys_returns_correct_digests() {
663        let (storage, _temp) = create_test_storage().await;
664
665        // Store two chunks.
666        let content_a = b"chunk alpha";
667        let addr_a = LmdbStorage::compute_address(content_a);
668        storage.put(&addr_a, content_a).await.expect("put a");
669
670        let content_b = b"chunk beta";
671        let addr_b = LmdbStorage::compute_address(content_b);
672        storage.put(&addr_b, content_b).await.expect("put b");
673
674        let nonce = [0xAA; 32];
675        let peer_id = [0xBB; 32];
676        let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
677        let self_id = peer_id_from_bytes(peer_id);
678
679        let response =
680            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
681
682        match response {
683            AuditResponse::Digests {
684                challenge_id,
685                digests,
686            } => {
687                assert_eq!(challenge_id, 42);
688                assert_eq!(digests.len(), 2);
689
690                let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
691                let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
692                assert_eq!(digests[0], expected_a);
693                assert_eq!(digests[1], expected_b);
694            }
695            AuditResponse::Bootstrapping { .. } => {
696                panic!("expected Digests, got Bootstrapping");
697            }
698            AuditResponse::Rejected { .. } => {
699                panic!("Unexpected Rejected response");
700            }
701        }
702    }
703
704    // -- handle_audit_challenge: absent keys ----------------------------------
705
706    #[tokio::test]
707    async fn handle_challenge_absent_keys_returns_sentinel() {
708        let (storage, _temp) = create_test_storage().await;
709
710        let absent_key = [0xFF; 32];
711        let nonce = [0x11; 32];
712        let peer_id = [0x22; 32];
713        let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
714        let self_id = peer_id_from_bytes(peer_id);
715
716        let response =
717            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
718
719        match response {
720            AuditResponse::Digests {
721                challenge_id,
722                digests,
723            } => {
724                assert_eq!(challenge_id, 99);
725                assert_eq!(digests.len(), 1);
726                assert_eq!(
727                    digests[0], ABSENT_KEY_DIGEST,
728                    "absent key should produce sentinel digest"
729                );
730            }
731            AuditResponse::Bootstrapping { .. } => {
732                panic!("expected Digests, got Bootstrapping");
733            }
734            AuditResponse::Rejected { .. } => {
735                panic!("Unexpected Rejected response");
736            }
737        }
738    }
739
740    // -- handle_audit_challenge: mixed present and absent ---------------------
741
742    #[tokio::test]
743    async fn handle_challenge_mixed_present_and_absent() {
744        let (storage, _temp) = create_test_storage().await;
745
746        let content = b"present chunk";
747        let addr_present = LmdbStorage::compute_address(content);
748        storage.put(&addr_present, content).await.expect("put");
749
750        let addr_absent = [0xDE; 32];
751        let nonce = [0x33; 32];
752        let peer_id = [0x44; 32];
753        let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
754        let self_id = peer_id_from_bytes(peer_id);
755
756        let response =
757            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
758
759        match response {
760            AuditResponse::Digests { digests, .. } => {
761                assert_eq!(digests.len(), 2);
762
763                let expected_present =
764                    compute_audit_digest(&nonce, &peer_id, &addr_present, content);
765                assert_eq!(digests[0], expected_present);
766                assert_eq!(
767                    digests[1], ABSENT_KEY_DIGEST,
768                    "absent key should be sentinel"
769                );
770            }
771            AuditResponse::Bootstrapping { .. } => {
772                panic!("expected Digests, got Bootstrapping");
773            }
774            AuditResponse::Rejected { .. } => {
775                panic!("Unexpected Rejected response");
776            }
777        }
778    }
779
780    // -- handle_audit_challenge: bootstrapping --------------------------------
781
782    #[tokio::test]
783    async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
784        let (storage, _temp) = create_test_storage().await;
785
786        let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
787        let self_id = peer_id_from_bytes([0x01; 32]);
788
789        let response =
790            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
791
792        match response {
793            AuditResponse::Bootstrapping { challenge_id } => {
794                assert_eq!(challenge_id, 55);
795            }
796            AuditResponse::Digests { .. } => {
797                panic!("expected Bootstrapping, got Digests");
798            }
799            AuditResponse::Rejected { .. } => {
800                panic!("Unexpected Rejected response");
801            }
802        }
803    }
804
805    // -- handle_audit_challenge: empty key list -------------------------------
806
807    #[tokio::test]
808    async fn handle_challenge_empty_keys_returns_empty_digests() {
809        let (storage, _temp) = create_test_storage().await;
810
811        let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
812        let self_id = peer_id_from_bytes([0x20; 32]);
813
814        let response =
815            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
816
817        match response {
818            AuditResponse::Digests {
819                challenge_id,
820                digests,
821            } => {
822                assert_eq!(challenge_id, 100);
823                assert!(
824                    digests.is_empty(),
825                    "empty key list should yield empty digests"
826                );
827            }
828            AuditResponse::Bootstrapping { .. } => {
829                panic!("expected Digests, got Bootstrapping");
830            }
831            AuditResponse::Rejected { .. } => {
832                panic!("Unexpected Rejected response");
833            }
834        }
835    }
836
837    // -- Digest verification: matching ----------------------------------------
838
839    #[test]
840    fn digest_verification_matching() {
841        let nonce = [0x01; 32];
842        let peer_id = [0x02; 32];
843        let key: XorName = [0x03; 32];
844        let data = b"correct data";
845
846        let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
847        let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
848
849        assert_eq!(
850            expected, recomputed,
851            "same inputs must produce identical digests"
852        );
853        assert_ne!(
854            expected, ABSENT_KEY_DIGEST,
855            "real digest must not be sentinel"
856        );
857    }
858
859    // -- Digest verification: mismatching -------------------------------------
860
861    #[test]
862    fn digest_verification_mismatching_data() {
863        let nonce = [0x01; 32];
864        let peer_id = [0x02; 32];
865        let key: XorName = [0x03; 32];
866
867        let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
868        let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
869
870        assert_ne!(
871            digest_a, digest_b,
872            "different data must produce different digests"
873        );
874    }
875
876    #[test]
877    fn digest_verification_mismatching_nonce() {
878        let peer_id = [0x02; 32];
879        let key: XorName = [0x03; 32];
880        let data = b"same data";
881
882        let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
883        let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
884
885        assert_ne!(
886            digest_a, digest_b,
887            "different nonces must produce different digests"
888        );
889    }
890
891    #[test]
892    fn digest_verification_mismatching_peer() {
893        let nonce = [0x01; 32];
894        let key: XorName = [0x03; 32];
895        let data = b"same data";
896
897        let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
898        let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
899
900        assert_ne!(
901            digest_a, digest_b,
902            "different peers must produce different digests"
903        );
904    }
905
906    #[test]
907    fn digest_verification_mismatching_key() {
908        let nonce = [0x01; 32];
909        let peer_id = [0x02; 32];
910        let data = b"same data";
911
912        let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
913        let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
914
915        assert_ne!(
916            digest_a, digest_b,
917            "different keys must produce different digests"
918        );
919    }
920
921    // -- Absent sentinel is all zeros -----------------------------------------
922
923    #[test]
924    fn absent_sentinel_is_all_zeros() {
925        assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
926    }
927
928    // -- Bootstrapping skips digest computation even with stored keys ---------
929
930    #[tokio::test]
931    async fn bootstrapping_skips_digest_computation() {
932        let (storage, _temp) = create_test_storage().await;
933
934        let content = b"stored but bootstrapping";
935        let addr = LmdbStorage::compute_address(content);
936        storage.put(&addr, content).await.expect("put");
937
938        let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
939        let self_id = peer_id_from_bytes([0xDD; 32]);
940
941        let response =
942            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
943
944        assert!(
945            matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
946            "bootstrapping node must not compute digests"
947        );
948    }
949
950    // -- Scenario 19/53: Partial failure with mixed responsibility ----------------
951
952    #[tokio::test]
953    async fn scenario_19_partial_failure_mixed_responsibility() {
954        // Three keys challenged: K1 matches, K2 mismatches, K3 absent.
955        // After responsibility confirmation, only K2 is confirmed responsible.
956        // AuditFailure emitted for {K2} only.
957        // Test handle_audit_challenge with mixed results, then verify
958        // the digest logic manually.
959
960        let (storage, _temp) = create_test_storage().await;
961        let nonce = [0x42u8; 32];
962        let peer_id = [0xAA; 32];
963
964        // Store K1 and K2, but NOT K3
965        let content_k1 = b"key one data";
966        let addr_k1 = LmdbStorage::compute_address(content_k1);
967        storage.put(&addr_k1, content_k1).await.unwrap();
968
969        let content_k2 = b"key two data";
970        let addr_k2 = LmdbStorage::compute_address(content_k2);
971        storage.put(&addr_k2, content_k2).await.unwrap();
972
973        let addr_k3 = [0xFF; 32]; // Not stored
974
975        let challenge = AuditChallenge {
976            challenge_id: 100,
977            nonce,
978            challenged_peer_id: peer_id,
979            keys: vec![addr_k1, addr_k2, addr_k3],
980        };
981        let self_id = peer_id_from_bytes(peer_id);
982
983        let response =
984            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
985
986        match response {
987            AuditResponse::Digests { digests, .. } => {
988                assert_eq!(digests.len(), 3);
989
990                // K1 should have correct digest
991                let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
992                assert_eq!(digests[0], expected_k1);
993
994                // K2 should have correct digest
995                let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
996                assert_eq!(digests[1], expected_k2);
997
998                // K3 absent -> sentinel
999                assert_eq!(digests[2], ABSENT_KEY_DIGEST);
1000            }
1001            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
1002            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1003        }
1004    }
1005
1006    // -- Scenario 54: All digests pass -------------------------------------------
1007
1008    #[tokio::test]
1009    async fn scenario_54_all_digests_pass() {
1010        // All challenged keys present and digests match.
1011        // Multiple keys to strengthen coverage beyond existing two-key tests.
1012        let (storage, _temp) = create_test_storage().await;
1013        let nonce = [0x10; 32];
1014        let peer_id = [0x20; 32];
1015
1016        let c1 = b"chunk alpha";
1017        let c2 = b"chunk beta";
1018        let c3 = b"chunk gamma";
1019        let a1 = LmdbStorage::compute_address(c1);
1020        let a2 = LmdbStorage::compute_address(c2);
1021        let a3 = LmdbStorage::compute_address(c3);
1022        storage.put(&a1, c1).await.unwrap();
1023        storage.put(&a2, c2).await.unwrap();
1024        storage.put(&a3, c3).await.unwrap();
1025
1026        let challenge = AuditChallenge {
1027            challenge_id: 200,
1028            nonce,
1029            challenged_peer_id: peer_id,
1030            keys: vec![a1, a2, a3],
1031        };
1032        let self_id = peer_id_from_bytes(peer_id);
1033
1034        let response =
1035            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1036        match response {
1037            AuditResponse::Digests { digests, .. } => {
1038                assert_eq!(digests.len(), 3);
1039                for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
1040                    .iter()
1041                    .enumerate()
1042                {
1043                    let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
1044                    assert_eq!(digests[i], expected, "Key {i} digest should match");
1045                }
1046            }
1047            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1048            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1049        }
1050    }
1051
1052    // -- Scenario 55: Empty failure set means no evidence -------------------------
1053
1054    /// Scenario 55: Peer challenged on {K1, K2}. Both digests mismatch.
1055    /// Responsibility confirmation shows the peer is NOT responsible for
1056    /// either key. The confirmed failure set is empty — no `AuditFailure`
1057    /// evidence is emitted.
1058    ///
1059    /// Full `verify_digests` requires a live `P2PNode` for network lookups.
1060    /// This test exercises the deterministic sub-steps:
1061    ///   (1) Digest comparison identifies K1 and K2 as mismatches.
1062    ///   (2) Responsibility confirmation removes both keys.
1063    ///   (3) Empty confirmed failure set means no evidence.
1064    #[tokio::test]
1065    async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1066        let (storage, _temp) = create_test_storage().await;
1067        let nonce = [0x55; 32];
1068        let peer_id = [0x55; 32];
1069
1070        // Store K1 and K2 on the challenger (for expected digest computation).
1071        let c1 = b"scenario 55 key one";
1072        let c2 = b"scenario 55 key two";
1073        let k1 = LmdbStorage::compute_address(c1);
1074        let k2 = LmdbStorage::compute_address(c2);
1075        storage.put(&k1, c1).await.expect("put k1");
1076        storage.put(&k2, c2).await.expect("put k2");
1077
1078        // Challenger computes expected digests.
1079        let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1080        let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1081
1082        // Simulate peer returning WRONG digests for both keys.
1083        let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1084        let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1085        assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1086        assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1087
1088        // Step 1: Identify failed keys via digest comparison.
1089        let keys = [k1, k2];
1090        let expected = [expected_d1, expected_d2];
1091        let received = [wrong_d1, wrong_d2];
1092
1093        let mut failed_keys = Vec::new();
1094        for i in 0..keys.len() {
1095            if received[i] != expected[i] {
1096                failed_keys.push(keys[i]);
1097            }
1098        }
1099        assert_eq!(
1100            failed_keys.len(),
1101            2,
1102            "Both keys should be identified as digest mismatches"
1103        );
1104
1105        // Step 2: Responsibility confirmation — peer is NOT responsible for
1106        // either key (simulated by filtering them all out).
1107        let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1108        let confirmed_failures: Vec<XorName> = failed_keys
1109            .into_iter()
1110            .filter(|k| confirmed_responsible_keys.contains(k))
1111            .collect();
1112
1113        // Step 3: Empty confirmed failure set → no AuditFailure evidence.
1114        assert!(
1115            confirmed_failures.is_empty(),
1116            "With no confirmed responsibility, failure set must be empty — \
1117             no AuditFailure evidence should be emitted"
1118        );
1119
1120        // Verify that constructing evidence with empty keys results in a
1121        // no-penalty outcome (the caller checks is_empty before emitting).
1122        let peer = PeerId::from_bytes(peer_id);
1123        let evidence = FailureEvidence::AuditFailure {
1124            challenge_id: 5500,
1125            challenged_peer: peer,
1126            confirmed_failed_keys: confirmed_failures,
1127            reason: AuditFailureReason::DigestMismatch,
1128        };
1129        if let FailureEvidence::AuditFailure {
1130            confirmed_failed_keys,
1131            ..
1132        } = evidence
1133        {
1134            assert!(
1135                confirmed_failed_keys.is_empty(),
1136                "Evidence with empty failure set should not trigger a trust penalty"
1137            );
1138        }
1139    }
1140
1141    // -- Scenario 56: RepairOpportunity filters never-synced peers ----------------
1142
1143    #[test]
1144    fn scenario_56_repair_opportunity_filters_never_synced() {
1145        // PeerSyncRecord with last_sync=None should not pass
1146        // has_repair_opportunity().
1147
1148        let never_synced = PeerSyncRecord {
1149            last_sync: None,
1150            cycles_since_sync: 5,
1151        };
1152        assert!(!never_synced.has_repair_opportunity());
1153
1154        let synced_no_cycle = PeerSyncRecord {
1155            last_sync: Some(Instant::now()),
1156            cycles_since_sync: 0,
1157        };
1158        assert!(!synced_no_cycle.has_repair_opportunity());
1159
1160        let synced_with_cycle = PeerSyncRecord {
1161            last_sync: Some(Instant::now()),
1162            cycles_since_sync: 1,
1163        };
1164        assert!(synced_with_cycle.has_repair_opportunity());
1165    }
1166
1167    #[test]
1168    fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1169        let peer = peer_id_from_bytes([0x57; 32]);
1170        let mut sync_history = HashMap::new();
1171        sync_history.insert(
1172            peer,
1173            PeerSyncRecord {
1174                last_sync: Some(Instant::now()),
1175                cycles_since_sync: 1,
1176            },
1177        );
1178
1179        let mut bootstrap_claims = HashMap::new();
1180        let first_seen = Instant::now()
1181            .checked_sub(
1182                crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1183                    + std::time::Duration::from_secs(1),
1184            )
1185            .unwrap_or_else(Instant::now);
1186        bootstrap_claims.insert(peer, first_seen);
1187
1188        let eligible = eligible_audit_peers(&sync_history);
1189
1190        assert!(bootstrap_claims.contains_key(&peer));
1191        assert!(
1192            eligible.contains(&peer),
1193            "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1194        );
1195    }
1196
1197    #[test]
1198    fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() {
1199        const HINT_EPOCH: u64 = 7;
1200        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1201        const CHALLENGED_PEER_BYTE: u8 = 0xA1;
1202        const OTHER_PEER_BYTE: u8 = 0xA2;
1203        const NEW_PEER_BYTE: u8 = 0xA3;
1204        const MATURE_KEY_BYTE: u8 = 0xB1;
1205        const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
1206        const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
1207        const STABLE_CHURN_KEY_BYTE: u8 = 0xB4;
1208        const EVICTED_KEY_BYTE: u8 = 0xB5;
1209        const XOR_NAME_LEN: usize = 32;
1210
1211        let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
1212        let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
1213        let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
1214        let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
1215        let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
1216        let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
1217        let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN];
1218        let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN];
1219        let close_group = HashSet::from([challenged_peer, other_peer]);
1220        let changed_close_group = HashSet::from([challenged_peer, new_peer]);
1221        let evicted_close_group = HashSet::from([other_peer, new_peer]);
1222        let mut repair_proofs = RepairProofs::new();
1223
1224        assert!(repair_proofs.record_replica_hint_sent(
1225            challenged_peer,
1226            mature_key,
1227            &close_group,
1228            HINT_EPOCH,
1229        ));
1230        assert!(repair_proofs.record_replica_hint_sent(
1231            challenged_peer,
1232            same_epoch_key,
1233            &close_group,
1234            CURRENT_EPOCH,
1235        ));
1236        assert!(repair_proofs.record_replica_hint_sent(
1237            challenged_peer,
1238            stable_churn_key,
1239            &close_group,
1240            HINT_EPOCH,
1241        ));
1242        assert!(repair_proofs.record_replica_hint_sent(
1243            challenged_peer,
1244            evicted_key,
1245            &close_group,
1246            HINT_EPOCH,
1247        ));
1248
1249        let sampled_key_groups = vec![
1250            (mature_key, close_group.clone()),
1251            (same_epoch_key, close_group.clone()),
1252            (missing_proof_key, close_group.clone()),
1253            (stable_churn_key, changed_close_group),
1254            (evicted_key, evicted_close_group),
1255        ];
1256        let peer_keys = mature_audit_keys_for_peer(
1257            &challenged_peer,
1258            sampled_key_groups,
1259            &mut repair_proofs,
1260            CURRENT_EPOCH,
1261        );
1262
1263        assert_eq!(
1264            peer_keys,
1265            vec![mature_key, stable_churn_key],
1266            "mature proofs for stable close-group peers should become audit keys, while same-epoch, missing, and evicted-peer proofs should not"
1267        );
1268    }
1269
1270    // -- Audit response must match key count --------------------------------------
1271
1272    #[tokio::test]
1273    async fn audit_response_must_match_key_count() {
1274        // Section 15: "A response is invalid if it has fewer or more entries
1275        // than challenged keys."
1276        // Verify handle_audit_challenge always produces exactly N digests for
1277        // N keys, including edge cases.
1278
1279        let (storage, _temp) = create_test_storage().await;
1280        let nonce = [0x50; 32];
1281        let peer_id = [0x60; 32];
1282
1283        // Store a single chunk
1284        let content = b"single chunk";
1285        let addr = LmdbStorage::compute_address(content);
1286        storage.put(&addr, content).await.unwrap();
1287
1288        // Challenge with 1 stored + 4 absent = 5 keys total
1289        let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1290        let mut keys = vec![addr];
1291        keys.extend_from_slice(&absent_keys);
1292
1293        let key_count = keys.len();
1294        let challenge = make_challenge(300, nonce, peer_id, keys);
1295        let self_id = peer_id_from_bytes(peer_id);
1296
1297        let response =
1298            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1299        match response {
1300            AuditResponse::Digests { digests, .. } => {
1301                assert_eq!(
1302                    digests.len(),
1303                    key_count,
1304                    "must produce exactly one digest per challenged key"
1305                );
1306            }
1307            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1308            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1309        }
1310    }
1311
1312    // -- Audit digest uses full record bytes --------------------------------------
1313
1314    #[test]
1315    fn audit_digest_uses_full_record_bytes() {
1316        // Verify digest changes when record content changes.
1317        let nonce = [1u8; 32];
1318        let peer = [2u8; 32];
1319        let key = [3u8; 32];
1320
1321        let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1322        let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1323        assert_ne!(
1324            d1, d2,
1325            "Different record bytes must produce different digests"
1326        );
1327    }
1328
1329    // -- Scenario 29: Audit start gate ------------------------------------------
1330
1331    /// Scenario 29: `handle_audit_challenge` returns `Bootstrapping` when the
1332    /// node is still bootstrapping — audit digests are never computed, and no
1333    /// `AuditFailure` evidence is emitted by the caller.
1334    ///
1335    /// This is the responder-side gate.  The challenger-side gate is enforced
1336    /// by `audit_tick`'s `is_bootstrapping` guard (Invariant 19) and by
1337    /// `check_bootstrap_drained()` in the engine loop; this test confirms the
1338    /// complementary responder behavior.
1339    #[tokio::test]
1340    async fn scenario_29_audit_start_gate_during_bootstrap() {
1341        let (storage, _temp) = create_test_storage().await;
1342
1343        // Store data so there *would* be work to audit.
1344        let content = b"should not be audited during bootstrap";
1345        let addr = LmdbStorage::compute_address(content);
1346        storage.put(&addr, content).await.expect("put");
1347
1348        let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1349        let self_id = peer_id_from_bytes([0x29; 32]);
1350
1351        // Responder is bootstrapping → Bootstrapping response, NOT Digests.
1352        let response =
1353            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1354        assert!(
1355            matches!(
1356                response,
1357                AuditResponse::Bootstrapping { challenge_id: 2900 }
1358            ),
1359            "bootstrapping node must not compute digests — audit start gate"
1360        );
1361
1362        // Responder is NOT bootstrapping → normal Digests.
1363        let response =
1364            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1365        assert!(
1366            matches!(response, AuditResponse::Digests { .. }),
1367            "drained node should compute digests normally"
1368        );
1369    }
1370
1371    // -- Scenario 30: Audit peer selection from sampled keys --------------------
1372
1373    /// Scenario 30: Key sampling uses dynamic sqrt-based batch sizing and
1374    /// `RepairOpportunity` filtering excludes never-synced peers.
1375    ///
1376    /// Full `audit_tick` requires a live network.  This test verifies the two
1377    /// deterministic sub-steps the function relies on:
1378    ///   (a) `audit_sample_count` scales with `sqrt(total_keys)`.
1379    ///   (b) `PeerSyncRecord::has_repair_opportunity` gates peer eligibility.
1380    #[test]
1381    fn scenario_30_audit_peer_selection_from_sampled_keys() {
1382        // (a) Dynamic sample count scales with sqrt(total_keys).
1383        assert_eq!(
1384            ReplicationConfig::audit_sample_count(100),
1385            10,
1386            "sample count should scale with sqrt(total_keys)"
1387        );
1388
1389        assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1390
1391        assert_eq!(
1392            ReplicationConfig::audit_sample_count(10_000),
1393            100,
1394            "sqrt(10000) = 100"
1395        );
1396
1397        // (b) Peer eligibility via RepairOpportunity.
1398        // Never synced → not eligible.
1399        let never = PeerSyncRecord {
1400            last_sync: None,
1401            cycles_since_sync: 10,
1402        };
1403        assert!(!never.has_repair_opportunity());
1404
1405        // Synced but zero subsequent cycles → not eligible.
1406        let too_soon = PeerSyncRecord {
1407            last_sync: Some(Instant::now()),
1408            cycles_since_sync: 0,
1409        };
1410        assert!(!too_soon.has_repair_opportunity());
1411
1412        // Synced with ≥1 cycle → eligible.
1413        let eligible = PeerSyncRecord {
1414            last_sync: Some(Instant::now()),
1415            cycles_since_sync: 2,
1416        };
1417        assert!(eligible.has_repair_opportunity());
1418    }
1419
1420    // -- Scenario 32: Dynamic challenge size ------------------------------------
1421
1422    /// Scenario 32: Challenge key count equals `|PeerKeySet(challenged_peer)|`,
1423    /// which is dynamic per round.  If no eligible peer remains after filtering,
1424    /// the tick is idle.
1425    ///
1426    /// Verified via `handle_audit_challenge`: the response digest count always
1427    /// equals the number of keys in the challenge.
1428    #[tokio::test]
1429    async fn scenario_32_dynamic_challenge_size() {
1430        let (storage, _temp) = create_test_storage().await;
1431
1432        // Store varying numbers of chunks.
1433        let mut addrs = Vec::new();
1434        for i in 0u8..5 {
1435            let content = format!("dynamic challenge key {i}");
1436            let addr = LmdbStorage::compute_address(content.as_bytes());
1437            storage.put(&addr, content.as_bytes()).await.expect("put");
1438            addrs.push(addr);
1439        }
1440
1441        let nonce = [0x32; 32];
1442        let peer_id = [0x32; 32];
1443        let self_id = peer_id_from_bytes(peer_id);
1444
1445        // Challenge with 1 key.
1446        let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1447        let resp1 =
1448            handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1449                .await;
1450        if let AuditResponse::Digests { digests, .. } = resp1 {
1451            assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1452        }
1453
1454        // Challenge with 3 keys.
1455        let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1456        let resp3 =
1457            handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1458                .await;
1459        if let AuditResponse::Digests { digests, .. } = resp3 {
1460            assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1461        }
1462
1463        // Challenge with all 5 keys.
1464        let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1465        let resp5 =
1466            handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1467                .await;
1468        if let AuditResponse::Digests { digests, .. } = resp5 {
1469            assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1470        }
1471
1472        // Challenge with 0 keys (idle equivalent — no work).
1473        let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1474        let resp0 =
1475            handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1476                .await;
1477        if let AuditResponse::Digests { digests, .. } = resp0 {
1478            assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1479        }
1480    }
1481
1482    // -- Scenario 47: Bootstrap claim grace period (audit) ----------------------
1483
1484    /// Scenario 47: Challenged peer responds with bootstrapping claim during
1485    /// audit.  `handle_audit_challenge` returns `Bootstrapping`; caller records
1486    /// `BootstrapClaimFirstSeen`.  No `AuditFailure` evidence is emitted.
1487    #[tokio::test]
1488    async fn scenario_47_bootstrap_claim_grace_period_audit() {
1489        let (storage, _temp) = create_test_storage().await;
1490
1491        // Store data so there is an auditable key.
1492        let content = b"bootstrap grace test";
1493        let addr = LmdbStorage::compute_address(content);
1494        storage.put(&addr, content).await.expect("put");
1495
1496        let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1497        let self_id = peer_id_from_bytes([0x47; 32]);
1498
1499        // Bootstrapping peer → Bootstrapping response (grace period start).
1500        let response =
1501            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1502        let challenge_id = match response {
1503            AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1504            AuditResponse::Digests { .. } => {
1505                panic!("Expected Bootstrapping response during grace period")
1506            }
1507            AuditResponse::Rejected { .. } => {
1508                panic!("Unexpected Rejected response")
1509            }
1510        };
1511        assert_eq!(challenge_id, 4700);
1512
1513        // Caller records BootstrapClaimFirstSeen — verify the types support it.
1514        let peer = PeerId::from_bytes([0x47; 32]);
1515        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1516        let now = Instant::now();
1517        let observed = state.observe_bootstrap_claim(
1518            peer,
1519            now,
1520            crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1521        );
1522
1523        assert_eq!(
1524            observed,
1525            BootstrapClaimObservation::WithinGrace { first_seen: now }
1526        );
1527        assert!(
1528            state.bootstrap_claims.contains_key(&peer),
1529            "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1530        );
1531        assert!(
1532            state.bootstrap_claim_history.contains_key(&peer),
1533            "Bootstrap claim history should remember that the grace window was used"
1534        );
1535    }
1536
1537    // -- Scenario 53: Audit partial per-key failure with mixed responsibility ---
1538
1539    /// Scenario 53: P challenged on {K1, K2, K3}.  K1 matches, K2 and K3
1540    /// mismatch.  Responsibility confirmation: P is responsible for K2 but
1541    /// not K3.  `AuditFailure` emitted for {K2} only.
1542    ///
1543    /// Full `verify_digests` + `handle_audit_failure` requires a `P2PNode` for
1544    /// network lookups.  This test verifies the conceptual steps:
1545    ///   (1) Digest comparison correctly identifies K2 and K3 as failures.
1546    ///   (2) `FailureEvidence::AuditFailure` carries only confirmed keys.
1547    #[tokio::test]
1548    async fn scenario_53_partial_failure_mixed_responsibility() {
1549        let (storage, _temp) = create_test_storage().await;
1550        let nonce = [0x53; 32];
1551        let peer_id = [0x53; 32];
1552
1553        // Store K1, K2, K3.
1554        let c1 = b"scenario 53 key one";
1555        let c2 = b"scenario 53 key two";
1556        let c3 = b"scenario 53 key three";
1557        let k1 = LmdbStorage::compute_address(c1);
1558        let k2 = LmdbStorage::compute_address(c2);
1559        let k3 = LmdbStorage::compute_address(c3);
1560        storage.put(&k1, c1).await.expect("put k1");
1561        storage.put(&k2, c2).await.expect("put k2");
1562        storage.put(&k3, c3).await.expect("put k3");
1563
1564        // Correct digests from challenger's local store.
1565        let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1566        let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1567        let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1568
1569        // Simulate peer response: K1 matches, K2 wrong data, K3 wrong data.
1570        let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1571        let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1572
1573        assert_eq!(d1_expected, d1_expected, "K1 should match");
1574        assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1575        assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1576
1577        // Step 1: Identify failed keys (digest comparison).
1578        let digests = [d1_expected, d2_wrong, d3_wrong];
1579        let keys = [k1, k2, k3];
1580        let contents: [&[u8]; 3] = [c1, c2, c3];
1581
1582        let mut failed_keys = Vec::new();
1583        for (i, key) in keys.iter().enumerate() {
1584            if digests[i] == ABSENT_KEY_DIGEST {
1585                failed_keys.push(*key);
1586                continue;
1587            }
1588            let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1589            if digests[i] != expected {
1590                failed_keys.push(*key);
1591            }
1592        }
1593
1594        assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1595        assert!(failed_keys.contains(&k2));
1596        assert!(failed_keys.contains(&k3));
1597        assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1598
1599        // Step 2: Responsibility confirmation removes K3 (not responsible).
1600        // Simulate: P is in closest peers for K2 but not K3.
1601        let responsible_for_k2 = true;
1602        let responsible_for_k3 = false;
1603        let mut confirmed = Vec::new();
1604        for key in &failed_keys {
1605            let is_responsible = if *key == k2 {
1606                responsible_for_k2
1607            } else {
1608                responsible_for_k3
1609            };
1610            if is_responsible {
1611                confirmed.push(*key);
1612            }
1613        }
1614
1615        assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1616
1617        // Step 3: Construct evidence for confirmed failures only.
1618        let challenged_peer = PeerId::from_bytes(peer_id);
1619        let evidence = FailureEvidence::AuditFailure {
1620            challenge_id: 5300,
1621            challenged_peer,
1622            confirmed_failed_keys: confirmed,
1623            reason: AuditFailureReason::DigestMismatch,
1624        };
1625
1626        match evidence {
1627            FailureEvidence::AuditFailure {
1628                confirmed_failed_keys,
1629                ..
1630            } => {
1631                assert_eq!(
1632                    confirmed_failed_keys.len(),
1633                    1,
1634                    "Only K2 should generate evidence"
1635                );
1636                assert_eq!(confirmed_failed_keys[0], k2);
1637            }
1638            _ => panic!("Expected AuditFailure evidence"),
1639        }
1640    }
1641}