Skip to main content

ant_node/replication/
storage_commitment_audit.rs

1//! Gossip-triggered contiguous-subtree storage audit (ADR-0002).
2//!
3//! A node commits to what it stores (a signed Merkle [`StorageCommitment`]
4//! gossiped to neighbours). On receiving a peer's changed commitment, a
5//! neighbour may audit it: pin the just-gossiped root, send a fresh nonce that
6//! deterministically selects one contiguous subtree, and require the peer to
7//! prove that subtree (structure + real bytes) within a deadline. This module
8//! owns the auditor entry point [`run_subtree_audit`] and the responder handler
9//! [`handle_subtree_challenge`]; the pure proof maths live in
10//! [`crate::replication::subtree`].
11
12use std::sync::Arc;
13
14use crate::logging::{debug, info, warn};
15use rand::Rng;
16
17use crate::ant_protocol::XorName;
18use crate::replication::commitment::{commitment_hash, StorageCommitment};
19use crate::replication::commitment_state::ResponderCommitmentState;
20use crate::replication::config::{
21    ReplicationConfig, MAX_BYTE_CHALLENGE_KEYS, REPLICATION_PROTOCOL_ID,
22};
23use crate::replication::protocol::{
24    RejectKind, ReplicationMessage, ReplicationMessageBody, SubtreeAuditChallenge,
25    SubtreeAuditResponse, SubtreeByteChallenge, SubtreeByteItem, SubtreeByteResponse,
26};
27use crate::replication::recent_provers::RecentProvers;
28use crate::replication::subtree::{
29    select_subtree_path, subtree_plan, verify_subtree_proof, StructureVerdict, SubtreeProof,
30};
31use crate::replication::types::{AuditFailureReason, AuditFailureSummary, FailureEvidence};
32use crate::storage::LmdbStorage;
33use saorsa_core::identity::PeerId;
34use saorsa_core::P2PNode;
35use tokio::sync::RwLock;
36
37// The gossip-triggered auditor shares the engine's [`AuditTickResult`] outcome
38// type with the responsible-chunk audit (defined in [`super::audit`]), so the
39// engine can dispatch both audits' results through one match.
40use crate::replication::audit::AuditTickResult;
41
42// ---------------------------------------------------------------------------
43// Auditor side
44// ---------------------------------------------------------------------------
45
46/// ADR-0002 round-2 byte challenge samples a SMALL surprise set of the proven
47/// leaves (3..=5). Small enough that the responder's honest local-disk read of
48/// the original chunks stays well inside the possession-in-time deadline, while
49/// a relay forced to fetch them over the network blows it; large enough that
50/// faking a fraction `x` of leaves survives only `(1 - x)^k`.
51const BYTE_SPOTCHECK_MIN: u32 = 3;
52const BYTE_SPOTCHECK_MAX: u32 = 5;
53
54/// Holder-eligibility cache the auditor credits on a passing audit.
55///
56/// Owned by [`crate::replication::ReplicationEngine`]; borrowed here so a
57/// passing audit can record `(peer, commitment_hash)` as a proven holder for
58/// downstream quorum / paid-list credit.
59pub struct AuditCredit<'a> {
60    /// Holder-eligibility cache.
61    pub recent_provers: &'a Arc<RwLock<RecentProvers>>,
62}
63
64/// The cross-cutting context for verifying one audit response, bundled so the
65/// response-dispatch and verification functions stay readable.
66struct AuditCtx<'a> {
67    p2p_node: &'a Arc<P2PNode>,
68    challenged_peer: &'a PeerId,
69    challenge_id: u64,
70    nonce: [u8; 32],
71    expected_commitment_hash: [u8; 32],
72    config: &'a ReplicationConfig,
73    credit: Option<&'a AuditCredit<'a>>,
74}
75
76/// Run one gossip-triggered subtree audit against `challenged_peer`, pinned to
77/// the commitment hash the peer just gossiped (`expected_commitment_hash`).
78///
79/// ADR-0002 two-round audit. The auditor sends a fresh random nonce and runs:
80///
81/// 1. **Structure** (round 1) — the returned subtree rebuilds to the pinned
82///    root, within a size-scaled deadline.
83/// 2. **Real bytes** (round 2) — the auditor demands the ORIGINAL chunk content
84///    for a 3..=5 FRESHLY-RANDOM sample of the proven leaves (chosen after the
85///    proof arrives, not nonce-derived — see `random_spotcheck_leaves`) FROM the
86///    responder, and recomputes both the content-address hash and the nonce
87///    freshness hash from that served content. The auditor holds none of the
88///    peer's chunks.
89/// 3. **Timing** — each round's deadline is sized to an honest local-disk read,
90///    so a relay forced to fetch over the network blows it.
91///
92/// A timeout (either round) is reported as [`AuditFailureReason::Timeout`] (the
93/// caller applies the strike/grace policy). Any structural failure, served
94/// content that fails a hash, an explicit `Absent` for a committed sampled key,
95/// or a rejection of a recently gossiped commitment, is a confirmed failure
96/// acted on immediately. On a full pass, records the peer as a proven holder.
97pub async fn run_subtree_audit(
98    p2p_node: &Arc<P2PNode>,
99    config: &ReplicationConfig,
100    challenged_peer: &PeerId,
101    expected_commitment_hash: [u8; 32],
102    key_count: u32,
103    credit: Option<&AuditCredit<'_>>,
104) -> AuditTickResult {
105    let (nonce, challenge_id) = {
106        let mut rng = rand::thread_rng();
107        (rng.gen::<[u8; 32]>(), rng.gen::<u64>())
108    };
109
110    let challenge = SubtreeAuditChallenge {
111        challenge_id,
112        nonce,
113        challenged_peer_id: *challenged_peer.as_bytes(),
114        expected_commitment_hash,
115    };
116    let msg = ReplicationMessage {
117        request_id: challenge_id,
118        body: ReplicationMessageBody::SubtreeAuditChallenge(challenge),
119    };
120    let encoded = match msg.encode() {
121        Ok(data) => data,
122        Err(e) => {
123            warn!("Audit: failed to encode subtree challenge for {challenged_peer}: {e}");
124            return AuditTickResult::Idle;
125        }
126    };
127
128    // Size the proof deadline from the ACTUAL selected subtree (its real-leaf
129    // count for this nonce + key_count), not a fixed worst-case hint. This keeps
130    // the deadline tight to "responder hashes ~sqrt(N) chunks at local-disk
131    // speed", so a relay that must fetch the subtree over the network blows it.
132    // The auditor and responder derive the same selection, so we know the leaf
133    // count before the response arrives.
134    let subtree_leaves = select_subtree_path(&nonce, key_count).map_or_else(
135        || config.subtree_audit_timeout_leaf_hint(),
136        |p| p.real_leaf_count() as usize,
137    );
138    let timeout = config.audit_response_timeout(subtree_leaves);
139
140    let response = match p2p_node
141        .send_request(challenged_peer, REPLICATION_PROTOCOL_ID, encoded, timeout)
142        .await
143    {
144        Ok(resp) => resp,
145        Err(e) => {
146            debug!("Audit: subtree challenge to {challenged_peer} timed out / failed: {e}");
147            return failed(challenged_peer, challenge_id, AuditFailureReason::Timeout);
148        }
149    };
150
151    let resp_msg = match ReplicationMessage::decode(&response.data) {
152        Ok(m) => m,
153        Err(e) => {
154            warn!("Audit: failed to decode subtree response from {challenged_peer}: {e}");
155            return failed(
156                challenged_peer,
157                challenge_id,
158                AuditFailureReason::MalformedResponse,
159            );
160        }
161    };
162
163    let ctx = AuditCtx {
164        p2p_node,
165        challenged_peer,
166        challenge_id,
167        nonce,
168        expected_commitment_hash,
169        config,
170        credit,
171    };
172    dispatch_subtree_response(resp_msg.body, &ctx).await
173}
174
175/// Outcome of the round-2 byte challenge round-trip (auditor side).
176enum ByteRound {
177    /// The responder returned per-key items (verified by the caller).
178    Served(Vec<SubtreeByteItem>),
179    /// The responder rejected the byte challenge (confirmed failure for a
180    /// recently pinned commitment).
181    Rejected,
182    /// The responder rejected with a GRACED kind (`UnknownCommitment`/
183    /// `Transient`): no trust penalty, but holder credit is revoked — the peer
184    /// answered and could not prove possession, so it must not keep stale credit
185    /// (codex-r2 C). Distinct from a silent network `Timeout`, which keeps credit
186    /// (a dropped packet is not evidence of loss).
187    GracedReject,
188    /// No response within the byte deadline, or a transport error (graced
189    /// timeout). Keeps holder credit.
190    Timeout,
191    /// Malformed / unexpected round-2 response body.
192    Malformed,
193}
194
195/// Round 2: ask the responder for the ORIGINAL chunk content of one BATCH of
196/// auditor-selected spot-check `keys` (at most [`MAX_BYTE_CHALLENGE_KEYS`], so
197/// the worst-case response of max-size chunks fits the wire cap), sized to a
198/// possession-in-time deadline (honest local-disk read of `keys.len()` chunks).
199/// The responder cannot have predicted which keys are sampled.
200async fn request_byte_proof(ctx: &AuditCtx<'_>, keys: &[XorName]) -> ByteRound {
201    let challenge = SubtreeByteChallenge {
202        challenge_id: ctx.challenge_id,
203        nonce: ctx.nonce,
204        challenged_peer_id: *ctx.challenged_peer.as_bytes(),
205        expected_commitment_hash: ctx.expected_commitment_hash,
206        keys: keys.to_vec(),
207    };
208    let msg = ReplicationMessage {
209        request_id: ctx.challenge_id,
210        body: ReplicationMessageBody::SubtreeByteChallenge(challenge),
211    };
212    let encoded = match msg.encode() {
213        Ok(data) => data,
214        Err(e) => {
215            warn!("Audit: failed to encode byte challenge: {e}");
216            return ByteRound::Malformed;
217        }
218    };
219
220    // Deadline sized to "honest responder reads `keys.len()` local chunks AND
221    // ships them back": a relay forced to fetch them over the network blows it
222    // (graced timeout, never a confirmed failure — same possession-in-time
223    // principle as round 1). Uses the byte-round floor, which is high enough for
224    // the multi-MiB reply (handshake + upload + busy disk) — the round-1
225    // hashes-only floor would be too tight for 2 × 4 MiB (§4).
226    let timeout = ctx.config.byte_audit_response_timeout(keys.len());
227    let response = match ctx
228        .p2p_node
229        .send_request(
230            ctx.challenged_peer,
231            REPLICATION_PROTOCOL_ID,
232            encoded,
233            timeout,
234        )
235        .await
236    {
237        Ok(resp) => resp,
238        Err(e) => {
239            debug!(
240                "Audit: byte challenge to {} timed out / failed: {e}",
241                ctx.challenged_peer
242            );
243            return ByteRound::Timeout;
244        }
245    };
246
247    let resp_msg = match ReplicationMessage::decode(&response.data) {
248        Ok(m) => m,
249        Err(e) => {
250            warn!("Audit: failed to decode byte response: {e}");
251            return ByteRound::Malformed;
252        }
253    };
254
255    match resp_msg.body {
256        ReplicationMessageBody::SubtreeByteResponse(SubtreeByteResponse::Items {
257            challenge_id,
258            items,
259        }) if challenge_id == ctx.challenge_id => ByteRound::Served(items),
260        ReplicationMessageBody::SubtreeByteResponse(SubtreeByteResponse::Rejected {
261            challenge_id,
262            kind,
263            reason,
264        }) if challenge_id == ctx.challenge_id => {
265            // Graced kinds (rotated past the pin / transient read error, §6/§7)
266            // are not a confirmed cheat — no trust penalty — but the peer DID
267            // answer and could not prove possession, so credit is revoked
268            // (GracedReject), unlike a silent network timeout.
269            if kind.is_graced() {
270                debug!(
271                    "Audit: {} rejected byte challenge (graced, {kind:?}): {reason}",
272                    ctx.challenged_peer
273                );
274                ByteRound::GracedReject
275            } else {
276                warn!(
277                    "Audit: {} rejected byte challenge: {reason}",
278                    ctx.challenged_peer
279                );
280                ByteRound::Rejected
281            }
282        }
283        // A node claiming bootstrap MID-AUDIT (it answered round 1) is treated
284        // as a timeout: it didn't prove possession but the round-1 proof shows
285        // it isn't bootstrapping, so the bootstrap-claim-abuse detector (round 1)
286        // owns that lane; here we just don't credit it.
287        ReplicationMessageBody::SubtreeByteResponse(SubtreeByteResponse::Bootstrapping {
288            challenge_id,
289        }) if challenge_id == ctx.challenge_id => ByteRound::Timeout,
290        _ => ByteRound::Malformed,
291    }
292}
293
294/// Map a decoded response body to an audit outcome (auditor side). A response
295/// whose `challenge_id` doesn't match, or any non-subtree body, is malformed.
296async fn dispatch_subtree_response(
297    body: ReplicationMessageBody,
298    ctx: &AuditCtx<'_>,
299) -> AuditTickResult {
300    let challenged_peer = ctx.challenged_peer;
301    let challenge_id = ctx.challenge_id;
302    let malformed = || {
303        failed(
304            challenged_peer,
305            challenge_id,
306            AuditFailureReason::MalformedResponse,
307        )
308    };
309    match body {
310        ReplicationMessageBody::SubtreeAuditResponse(SubtreeAuditResponse::Bootstrapping {
311            challenge_id: resp_id,
312        }) => {
313            if resp_id != challenge_id {
314                return malformed();
315            }
316            AuditTickResult::BootstrapClaim {
317                peer: *challenged_peer,
318            }
319        }
320        ReplicationMessageBody::SubtreeAuditResponse(SubtreeAuditResponse::Rejected {
321            challenge_id: resp_id,
322            kind,
323            reason,
324        }) => {
325            if resp_id != challenge_id {
326                return malformed();
327            }
328            // A genuine protocol rejection of a freshly pinned root is a
329            // confirmed failure (repudiating what you just published). But an
330            // `UnknownCommitment`/`Transient` rejection is GRACED (§6/§7): the
331            // peer may have legitimately rotated past a root the auditor still
332            // had cached (retention is capped at the last two gossiped roots),
333            // or hit a transient read error — neither is provable misbehaviour,
334            // so we do NOT apply the trust penalty (return a graced Timeout).
335            if kind.is_graced() {
336                // BUT revoke the holder credit for THIS pinned commitment
337                // (codex-r2 C): the peer did not prove possession of it NOW, so
338                // it must not keep "proven holder" credit for it until the TTL
339                // lapses. Closes the loophole where a deleter lies
340                // `Transient`/`UnknownCommitment` to dodge the confirmed-failure
341                // path and PRESERVE stale credit.
342                //
343                // Scoped to the pinned commitment hash, NOT the whole peer
344                // (codex-r3): a commitment hash is peer-specific (it signs over
345                // `sender_peer_id`), so this revokes exactly this peer's credit
346                // for this commitment. A delayed/stale audit of an OLD commitment
347                // C1 therefore cannot erase the valid credit an honest rotated
348                // peer already re-earned for its CURRENT commitment C2.
349                if let Some(credit) = ctx.credit {
350                    credit
351                        .recent_provers
352                        .write()
353                        .await
354                        .forget_commitment(&ctx.expected_commitment_hash);
355                }
356                debug!(
357                    "Audit: peer {challenged_peer} rejected subtree challenge \
358                     (graced, {kind:?}; credit for the pinned commitment revoked): {reason}"
359                );
360                failed(challenged_peer, challenge_id, AuditFailureReason::Timeout)
361            } else {
362                warn!("Audit: peer {challenged_peer} rejected subtree challenge: {reason}");
363                failed(challenged_peer, challenge_id, AuditFailureReason::Rejected)
364            }
365        }
366        ReplicationMessageBody::SubtreeAuditResponse(SubtreeAuditResponse::Proof {
367            challenge_id: resp_id,
368            commitment,
369            proof,
370        }) => {
371            if resp_id != challenge_id {
372                return malformed();
373            }
374            verify_subtree_response(ctx, &commitment, &proof).await
375        }
376        _ => {
377            warn!("Audit: unexpected response type from {challenged_peer}");
378            malformed()
379        }
380    }
381}
382
383/// The pure verdict of evaluating a subtree-audit response, independent of
384/// storage/network. Tests call this directly so the SHIPPED gate logic is what
385/// gets exercised (no reimplementation that could drift).
386#[derive(Debug, Clone, PartialEq, Eq)]
387pub(crate) enum AuditVerdict {
388    /// All gates passed and at least one leaf was byte-verified.
389    Pass {
390        /// Number of leaves whose real bytes were verified in round 2.
391        checked: usize,
392    },
393    /// A confirmed failure with this reason (penalizable / acted upon).
394    Fail(AuditFailureReason),
395}
396
397/// Round-1 structural evaluation of a subtree-audit proof (ADR-0002).
398///
399/// Runs the cheap gates in fail-fast order: pin / identity / signature →
400/// structure (the returned subtree rebuilds to the pinned root). It does **not**
401/// prove byte possession — the leaves carry only the public `bytes_hash` (the
402/// chunk address) and a `nonced_hash` the responder computed itself. Possession
403/// is proven in round 2 ([`verify_byte_response`]), where the auditor demands
404/// the original chunk bytes for a freshly-random (post-proof) sample and
405/// recomputes both hashes from the SERVED content. This removes any dependency
406/// on the auditor holding the peer's chunks.
407///
408/// Returns [`StructureVerdict::Valid`] (proceed to round 2) or a confirmed
409/// [`AuditFailureReason`] mapped from the failing gate.
410pub(crate) fn evaluate_subtree_structure(
411    commitment: &StorageCommitment,
412    proof: &SubtreeProof,
413    nonce: &[u8; 32],
414    expected_commitment_hash: &[u8; 32],
415    challenged_peer_bytes: &[u8; 32],
416) -> Result<(), AuditFailureReason> {
417    // -- Pin + identity + signature --
418    if &commitment.sender_peer_id != challenged_peer_bytes {
419        return Err(AuditFailureReason::Rejected);
420    }
421    let derived_peer_id = *blake3::hash(&commitment.sender_public_key).as_bytes();
422    if derived_peer_id != commitment.sender_peer_id {
423        return Err(AuditFailureReason::Rejected);
424    }
425    match commitment_hash(commitment) {
426        Some(h) if &h == expected_commitment_hash => {}
427        _ => return Err(AuditFailureReason::Rejected),
428    }
429    if !crate::replication::commitment::verify_commitment_signature(commitment) {
430        return Err(AuditFailureReason::Rejected);
431    }
432
433    // -- Structure --
434    if let StructureVerdict::Invalid(_) = verify_subtree_proof(proof, nonce, commitment) {
435        return Err(AuditFailureReason::DigestMismatch);
436    }
437    Ok(())
438}
439
440/// The auditor's **freshly-randomised** spot-check sample of the round-1 proof:
441/// `count` distinct leaves (deduplicated, in increasing-index order) whose
442/// original bytes the auditor will demand in round 2.
443///
444/// CRITICAL (ADR-0002 soundness): the sample MUST NOT be derivable from
445/// anything the responder knew when it built the round-1 proof. The structural
446/// root check binds only `(key, bytes_hash)` (both public — `bytes_hash` is the
447/// chunk's network address), NOT `nonced_hash`. So a relay holding only public
448/// addresses can fabricate a structurally-valid proof with bogus `nonced_hash`
449/// on every leaf and, if it could predict which leaves round 2 opens, fetch
450/// only those and pass — earning holder credit for leaves it never held.
451///
452/// Picking the sample with fresh CSPRNG randomness AFTER the proof is received
453/// turns round 1 into a commitment and round 2 into an unpredictable challenge
454/// (cut-and-choose): to pass with probability above `(1 - faked_fraction)^count`
455/// the responder must have produced a correct `nonced_hash` — which requires the
456/// real bytes — for essentially every leaf at round-1 commit time. The auditor
457/// still holds none of the peer's chunks.
458fn random_spotcheck_leaves(
459    proof: &SubtreeProof,
460    count: u32,
461) -> Vec<&crate::replication::subtree::SubtreeLeaf> {
462    let n = proof.leaves.len();
463    if n == 0 {
464        return Vec::new();
465    }
466    let want = (count as usize).min(n);
467    let mut rng = rand::thread_rng();
468    let mut picked = std::collections::BTreeSet::new();
469    // n >= want, so this terminates quickly; bound the loop defensively against
470    // a pathological RNG rather than risk spinning.
471    let mut guard = 0u32;
472    while picked.len() < want && guard < count.saturating_mul(64).max(64) {
473        picked.insert(rng.gen_range(0..n));
474        guard = guard.saturating_add(1);
475    }
476    // Deterministic top-up if the RNG kept colliding (astronomically unlikely):
477    // fill the lowest missing indices so the sample is never silently short.
478    for idx in 0..n {
479        if picked.len() >= want {
480            break;
481        }
482        picked.insert(idx);
483    }
484    picked
485        .into_iter()
486        .filter_map(|idx| proof.leaves.get(idx))
487        .collect()
488}
489
490/// Round-2 verdict (ADR-0002): the responder served the original chunk content
491/// for the auditor's spot-check sample; verify possession from THAT content.
492///
493/// `served(key)` returns what the responder returned for a requested key:
494/// `Some(Some(bytes))` for [`SubtreeByteItem::Present`], `Some(None)` for an
495/// explicit [`SubtreeByteItem::Absent`], and `None` if the responder omitted the
496/// key entirely (treated like `Absent` — a committed key it would not serve).
497///
498/// For each sampled leaf the auditor recomputes, from the SERVED content:
499///   - `BLAKE3(content) == leaf.bytes_hash` (the chunk's content address), AND
500///   - `BLAKE3(nonce ‖ peer ‖ key ‖ content) == leaf.nonced_hash` (freshness),
501///     i.e. `compute_audit_digest(nonce, peer, key, content)`.
502///
503/// The freshness inputs are byte-identical to what the responder used to BUILD
504/// the leaf in round 1 (`subtree_leaf` → `nonced_leaf_hash`): the SAME four
505/// inputs, so an honest holder's served content reproduces `nonced_hash`
506/// exactly. Round 1 commits over the data (the `nonced_hash` is uncomputable
507/// without the bytes); round 2 reveals a random subset to prove the commitment
508/// was not fabricated.
509///
510/// Both checks are over the content the responder sent, so the auditor needs to
511/// hold none of the peer's chunks. Any `Absent`/omitted committed key, or any
512/// served content that fails a hash, is a provable lie → confirmed
513/// [`AuditFailureReason::DigestMismatch`]. All sampled leaves verifying →
514/// `Pass { checked }`.
515pub(crate) fn verify_byte_response(
516    leaves: &[&crate::replication::subtree::SubtreeLeaf],
517    nonce: &[u8; 32],
518    challenged_peer_bytes: &[u8; 32],
519    served: impl Fn(&XorName) -> Option<Option<Vec<u8>>>,
520) -> AuditVerdict {
521    let mut checked = 0usize;
522    for leaf in leaves {
523        // Present{bytes} -> Some(Some(bytes)); Absent -> Some(None); omitted -> None.
524        // A committed key the responder cannot / will not serve is a provable lie.
525        let Some(Some(content)) = served(&leaf.key) else {
526            return AuditVerdict::Fail(AuditFailureReason::DigestMismatch);
527        };
528        let plain = *blake3::hash(&content).as_bytes();
529        let nonced = crate::replication::subtree::nonced_leaf_hash(
530            nonce,
531            challenged_peer_bytes,
532            &leaf.key,
533            &content,
534        );
535        if leaf.bytes_hash != plain || leaf.nonced_hash != nonced {
536            // Served content does not hash to the committed address / freshness
537            // hash: cannot be the chunk it committed to.
538            return AuditVerdict::Fail(AuditFailureReason::DigestMismatch);
539        }
540        checked += 1;
541    }
542    AuditVerdict::Pass { checked }
543}
544
545/// Verify a subtree-proof response (auditor side), ADR-0002 two-round audit.
546///
547/// **Round 1** (this proof): pin + identity + signature + structure. If the
548/// proof structurally rebuilds to the pinned root, the tree SHAPE is committed —
549/// but not yet that the bytes are held. **Round 2**: the auditor picks a small
550/// freshly-random (post-proof) sample of the just-proven leaves and sends a
551/// [`SubtreeByteChallenge`] demanding their original chunk content FROM the
552/// responder, then verifies that content against the committed `bytes_hash`
553/// (content address) and `nonced_hash` (freshness). A responder that committed
554/// to a chunk it no longer holds cannot serve content that hashes to the
555/// committed address, so it fails — regardless of what the auditor holds. On a
556/// full pass, credits the peer as a proven holder.
557async fn verify_subtree_response(
558    ctx: &AuditCtx<'_>,
559    commitment: &StorageCommitment,
560    proof: &SubtreeProof,
561) -> AuditTickResult {
562    let challenged_peer = ctx.challenged_peer;
563    let challenge_id = ctx.challenge_id;
564
565    // -- Round 1: pin/identity/signature + structure (no bytes). --
566    if let Err(reason) = evaluate_subtree_structure(
567        commitment,
568        proof,
569        &ctx.nonce,
570        &ctx.expected_commitment_hash,
571        challenged_peer.as_bytes(),
572    ) {
573        warn!("Audit: {challenged_peer} failed subtree structure ({reason:?})");
574        return failed(challenged_peer, challenge_id, reason);
575    }
576
577    // -- Round 2: surprise byte challenge for a 3..=5 FRESHLY-RANDOM sample. --
578    // The sample is chosen now, with CSPRNG randomness, AFTER the round-1 proof
579    // is in hand — NOT derived from the round-1 nonce. The responder committed
580    // every leaf's `nonced_hash` in round 1 without knowing which leaves we will
581    // open, so it cannot have fabricated the un-opened ones (cut-and-choose).
582    // We cap the sample at the ADR's 3..=5 band (clamped to the subtree size) so
583    // the round-2 message and the responder's disk read stay cheap.
584    let sample_n = ctx
585        .config
586        .audit_spotcheck_count()
587        .clamp(BYTE_SPOTCHECK_MIN, BYTE_SPOTCHECK_MAX);
588    let sampled = random_spotcheck_leaves(proof, sample_n);
589    if sampled.is_empty() {
590        // Cannot happen after a valid structure (subtree is never empty), but
591        // guard rather than credit an unproven peer.
592        warn!("Audit: {challenged_peer} produced an empty spot-check sample; rejecting");
593        return failed(
594            challenged_peer,
595            challenge_id,
596            AuditFailureReason::DigestMismatch,
597        );
598    }
599    // The sample is challenged in batches of MAX_BYTE_CHALLENGE_KEYS so each
600    // response — worst case, every requested chunk at MAX_CHUNK_SIZE — still
601    // encodes under MAX_REPLICATION_MESSAGE_SIZE. Each batch carries its own
602    // possession-in-time deadline (sized to its own length), so splitting does
603    // not widen the per-chunk window a relay would need to fetch over the
604    // network.
605    //
606    // CRITICAL: verify each batch's served bytes AS IT ARRIVES, against that
607    // batch's own sampled leaves, and return a CONFIRMED failure immediately.
608    // Deferring all verification until every batch is collected would let a
609    // later batch's graced Timeout (`round_failure`) mask a deterministic
610    // failure already proven by an earlier batch (an absent committed key or a
611    // hash mismatch) — a confirmed cheat would be downgraded to a graced
612    // timeout. A Timeout/Rejected/Malformed only becomes the verdict if NO
613    // earlier batch already produced confirmed bad bytes.
614    let verdict = 'rounds: {
615        for batch in sampled.chunks(MAX_BYTE_CHALLENGE_KEYS) {
616            let batch_keys: Vec<XorName> = batch.iter().map(|l| l.key).collect();
617            match request_byte_proof(ctx, &batch_keys).await {
618                ByteRound::Served(items) => {
619                    // Verify THIS batch now. A confirmed failure here is final —
620                    // a later batch's timeout must not be able to overwrite it.
621                    let v = verify_byte_response(
622                        batch,
623                        &ctx.nonce,
624                        challenged_peer.as_bytes(),
625                        |key| {
626                            items.iter().find_map(|it| match it {
627                                SubtreeByteItem::Present { key: k, bytes } if k == key => {
628                                    Some(Some(bytes.clone()))
629                                }
630                                SubtreeByteItem::Absent { key: k } if k == key => Some(None),
631                                _ => None,
632                            })
633                        },
634                    );
635                    if let AuditVerdict::Fail(reason) = v {
636                        break 'rounds AuditVerdict::Fail(reason);
637                    }
638                }
639                // The responder rejected the byte challenge for a recently
640                // pinned commitment → confirmed failure, same as round 1.
641                ByteRound::Rejected => {
642                    break 'rounds AuditVerdict::Fail(AuditFailureReason::Rejected)
643                }
644                // Graced reject (rotated past the pin / transient): no trust
645                // penalty, but the peer answered and could not prove possession,
646                // so revoke the holder credit for THIS pinned commitment
647                // (codex-r2 C) before taking the graced Timeout verdict. Scoped
648                // to the commitment hash, not the whole peer (codex-r3), so it
649                // never erases credit the peer re-earned for a newer commitment.
650                ByteRound::GracedReject => {
651                    if let Some(credit) = ctx.credit {
652                        credit
653                            .recent_provers
654                            .write()
655                            .await
656                            .forget_commitment(&ctx.expected_commitment_hash);
657                    }
658                    break 'rounds AuditVerdict::Fail(AuditFailureReason::Timeout);
659                }
660                // No response within the byte deadline (or transport error) →
661                // timeout (graced by the caller's strike policy — could be
662                // honest slowness). Keeps credit (a dropped packet is not
663                // evidence of loss). Only reached when no earlier batch already
664                // confirmed bad bytes.
665                ByteRound::Timeout => {
666                    break 'rounds AuditVerdict::Fail(AuditFailureReason::Timeout)
667                }
668                // Malformed/unexpected round-2 body.
669                ByteRound::Malformed => {
670                    break 'rounds AuditVerdict::Fail(AuditFailureReason::MalformedResponse)
671                }
672            }
673        }
674        // Every batch served bytes that verified.
675        AuditVerdict::Pass {
676            checked: sampled.len(),
677        }
678    };
679
680    match verdict {
681        AuditVerdict::Fail(reason) => {
682            warn!("Audit: {challenged_peer} failed subtree audit ({reason:?})");
683            failed(challenged_peer, challenge_id, reason)
684        }
685        AuditVerdict::Pass { checked } => {
686            // Closeness (ADR-0002, soft/observe-only) — see observe_closeness.
687            observe_closeness(ctx.p2p_node, ctx.config, challenged_peer, proof).await;
688            // Credit the peer as a proven holder of its committed keys.
689            if let (Some(credit), Some(pin)) = (ctx.credit, commitment_hash(commitment)) {
690                let now = std::time::Instant::now();
691                let mut provers = credit.recent_provers.write().await;
692                for leaf in &proof.leaves {
693                    provers.record_proof(leaf.key, *challenged_peer, pin, now);
694                }
695            }
696            info!(
697                "Audit: peer {challenged_peer} passed subtree audit ({} leaves, {checked} \
698                 byte-checked)",
699                proof.leaves.len()
700            );
701            AuditTickResult::Passed {
702                challenged_peer: *challenged_peer,
703                keys_checked: checked,
704            }
705        }
706    }
707}
708
709/// Soft, density-aware closeness observation (ADR-0002). Logs — never fails —
710/// when a suspicious fraction of the proof's leaves are keys the auditor itself
711/// is NOT responsible for (a proxy for "implausibly far from the peer").
712///
713/// Using the auditor's own `SelfInclusiveRT` responsibility as the yardstick
714/// makes this density-aware for free: on a small/dense network the auditor is
715/// close to nearly every key, so almost nothing reads as far and no honest peer
716/// is ever flagged. Enforcement is intentionally deferred until a testnet
717/// calibrates the density threshold.
718async fn observe_closeness(
719    p2p_node: &Arc<P2PNode>,
720    config: &ReplicationConfig,
721    challenged_peer: &PeerId,
722    proof: &SubtreeProof,
723) {
724    /// Max leaves probed for the closeness estimate (bounds the DHT lookups).
725    const CLOSENESS_SAMPLE_CAP: usize = 8;
726
727    // This is an observe-only DEBUG signal (never enforced). The check costs one
728    // DHT responsibility lookup per inspected leaf, so (§12): (a) skip it
729    // entirely unless debug logging is on — there is no other consumer — and
730    // (b) inspect at most a bounded SAMPLE of leaves rather than all ~sqrt(N),
731    // which still reveals the "mostly far" padding shape without N lookups.
732    if !crate::logging::enabled!(crate::logging::Level::DEBUG) {
733        return;
734    }
735
736    let self_id = *p2p_node.peer_id();
737    let inspected = proof.leaves.len().min(CLOSENESS_SAMPLE_CAP);
738    let mut far = 0usize;
739    for leaf in proof.leaves.iter().take(inspected) {
740        if !crate::replication::admission::is_responsible(
741            &self_id,
742            &leaf.key,
743            p2p_node,
744            config.close_group_size,
745        )
746        .await
747        {
748            far += 1;
749        }
750    }
751    // Only worth a line when MOST of the inspected sample is far — that's the
752    // padding shape. A normal proof on a sparse network has some far keys.
753    if inspected > 0 && far * 2 > inspected {
754        debug!(
755            "Audit: closeness signal — {far}/{inspected} sampled of {challenged_peer}'s proven \
756             leaves are keys this auditor is not close to (observe-only; possible padding, not \
757             penalized)"
758        );
759    }
760}
761
762/// Build a confirmed-failure result. The auditor pinned a commitment the peer
763/// committed to itself, so there is no per-key responsibility to re-confirm:
764/// the failure is about the peer's own committed tree.
765///
766/// The subtree audit fails a peer as a whole (one challenge, one verdict) rather
767/// than per-key, so the [`AuditFailureSummary`] is a single-failure rollup
768/// mapped from `reason` — enough for the shared audit-failure diagnostics log
769/// line (`absent_keys`/`digest_mismatch_keys`) without inventing per-key counts
770/// this audit shape does not have.
771fn failed(
772    challenged_peer: &PeerId,
773    challenge_id: u64,
774    reason: AuditFailureReason,
775) -> AuditTickResult {
776    let summary = subtree_failure_summary(&reason);
777    AuditTickResult::Failed {
778        evidence: FailureEvidence::AuditFailure {
779            challenge_id,
780            challenged_peer: *challenged_peer,
781            confirmed_failed_keys: Vec::new(),
782            summary,
783            reason,
784        },
785    }
786}
787
788/// Map a subtree-audit `reason` to a single-failure [`AuditFailureSummary`].
789///
790/// A `Timeout` is not (yet) a confirmed failure (it is graced), so it rolls up
791/// as zero confirmed failures; every other reason is one confirmed failure,
792/// categorised where the category is meaningful (byte/nonce/root mismatch →
793/// `digest_mismatch_keys`; explicit absent → `absent_keys`).
794fn subtree_failure_summary(reason: &AuditFailureReason) -> AuditFailureSummary {
795    let mut summary = AuditFailureSummary {
796        challenged_keys: 1,
797        ..AuditFailureSummary::default()
798    };
799    match reason {
800        AuditFailureReason::Timeout => {}
801        AuditFailureReason::DigestMismatch => {
802            summary.failed_keys = 1;
803            summary.digest_mismatch_keys = 1;
804        }
805        AuditFailureReason::KeyAbsent => {
806            summary.failed_keys = 1;
807            summary.absent_keys = 1;
808        }
809        AuditFailureReason::MalformedResponse | AuditFailureReason::Rejected => {
810            summary.failed_keys = 1;
811        }
812    }
813    summary
814}
815
816// ---------------------------------------------------------------------------
817// Responder side
818// ---------------------------------------------------------------------------
819
820/// Handle an incoming subtree audit challenge (responder side).
821///
822/// Validates the challenge targets this node, looks up the pinned commitment in
823/// the retained (last-two-gossiped) set, and builds the subtree proof for the
824/// nonce-selected branch. If this node is bootstrapping it says so; if it
825/// genuinely does not retain the pinned commitment it rejects (which the
826/// auditor treats as a confirmed failure for a recently gossiped root).
827pub async fn handle_subtree_challenge(
828    challenge: &SubtreeAuditChallenge,
829    storage: &LmdbStorage,
830    self_peer_id: &PeerId,
831    is_bootstrapping: bool,
832    commitment_state: Option<&Arc<ResponderCommitmentState>>,
833) -> SubtreeAuditResponse {
834    if is_bootstrapping {
835        return SubtreeAuditResponse::Bootstrapping {
836            challenge_id: challenge.challenge_id,
837        };
838    }
839
840    if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
841        warn!(
842            "Subtree audit challenge targeted wrong peer: expected {}, got {}",
843            hex::encode(self_peer_id.as_bytes()),
844            hex::encode(challenge.challenged_peer_id),
845        );
846        return SubtreeAuditResponse::Rejected {
847            challenge_id: challenge.challenge_id,
848            kind: RejectKind::Protocol,
849            reason: "challenged_peer_id does not match this node".to_string(),
850        };
851    }
852
853    let Some(state) = commitment_state else {
854        return SubtreeAuditResponse::Rejected {
855            challenge_id: challenge.challenge_id,
856            kind: RejectKind::Protocol,
857            reason: "no commitment state".to_string(),
858        };
859    };
860
861    // Look up the pinned commitment among the last-two-gossiped retained set.
862    // A miss is `UnknownCommitment` — the auditor GRACES it (the peer may have
863    // legitimately rotated past a root the auditor still had cached), rather
864    // than treating legitimate rotation as a confirmed repudiation (§6).
865    let Some(built) = state.lookup_by_hash(&challenge.expected_commitment_hash) else {
866        return SubtreeAuditResponse::Rejected {
867            challenge_id: challenge.challenge_id,
868            kind: RejectKind::UnknownCommitment,
869            reason: "unknown commitment hash".to_string(),
870        };
871    };
872
873    // Geometry first (no bytes touched): which leaves to prove + the sibling
874    // cut-hashes from the committed tree.
875    let plan = match subtree_plan(built.tree(), &challenge.nonce) {
876        Ok(p) => p,
877        Err(e) => {
878            warn!("Subtree audit: failed to plan proof: {e:?}");
879            return SubtreeAuditResponse::Rejected {
880                challenge_id: challenge.challenge_id,
881                kind: RejectKind::Protocol,
882                reason: "could not build subtree proof".to_string(),
883            };
884        }
885    };
886
887    // Read chunk bytes one leaf at a time so peak memory is bounded regardless
888    // of subtree size, hashing each into its plain + nonced leaf.
889    let mut leaves = Vec::with_capacity(plan.leaf_keys.len());
890    for key in &plan.leaf_keys {
891        let bytes = match storage.get_raw(key).await {
892            Ok(Some(bytes)) => bytes,
893            // Key is in our committed tree but definitively NOT stored — real
894            // storage loss / the classic deleter. For a recently gossiped pin
895            // the auditor counts this as a CONFIRMED failure.
896            Ok(None) => {
897                warn!(
898                    "Subtree audit: missing bytes for committed key {}",
899                    hex::encode(key)
900                );
901                return SubtreeAuditResponse::Rejected {
902                    challenge_id: challenge.challenge_id,
903                    kind: RejectKind::Protocol,
904                    reason: format!("missing bytes for committed key: {}", hex::encode(key)),
905                };
906            }
907            // Transient storage read error — NOT evidence of missing data (§7).
908            // Reject as graced (timeout-class) so a flaky disk never brands an
909            // honest holder a deleter.
910            Err(e) => {
911                warn!(
912                    "Subtree audit: storage read error for committed key {}: {e} \
913                     (rejecting as transient, not a confirmed failure)",
914                    hex::encode(key)
915                );
916                return SubtreeAuditResponse::Rejected {
917                    challenge_id: challenge.challenge_id,
918                    kind: RejectKind::Transient,
919                    reason: format!("transient storage read error: {e}"),
920                };
921            }
922        };
923        leaves.push(crate::replication::subtree::subtree_leaf(
924            &challenge.nonce,
925            &challenge.challenged_peer_id,
926            key,
927            &bytes,
928        ));
929        // bytes drops here.
930    }
931
932    SubtreeAuditResponse::Proof {
933        challenge_id: challenge.challenge_id,
934        commitment: built.commitment().clone(),
935        proof: SubtreeProof {
936            leaves,
937            sibling_cut_hashes: plan.sibling_cut_hashes,
938        },
939    }
940}
941
942/// Handle a round-2 byte challenge (responder side), ADR-0002.
943///
944/// The auditor has already structurally verified this node's round-1 subtree
945/// proof and now demands the ORIGINAL chunk bytes for a small freshly-random
946/// sample of those leaves. For each requested key the responder either returns
947/// the bytes ([`SubtreeByteItem::Present`]) or — if it committed to the key but
948/// can no longer produce it — an explicit [`SubtreeByteItem::Absent`], which the
949/// auditor counts as a provable failure (committing to bytes you don't hold).
950///
951/// A key the responder never committed to (not in the pinned tree) is also
952/// returned `Absent`: the auditor only ever samples keys it saw in round 1, so
953/// in practice this guards against a malformed/forged byte challenge rather than
954/// an honest mismatch.
955pub async fn handle_subtree_byte_challenge(
956    challenge: &SubtreeByteChallenge,
957    storage: &LmdbStorage,
958    self_peer_id: &PeerId,
959    is_bootstrapping: bool,
960    commitment_state: Option<&Arc<ResponderCommitmentState>>,
961) -> SubtreeByteResponse {
962    if is_bootstrapping {
963        return SubtreeByteResponse::Bootstrapping {
964            challenge_id: challenge.challenge_id,
965        };
966    }
967
968    if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
969        return SubtreeByteResponse::Rejected {
970            challenge_id: challenge.challenge_id,
971            kind: RejectKind::Protocol,
972            reason: "challenged_peer_id does not match this node".to_string(),
973        };
974    }
975
976    // An honest auditor batches its sample to MAX_BYTE_CHALLENGE_KEYS per
977    // challenge so the worst-case response fits the wire cap. Reject larger
978    // requests up front: serving them could only produce an unencodable
979    // response (and invites disk-read amplification from a forged auditor).
980    if challenge.keys.len() > MAX_BYTE_CHALLENGE_KEYS {
981        let requested = challenge.keys.len();
982        return SubtreeByteResponse::Rejected {
983            challenge_id: challenge.challenge_id,
984            kind: RejectKind::Protocol,
985            reason: format!(
986                "byte challenge requests {requested} keys; max {MAX_BYTE_CHALLENGE_KEYS} per challenge"
987            ),
988        };
989    }
990
991    let Some(state) = commitment_state else {
992        return SubtreeByteResponse::Rejected {
993            challenge_id: challenge.challenge_id,
994            kind: RejectKind::Protocol,
995            reason: "no commitment state".to_string(),
996        };
997    };
998    // Resolve the SAME commitment the auditor pinned in round 1. If we no longer
999    // retain it (rotated past it), reject as `UnknownCommitment` — the auditor
1000    // GRACES that (legitimate rotation it may not have observed, §6), rather
1001    // than confirming a failure. We serve bytes only for keys committed under
1002    // this pin.
1003    let Some(built) = state.lookup_by_hash(&challenge.expected_commitment_hash) else {
1004        return SubtreeByteResponse::Rejected {
1005            challenge_id: challenge.challenge_id,
1006            kind: RejectKind::UnknownCommitment,
1007            reason: "unknown commitment hash".to_string(),
1008        };
1009    };
1010
1011    let mut items = Vec::with_capacity(challenge.keys.len());
1012    for key in &challenge.keys {
1013        // Serve ONLY keys committed under this pin. A key the auditor asks for
1014        // that is not in the pinned tree is `Absent` — never served from local
1015        // storage just because we happen to hold it (§15: serving an
1016        // uncommitted-but-held key would let a forged challenge harvest bytes
1017        // and muddy the possession proof, which must be about THIS commitment).
1018        if built.proof_for(key).is_none() {
1019            items.push(SubtreeByteItem::Absent { key: *key });
1020            continue;
1021        }
1022        match storage.get_raw(key).await {
1023            // Committed key, bytes present → serve them.
1024            Ok(Some(bytes)) => items.push(SubtreeByteItem::Present { key: *key, bytes }),
1025            // Committed key, definitively absent → provable failure (§7: this is
1026            // a real "I don't hold it" answer, distinct from a read error).
1027            Ok(None) => {
1028                warn!(
1029                    "Subtree byte audit: committed key {} requested but bytes absent",
1030                    hex::encode(key)
1031                );
1032                items.push(SubtreeByteItem::Absent { key: *key });
1033            }
1034            // Transient storage read error → do NOT brand the peer a deleter
1035            // (§7). Reject the whole challenge as a graced (timeout-class)
1036            // outcome so a flaky LMDB read never manufactures a confirmed
1037            // possession failure on an honest holder.
1038            Err(e) => {
1039                warn!(
1040                    "Subtree byte audit: storage read error for committed key {}: {e} \
1041                     (rejecting as transient, not a confirmed failure)",
1042                    hex::encode(key)
1043                );
1044                return SubtreeByteResponse::Rejected {
1045                    challenge_id: challenge.challenge_id,
1046                    kind: RejectKind::Transient,
1047                    reason: format!("transient storage read error: {e}"),
1048                };
1049            }
1050        }
1051    }
1052
1053    SubtreeByteResponse::Items {
1054        challenge_id: challenge.challenge_id,
1055        items,
1056    }
1057}
1058
1059#[cfg(test)]
1060#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1061mod tests {
1062    use super::*;
1063    use crate::replication::commitment_state::BuiltCommitment;
1064    use crate::replication::subtree::{build_subtree_proof, nonced_leaf_hash, SubtreeLeaf};
1065    use saorsa_pqc::api::sig::ml_dsa_65;
1066
1067    // The two-round audit splits into SHIPPED pure functions exercised directly
1068    // here (no reimplementation that could drift):
1069    //   - round 1: `evaluate_subtree_structure` (pin/identity/signature +
1070    //     structural root rebuild),
1071    //   - sampling: `random_spotcheck_leaves` (3..=5 FRESHLY-RANDOM leaves chosen
1072    //     after the proof is in hand — see its doc for the soundness argument), and
1073    //   - round 2: `verify_byte_response` (recompute content-address + freshness
1074    //     from the bytes the RESPONDER served — the auditor holds nothing).
1075
1076    fn key(i: u32) -> XorName {
1077        let mut k = [0u8; 32];
1078        k[..4].copy_from_slice(&i.to_be_bytes());
1079        k
1080    }
1081    /// The "chunk content" for a key in these fixtures. The committed tree's leaf
1082    /// `bytes_hash` is `BLAKE3(chunk_bytes(key))`, mirroring the general
1083    /// `(key, BLAKE3(content))` commitment; round 2 serves exactly this content.
1084    fn chunk_bytes(k: &XorName) -> Vec<u8> {
1085        let mut v = k.to_vec();
1086        v.extend_from_slice(b"chunk-body");
1087        v
1088    }
1089
1090    /// Build an honest committed tree of `n` keys + a valid round-1 proof for
1091    /// `nonce`. Returns `(built, proof, peer_id)`. The auditor pins `built.hash()`.
1092    fn honest(n: u32, nonce: &[u8; 32]) -> (BuiltCommitment, SubtreeProof, [u8; 32]) {
1093        let (pk, sk) = ml_dsa_65().generate_keypair().unwrap();
1094        let peer_id = *blake3::hash(&pk.to_bytes()).as_bytes();
1095        let pk_b = pk.to_bytes();
1096        let entries: Vec<_> = (0..n)
1097            .map(|i| {
1098                let k = key(i);
1099                (k, *blake3::hash(&chunk_bytes(&k)).as_bytes())
1100            })
1101            .collect();
1102        let built = BuiltCommitment::build(entries, &peer_id, &sk, &pk_b).unwrap();
1103        let proof =
1104            build_subtree_proof(built.tree(), nonce, &peer_id, |k| Some(chunk_bytes(k))).unwrap();
1105        (built, proof, peer_id)
1106    }
1107
1108    /// Round-1 verdict against the pinned commitment.
1109    fn structure(
1110        built: &BuiltCommitment,
1111        proof: &SubtreeProof,
1112        nonce: &[u8; 32],
1113        peer: &[u8; 32],
1114    ) -> Result<(), AuditFailureReason> {
1115        evaluate_subtree_structure(built.commitment(), proof, nonce, &built.hash(), peer)
1116    }
1117
1118    /// The 3..=5 spot-check leaves the auditor would demand bytes for in round 2.
1119    /// Now freshly-random (post-proof) rather than nonce-derived; the `_nonce`/
1120    /// `_key_count` params are kept so existing call sites read unchanged.
1121    fn sample<'a>(
1122        proof: &'a SubtreeProof,
1123        _nonce: &[u8; 32],
1124        _key_count: u32,
1125    ) -> Vec<&'a SubtreeLeaf> {
1126        random_spotcheck_leaves(proof, 8u32.clamp(BYTE_SPOTCHECK_MIN, BYTE_SPOTCHECK_MAX))
1127    }
1128
1129    // A round-2 `served` closure that returns the HONEST content for every key.
1130    // The nested-Option shape is the `verify_byte_response` callback contract:
1131    // Present{bytes} -> Some(Some(bytes)); Absent -> Some(None); omitted -> None.
1132    #[allow(clippy::option_option, clippy::unnecessary_wraps)]
1133    fn served_honest(key: &XorName) -> Option<Option<Vec<u8>>> {
1134        Some(Some(chunk_bytes(key)))
1135    }
1136
1137    // ---- round 1: structure --------------------------------------------------
1138
1139    #[test]
1140    fn honest_structure_then_bytes_passes() {
1141        let nonce = [9u8; 32];
1142        let (built, proof, peer) = honest(400, &nonce);
1143        // Round 1.
1144        assert!(structure(&built, &proof, &nonce, &peer).is_ok());
1145        // Round 2: honest responder serves the real content for the sample.
1146        let s = sample(&proof, &nonce, built.commitment().key_count);
1147        assert!(!s.is_empty());
1148        match verify_byte_response(&s, &nonce, &peer, served_honest) {
1149            AuditVerdict::Pass { checked } => assert!(checked >= 1, "must verify >=1 leaf"),
1150            other @ AuditVerdict::Fail(_) => panic!("expected Pass, got {other:?}"),
1151        }
1152    }
1153
1154    #[test]
1155    fn commitment_bound_to_another_peer_rejected() {
1156        let nonce = [3u8; 32];
1157        let (built, proof, _peer) = honest(200, &nonce);
1158        let other = [0xAAu8; 32];
1159        assert_eq!(
1160            structure(&built, &proof, &nonce, &other),
1161            Err(AuditFailureReason::Rejected)
1162        );
1163    }
1164
1165    #[test]
1166    fn wrong_pinned_commitment_rejected() {
1167        let nonce = [3u8; 32];
1168        let (built, proof, peer) = honest(200, &nonce);
1169        let mut wrong_pin = built.hash();
1170        wrong_pin[0] ^= 0x01;
1171        assert_eq!(
1172            evaluate_subtree_structure(built.commitment(), &proof, &nonce, &wrong_pin, &peer),
1173            Err(AuditFailureReason::Rejected)
1174        );
1175    }
1176
1177    #[test]
1178    fn tampered_leaf_structure_rejected() {
1179        let nonce = [3u8; 32];
1180        let (built, mut proof, peer) = honest(200, &nonce);
1181        if let Some(first) = proof.leaves.first_mut() {
1182            first.bytes_hash[0] ^= 0x01; // breaks root reconstruction
1183        }
1184        assert_eq!(
1185            structure(&built, &proof, &nonce, &peer),
1186            Err(AuditFailureReason::DigestMismatch)
1187        );
1188    }
1189
1190    #[test]
1191    fn wrong_leaf_count_structure_rejected() {
1192        let nonce = [3u8; 32];
1193        let (built, mut proof, peer) = honest(200, &nonce);
1194        proof.leaves.pop();
1195        assert_eq!(
1196            structure(&built, &proof, &nonce, &peer),
1197            Err(AuditFailureReason::DigestMismatch)
1198        );
1199    }
1200
1201    // ---- round 2: responder-served bytes ------------------------------------
1202
1203    #[test]
1204    fn deleter_absent_bytes_is_confirmed_failure() {
1205        // THE headline fix: a node whose round-1 proof is structurally perfect
1206        // but which has DELETED a committed chunk cannot serve its bytes. It
1207        // signals `Absent` for the sampled key → provable lie → confirmed
1208        // failure. Crucially, the auditor holds NONE of the peer's chunks; the
1209        // verdict depends only on what the responder serves.
1210        let nonce = [9u8; 32];
1211        let (built, proof, peer) = honest(400, &nonce);
1212        assert!(structure(&built, &proof, &nonce, &peer).is_ok());
1213        let s = sample(&proof, &nonce, built.commitment().key_count);
1214        // Responder returns Absent for the FIRST sampled key, honest for the rest.
1215        let victim = s.first().map(|l| l.key).unwrap();
1216        let v = verify_byte_response(&s, &nonce, &peer, |k| {
1217            if *k == victim {
1218                Some(None) // explicit Absent
1219            } else {
1220                Some(Some(chunk_bytes(k)))
1221            }
1222        });
1223        assert_eq!(v, AuditVerdict::Fail(AuditFailureReason::DigestMismatch));
1224    }
1225
1226    #[test]
1227    fn omitted_committed_key_is_confirmed_failure() {
1228        // A responder that simply omits a sampled committed key from its items
1229        // (neither Present nor Absent) is treated identically to Absent: it
1230        // committed to the key and won't serve it → confirmed failure.
1231        let nonce = [9u8; 32];
1232        let (built, proof, peer) = honest(400, &nonce);
1233        let s = sample(&proof, &nonce, built.commitment().key_count);
1234        let victim = s.first().map(|l| l.key).unwrap();
1235        let v = verify_byte_response(&s, &nonce, &peer, |k| {
1236            if *k == victim {
1237                None // omitted entirely
1238            } else {
1239                Some(Some(chunk_bytes(k)))
1240            }
1241        });
1242        assert_eq!(v, AuditVerdict::Fail(AuditFailureReason::DigestMismatch));
1243    }
1244
1245    #[test]
1246    fn fake_storage_garbage_bytes_is_confirmed_failure() {
1247        // A "fake-storage" responder claims possession but serves garbage. The
1248        // garbage does not hash to the committed content address (`bytes_hash`),
1249        // so the round-2 content-address check fails → confirmed failure. No
1250        // auditor holdings involved.
1251        let nonce = [9u8; 32];
1252        let (built, proof, peer) = honest(400, &nonce);
1253        let s = sample(&proof, &nonce, built.commitment().key_count);
1254        let v = verify_byte_response(&s, &nonce, &peer, |k| {
1255            let mut garbage = blake3::hash(k).as_bytes().to_vec();
1256            garbage.extend_from_slice(b"adversary-fake-storage");
1257            Some(Some(garbage))
1258        });
1259        assert_eq!(v, AuditVerdict::Fail(AuditFailureReason::DigestMismatch));
1260    }
1261
1262    #[test]
1263    fn correct_content_address_but_stale_freshness_fails() {
1264        // Suppose a responder could serve bytes that hash to the content address
1265        // (it holds the chunk) — then BOTH checks pass; that is honest. But if
1266        // it serves bytes whose freshness hash does not match (e.g. replaying a
1267        // different nonce's digest is impossible since we recompute it here), the
1268        // freshness check must catch any content that doesn't reproduce the
1269        // committed `nonced_hash`. We model a leaf whose committed nonced_hash was
1270        // built under a DIFFERENT nonce, so the audit nonce's recompute differs.
1271        let nonce = [9u8; 32];
1272        let (built, mut proof, peer) = honest(400, &nonce);
1273        // Rewrite EVERY leaf's nonced_hash to one bound to a different nonce but
1274        // keep its bytes_hash correct (so each leaf's content-address check is
1275        // fine; only freshness is wrong). Tampering all leaves means the
1276        // freshly-random sample is guaranteed to land on a stale-freshness leaf.
1277        let other_nonce = [0xEEu8; 32];
1278        for leaf in &mut proof.leaves {
1279            leaf.nonced_hash =
1280                nonced_leaf_hash(&other_nonce, &peer, &leaf.key, &chunk_bytes(&leaf.key));
1281        }
1282        let s = sample(&proof, &nonce, built.commitment().key_count);
1283        let v = verify_byte_response(&s, &nonce, &peer, served_honest);
1284        assert_eq!(v, AuditVerdict::Fail(AuditFailureReason::DigestMismatch));
1285    }
1286
1287    #[test]
1288    fn auditor_holds_nothing_still_catches_deleter() {
1289        // Explicit contract: the auditor's own storage is irrelevant. A deleter
1290        // is caught purely from its served (absent) response. (Compare the OLD
1291        // design, where an auditor holding none of the chunks went Inconclusive
1292        // and the deleter walked free.)
1293        let nonce = [0x21u8; 32];
1294        let (built, proof, peer) = honest(256, &nonce);
1295        assert!(structure(&built, &proof, &nonce, &peer).is_ok());
1296        let s = sample(&proof, &nonce, built.commitment().key_count);
1297        // Responder is a total deleter: Absent for everything.
1298        let v = verify_byte_response(&s, &nonce, &peer, |_| Some(None));
1299        assert_eq!(v, AuditVerdict::Fail(AuditFailureReason::DigestMismatch));
1300    }
1301
1302    #[test]
1303    fn sample_size_is_in_3_to_5_band() {
1304        // ADR-0002: round-2 samples a SMALL surprise set (3..=5) of the proven
1305        // leaves. For a large subtree the sample is capped at 5.
1306        let nonce = [7u8; 32];
1307        let (built, proof, _peer) = honest(1024, &nonce);
1308        let s = sample(&proof, &nonce, built.commitment().key_count);
1309        assert!(
1310            (BYTE_SPOTCHECK_MIN as usize..=BYTE_SPOTCHECK_MAX as usize).contains(&s.len()),
1311            "sample {} must be within 3..=5",
1312            s.len()
1313        );
1314    }
1315
1316    #[test]
1317    fn full_pass_requires_every_sampled_leaf() {
1318        // checked must equal the number of sampled leaves on a pass (no leaf is
1319        // silently skipped — every sampled, committed key must verify).
1320        let nonce = [11u8; 32];
1321        let (built, proof, peer) = honest(400, &nonce);
1322        let s = sample(&proof, &nonce, built.commitment().key_count);
1323        match verify_byte_response(&s, &nonce, &peer, served_honest) {
1324            AuditVerdict::Pass { checked } => assert_eq!(checked, s.len()),
1325            other @ AuditVerdict::Fail(_) => panic!("expected Pass, got {other:?}"),
1326        }
1327    }
1328
1329    // ---- end-to-end gate composition ----------------------------------------
1330
1331    #[test]
1332    fn structure_fail_short_circuits_before_round_2() {
1333        // A structurally invalid proof is rejected in round 1; the byte challenge
1334        // is never issued. We assert the round-1 gate returns Err so the auditor
1335        // (verify_subtree_response) never reaches request_byte_proof.
1336        let nonce = [5u8; 32];
1337        let (built, mut proof, peer) = honest(300, &nonce);
1338        if let Some(first) = proof.leaves.first_mut() {
1339            first.bytes_hash[0] ^= 0x01;
1340        }
1341        assert!(structure(&built, &proof, &nonce, &peer).is_err());
1342    }
1343
1344    /// Build an honest committed tree whose keys are deliberately "FAR": their
1345    /// addresses live at the high end of the XOR space (top bytes = 0xFF). On the
1346    /// auditor side these are the leaves `observe_closeness` counts toward `far`.
1347    fn honest_far(n: u32, nonce: &[u8; 32]) -> (BuiltCommitment, SubtreeProof, [u8; 32]) {
1348        let (pk, sk) = ml_dsa_65().generate_keypair().unwrap();
1349        let peer_id = *blake3::hash(&pk.to_bytes()).as_bytes();
1350        let pk_b = pk.to_bytes();
1351        let entries: Vec<_> = (0..n)
1352            .map(|i| {
1353                let mut k = [0xFFu8; 32];
1354                k[28..].copy_from_slice(&i.to_be_bytes());
1355                (k, *blake3::hash(&chunk_bytes(&k)).as_bytes())
1356            })
1357            .collect();
1358        let built = BuiltCommitment::build(entries, &peer_id, &sk, &pk_b).unwrap();
1359        let proof =
1360            build_subtree_proof(built.tree(), nonce, &peer_id, |k| Some(chunk_bytes(k))).unwrap();
1361        (built, proof, peer_id)
1362    }
1363
1364    /// ADR-0002 "Closeness" is OBSERVE-ONLY: far-keyed honest proofs verify
1365    /// exactly like near-keyed ones. The verdict (structure + served bytes) is
1366    /// closeness-blind, so a "far/padding" shape can never produce a Fail.
1367    #[test]
1368    fn closeness_is_observe_only_far_keys_still_pass() {
1369        let nonce = [9u8; 32];
1370
1371        let (built_far, proof_far, peer_far) = honest_far(400, &nonce);
1372        assert!(structure(&built_far, &proof_far, &nonce, &peer_far).is_ok());
1373        let sf = sample(&proof_far, &nonce, built_far.commitment().key_count);
1374        let v_far = verify_byte_response(&sf, &nonce, &peer_far, served_honest);
1375
1376        let (built_near, proof_near, peer_near) = honest(400, &nonce);
1377        assert!(structure(&built_near, &proof_near, &nonce, &peer_near).is_ok());
1378        let sn = sample(&proof_near, &nonce, built_near.commitment().key_count);
1379        let v_near = verify_byte_response(&sn, &nonce, &peer_near, served_honest);
1380
1381        match (&v_far, &v_near) {
1382            (AuditVerdict::Pass { checked: cf }, AuditVerdict::Pass { checked: cn }) => {
1383                assert!(*cf >= 1 && *cn >= 1);
1384            }
1385            other => panic!("both honest proofs must Pass regardless of closeness, got {other:?}"),
1386        }
1387        assert!(
1388            !matches!(v_far, AuditVerdict::Fail(_)),
1389            "far/padding-shaped honest proof must NEVER fail, got {v_far:?}"
1390        );
1391    }
1392
1393    // Unused-leaf constructor guard: keep SubtreeLeaf import meaningful.
1394    #[test]
1395    fn subtree_leaf_is_constructible() {
1396        let _l = SubtreeLeaf {
1397            key: key(1),
1398            bytes_hash: [0u8; 32],
1399            nonced_hash: [0u8; 32],
1400        };
1401    }
1402}