Skip to main content

ant_node/replication/
audit.rs

1//! Storage audit protocol (Section 15).
2//!
3//! Challenge-response for claimed holders. Anti-outsourcing protection.
4
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use crate::logging::{debug, info, warn};
9use rand::seq::SliceRandom;
10use rand::Rng;
11
12use crate::ant_protocol::XorName;
13use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
14use crate::replication::protocol::{
15    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
16    ReplicationMessageBody, ABSENT_KEY_DIGEST,
17};
18use crate::replication::types::{AuditFailureReason, FailureEvidence, PeerSyncRecord};
19use crate::storage::LmdbStorage;
20use saorsa_core::identity::PeerId;
21use saorsa_core::P2PNode;
22
23// ---------------------------------------------------------------------------
24// Audit tick result
25// ---------------------------------------------------------------------------
26
27/// Result of an audit tick.
28#[derive(Debug)]
29pub enum AuditTickResult {
30    /// Audit completed successfully (all digests matched).
31    Passed {
32        /// The peer that was challenged.
33        challenged_peer: PeerId,
34        /// Number of keys verified.
35        keys_checked: usize,
36    },
37    /// Audit found failures (after responsibility confirmation).
38    Failed {
39        /// Evidence of the failure for trust engine.
40        evidence: FailureEvidence,
41    },
42    /// Audit target claimed bootstrapping.
43    BootstrapClaim {
44        /// The peer claiming bootstrap status.
45        peer: PeerId,
46    },
47    /// No eligible peers for audit this tick.
48    Idle,
49    /// Audit skipped (not enough local keys).
50    InsufficientKeys,
51}
52
53// ---------------------------------------------------------------------------
54// Main audit tick
55// ---------------------------------------------------------------------------
56
57/// Execute one audit tick (Section 15 steps 2-9).
58///
59/// Returns the audit result. Caller is responsible for emitting trust events.
60///
61/// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if
62/// `is_bootstrapping` is `true` — a node must not audit others while it
63/// is still bootstrapping.
64#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
65pub async fn audit_tick(
66    p2p_node: &Arc<P2PNode>,
67    storage: &Arc<LmdbStorage>,
68    config: &ReplicationConfig,
69    sync_history: &HashMap<PeerId, PeerSyncRecord>,
70    is_bootstrapping: bool,
71) -> AuditTickResult {
72    // Invariant 19: never audit while still bootstrapping.
73    if is_bootstrapping {
74        return AuditTickResult::Idle;
75    }
76
77    let dht = p2p_node.dht_manager();
78
79    // Step 2: Select one eligible peer (has RepairOpportunity) at random.
80    // Peers with active bootstrap claims remain eligible. A follow-up audit is
81    // how we observe a continued claim and apply past-grace abuse handling.
82    let eligible_peers = eligible_audit_peers(sync_history);
83
84    if eligible_peers.is_empty() {
85        return AuditTickResult::Idle;
86    }
87
88    let (challenged_peer, nonce, challenge_id) = {
89        let mut rng = rand::thread_rng();
90        let selected = match eligible_peers.choose(&mut rng) {
91            Some(p) => *p,
92            None => return AuditTickResult::Idle,
93        };
94        let n: [u8; 32] = rng.gen();
95        let c: u64 = rng.gen();
96        (selected, n, c)
97    };
98
99    // Step 3: Sample keys from local store and keep those the peer is
100    // responsible for (appears in the close group via local RT lookup).
101    let all_keys = match storage.all_keys().await {
102        Ok(keys) => keys,
103        Err(e) => {
104            warn!("Audit: failed to read local keys: {e}");
105            return AuditTickResult::Idle;
106        }
107    };
108
109    if all_keys.is_empty() {
110        return AuditTickResult::Idle;
111    }
112
113    let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
114    let sampled_keys: Vec<XorName> = {
115        let mut rng = rand::thread_rng();
116        all_keys
117            .choose_multiple(&mut rng, sample_count)
118            .copied()
119            .collect()
120    };
121
122    // Step 4: Filter to keys where the chosen peer is in the close group.
123    let mut peer_keys = Vec::new();
124    for key in &sampled_keys {
125        let closest = dht
126            .find_closest_nodes_local_with_self(key, config.close_group_size)
127            .await;
128        if closest.iter().any(|n| n.peer_id == challenged_peer) {
129            peer_keys.push(*key);
130        }
131    }
132
133    if peer_keys.is_empty() {
134        return AuditTickResult::Idle;
135    }
136
137    // peer_keys is naturally bounded by audit_sample_count (sqrt-scaled),
138    // so no explicit truncation needed.
139
140    // Step 6: Send challenge.
141
142    let challenge = AuditChallenge {
143        challenge_id,
144        nonce,
145        challenged_peer_id: *challenged_peer.as_bytes(),
146        keys: peer_keys.clone(),
147    };
148
149    let msg = ReplicationMessage {
150        request_id: challenge_id,
151        body: ReplicationMessageBody::AuditChallenge(challenge),
152    };
153
154    let encoded = match msg.encode() {
155        Ok(data) => data,
156        Err(e) => {
157            warn!("Audit: failed to encode challenge: {e}");
158            return AuditTickResult::Idle;
159        }
160    };
161
162    let response = match p2p_node
163        .send_request(
164            &challenged_peer,
165            REPLICATION_PROTOCOL_ID,
166            encoded,
167            config.audit_response_timeout(peer_keys.len()),
168        )
169        .await
170    {
171        Ok(resp) => resp,
172        Err(e) => {
173            debug!("Audit: challenge to {challenged_peer} failed: {e}");
174            // Timeout — need responsibility confirmation before penalty.
175            return handle_audit_timeout(
176                &challenged_peer,
177                challenge_id,
178                &peer_keys,
179                p2p_node,
180                config,
181            )
182            .await;
183        }
184    };
185
186    // Step 7: Parse response.
187    let resp_msg = match ReplicationMessage::decode(&response.data) {
188        Ok(m) => m,
189        Err(e) => {
190            warn!("Audit: failed to decode response from {challenged_peer}: {e}");
191            return handle_audit_failure(
192                &challenged_peer,
193                challenge_id,
194                &peer_keys,
195                AuditFailureReason::MalformedResponse,
196                p2p_node,
197                config,
198            )
199            .await;
200        }
201    };
202
203    match resp_msg.body {
204        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
205            challenge_id: resp_id,
206        }) => {
207            if resp_id != challenge_id {
208                warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
209                return handle_audit_failure(
210                    &challenged_peer,
211                    challenge_id,
212                    &peer_keys,
213                    AuditFailureReason::MalformedResponse,
214                    p2p_node,
215                    config,
216                )
217                .await;
218            }
219            // Step 7b: Bootstrapping claim.
220            AuditTickResult::BootstrapClaim {
221                peer: challenged_peer,
222            }
223        }
224        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
225            challenge_id: resp_id,
226            digests,
227        }) => {
228            if resp_id != challenge_id {
229                warn!("Audit: challenge ID mismatch from {challenged_peer}");
230                return handle_audit_failure(
231                    &challenged_peer,
232                    challenge_id,
233                    &peer_keys,
234                    AuditFailureReason::MalformedResponse,
235                    p2p_node,
236                    config,
237                )
238                .await;
239            }
240            verify_digests(
241                &challenged_peer,
242                challenge_id,
243                &nonce,
244                &peer_keys,
245                &digests,
246                storage,
247                p2p_node,
248                config,
249            )
250            .await
251        }
252        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
253            challenge_id: resp_id,
254            reason,
255        }) => {
256            if resp_id != challenge_id {
257                warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
258                return handle_audit_failure(
259                    &challenged_peer,
260                    challenge_id,
261                    &peer_keys,
262                    AuditFailureReason::MalformedResponse,
263                    p2p_node,
264                    config,
265                )
266                .await;
267            }
268            warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
269            handle_audit_failure(
270                &challenged_peer,
271                challenge_id,
272                &peer_keys,
273                AuditFailureReason::Rejected,
274                p2p_node,
275                config,
276            )
277            .await
278        }
279        _ => {
280            warn!("Audit: unexpected response type from {challenged_peer}");
281            handle_audit_failure(
282                &challenged_peer,
283                challenge_id,
284                &peer_keys,
285                AuditFailureReason::MalformedResponse,
286                p2p_node,
287                config,
288            )
289            .await
290        }
291    }
292}
293
294fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<PeerId> {
295    sync_history
296        .iter()
297        .filter(|(_, record)| record.has_repair_opportunity())
298        .map(|(peer, _)| *peer)
299        .collect()
300}
301
302// ---------------------------------------------------------------------------
303// Digest verification
304// ---------------------------------------------------------------------------
305
306/// Verify per-key digests from audit response (Step 8).
307#[allow(clippy::too_many_arguments)]
308async fn verify_digests(
309    challenged_peer: &PeerId,
310    challenge_id: u64,
311    nonce: &[u8; 32],
312    keys: &[XorName],
313    digests: &[[u8; 32]],
314    storage: &Arc<LmdbStorage>,
315    p2p_node: &Arc<P2PNode>,
316    config: &ReplicationConfig,
317) -> AuditTickResult {
318    // Requirement: response must have exactly one digest per key.
319    if digests.len() != keys.len() {
320        warn!(
321            "Audit: malformed response from {challenged_peer}: {} digests for {} keys",
322            digests.len(),
323            keys.len()
324        );
325        return handle_audit_failure(
326            challenged_peer,
327            challenge_id,
328            keys,
329            AuditFailureReason::MalformedResponse,
330            p2p_node,
331            config,
332        )
333        .await;
334    }
335
336    let challenged_peer_bytes = challenged_peer.as_bytes();
337    let mut failed_keys = Vec::new();
338
339    for (i, key) in keys.iter().enumerate() {
340        let received_digest = &digests[i];
341
342        // Check for absent sentinel.
343        if *received_digest == ABSENT_KEY_DIGEST {
344            failed_keys.push(*key);
345            continue;
346        }
347
348        // Recompute expected digest from local copy.
349        let local_bytes = match storage.get_raw(key).await {
350            Ok(Some(bytes)) => bytes,
351            Ok(None) => {
352                // We should hold this key (we sampled it), but it's gone.
353                warn!(
354                    "Audit: local key {} disappeared during audit",
355                    hex::encode(key)
356                );
357                continue;
358            }
359            Err(e) => {
360                warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
361                continue;
362            }
363        };
364
365        let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
366        if *received_digest != expected {
367            failed_keys.push(*key);
368        }
369    }
370
371    if failed_keys.is_empty() {
372        info!(
373            "Audit: peer {challenged_peer} passed (all {} keys verified)",
374            keys.len()
375        );
376        return AuditTickResult::Passed {
377            challenged_peer: *challenged_peer,
378            keys_checked: keys.len(),
379        };
380    }
381
382    // Step 9: Responsibility confirmation for failed keys.
383    handle_audit_failure(
384        challenged_peer,
385        challenge_id,
386        &failed_keys,
387        AuditFailureReason::DigestMismatch,
388        p2p_node,
389        config,
390    )
391    .await
392}
393
394// ---------------------------------------------------------------------------
395// Failure handling with responsibility confirmation
396// ---------------------------------------------------------------------------
397
398/// Handle audit failure: confirm responsibility before emitting evidence (Step 9).
399async fn handle_audit_failure(
400    challenged_peer: &PeerId,
401    challenge_id: u64,
402    failed_keys: &[XorName],
403    reason: AuditFailureReason,
404    p2p_node: &Arc<P2PNode>,
405    config: &ReplicationConfig,
406) -> AuditTickResult {
407    let dht = p2p_node.dht_manager();
408    let mut confirmed_failures = Vec::new();
409
410    // Step 9a-b: Fresh local RT lookup for each failed key.
411    for key in failed_keys {
412        let closest = dht
413            .find_closest_nodes_local_with_self(key, config.close_group_size)
414            .await;
415        if closest.iter().any(|n| n.peer_id == *challenged_peer) {
416            confirmed_failures.push(*key);
417        } else {
418            debug!(
419                "Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
420                hex::encode(key)
421            );
422        }
423    }
424
425    // Step 9c: Empty confirmed set -> peer is no longer responsible for any
426    // of the failed keys (topology churn). This is NOT a pass — the peer did
427    // not prove it stores the data. Return Idle to avoid granting unearned
428    // positive trust.
429    if confirmed_failures.is_empty() {
430        info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
431        return AuditTickResult::Idle;
432    }
433
434    // Step 9d: Non-empty confirmed set -> emit evidence.
435    let evidence = FailureEvidence::AuditFailure {
436        challenge_id,
437        challenged_peer: *challenged_peer,
438        confirmed_failed_keys: confirmed_failures,
439        reason,
440    };
441
442    AuditTickResult::Failed { evidence }
443}
444
445/// Handle audit timeout (no response received).
446async fn handle_audit_timeout(
447    challenged_peer: &PeerId,
448    challenge_id: u64,
449    keys: &[XorName],
450    p2p_node: &Arc<P2PNode>,
451    config: &ReplicationConfig,
452) -> AuditTickResult {
453    handle_audit_failure(
454        challenged_peer,
455        challenge_id,
456        keys,
457        AuditFailureReason::Timeout,
458        p2p_node,
459        config,
460    )
461    .await
462}
463
464// ---------------------------------------------------------------------------
465// Responder-side handler
466// ---------------------------------------------------------------------------
467
468/// Handle an incoming audit challenge (responder side).
469///
470/// Validates that the challenge targets this node, computes per-key digests,
471/// and returns the response.  Rejects challenges where
472/// `challenged_peer_id` does not match `self_peer_id` to prevent an oracle
473/// attack where a malicious challenger forges digests for a different peer.
474pub async fn handle_audit_challenge(
475    challenge: &AuditChallenge,
476    storage: &LmdbStorage,
477    self_peer_id: &PeerId,
478    is_bootstrapping: bool,
479    stored_chunks: usize,
480) -> AuditResponse {
481    if is_bootstrapping {
482        return AuditResponse::Bootstrapping {
483            challenge_id: challenge.challenge_id,
484        };
485    }
486
487    if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
488        warn!(
489            "Audit challenge targeted wrong peer: expected {}, got {}",
490            hex::encode(self_peer_id.as_bytes()),
491            hex::encode(challenge.challenged_peer_id),
492        );
493        return AuditResponse::Rejected {
494            challenge_id: challenge.challenge_id,
495            reason: "challenged_peer_id does not match this node".to_string(),
496        };
497    }
498
499    let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
500    if challenge.keys.len() > max_keys {
501        warn!(
502            "Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
503             (stored_chunks={stored_chunks})",
504            challenge.keys.len(),
505        );
506        return AuditResponse::Rejected {
507            challenge_id: challenge.challenge_id,
508            reason: format!(
509                "challenge contains {} keys, limit is {max_keys}",
510                challenge.keys.len()
511            ),
512        };
513    }
514
515    let mut digests = Vec::with_capacity(challenge.keys.len());
516
517    for key in &challenge.keys {
518        match storage.get_raw(key).await {
519            Ok(Some(data)) => {
520                let digest = compute_audit_digest(
521                    &challenge.nonce,
522                    &challenge.challenged_peer_id,
523                    key,
524                    &data,
525                );
526                digests.push(digest);
527            }
528            Ok(None) => {
529                digests.push(ABSENT_KEY_DIGEST);
530            }
531            Err(e) => {
532                warn!(
533                    "Audit responder: failed to read key {}: {e}",
534                    hex::encode(key)
535                );
536                digests.push(ABSENT_KEY_DIGEST);
537            }
538        }
539    }
540
541    AuditResponse::Digests {
542        challenge_id: challenge.challenge_id,
543        digests,
544    }
545}
546
547// ---------------------------------------------------------------------------
548// Tests
549// ---------------------------------------------------------------------------
550
551#[cfg(test)]
552#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
553mod tests {
554    use super::*;
555    use crate::replication::protocol::compute_audit_digest;
556    use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
557    use crate::storage::LmdbStorageConfig;
558    use std::time::Instant;
559    use tempfile::TempDir;
560
561    /// Simulated stored chunk count for tests. Large enough that the dynamic
562    /// incoming audit limit (`2 * sqrt(N)`) never rejects small test challenges.
563    const TEST_STORED_CHUNKS: usize = 1_000_000;
564
565    /// Create a test `LmdbStorage` backed by a temp directory.
566    async fn create_test_storage() -> (LmdbStorage, TempDir) {
567        let temp_dir = TempDir::new().expect("create temp dir");
568        let config = LmdbStorageConfig {
569            root_dir: temp_dir.path().to_path_buf(),
570            verify_on_read: false,
571            max_map_size: 0,
572            disk_reserve: 0,
573        };
574        let storage = LmdbStorage::new(config).await.expect("create storage");
575        (storage, temp_dir)
576    }
577
578    /// Build a challenge with the given parameters.
579    fn make_challenge(
580        challenge_id: u64,
581        nonce: [u8; 32],
582        peer_id: [u8; 32],
583        keys: Vec<XorName>,
584    ) -> AuditChallenge {
585        AuditChallenge {
586            challenge_id,
587            nonce,
588            challenged_peer_id: peer_id,
589            keys,
590        }
591    }
592
593    /// Build a `PeerId` matching the raw bytes used in a challenge.
594    fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
595        PeerId::from_bytes(bytes)
596    }
597
598    // -- handle_audit_challenge: present keys ---------------------------------
599
600    #[tokio::test]
601    async fn handle_challenge_present_keys_returns_correct_digests() {
602        let (storage, _temp) = create_test_storage().await;
603
604        // Store two chunks.
605        let content_a = b"chunk alpha";
606        let addr_a = LmdbStorage::compute_address(content_a);
607        storage.put(&addr_a, content_a).await.expect("put a");
608
609        let content_b = b"chunk beta";
610        let addr_b = LmdbStorage::compute_address(content_b);
611        storage.put(&addr_b, content_b).await.expect("put b");
612
613        let nonce = [0xAA; 32];
614        let peer_id = [0xBB; 32];
615        let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
616        let self_id = peer_id_from_bytes(peer_id);
617
618        let response =
619            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
620
621        match response {
622            AuditResponse::Digests {
623                challenge_id,
624                digests,
625            } => {
626                assert_eq!(challenge_id, 42);
627                assert_eq!(digests.len(), 2);
628
629                let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
630                let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
631                assert_eq!(digests[0], expected_a);
632                assert_eq!(digests[1], expected_b);
633            }
634            AuditResponse::Bootstrapping { .. } => {
635                panic!("expected Digests, got Bootstrapping");
636            }
637            AuditResponse::Rejected { .. } => {
638                panic!("Unexpected Rejected response");
639            }
640        }
641    }
642
643    // -- handle_audit_challenge: absent keys ----------------------------------
644
645    #[tokio::test]
646    async fn handle_challenge_absent_keys_returns_sentinel() {
647        let (storage, _temp) = create_test_storage().await;
648
649        let absent_key = [0xFF; 32];
650        let nonce = [0x11; 32];
651        let peer_id = [0x22; 32];
652        let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
653        let self_id = peer_id_from_bytes(peer_id);
654
655        let response =
656            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
657
658        match response {
659            AuditResponse::Digests {
660                challenge_id,
661                digests,
662            } => {
663                assert_eq!(challenge_id, 99);
664                assert_eq!(digests.len(), 1);
665                assert_eq!(
666                    digests[0], ABSENT_KEY_DIGEST,
667                    "absent key should produce sentinel digest"
668                );
669            }
670            AuditResponse::Bootstrapping { .. } => {
671                panic!("expected Digests, got Bootstrapping");
672            }
673            AuditResponse::Rejected { .. } => {
674                panic!("Unexpected Rejected response");
675            }
676        }
677    }
678
679    // -- handle_audit_challenge: mixed present and absent ---------------------
680
681    #[tokio::test]
682    async fn handle_challenge_mixed_present_and_absent() {
683        let (storage, _temp) = create_test_storage().await;
684
685        let content = b"present chunk";
686        let addr_present = LmdbStorage::compute_address(content);
687        storage.put(&addr_present, content).await.expect("put");
688
689        let addr_absent = [0xDE; 32];
690        let nonce = [0x33; 32];
691        let peer_id = [0x44; 32];
692        let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
693        let self_id = peer_id_from_bytes(peer_id);
694
695        let response =
696            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
697
698        match response {
699            AuditResponse::Digests { digests, .. } => {
700                assert_eq!(digests.len(), 2);
701
702                let expected_present =
703                    compute_audit_digest(&nonce, &peer_id, &addr_present, content);
704                assert_eq!(digests[0], expected_present);
705                assert_eq!(
706                    digests[1], ABSENT_KEY_DIGEST,
707                    "absent key should be sentinel"
708                );
709            }
710            AuditResponse::Bootstrapping { .. } => {
711                panic!("expected Digests, got Bootstrapping");
712            }
713            AuditResponse::Rejected { .. } => {
714                panic!("Unexpected Rejected response");
715            }
716        }
717    }
718
719    // -- handle_audit_challenge: bootstrapping --------------------------------
720
721    #[tokio::test]
722    async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
723        let (storage, _temp) = create_test_storage().await;
724
725        let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
726        let self_id = peer_id_from_bytes([0x01; 32]);
727
728        let response =
729            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
730
731        match response {
732            AuditResponse::Bootstrapping { challenge_id } => {
733                assert_eq!(challenge_id, 55);
734            }
735            AuditResponse::Digests { .. } => {
736                panic!("expected Bootstrapping, got Digests");
737            }
738            AuditResponse::Rejected { .. } => {
739                panic!("Unexpected Rejected response");
740            }
741        }
742    }
743
744    // -- handle_audit_challenge: empty key list -------------------------------
745
746    #[tokio::test]
747    async fn handle_challenge_empty_keys_returns_empty_digests() {
748        let (storage, _temp) = create_test_storage().await;
749
750        let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
751        let self_id = peer_id_from_bytes([0x20; 32]);
752
753        let response =
754            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
755
756        match response {
757            AuditResponse::Digests {
758                challenge_id,
759                digests,
760            } => {
761                assert_eq!(challenge_id, 100);
762                assert!(
763                    digests.is_empty(),
764                    "empty key list should yield empty digests"
765                );
766            }
767            AuditResponse::Bootstrapping { .. } => {
768                panic!("expected Digests, got Bootstrapping");
769            }
770            AuditResponse::Rejected { .. } => {
771                panic!("Unexpected Rejected response");
772            }
773        }
774    }
775
776    // -- Digest verification: matching ----------------------------------------
777
778    #[test]
779    fn digest_verification_matching() {
780        let nonce = [0x01; 32];
781        let peer_id = [0x02; 32];
782        let key: XorName = [0x03; 32];
783        let data = b"correct data";
784
785        let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
786        let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
787
788        assert_eq!(
789            expected, recomputed,
790            "same inputs must produce identical digests"
791        );
792        assert_ne!(
793            expected, ABSENT_KEY_DIGEST,
794            "real digest must not be sentinel"
795        );
796    }
797
798    // -- Digest verification: mismatching -------------------------------------
799
800    #[test]
801    fn digest_verification_mismatching_data() {
802        let nonce = [0x01; 32];
803        let peer_id = [0x02; 32];
804        let key: XorName = [0x03; 32];
805
806        let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
807        let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
808
809        assert_ne!(
810            digest_a, digest_b,
811            "different data must produce different digests"
812        );
813    }
814
815    #[test]
816    fn digest_verification_mismatching_nonce() {
817        let peer_id = [0x02; 32];
818        let key: XorName = [0x03; 32];
819        let data = b"same data";
820
821        let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
822        let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
823
824        assert_ne!(
825            digest_a, digest_b,
826            "different nonces must produce different digests"
827        );
828    }
829
830    #[test]
831    fn digest_verification_mismatching_peer() {
832        let nonce = [0x01; 32];
833        let key: XorName = [0x03; 32];
834        let data = b"same data";
835
836        let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
837        let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
838
839        assert_ne!(
840            digest_a, digest_b,
841            "different peers must produce different digests"
842        );
843    }
844
845    #[test]
846    fn digest_verification_mismatching_key() {
847        let nonce = [0x01; 32];
848        let peer_id = [0x02; 32];
849        let data = b"same data";
850
851        let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
852        let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
853
854        assert_ne!(
855            digest_a, digest_b,
856            "different keys must produce different digests"
857        );
858    }
859
860    // -- Absent sentinel is all zeros -----------------------------------------
861
862    #[test]
863    fn absent_sentinel_is_all_zeros() {
864        assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
865    }
866
867    // -- Bootstrapping skips digest computation even with stored keys ---------
868
869    #[tokio::test]
870    async fn bootstrapping_skips_digest_computation() {
871        let (storage, _temp) = create_test_storage().await;
872
873        let content = b"stored but bootstrapping";
874        let addr = LmdbStorage::compute_address(content);
875        storage.put(&addr, content).await.expect("put");
876
877        let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
878        let self_id = peer_id_from_bytes([0xDD; 32]);
879
880        let response =
881            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
882
883        assert!(
884            matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
885            "bootstrapping node must not compute digests"
886        );
887    }
888
889    // -- Scenario 19/53: Partial failure with mixed responsibility ----------------
890
891    #[tokio::test]
892    async fn scenario_19_partial_failure_mixed_responsibility() {
893        // Three keys challenged: K1 matches, K2 mismatches, K3 absent.
894        // After responsibility confirmation, only K2 is confirmed responsible.
895        // AuditFailure emitted for {K2} only.
896        // Test handle_audit_challenge with mixed results, then verify
897        // the digest logic manually.
898
899        let (storage, _temp) = create_test_storage().await;
900        let nonce = [0x42u8; 32];
901        let peer_id = [0xAA; 32];
902
903        // Store K1 and K2, but NOT K3
904        let content_k1 = b"key one data";
905        let addr_k1 = LmdbStorage::compute_address(content_k1);
906        storage.put(&addr_k1, content_k1).await.unwrap();
907
908        let content_k2 = b"key two data";
909        let addr_k2 = LmdbStorage::compute_address(content_k2);
910        storage.put(&addr_k2, content_k2).await.unwrap();
911
912        let addr_k3 = [0xFF; 32]; // Not stored
913
914        let challenge = AuditChallenge {
915            challenge_id: 100,
916            nonce,
917            challenged_peer_id: peer_id,
918            keys: vec![addr_k1, addr_k2, addr_k3],
919        };
920        let self_id = peer_id_from_bytes(peer_id);
921
922        let response =
923            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
924
925        match response {
926            AuditResponse::Digests { digests, .. } => {
927                assert_eq!(digests.len(), 3);
928
929                // K1 should have correct digest
930                let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
931                assert_eq!(digests[0], expected_k1);
932
933                // K2 should have correct digest
934                let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
935                assert_eq!(digests[1], expected_k2);
936
937                // K3 absent -> sentinel
938                assert_eq!(digests[2], ABSENT_KEY_DIGEST);
939            }
940            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
941            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
942        }
943    }
944
945    // -- Scenario 54: All digests pass -------------------------------------------
946
947    #[tokio::test]
948    async fn scenario_54_all_digests_pass() {
949        // All challenged keys present and digests match.
950        // Multiple keys to strengthen coverage beyond existing two-key tests.
951        let (storage, _temp) = create_test_storage().await;
952        let nonce = [0x10; 32];
953        let peer_id = [0x20; 32];
954
955        let c1 = b"chunk alpha";
956        let c2 = b"chunk beta";
957        let c3 = b"chunk gamma";
958        let a1 = LmdbStorage::compute_address(c1);
959        let a2 = LmdbStorage::compute_address(c2);
960        let a3 = LmdbStorage::compute_address(c3);
961        storage.put(&a1, c1).await.unwrap();
962        storage.put(&a2, c2).await.unwrap();
963        storage.put(&a3, c3).await.unwrap();
964
965        let challenge = AuditChallenge {
966            challenge_id: 200,
967            nonce,
968            challenged_peer_id: peer_id,
969            keys: vec![a1, a2, a3],
970        };
971        let self_id = peer_id_from_bytes(peer_id);
972
973        let response =
974            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
975        match response {
976            AuditResponse::Digests { digests, .. } => {
977                assert_eq!(digests.len(), 3);
978                for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
979                    .iter()
980                    .enumerate()
981                {
982                    let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
983                    assert_eq!(digests[i], expected, "Key {i} digest should match");
984                }
985            }
986            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
987            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
988        }
989    }
990
991    // -- Scenario 55: Empty failure set means no evidence -------------------------
992
993    /// Scenario 55: Peer challenged on {K1, K2}. Both digests mismatch.
994    /// Responsibility confirmation shows the peer is NOT responsible for
995    /// either key. The confirmed failure set is empty — no `AuditFailure`
996    /// evidence is emitted.
997    ///
998    /// Full `verify_digests` requires a live `P2PNode` for network lookups.
999    /// This test exercises the deterministic sub-steps:
1000    ///   (1) Digest comparison identifies K1 and K2 as mismatches.
1001    ///   (2) Responsibility confirmation removes both keys.
1002    ///   (3) Empty confirmed failure set means no evidence.
1003    #[tokio::test]
1004    async fn scenario_55_no_confirmed_responsibility_no_evidence() {
1005        let (storage, _temp) = create_test_storage().await;
1006        let nonce = [0x55; 32];
1007        let peer_id = [0x55; 32];
1008
1009        // Store K1 and K2 on the challenger (for expected digest computation).
1010        let c1 = b"scenario 55 key one";
1011        let c2 = b"scenario 55 key two";
1012        let k1 = LmdbStorage::compute_address(c1);
1013        let k2 = LmdbStorage::compute_address(c2);
1014        storage.put(&k1, c1).await.expect("put k1");
1015        storage.put(&k2, c2).await.expect("put k2");
1016
1017        // Challenger computes expected digests.
1018        let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1019        let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1020
1021        // Simulate peer returning WRONG digests for both keys.
1022        let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
1023        let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
1024        assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
1025        assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
1026
1027        // Step 1: Identify failed keys via digest comparison.
1028        let keys = [k1, k2];
1029        let expected = [expected_d1, expected_d2];
1030        let received = [wrong_d1, wrong_d2];
1031
1032        let mut failed_keys = Vec::new();
1033        for i in 0..keys.len() {
1034            if received[i] != expected[i] {
1035                failed_keys.push(keys[i]);
1036            }
1037        }
1038        assert_eq!(
1039            failed_keys.len(),
1040            2,
1041            "Both keys should be identified as digest mismatches"
1042        );
1043
1044        // Step 2: Responsibility confirmation — peer is NOT responsible for
1045        // either key (simulated by filtering them all out).
1046        let confirmed_responsible_keys: Vec<XorName> = Vec::new();
1047        let confirmed_failures: Vec<XorName> = failed_keys
1048            .into_iter()
1049            .filter(|k| confirmed_responsible_keys.contains(k))
1050            .collect();
1051
1052        // Step 3: Empty confirmed failure set → no AuditFailure evidence.
1053        assert!(
1054            confirmed_failures.is_empty(),
1055            "With no confirmed responsibility, failure set must be empty — \
1056             no AuditFailure evidence should be emitted"
1057        );
1058
1059        // Verify that constructing evidence with empty keys results in a
1060        // no-penalty outcome (the caller checks is_empty before emitting).
1061        let peer = PeerId::from_bytes(peer_id);
1062        let evidence = FailureEvidence::AuditFailure {
1063            challenge_id: 5500,
1064            challenged_peer: peer,
1065            confirmed_failed_keys: confirmed_failures,
1066            reason: AuditFailureReason::DigestMismatch,
1067        };
1068        if let FailureEvidence::AuditFailure {
1069            confirmed_failed_keys,
1070            ..
1071        } = evidence
1072        {
1073            assert!(
1074                confirmed_failed_keys.is_empty(),
1075                "Evidence with empty failure set should not trigger a trust penalty"
1076            );
1077        }
1078    }
1079
1080    // -- Scenario 56: RepairOpportunity filters never-synced peers ----------------
1081
1082    #[test]
1083    fn scenario_56_repair_opportunity_filters_never_synced() {
1084        // PeerSyncRecord with last_sync=None should not pass
1085        // has_repair_opportunity().
1086
1087        let never_synced = PeerSyncRecord {
1088            last_sync: None,
1089            cycles_since_sync: 5,
1090        };
1091        assert!(!never_synced.has_repair_opportunity());
1092
1093        let synced_no_cycle = PeerSyncRecord {
1094            last_sync: Some(Instant::now()),
1095            cycles_since_sync: 0,
1096        };
1097        assert!(!synced_no_cycle.has_repair_opportunity());
1098
1099        let synced_with_cycle = PeerSyncRecord {
1100            last_sync: Some(Instant::now()),
1101            cycles_since_sync: 1,
1102        };
1103        assert!(synced_with_cycle.has_repair_opportunity());
1104    }
1105
1106    #[test]
1107    fn expired_bootstrap_claim_does_not_remove_peer_from_audit_eligibility() {
1108        let peer = peer_id_from_bytes([0x57; 32]);
1109        let mut sync_history = HashMap::new();
1110        sync_history.insert(
1111            peer,
1112            PeerSyncRecord {
1113                last_sync: Some(Instant::now()),
1114                cycles_since_sync: 1,
1115            },
1116        );
1117
1118        let mut bootstrap_claims = HashMap::new();
1119        let first_seen = Instant::now()
1120            .checked_sub(
1121                crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD
1122                    + std::time::Duration::from_secs(1),
1123            )
1124            .unwrap_or_else(Instant::now);
1125        bootstrap_claims.insert(peer, first_seen);
1126
1127        let eligible = eligible_audit_peers(&sync_history);
1128
1129        assert!(bootstrap_claims.contains_key(&peer));
1130        assert!(
1131            eligible.contains(&peer),
1132            "continued bootstrap claims must remain auditable so past-grace abuse can be observed"
1133        );
1134    }
1135
1136    // -- Audit response must match key count --------------------------------------
1137
1138    #[tokio::test]
1139    async fn audit_response_must_match_key_count() {
1140        // Section 15: "A response is invalid if it has fewer or more entries
1141        // than challenged keys."
1142        // Verify handle_audit_challenge always produces exactly N digests for
1143        // N keys, including edge cases.
1144
1145        let (storage, _temp) = create_test_storage().await;
1146        let nonce = [0x50; 32];
1147        let peer_id = [0x60; 32];
1148
1149        // Store a single chunk
1150        let content = b"single chunk";
1151        let addr = LmdbStorage::compute_address(content);
1152        storage.put(&addr, content).await.unwrap();
1153
1154        // Challenge with 1 stored + 4 absent = 5 keys total
1155        let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
1156        let mut keys = vec![addr];
1157        keys.extend_from_slice(&absent_keys);
1158
1159        let key_count = keys.len();
1160        let challenge = make_challenge(300, nonce, peer_id, keys);
1161        let self_id = peer_id_from_bytes(peer_id);
1162
1163        let response =
1164            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1165        match response {
1166            AuditResponse::Digests { digests, .. } => {
1167                assert_eq!(
1168                    digests.len(),
1169                    key_count,
1170                    "must produce exactly one digest per challenged key"
1171                );
1172            }
1173            AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
1174            AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
1175        }
1176    }
1177
1178    // -- Audit digest uses full record bytes --------------------------------------
1179
1180    #[test]
1181    fn audit_digest_uses_full_record_bytes() {
1182        // Verify digest changes when record content changes.
1183        let nonce = [1u8; 32];
1184        let peer = [2u8; 32];
1185        let key = [3u8; 32];
1186
1187        let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
1188        let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
1189        assert_ne!(
1190            d1, d2,
1191            "Different record bytes must produce different digests"
1192        );
1193    }
1194
1195    // -- Scenario 29: Audit start gate ------------------------------------------
1196
1197    /// Scenario 29: `handle_audit_challenge` returns `Bootstrapping` when the
1198    /// node is still bootstrapping — audit digests are never computed, and no
1199    /// `AuditFailure` evidence is emitted by the caller.
1200    ///
1201    /// This is the responder-side gate.  The challenger-side gate is enforced
1202    /// by `audit_tick`'s `is_bootstrapping` guard (Invariant 19) and by
1203    /// `check_bootstrap_drained()` in the engine loop; this test confirms the
1204    /// complementary responder behavior.
1205    #[tokio::test]
1206    async fn scenario_29_audit_start_gate_during_bootstrap() {
1207        let (storage, _temp) = create_test_storage().await;
1208
1209        // Store data so there *would* be work to audit.
1210        let content = b"should not be audited during bootstrap";
1211        let addr = LmdbStorage::compute_address(content);
1212        storage.put(&addr, content).await.expect("put");
1213
1214        let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
1215        let self_id = peer_id_from_bytes([0x29; 32]);
1216
1217        // Responder is bootstrapping → Bootstrapping response, NOT Digests.
1218        let response =
1219            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1220        assert!(
1221            matches!(
1222                response,
1223                AuditResponse::Bootstrapping { challenge_id: 2900 }
1224            ),
1225            "bootstrapping node must not compute digests — audit start gate"
1226        );
1227
1228        // Responder is NOT bootstrapping → normal Digests.
1229        let response =
1230            handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
1231        assert!(
1232            matches!(response, AuditResponse::Digests { .. }),
1233            "drained node should compute digests normally"
1234        );
1235    }
1236
1237    // -- Scenario 30: Audit peer selection from sampled keys --------------------
1238
1239    /// Scenario 30: Key sampling uses dynamic sqrt-based batch sizing and
1240    /// `RepairOpportunity` filtering excludes never-synced peers.
1241    ///
1242    /// Full `audit_tick` requires a live network.  This test verifies the two
1243    /// deterministic sub-steps the function relies on:
1244    ///   (a) `audit_sample_count` scales with `sqrt(total_keys)`.
1245    ///   (b) `PeerSyncRecord::has_repair_opportunity` gates peer eligibility.
1246    #[test]
1247    fn scenario_30_audit_peer_selection_from_sampled_keys() {
1248        // (a) Dynamic sample count scales with sqrt(total_keys).
1249        assert_eq!(
1250            ReplicationConfig::audit_sample_count(100),
1251            10,
1252            "sample count should scale with sqrt(total_keys)"
1253        );
1254
1255        assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
1256
1257        assert_eq!(
1258            ReplicationConfig::audit_sample_count(10_000),
1259            100,
1260            "sqrt(10000) = 100"
1261        );
1262
1263        // (b) Peer eligibility via RepairOpportunity.
1264        // Never synced → not eligible.
1265        let never = PeerSyncRecord {
1266            last_sync: None,
1267            cycles_since_sync: 10,
1268        };
1269        assert!(!never.has_repair_opportunity());
1270
1271        // Synced but zero subsequent cycles → not eligible.
1272        let too_soon = PeerSyncRecord {
1273            last_sync: Some(Instant::now()),
1274            cycles_since_sync: 0,
1275        };
1276        assert!(!too_soon.has_repair_opportunity());
1277
1278        // Synced with ≥1 cycle → eligible.
1279        let eligible = PeerSyncRecord {
1280            last_sync: Some(Instant::now()),
1281            cycles_since_sync: 2,
1282        };
1283        assert!(eligible.has_repair_opportunity());
1284    }
1285
1286    // -- Scenario 32: Dynamic challenge size ------------------------------------
1287
1288    /// Scenario 32: Challenge key count equals `|PeerKeySet(challenged_peer)|`,
1289    /// which is dynamic per round.  If no eligible peer remains after filtering,
1290    /// the tick is idle.
1291    ///
1292    /// Verified via `handle_audit_challenge`: the response digest count always
1293    /// equals the number of keys in the challenge.
1294    #[tokio::test]
1295    async fn scenario_32_dynamic_challenge_size() {
1296        let (storage, _temp) = create_test_storage().await;
1297
1298        // Store varying numbers of chunks.
1299        let mut addrs = Vec::new();
1300        for i in 0u8..5 {
1301            let content = format!("dynamic challenge key {i}");
1302            let addr = LmdbStorage::compute_address(content.as_bytes());
1303            storage.put(&addr, content.as_bytes()).await.expect("put");
1304            addrs.push(addr);
1305        }
1306
1307        let nonce = [0x32; 32];
1308        let peer_id = [0x32; 32];
1309        let self_id = peer_id_from_bytes(peer_id);
1310
1311        // Challenge with 1 key.
1312        let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
1313        let resp1 =
1314            handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
1315                .await;
1316        if let AuditResponse::Digests { digests, .. } = resp1 {
1317            assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
1318        }
1319
1320        // Challenge with 3 keys.
1321        let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
1322        let resp3 =
1323            handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
1324                .await;
1325        if let AuditResponse::Digests { digests, .. } = resp3 {
1326            assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
1327        }
1328
1329        // Challenge with all 5 keys.
1330        let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
1331        let resp5 =
1332            handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
1333                .await;
1334        if let AuditResponse::Digests { digests, .. } = resp5 {
1335            assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
1336        }
1337
1338        // Challenge with 0 keys (idle equivalent — no work).
1339        let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
1340        let resp0 =
1341            handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
1342                .await;
1343        if let AuditResponse::Digests { digests, .. } = resp0 {
1344            assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
1345        }
1346    }
1347
1348    // -- Scenario 47: Bootstrap claim grace period (audit) ----------------------
1349
1350    /// Scenario 47: Challenged peer responds with bootstrapping claim during
1351    /// audit.  `handle_audit_challenge` returns `Bootstrapping`; caller records
1352    /// `BootstrapClaimFirstSeen`.  No `AuditFailure` evidence is emitted.
1353    #[tokio::test]
1354    async fn scenario_47_bootstrap_claim_grace_period_audit() {
1355        let (storage, _temp) = create_test_storage().await;
1356
1357        // Store data so there is an auditable key.
1358        let content = b"bootstrap grace test";
1359        let addr = LmdbStorage::compute_address(content);
1360        storage.put(&addr, content).await.expect("put");
1361
1362        let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
1363        let self_id = peer_id_from_bytes([0x47; 32]);
1364
1365        // Bootstrapping peer → Bootstrapping response (grace period start).
1366        let response =
1367            handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
1368        let challenge_id = match response {
1369            AuditResponse::Bootstrapping { challenge_id } => challenge_id,
1370            AuditResponse::Digests { .. } => {
1371                panic!("Expected Bootstrapping response during grace period")
1372            }
1373            AuditResponse::Rejected { .. } => {
1374                panic!("Unexpected Rejected response")
1375            }
1376        };
1377        assert_eq!(challenge_id, 4700);
1378
1379        // Caller records BootstrapClaimFirstSeen — verify the types support it.
1380        let peer = PeerId::from_bytes([0x47; 32]);
1381        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1382        let now = Instant::now();
1383        let observed = state.observe_bootstrap_claim(
1384            peer,
1385            now,
1386            crate::replication::config::BOOTSTRAP_CLAIM_GRACE_PERIOD,
1387        );
1388
1389        assert_eq!(
1390            observed,
1391            BootstrapClaimObservation::WithinGrace { first_seen: now }
1392        );
1393        assert!(
1394            state.bootstrap_claims.contains_key(&peer),
1395            "BootstrapClaimFirstSeen should be recorded after grace-period claim"
1396        );
1397        assert!(
1398            state.bootstrap_claim_history.contains_key(&peer),
1399            "Bootstrap claim history should remember that the grace window was used"
1400        );
1401    }
1402
1403    // -- Scenario 53: Audit partial per-key failure with mixed responsibility ---
1404
1405    /// Scenario 53: P challenged on {K1, K2, K3}.  K1 matches, K2 and K3
1406    /// mismatch.  Responsibility confirmation: P is responsible for K2 but
1407    /// not K3.  `AuditFailure` emitted for {K2} only.
1408    ///
1409    /// Full `verify_digests` + `handle_audit_failure` requires a `P2PNode` for
1410    /// network lookups.  This test verifies the conceptual steps:
1411    ///   (1) Digest comparison correctly identifies K2 and K3 as failures.
1412    ///   (2) `FailureEvidence::AuditFailure` carries only confirmed keys.
1413    #[tokio::test]
1414    async fn scenario_53_partial_failure_mixed_responsibility() {
1415        let (storage, _temp) = create_test_storage().await;
1416        let nonce = [0x53; 32];
1417        let peer_id = [0x53; 32];
1418
1419        // Store K1, K2, K3.
1420        let c1 = b"scenario 53 key one";
1421        let c2 = b"scenario 53 key two";
1422        let c3 = b"scenario 53 key three";
1423        let k1 = LmdbStorage::compute_address(c1);
1424        let k2 = LmdbStorage::compute_address(c2);
1425        let k3 = LmdbStorage::compute_address(c3);
1426        storage.put(&k1, c1).await.expect("put k1");
1427        storage.put(&k2, c2).await.expect("put k2");
1428        storage.put(&k3, c3).await.expect("put k3");
1429
1430        // Correct digests from challenger's local store.
1431        let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
1432        let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
1433        let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
1434
1435        // Simulate peer response: K1 matches, K2 wrong data, K3 wrong data.
1436        let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
1437        let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
1438
1439        assert_eq!(d1_expected, d1_expected, "K1 should match");
1440        assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
1441        assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
1442
1443        // Step 1: Identify failed keys (digest comparison).
1444        let digests = [d1_expected, d2_wrong, d3_wrong];
1445        let keys = [k1, k2, k3];
1446        let contents: [&[u8]; 3] = [c1, c2, c3];
1447
1448        let mut failed_keys = Vec::new();
1449        for (i, key) in keys.iter().enumerate() {
1450            if digests[i] == ABSENT_KEY_DIGEST {
1451                failed_keys.push(*key);
1452                continue;
1453            }
1454            let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
1455            if digests[i] != expected {
1456                failed_keys.push(*key);
1457            }
1458        }
1459
1460        assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
1461        assert!(failed_keys.contains(&k2));
1462        assert!(failed_keys.contains(&k3));
1463        assert!(!failed_keys.contains(&k1), "K1 passed digest check");
1464
1465        // Step 2: Responsibility confirmation removes K3 (not responsible).
1466        // Simulate: P is in closest peers for K2 but not K3.
1467        let responsible_for_k2 = true;
1468        let responsible_for_k3 = false;
1469        let mut confirmed = Vec::new();
1470        for key in &failed_keys {
1471            let is_responsible = if *key == k2 {
1472                responsible_for_k2
1473            } else {
1474                responsible_for_k3
1475            };
1476            if is_responsible {
1477                confirmed.push(*key);
1478            }
1479        }
1480
1481        assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
1482
1483        // Step 3: Construct evidence for confirmed failures only.
1484        let challenged_peer = PeerId::from_bytes(peer_id);
1485        let evidence = FailureEvidence::AuditFailure {
1486            challenge_id: 5300,
1487            challenged_peer,
1488            confirmed_failed_keys: confirmed,
1489            reason: AuditFailureReason::DigestMismatch,
1490        };
1491
1492        match evidence {
1493            FailureEvidence::AuditFailure {
1494                confirmed_failed_keys,
1495                ..
1496            } => {
1497                assert_eq!(
1498                    confirmed_failed_keys.len(),
1499                    1,
1500                    "Only K2 should generate evidence"
1501                );
1502                assert_eq!(confirmed_failed_keys[0], k2);
1503            }
1504            _ => panic!("Expected AuditFailure evidence"),
1505        }
1506    }
1507}