Skip to main content

ant_node/replication/
audit.rs

1//! Storage audit protocol (Section 15).
2//!
3//! Challenge-response for claimed holders. Anti-outsourcing protection.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::logging::{debug, info, warn};
10use rand::seq::SliceRandom;
11use rand::Rng;
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
15use crate::replication::protocol::{
16    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
17    ReplicationMessageBody, ABSENT_KEY_DIGEST,
18};
19use crate::replication::types::{AuditFailureReason, FailureEvidence, PeerSyncRecord};
20use crate::storage::LmdbStorage;
21use saorsa_core::identity::PeerId;
22use saorsa_core::P2PNode;
23
24// ---------------------------------------------------------------------------
25// Audit tick result
26// ---------------------------------------------------------------------------
27
28/// Result of an audit tick.
29#[derive(Debug)]
30pub enum AuditTickResult {
31    /// Audit completed successfully (all digests matched).
32    Passed {
33        /// The peer that was challenged.
34        challenged_peer: PeerId,
35        /// Number of keys verified.
36        keys_checked: usize,
37    },
38    /// Audit found failures (after responsibility confirmation).
39    Failed {
40        /// Evidence of the failure for trust engine.
41        evidence: FailureEvidence,
42    },
43    /// Audit target claimed bootstrapping.
44    BootstrapClaim {
45        /// The peer claiming bootstrap status.
46        peer: PeerId,
47    },
48    /// No eligible peers for audit this tick.
49    Idle,
50    /// Audit skipped (not enough local keys).
51    InsufficientKeys,
52}
53
54// ---------------------------------------------------------------------------
55// Main audit tick
56// ---------------------------------------------------------------------------
57
58/// Execute one audit tick (Section 15 steps 2-9).
59///
60/// Returns the audit result. Caller is responsible for emitting trust events.
61///
62/// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if
63/// `is_bootstrapping` is `true` — a node must not audit others while it
64/// is still bootstrapping.
65#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
66pub async fn audit_tick(
67    p2p_node: &Arc<P2PNode>,
68    storage: &Arc<LmdbStorage>,
69    config: &ReplicationConfig,
70    sync_history: &HashMap<PeerId, PeerSyncRecord>,
71    bootstrap_claims: &HashMap<PeerId, Instant>,
72    is_bootstrapping: bool,
73) -> AuditTickResult {
74    // Invariant 19: never audit while still bootstrapping.
75    if is_bootstrapping {
76        return AuditTickResult::Idle;
77    }
78
79    let dht = p2p_node.dht_manager();
80    let now = Instant::now();
81
82    // Step 2: Select one eligible peer (has RepairOpportunity) at random.
83    // Exclude peers whose bootstrap claim has exceeded the grace period —
84    // they are already penalized in handle_audit_result and should not
85    // consume an audit slot.
86    let eligible_peers: Vec<PeerId> = sync_history
87        .iter()
88        .filter(|(_, record)| record.has_repair_opportunity())
89        .filter(|(peer, _)| {
90            bootstrap_claims.get(peer).map_or(true, |first_seen| {
91                now.duration_since(*first_seen) <= config.bootstrap_claim_grace_period
92            })
93        })
94        .map(|(peer, _)| *peer)
95        .collect();
96
97    if eligible_peers.is_empty() {
98        return AuditTickResult::Idle;
99    }
100
101    let (challenged_peer, nonce, challenge_id) = {
102        let mut rng = rand::thread_rng();
103        let selected = match eligible_peers.choose(&mut rng) {
104            Some(p) => *p,
105            None => return AuditTickResult::Idle,
106        };
107        let n: [u8; 32] = rng.gen();
108        let c: u64 = rng.gen();
109        (selected, n, c)
110    };
111
112    // Step 3: Sample keys from local store and keep those the peer is
113    // responsible for (appears in the close group via local RT lookup).
114    let all_keys = match storage.all_keys().await {
115        Ok(keys) => keys,
116        Err(e) => {
117            warn!("Audit: failed to read local keys: {e}");
118            return AuditTickResult::Idle;
119        }
120    };
121
122    if all_keys.is_empty() {
123        return AuditTickResult::Idle;
124    }
125
126    let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
127    let sampled_keys: Vec<XorName> = {
128        let mut rng = rand::thread_rng();
129        all_keys
130            .choose_multiple(&mut rng, sample_count)
131            .copied()
132            .collect()
133    };
134
135    // Step 4: Filter to keys where the chosen peer is in the close group.
136    let mut peer_keys = Vec::new();
137    for key in &sampled_keys {
138        let closest = dht
139            .find_closest_nodes_local_with_self(key, config.close_group_size)
140            .await;
141        if closest.iter().any(|n| n.peer_id == challenged_peer) {
142            peer_keys.push(*key);
143        }
144    }
145
146    if peer_keys.is_empty() {
147        return AuditTickResult::Idle;
148    }
149
150    // peer_keys is naturally bounded by audit_sample_count (sqrt-scaled),
151    // so no explicit truncation needed.
152
153    // Step 6: Send challenge.
154
155    let challenge = AuditChallenge {
156        challenge_id,
157        nonce,
158        challenged_peer_id: *challenged_peer.as_bytes(),
159        keys: peer_keys.clone(),
160    };
161
162    let msg = ReplicationMessage {
163        request_id: challenge_id,
164        body: ReplicationMessageBody::AuditChallenge(challenge),
165    };
166
167    let encoded = match msg.encode() {
168        Ok(data) => data,
169        Err(e) => {
170            warn!("Audit: failed to encode challenge: {e}");
171            return AuditTickResult::Idle;
172        }
173    };
174
175    let response = match p2p_node
176        .send_request(
177            &challenged_peer,
178            REPLICATION_PROTOCOL_ID,
179            encoded,
180            config.audit_response_timeout(peer_keys.len()),
181        )
182        .await
183    {
184        Ok(resp) => resp,
185        Err(e) => {
186            debug!("Audit: challenge to {challenged_peer} failed: {e}");
187            // Timeout — need responsibility confirmation before penalty.
188            return handle_audit_timeout(
189                &challenged_peer,
190                challenge_id,
191                &peer_keys,
192                p2p_node,
193                config,
194            )
195            .await;
196        }
197    };
198
199    // Step 7: Parse response.
200    let resp_msg = match ReplicationMessage::decode(&response.data) {
201        Ok(m) => m,
202        Err(e) => {
203            warn!("Audit: failed to decode response from {challenged_peer}: {e}");
204            return handle_audit_failure(
205                &challenged_peer,
206                challenge_id,
207                &peer_keys,
208                AuditFailureReason::MalformedResponse,
209                p2p_node,
210                config,
211            )
212            .await;
213        }
214    };
215
216    match resp_msg.body {
217        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
218            challenge_id: resp_id,
219        }) => {
220            if resp_id != challenge_id {
221                warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
222                return handle_audit_failure(
223                    &challenged_peer,
224                    challenge_id,
225                    &peer_keys,
226                    AuditFailureReason::MalformedResponse,
227                    p2p_node,
228                    config,
229                )
230                .await;
231            }
232            // Step 7b: Bootstrapping claim.
233            AuditTickResult::BootstrapClaim {
234                peer: challenged_peer,
235            }
236        }
237        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
238            challenge_id: resp_id,
239            digests,
240        }) => {
241            if resp_id != challenge_id {
242                warn!("Audit: challenge ID mismatch from {challenged_peer}");
243                return handle_audit_failure(
244                    &challenged_peer,
245                    challenge_id,
246                    &peer_keys,
247                    AuditFailureReason::MalformedResponse,
248                    p2p_node,
249                    config,
250                )
251                .await;
252            }
253            verify_digests(
254                &challenged_peer,
255                challenge_id,
256                &nonce,
257                &peer_keys,
258                &digests,
259                storage,
260                p2p_node,
261                config,
262            )
263            .await
264        }
265        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
266            challenge_id: resp_id,
267            reason,
268        }) => {
269            if resp_id != challenge_id {
270                warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
271                return handle_audit_failure(
272                    &challenged_peer,
273                    challenge_id,
274                    &peer_keys,
275                    AuditFailureReason::MalformedResponse,
276                    p2p_node,
277                    config,
278                )
279                .await;
280            }
281            warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
282            handle_audit_failure(
283                &challenged_peer,
284                challenge_id,
285                &peer_keys,
286                AuditFailureReason::Rejected,
287                p2p_node,
288                config,
289            )
290            .await
291        }
292        _ => {
293            warn!("Audit: unexpected response type from {challenged_peer}");
294            handle_audit_failure(
295                &challenged_peer,
296                challenge_id,
297                &peer_keys,
298                AuditFailureReason::MalformedResponse,
299                p2p_node,
300                config,
301            )
302            .await
303        }
304    }
305}
306
307// ---------------------------------------------------------------------------
308// Digest verification
309// ---------------------------------------------------------------------------
310
311/// Verify per-key digests from audit response (Step 8).
312#[allow(clippy::too_many_arguments)]
313async fn verify_digests(
314    challenged_peer: &PeerId,
315    challenge_id: u64,
316    nonce: &[u8; 32],
317    keys: &[XorName],
318    digests: &[[u8; 32]],
319    storage: &Arc<LmdbStorage>,
320    p2p_node: &Arc<P2PNode>,
321    config: &ReplicationConfig,
322) -> AuditTickResult {
323    // Requirement: response must have exactly one digest per key.
324    if digests.len() != keys.len() {
325        warn!(
326            "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
327            digests.len(),
328            keys.len()
329        );
330        return handle_audit_failure(
331            challenged_peer,
332            challenge_id,
333            keys,
334            AuditFailureReason::MalformedResponse,
335            p2p_node,
336            config,
337        )
338        .await;
339    }
340
341    let challenged_peer_bytes = challenged_peer.as_bytes();
342    let mut failed_keys = Vec::new();
343
344    for (i, key) in keys.iter().enumerate() {
345        let received_digest = &digests[i];
346
347        // Check for absent sentinel.
348        if *received_digest == ABSENT_KEY_DIGEST {
349            failed_keys.push(*key);
350            continue;
351        }
352
353        // Recompute expected digest from local copy.
354        let local_bytes = match storage.get_raw(key).await {
355            Ok(Some(bytes)) => bytes,
356            Ok(None) => {
357                // We should hold this key (we sampled it), but it's gone.
358                warn!(
359                    "Audit: local key {} disappeared during audit",
360                    hex::encode(key)
361                );
362                continue;
363            }
364            Err(e) => {
365                warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
366                continue;
367            }
368        };
369
370        let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
371        if *received_digest != expected {
372            failed_keys.push(*key);
373        }
374    }
375
376    if failed_keys.is_empty() {
377        info!(
378            "Audit: peer {challenged_peer} passed (all {} keys verified)",
379            keys.len()
380        );
381        return AuditTickResult::Passed {
382            challenged_peer: *challenged_peer,
383            keys_checked: keys.len(),
384        };
385    }
386
387    // Step 9: Responsibility confirmation for failed keys.
388    handle_audit_failure(
389        challenged_peer,
390        challenge_id,
391        &failed_keys,
392        AuditFailureReason::DigestMismatch,
393        p2p_node,
394        config,
395    )
396    .await
397}
398
399// ---------------------------------------------------------------------------
400// Failure handling with responsibility confirmation
401// ---------------------------------------------------------------------------
402
403/// Handle audit failure: confirm responsibility before emitting evidence (Step 9).
404async fn handle_audit_failure(
405    challenged_peer: &PeerId,
406    challenge_id: u64,
407    failed_keys: &[XorName],
408    reason: AuditFailureReason,
409    p2p_node: &Arc<P2PNode>,
410    config: &ReplicationConfig,
411) -> AuditTickResult {
412    let dht = p2p_node.dht_manager();
413    let mut confirmed_failures = Vec::new();
414
415    // Step 9a-b: Fresh local RT lookup for each failed key.
416    for key in failed_keys {
417        let closest = dht
418            .find_closest_nodes_local_with_self(key, config.close_group_size)
419            .await;
420        if closest.iter().any(|n| n.peer_id == *challenged_peer) {
421            confirmed_failures.push(*key);
422        } else {
423            debug!(
424                "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
425                hex::encode(key)
426            );
427        }
428    }
429
430    // Step 9c: Empty confirmed set -> peer is no longer responsible for any
431    // of the failed keys (topology churn). This is NOT a pass — the peer did
432    // not prove it stores the data. Return Idle to avoid granting unearned
433    // positive trust.
434    if confirmed_failures.is_empty() {
435        info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
436        return AuditTickResult::Idle;
437    }
438
439    // Step 9d: Non-empty confirmed set -> emit evidence.
440    let evidence = FailureEvidence::AuditFailure {
441        challenge_id,
442        challenged_peer: *challenged_peer,
443        confirmed_failed_keys: confirmed_failures,
444        reason,
445    };
446
447    AuditTickResult::Failed { evidence }
448}
449
450/// Handle audit timeout (no response received).
451async fn handle_audit_timeout(
452    challenged_peer: &PeerId,
453    challenge_id: u64,
454    keys: &[XorName],
455    p2p_node: &Arc<P2PNode>,
456    config: &ReplicationConfig,
457) -> AuditTickResult {
458    handle_audit_failure(
459        challenged_peer,
460        challenge_id,
461        keys,
462        AuditFailureReason::Timeout,
463        p2p_node,
464        config,
465    )
466    .await
467}
468
469// ---------------------------------------------------------------------------
470// Responder-side handler
471// ---------------------------------------------------------------------------
472
473/// Handle an incoming audit challenge (responder side).
474///
475/// Validates that the challenge targets this node, computes per-key digests,
476/// and returns the response.  Rejects challenges where
477/// `challenged_peer_id` does not match `self_peer_id` to prevent an oracle
478/// attack where a malicious challenger forges digests for a different peer.
479pub async fn handle_audit_challenge(
480    challenge: &AuditChallenge,
481    storage: &LmdbStorage,
482    self_peer_id: &PeerId,
483    is_bootstrapping: bool,
484    stored_chunks: usize,
485) -> AuditResponse {
486    if is_bootstrapping {
487        return AuditResponse::Bootstrapping {
488            challenge_id: challenge.challenge_id,
489        };
490    }
491
492    if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
493        warn!(
494            "Audit challenge targeted wrong peer: expected {}, got {}",
495            hex::encode(self_peer_id.as_bytes()),
496            hex::encode(challenge.challenged_peer_id),
497        );
498        return AuditResponse::Rejected {
499            challenge_id: challenge.challenge_id,
500            reason: "challenged_peer_id does not match this node".to_string(),
501        };
502    }
503
504    let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
505    if challenge.keys.len() > max_keys {
506        warn!(
507            "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
508             (stored_chunks={stored_chunks})",
509            challenge.keys.len(),
510        );
511        return AuditResponse::Rejected {
512            challenge_id: challenge.challenge_id,
513            reason: format!(
514                "challenge contains {} keys, limit is {max_keys}",
515                challenge.keys.len()
516            ),
517        };
518    }
519
520    let mut digests = Vec::with_capacity(challenge.keys.len());
521
522    for key in &challenge.keys {
523        match storage.get_raw(key).await {
524            Ok(Some(data)) => {
525                let digest = compute_audit_digest(
526                    &challenge.nonce,
527                    &challenge.challenged_peer_id,
528                    key,
529                    &data,
530                );
531                digests.push(digest);
532            }
533            Ok(None) => {
534                digests.push(ABSENT_KEY_DIGEST);
535            }
536            Err(e) => {
537                warn!(
538                    "Audit responder: failed to read key {}: {e}",
539                    hex::encode(key)
540                );
541                digests.push(ABSENT_KEY_DIGEST);
542            }
543        }
544    }
545
546    AuditResponse::Digests {
547        challenge_id: challenge.challenge_id,
548        digests,
549    }
550}
551
552// ---------------------------------------------------------------------------
553// Tests
554// ---------------------------------------------------------------------------
555
556#[cfg(test)]
557#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
558mod tests {
559    use super::*;
560    use crate::replication::protocol::compute_audit_digest;
561    use crate::replication::types::NeighborSyncState;
562    use crate::storage::LmdbStorageConfig;
563    use tempfile::TempDir;
564
565    /// Simulated stored chunk count for tests. Large enough that the dynamic
566    /// incoming audit limit (`2 * sqrt(N)`) never rejects small test challenges.
567    const TEST_STORED_CHUNKS: usize = 1_000_000;
568
569    /// Create a test `LmdbStorage` backed by a temp directory.
570    async fn create_test_storage() -> (LmdbStorage, TempDir) {
571        let temp_dir = TempDir::new().expect("create temp dir");
572        let config = LmdbStorageConfig {
573            root_dir: temp_dir.path().to_path_buf(),
574            verify_on_read: false,
575            max_map_size: 0,
576            disk_reserve: 0,
577        };
578        let storage = LmdbStorage::new(config).await.expect("create storage");
579        (storage, temp_dir)
580    }
581
582    /// Build a challenge with the given parameters.
583    fn make_challenge(
584        challenge_id: u64,
585        nonce: [u8; 32],
586        peer_id: [u8; 32],
587        keys: Vec<XorName>,
588    ) -> AuditChallenge {
589        AuditChallenge {
590            challenge_id,
591            nonce,
592            challenged_peer_id: peer_id,
593            keys,
594        }
595    }
596
597    /// Build a `PeerId` matching the raw bytes used in a challenge.
598    fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
599        PeerId::from_bytes(bytes)
600    }
601
602    // -- handle_audit_challenge: present keys ---------------------------------
603
604    #[tokio::test]
605    async fn handle_challenge_present_keys_returns_correct_digests() {
606        let (storage, _temp) = create_test_storage().await;
607
608        // Store two chunks.
609        let content_a = b"chunk alpha";
610        let addr_a = LmdbStorage::compute_address(content_a);
611        storage.put(&addr_a, content_a).await.expect("put a");
612
613        let content_b = b"chunk beta";
614        let addr_b = LmdbStorage::compute_address(content_b);
615        storage.put(&addr_b, content_b).await.expect("put b");
616
617        let nonce = [0xAA; 32];
618        let peer_id = [0xBB; 32];
619        let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
620        let self_id = peer_id_from_bytes(peer_id);
621
622        let response =
623            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
624
625        match response {
626            AuditResponse::Digests {
627                challenge_id,
628                digests,
629            } => {
630                assert_eq!(challenge_id, 42);
631                assert_eq!(digests.len(), 2);
632
633                let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
634                let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
635                assert_eq!(digests[0], expected_a);
636                assert_eq!(digests[1], expected_b);
637            }
638            AuditResponse::Bootstrapping { .. } => {
639                panic!("expected Digests, got Bootstrapping");
640            }
641            AuditResponse::Rejected { .. } => {
642                panic!("Unexpected Rejected response");
643            }
644        }
645    }
646
647    // -- handle_audit_challenge: absent keys ----------------------------------
648
649    #[tokio::test]
650    async fn handle_challenge_absent_keys_returns_sentinel() {
651        let (storage, _temp) = create_test_storage().await;
652
653        let absent_key = [0xFF; 32];
654        let nonce = [0x11; 32];
655        let peer_id = [0x22; 32];
656        let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
657        let self_id = peer_id_from_bytes(peer_id);
658
659        let response =
660            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
661
662        match response {
663            AuditResponse::Digests {
664                challenge_id,
665                digests,
666            } => {
667                assert_eq!(challenge_id, 99);
668                assert_eq!(digests.len(), 1);
669                assert_eq!(
670                    digests[0], ABSENT_KEY_DIGEST,
671                    "absent key should produce sentinel digest"
672                );
673            }
674            AuditResponse::Bootstrapping { .. } => {
675                panic!("expected Digests, got Bootstrapping");
676            }
677            AuditResponse::Rejected { .. } => {
678                panic!("Unexpected Rejected response");
679            }
680        }
681    }
682
683    // -- handle_audit_challenge: mixed present and absent ---------------------
684
685    #[tokio::test]
686    async fn handle_challenge_mixed_present_and_absent() {
687        let (storage, _temp) = create_test_storage().await;
688
689        let content = b"present chunk";
690        let addr_present = LmdbStorage::compute_address(content);
691        storage.put(&addr_present, content).await.expect("put");
692
693        let addr_absent = [0xDE; 32];
694        let nonce = [0x33; 32];
695        let peer_id = [0x44; 32];
696        let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
697        let self_id = peer_id_from_bytes(peer_id);
698
699        let response =
700            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
701
702        match response {
703            AuditResponse::Digests { digests, .. } => {
704                assert_eq!(digests.len(), 2);
705
706                let expected_present =
707                    compute_audit_digest(&nonce, &peer_id, &addr_present, content);
708                assert_eq!(digests[0], expected_present);
709                assert_eq!(
710                    digests[1], ABSENT_KEY_DIGEST,
711                    "absent key should be sentinel"
712                );
713            }
714            AuditResponse::Bootstrapping { .. } => {
715                panic!("expected Digests, got Bootstrapping");
716            }
717            AuditResponse::Rejected { .. } => {
718                panic!("Unexpected Rejected response");
719            }
720        }
721    }
722
723    // -- handle_audit_challenge: bootstrapping --------------------------------
724
725    #[tokio::test]
726    async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
727        let (storage, _temp) = create_test_storage().await;
728
729        let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
730        let self_id = peer_id_from_bytes([0x01; 32]);
731
732        let response =
733            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
734
735        match response {
736            AuditResponse::Bootstrapping { challenge_id } => {
737                assert_eq!(challenge_id, 55);
738            }
739            AuditResponse::Digests { .. } => {
740                panic!("expected Bootstrapping, got Digests");
741            }
742            AuditResponse::Rejected { .. } => {
743                panic!("Unexpected Rejected response");
744            }
745        }
746    }
747
748    // -- handle_audit_challenge: empty key list -------------------------------
749
750    #[tokio::test]
751    async fn handle_challenge_empty_keys_returns_empty_digests() {
752        let (storage, _temp) = create_test_storage().await;
753
754        let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
755        let self_id = peer_id_from_bytes([0x20; 32]);
756
757        let response =
758            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
759
760        match response {
761            AuditResponse::Digests {
762                challenge_id,
763                digests,
764            } => {
765                assert_eq!(challenge_id, 100);
766                assert!(
767                    digests.is_empty(),
768                    "empty key list should yield empty digests"
769                );
770            }
771            AuditResponse::Bootstrapping { .. } => {
772                panic!("expected Digests, got Bootstrapping");
773            }
774            AuditResponse::Rejected { .. } => {
775                panic!("Unexpected Rejected response");
776            }
777        }
778    }
779
780    // -- Digest verification: matching ----------------------------------------
781
782    #[test]
783    fn digest_verification_matching() {
784        let nonce = [0x01; 32];
785        let peer_id = [0x02; 32];
786        let key: XorName = [0x03; 32];
787        let data = b"correct data";
788
789        let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
790        let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
791
792        assert_eq!(
793            expected, recomputed,
794            "same inputs must produce identical digests"
795        );
796        assert_ne!(
797            expected, ABSENT_KEY_DIGEST,
798            "real digest must not be sentinel"
799        );
800    }
801
802    // -- Digest verification: mismatching -------------------------------------
803
804    #[test]
805    fn digest_verification_mismatching_data() {
806        let nonce = [0x01; 32];
807        let peer_id = [0x02; 32];
808        let key: XorName = [0x03; 32];
809
810        let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
811        let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
812
813        assert_ne!(
814            digest_a, digest_b,
815            "different data must produce different digests"
816        );
817    }
818
819    #[test]
820    fn digest_verification_mismatching_nonce() {
821        let peer_id = [0x02; 32];
822        let key: XorName = [0x03; 32];
823        let data = b"same data";
824
825        let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
826        let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
827
828        assert_ne!(
829            digest_a, digest_b,
830            "different nonces must produce different digests"
831        );
832    }
833
834    #[test]
835    fn digest_verification_mismatching_peer() {
836        let nonce = [0x01; 32];
837        let key: XorName = [0x03; 32];
838        let data = b"same data";
839
840        let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
841        let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
842
843        assert_ne!(
844            digest_a, digest_b,
845            "different peers must produce different digests"
846        );
847    }
848
849    #[test]
850    fn digest_verification_mismatching_key() {
851        let nonce = [0x01; 32];
852        let peer_id = [0x02; 32];
853        let data = b"same data";
854
855        let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
856        let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
857
858        assert_ne!(
859            digest_a, digest_b,
860            "different keys must produce different digests"
861        );
862    }
863
864    // -- Absent sentinel is all zeros -----------------------------------------
865
866    #[test]
867    fn absent_sentinel_is_all_zeros() {
868        assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
869    }
870
871    // -- Bootstrapping skips digest computation even with stored keys ---------
872
873    #[tokio::test]
874    async fn bootstrapping_skips_digest_computation() {
875        let (storage, _temp) = create_test_storage().await;
876
877        let content = b"stored but bootstrapping";
878        let addr = LmdbStorage::compute_address(content);
879        storage.put(&addr, content).await.expect("put");
880
881        let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
882        let self_id = peer_id_from_bytes([0xDD; 32]);
883
884        let response =
885            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
886
887        assert!(
888            matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
889            "bootstrapping node must not compute digests"
890        );
891    }
892
893    // -- Scenario 19/53: Partial failure with mixed responsibility ----------------
894
895    #[tokio::test]
896    async fn scenario_19_partial_failure_mixed_responsibility() {
897        // Three keys challenged: K1 matches, K2 mismatches, K3 absent.
898        // After responsibility confirmation, only K2 is confirmed responsible.
899        // AuditFailure emitted for {K2} only.
900        // Test handle_audit_challenge with mixed results, then verify
901        // the digest logic manually.
902
903        let (storage, _temp) = create_test_storage().await;
904        let nonce = [0x42u8; 32];
905        let peer_id = [0xAA; 32];
906
907        // Store K1 and K2, but NOT K3
908        let content_k1 = b"key one data";
909        let addr_k1 = LmdbStorage::compute_address(content_k1);
910        storage.put(&addr_k1, content_k1).await.unwrap();
911
912        let content_k2 = b"key two data";
913        let addr_k2 = LmdbStorage::compute_address(content_k2);
914        storage.put(&addr_k2, content_k2).await.unwrap();
915
916        let addr_k3 = [0xFF; 32]; // Not stored
917
918        let challenge = AuditChallenge {
919            challenge_id: 100,
920            nonce,
921            challenged_peer_id: peer_id,
922            keys: vec![addr_k1, addr_k2, addr_k3],
923        };
924        let self_id = peer_id_from_bytes(peer_id);
925
926        let response =
927            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
928
929        match response {
930            AuditResponse::Digests { digests, .. } => {
931                assert_eq!(digests.len(), 3);
932
933                // K1 should have correct digest
934                let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
935                assert_eq!(digests[0], expected_k1);
936
937                // K2 should have correct digest
938                let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
939                assert_eq!(digests[1], expected_k2);
940
941                // K3 absent -> sentinel
942                assert_eq!(digests[2], ABSENT_KEY_DIGEST);
943            }
944            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
945            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
946        }
947    }
948
949    // -- Scenario 54: All digests pass -------------------------------------------
950
951    #[tokio::test]
952    async fn scenario_54_all_digests_pass() {
953        // All challenged keys present and digests match.
954        // Multiple keys to strengthen coverage beyond existing two-key tests.
955        let (storage, _temp) = create_test_storage().await;
956        let nonce = [0x10; 32];
957        let peer_id = [0x20; 32];
958
959        let c1 = b"chunk alpha";
960        let c2 = b"chunk beta";
961        let c3 = b"chunk gamma";
962        let a1 = LmdbStorage::compute_address(c1);
963        let a2 = LmdbStorage::compute_address(c2);
964        let a3 = LmdbStorage::compute_address(c3);
965        storage.put(&a1, c1).await.unwrap();
966        storage.put(&a2, c2).await.unwrap();
967        storage.put(&a3, c3).await.unwrap();
968
969        let challenge = AuditChallenge {
970            challenge_id: 200,
971            nonce,
972            challenged_peer_id: peer_id,
973            keys: vec![a1, a2, a3],
974        };
975        let self_id = peer_id_from_bytes(peer_id);
976
977        let response =
978            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
979        match response {
980            AuditResponse::Digests { digests, .. } => {
981                assert_eq!(digests.len(), 3);
982                for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
983                    .iter()
984                    .enumerate()
985                {
986                    let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
987                    assert_eq!(digests[i], expected, "Key {i} digest should match");
988                }
989            }
990            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
991            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
992        }
993    }
994
995    // -- Scenario 55: Empty failure set means no evidence -------------------------
996
997    /// Scenario 55: Peer challenged on {K1, K2}. Both digests mismatch.
998    /// Responsibility confirmation shows the peer is NOT responsible for
999    /// either key. The confirmed failure set is empty — no `AuditFailure`
1000    /// evidence is emitted.
1001    ///
1002    /// Full `verify_digests` requires a live `P2PNode` for network lookups.
1003    /// This test exercises the deterministic sub-steps:
1004    ///   (1) Digest comparison identifies K1 and K2 as mismatches.
1005    ///   (2) Responsibility confirmation removes both keys.
1006    ///   (3) Empty confirmed failure set means no evidence.
1007    #[tokio::test]
1008    async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1009        let (storage, _temp) = create_test_storage().await;
1010        let nonce = [0x55; 32];
1011        let peer_id = [0x55; 32];
1012
1013        // Store K1 and K2 on the challenger (for expected digest computation).
1014        let c1 = b"scenario 55 key one";
1015        let c2 = b"scenario 55 key two";
1016        let k1 = LmdbStorage::compute_address(c1);
1017        let k2 = LmdbStorage::compute_address(c2);
1018        storage.put(&k1, c1).await.expect("put k1");
1019        storage.put(&k2, c2).await.expect("put k2");
1020
1021        // Challenger computes expected digests.
1022        let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1023        let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1024
1025        // Simulate peer returning WRONG digests for both keys.
1026        let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1027        let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1028        assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1029        assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1030
1031        // Step 1: Identify failed keys via digest comparison.
1032        let keys = [k1, k2];
1033        let expected = [expected_d1, expected_d2];
1034        let received = [wrong_d1, wrong_d2];
1035
1036        let mut failed_keys = Vec::new();
1037        for i in 0..keys.len() {
1038            if received[i] != expected[i] {
1039                failed_keys.push(keys[i]);
1040            }
1041        }
1042        assert_eq!(
1043            failed_keys.len(),
1044            2,
1045            "Both keys should be identified as digest mismatches"
1046        );
1047
1048        // Step 2: Responsibility confirmation — peer is NOT responsible for
1049        // either key (simulated by filtering them all out).
1050        let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1051        let confirmed_failures: Vec<XorName> = failed_keys
1052            .into_iter()
1053            .filter(|k| confirmed_responsible_keys.contains(k))
1054            .collect();
1055
1056        // Step 3: Empty confirmed failure set → no AuditFailure evidence.
1057        assert!(
1058            confirmed_failures.is_empty(),
1059            "With no confirmed responsibility, failure set must be empty — \
1060             no AuditFailure evidence should be emitted"
1061        );
1062
1063        // Verify that constructing evidence with empty keys results in a
1064        // no-penalty outcome (the caller checks is_empty before emitting).
1065        let peer = PeerId::from_bytes(peer_id);
1066        let evidence = FailureEvidence::AuditFailure {
1067            challenge_id: 5500,
1068            challenged_peer: peer,
1069            confirmed_failed_keys: confirmed_failures,
1070            reason: AuditFailureReason::DigestMismatch,
1071        };
1072        if let FailureEvidence::AuditFailure {
1073            confirmed_failed_keys,
1074            ..
1075        } = evidence
1076        {
1077            assert!(
1078                confirmed_failed_keys.is_empty(),
1079                "Evidence with empty failure set should not trigger a trust penalty"
1080            );
1081        }
1082    }
1083
1084    // -- Scenario 56: RepairOpportunity filters never-synced peers ----------------
1085
1086    #[test]
1087    fn scenario_56_repair_opportunity_filters_never_synced() {
1088        // PeerSyncRecord with last_sync=None should not pass
1089        // has_repair_opportunity().
1090
1091        let never_synced = PeerSyncRecord {
1092            last_sync: None,
1093            cycles_since_sync: 5,
1094        };
1095        assert!(!never_synced.has_repair_opportunity());
1096
1097        let synced_no_cycle = PeerSyncRecord {
1098            last_sync: Some(Instant::now()),
1099            cycles_since_sync: 0,
1100        };
1101        assert!(!synced_no_cycle.has_repair_opportunity());
1102
1103        let synced_with_cycle = PeerSyncRecord {
1104            last_sync: Some(Instant::now()),
1105            cycles_since_sync: 1,
1106        };
1107        assert!(synced_with_cycle.has_repair_opportunity());
1108    }
1109
1110    // -- Audit response must match key count --------------------------------------
1111
1112    #[tokio::test]
1113    async fn audit_response_must_match_key_count() {
1114        // Section 15: "A response is invalid if it has fewer or more entries
1115        // than challenged keys."
1116        // Verify handle_audit_challenge always produces exactly N digests for
1117        // N keys, including edge cases.
1118
1119        let (storage, _temp) = create_test_storage().await;
1120        let nonce = [0x50; 32];
1121        let peer_id = [0x60; 32];
1122
1123        // Store a single chunk
1124        let content = b"single chunk";
1125        let addr = LmdbStorage::compute_address(content);
1126        storage.put(&addr, content).await.unwrap();
1127
1128        // Challenge with 1 stored + 4 absent = 5 keys total
1129        let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1130        let mut keys = vec![addr];
1131        keys.extend_from_slice(&absent_keys);
1132
1133        let key_count = keys.len();
1134        let challenge = make_challenge(300, nonce, peer_id, keys);
1135        let self_id = peer_id_from_bytes(peer_id);
1136
1137        let response =
1138            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1139        match response {
1140            AuditResponse::Digests { digests, .. } => {
1141                assert_eq!(
1142                    digests.len(),
1143                    key_count,
1144                    "must produce exactly one digest per challenged key"
1145                );
1146            }
1147            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1148            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1149        }
1150    }
1151
1152    // -- Audit digest uses full record bytes --------------------------------------
1153
1154    #[test]
1155    fn audit_digest_uses_full_record_bytes() {
1156        // Verify digest changes when record content changes.
1157        let nonce = [1u8; 32];
1158        let peer = [2u8; 32];
1159        let key = [3u8; 32];
1160
1161        let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1162        let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1163        assert_ne!(
1164            d1, d2,
1165            "Different record bytes must produce different digests"
1166        );
1167    }
1168
1169    // -- Scenario 29: Audit start gate ------------------------------------------
1170
1171    /// Scenario 29: `handle_audit_challenge` returns `Bootstrapping` when the
1172    /// node is still bootstrapping — audit digests are never computed, and no
1173    /// `AuditFailure` evidence is emitted by the caller.
1174    ///
1175    /// This is the responder-side gate.  The challenger-side gate is enforced
1176    /// by `audit_tick`'s `is_bootstrapping` guard (Invariant 19) and by
1177    /// `check_bootstrap_drained()` in the engine loop; this test confirms the
1178    /// complementary responder behavior.
1179    #[tokio::test]
1180    async fn scenario_29_audit_start_gate_during_bootstrap() {
1181        let (storage, _temp) = create_test_storage().await;
1182
1183        // Store data so there *would* be work to audit.
1184        let content = b"should not be audited during bootstrap";
1185        let addr = LmdbStorage::compute_address(content);
1186        storage.put(&addr, content).await.expect("put");
1187
1188        let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1189        let self_id = peer_id_from_bytes([0x29; 32]);
1190
1191        // Responder is bootstrapping → Bootstrapping response, NOT Digests.
1192        let response =
1193            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1194        assert!(
1195            matches!(
1196                response,
1197                AuditResponse::Bootstrapping { challenge_id: 2900 }
1198            ),
1199            "bootstrapping node must not compute digests — audit start gate"
1200        );
1201
1202        // Responder is NOT bootstrapping → normal Digests.
1203        let response =
1204            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1205        assert!(
1206            matches!(response, AuditResponse::Digests { .. }),
1207            "drained node should compute digests normally"
1208        );
1209    }
1210
1211    // -- Scenario 30: Audit peer selection from sampled keys --------------------
1212
1213    /// Scenario 30: Key sampling uses dynamic sqrt-based batch sizing and
1214    /// `RepairOpportunity` filtering excludes never-synced peers.
1215    ///
1216    /// Full `audit_tick` requires a live network.  This test verifies the two
1217    /// deterministic sub-steps the function relies on:
1218    ///   (a) `audit_sample_count` scales with `sqrt(total_keys)`.
1219    ///   (b) `PeerSyncRecord::has_repair_opportunity` gates peer eligibility.
1220    #[test]
1221    fn scenario_30_audit_peer_selection_from_sampled_keys() {
1222        // (a) Dynamic sample count scales with sqrt(total_keys).
1223        assert_eq!(
1224            ReplicationConfig::audit_sample_count(100),
1225            10,
1226            "sample count should scale with sqrt(total_keys)"
1227        );
1228
1229        assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1230
1231        assert_eq!(
1232            ReplicationConfig::audit_sample_count(10_000),
1233            100,
1234            "sqrt(10000) = 100"
1235        );
1236
1237        // (b) Peer eligibility via RepairOpportunity.
1238        // Never synced → not eligible.
1239        let never = PeerSyncRecord {
1240            last_sync: None,
1241            cycles_since_sync: 10,
1242        };
1243        assert!(!never.has_repair_opportunity());
1244
1245        // Synced but zero subsequent cycles → not eligible.
1246        let too_soon = PeerSyncRecord {
1247            last_sync: Some(Instant::now()),
1248            cycles_since_sync: 0,
1249        };
1250        assert!(!too_soon.has_repair_opportunity());
1251
1252        // Synced with ≥1 cycle → eligible.
1253        let eligible = PeerSyncRecord {
1254            last_sync: Some(Instant::now()),
1255            cycles_since_sync: 2,
1256        };
1257        assert!(eligible.has_repair_opportunity());
1258    }
1259
1260    // -- Scenario 32: Dynamic challenge size ------------------------------------
1261
1262    /// Scenario 32: Challenge key count equals `|PeerKeySet(challenged_peer)|`,
1263    /// which is dynamic per round.  If no eligible peer remains after filtering,
1264    /// the tick is idle.
1265    ///
1266    /// Verified via `handle_audit_challenge`: the response digest count always
1267    /// equals the number of keys in the challenge.
1268    #[tokio::test]
1269    async fn scenario_32_dynamic_challenge_size() {
1270        let (storage, _temp) = create_test_storage().await;
1271
1272        // Store varying numbers of chunks.
1273        let mut addrs = Vec::new();
1274        for i in 0u8..5 {
1275            let content = format!("dynamic challenge key {i}");
1276            let addr = LmdbStorage::compute_address(content.as_bytes());
1277            storage.put(&addr, content.as_bytes()).await.expect("put");
1278            addrs.push(addr);
1279        }
1280
1281        let nonce = [0x32; 32];
1282        let peer_id = [0x32; 32];
1283        let self_id = peer_id_from_bytes(peer_id);
1284
1285        // Challenge with 1 key.
1286        let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1287        let resp1 =
1288            handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1289                .await;
1290        if let AuditResponse::Digests { digests, .. } = resp1 {
1291            assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1292        }
1293
1294        // Challenge with 3 keys.
1295        let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1296        let resp3 =
1297            handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1298                .await;
1299        if let AuditResponse::Digests { digests, .. } = resp3 {
1300            assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1301        }
1302
1303        // Challenge with all 5 keys.
1304        let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1305        let resp5 =
1306            handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1307                .await;
1308        if let AuditResponse::Digests { digests, .. } = resp5 {
1309            assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1310        }
1311
1312        // Challenge with 0 keys (idle equivalent — no work).
1313        let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1314        let resp0 =
1315            handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1316                .await;
1317        if let AuditResponse::Digests { digests, .. } = resp0 {
1318            assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1319        }
1320    }
1321
1322    // -- Scenario 47: Bootstrap claim grace period (audit) ----------------------
1323
1324    /// Scenario 47: Challenged peer responds with bootstrapping claim during
1325    /// audit.  `handle_audit_challenge` returns `Bootstrapping`; caller records
1326    /// `BootstrapClaimFirstSeen`.  No `AuditFailure` evidence is emitted.
1327    #[tokio::test]
1328    async fn scenario_47_bootstrap_claim_grace_period_audit() {
1329        let (storage, _temp) = create_test_storage().await;
1330
1331        // Store data so there is an auditable key.
1332        let content = b"bootstrap grace test";
1333        let addr = LmdbStorage::compute_address(content);
1334        storage.put(&addr, content).await.expect("put");
1335
1336        let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1337        let self_id = peer_id_from_bytes([0x47; 32]);
1338
1339        // Bootstrapping peer → Bootstrapping response (grace period start).
1340        let response =
1341            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1342        let challenge_id = match response {
1343            AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1344            AuditResponse::Digests { .. } => {
1345                panic!("Expected Bootstrapping response during grace period")
1346            }
1347            AuditResponse::Rejected { .. } => {
1348                panic!("Unexpected Rejected response")
1349            }
1350        };
1351        assert_eq!(challenge_id, 4700);
1352
1353        // Caller records BootstrapClaimFirstSeen — verify the types support it.
1354        let peer = PeerId::from_bytes([0x47; 32]);
1355        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1356        let now = Instant::now();
1357        state.bootstrap_claims.entry(peer).or_insert(now);
1358
1359        assert!(
1360            state.bootstrap_claims.contains_key(&peer),
1361            "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1362        );
1363    }
1364
1365    // -- Scenario 53: Audit partial per-key failure with mixed responsibility ---
1366
1367    /// Scenario 53: P challenged on {K1, K2, K3}.  K1 matches, K2 and K3
1368    /// mismatch.  Responsibility confirmation: P is responsible for K2 but
1369    /// not K3.  `AuditFailure` emitted for {K2} only.
1370    ///
1371    /// Full `verify_digests` + `handle_audit_failure` requires a `P2PNode` for
1372    /// network lookups.  This test verifies the conceptual steps:
1373    ///   (1) Digest comparison correctly identifies K2 and K3 as failures.
1374    ///   (2) `FailureEvidence::AuditFailure` carries only confirmed keys.
1375    #[tokio::test]
1376    async fn scenario_53_partial_failure_mixed_responsibility() {
1377        let (storage, _temp) = create_test_storage().await;
1378        let nonce = [0x53; 32];
1379        let peer_id = [0x53; 32];
1380
1381        // Store K1, K2, K3.
1382        let c1 = b"scenario 53 key one";
1383        let c2 = b"scenario 53 key two";
1384        let c3 = b"scenario 53 key three";
1385        let k1 = LmdbStorage::compute_address(c1);
1386        let k2 = LmdbStorage::compute_address(c2);
1387        let k3 = LmdbStorage::compute_address(c3);
1388        storage.put(&k1, c1).await.expect("put k1");
1389        storage.put(&k2, c2).await.expect("put k2");
1390        storage.put(&k3, c3).await.expect("put k3");
1391
1392        // Correct digests from challenger's local store.
1393        let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1394        let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1395        let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1396
1397        // Simulate peer response: K1 matches, K2 wrong data, K3 wrong data.
1398        let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1399        let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1400
1401        assert_eq!(d1_expected, d1_expected, "K1 should match");
1402        assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1403        assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1404
1405        // Step 1: Identify failed keys (digest comparison).
1406        let digests = [d1_expected, d2_wrong, d3_wrong];
1407        let keys = [k1, k2, k3];
1408        let contents: [&[u8]; 3] = [c1, c2, c3];
1409
1410        let mut failed_keys = Vec::new();
1411        for (i, key) in keys.iter().enumerate() {
1412            if digests[i] == ABSENT_KEY_DIGEST {
1413                failed_keys.push(*key);
1414                continue;
1415            }
1416            let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1417            if digests[i] != expected {
1418                failed_keys.push(*key);
1419            }
1420        }
1421
1422        assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1423        assert!(failed_keys.contains(&k2));
1424        assert!(failed_keys.contains(&k3));
1425        assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1426
1427        // Step 2: Responsibility confirmation removes K3 (not responsible).
1428        // Simulate: P is in closest peers for K2 but not K3.
1429        let responsible_for_k2 = true;
1430        let responsible_for_k3 = false;
1431        let mut confirmed = Vec::new();
1432        for key in &failed_keys {
1433            let is_responsible = if *key == k2 {
1434                responsible_for_k2
1435            } else {
1436                responsible_for_k3
1437            };
1438            if is_responsible {
1439                confirmed.push(*key);
1440            }
1441        }
1442
1443        assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1444
1445        // Step 3: Construct evidence for confirmed failures only.
1446        let challenged_peer = PeerId::from_bytes(peer_id);
1447        let evidence = FailureEvidence::AuditFailure {
1448            challenge_id: 5300,
1449            challenged_peer,
1450            confirmed_failed_keys: confirmed,
1451            reason: AuditFailureReason::DigestMismatch,
1452        };
1453
1454        match evidence {
1455            FailureEvidence::AuditFailure {
1456                confirmed_failed_keys,
1457                ..
1458            } => {
1459                assert_eq!(
1460                    confirmed_failed_keys.len(),
1461                    1,
1462                    "Only K2 should generate evidence"
1463                );
1464                assert_eq!(confirmed_failed_keys[0], k2);
1465            }
1466            _ => panic!("Expected AuditFailure evidence"),
1467        }
1468    }
1469}