Skip to main content

ant_node/replication/
mod.rs

1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod commitment;
21pub mod commitment_state;
22pub mod config;
23pub mod fresh;
24pub mod neighbor_sync;
25pub mod paid_list;
26pub mod possession;
27pub mod protocol;
28pub mod pruning;
29pub mod quorum;
30pub mod recent_provers;
31pub mod scheduling;
32pub mod storage_commitment_audit;
33pub mod subtree;
34pub mod types;
35
36use std::collections::{HashMap, HashSet};
37use std::path::Path;
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use std::pin::Pin;
42
43use crate::logging::{debug, error, info, warn};
44use futures::stream::FuturesUnordered;
45use futures::{Future, StreamExt};
46use rand::Rng;
47use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
48use tokio::task::JoinHandle;
49use tokio_util::sync::CancellationToken;
50
51use crate::ant_protocol::XorName;
52use crate::error::{Error, Result};
53use crate::payment::{PaymentVerifier, VerificationContext};
54use crate::replication::audit::AuditTickResult;
55use crate::replication::commitment::{commitment_hash, StorageCommitment};
56use crate::replication::commitment_state::{PeerCommitmentRecord, ResponderCommitmentState};
57use crate::replication::config::{
58    max_parallel_fetch, storage_admission_width, ReplicationConfig, MAX_AUDIT_RESPONSES_PER_PEER,
59    MAX_CONCURRENT_AUDIT_RESPONSES, MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID,
60};
61use crate::replication::paid_list::PaidList;
62use crate::replication::protocol::{
63    FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
64    VerificationResponse,
65};
66use crate::replication::quorum::KeyVerificationOutcome;
67use crate::replication::recent_provers::RecentProvers;
68use crate::replication::scheduling::ReplicationQueues;
69use crate::replication::types::{
70    AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
71    NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
72};
73use crate::storage::LmdbStorage;
74use saorsa_core::identity::{NodeIdentity, PeerId};
75use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
76
77// ---------------------------------------------------------------------------
78// Constants
79// ---------------------------------------------------------------------------
80
81/// Prefix used by saorsa-core's request-response mechanism.
82const RR_PREFIX: &str = "/rr/";
83
84fn fresh_offer_payment_context() -> VerificationContext {
85    VerificationContext::ClientPut
86}
87
88fn paid_notify_payment_context() -> VerificationContext {
89    VerificationContext::PaidListAdmission
90}
91
92/// Boxed future type for in-flight fetch tasks.
93type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
94
95/// Shared dependencies for one verification worker cycle.
96struct VerificationCycleContext<'a> {
97    p2p_node: &'a Arc<P2PNode>,
98    paid_list: &'a Arc<PaidList>,
99    storage: &'a Arc<LmdbStorage>,
100    queues: &'a Arc<RwLock<ReplicationQueues>>,
101    config: &'a ReplicationConfig,
102    bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
103    is_bootstrapping: &'a Arc<RwLock<bool>>,
104    bootstrap_complete_notify: &'a Arc<Notify>,
105    /// v12 §6 holder-eligibility inputs. The verifier downgrades a
106    /// peer's Present claim to Unresolved unless they're a credited
107    /// holder of the key (i.e. they recently passed a commitment-bound
108    /// audit on it under their currently-credited commitment hash).
109    last_commitment_by_peer: &'a Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
110    ever_capable_peers: &'a Arc<RwLock<HashSet<PeerId>>>,
111    recent_provers: &'a Arc<RwLock<RecentProvers>>,
112}
113
114/// Fetch worker polling interval in milliseconds.
115const FETCH_WORKER_POLL_MS: u64 = 100;
116
117/// Verification worker polling interval in milliseconds.
118const VERIFICATION_WORKER_POLL_MS: u64 = 250;
119
120/// Verification cycle duration that is worth surfacing at info level.
121const VERIFICATION_CYCLE_SLOW_LOG_MS: u128 = 500;
122
123/// Standard trust event weight for per-operation success/failure signals.
124///
125/// Used for individual replication fetch outcomes, integrity check failures,
126/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
127/// is reserved for confirmed audit failures.
128const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
129
130/// Bootstrap drain check interval in seconds.
131const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
132
133/// How often the responder rebuilds + rotates its storage commitment.
134///
135/// Each rebuild scans LMDB to compute leaf hashes; for ~10k keys this is
136/// sub-100ms (BLAKE3 + tree build). Retention is gossip-anchored, NOT
137/// rotation-anchored: the responder stays answerable for the current
138/// commitment plus the last `RETAINED_GOSSIPED_COMMITMENTS` (= 2) it
139/// actually gossiped, each kept for `GOSSIP_ANSWERABILITY_TTL` (3 h) after
140/// its last emission (see `commitment_state`). So the rotation cadence does
141/// not by itself bound answerability — a gossiped commitment stays
142/// answerable across rotations until its gossip TTL lapses.
143///
144/// Default: 1 hour, aligned with the worst-case neighbor-sync cooldown
145/// (`NEIGHBOR_SYNC_COOLDOWN_SECS = 3600`). Because the gossip TTL (3 h)
146/// comfortably exceeds the gap between our rotation and the next gossip
147/// arrival at a remote peer, this prevents the "unknown commitment hash" ->
148/// Idle audit-skip pattern from being the common case.
149///
150/// Why not faster: the v12 pin is bound to a specific point-in-time
151/// commitment, so rotation isn't security-critical for pin freshness —
152/// only for keeping the committed key set current as the responder
153/// writes new keys. 1 hour is plenty for that, and slow enough that
154/// honest auditors mostly hit `current` or `previous` rather than the
155/// "rotated past" case.
156const COMMITMENT_ROTATION_INTERVAL_SECS: u64 = 3600;
157
158/// Minimum interval between commitment signature verifications for a
159/// single peer (v10/v12 §2 step 3 + §11 `DoS`).
160///
161/// A sybil that bypasses the routing-table gate (e.g. by transient
162/// bucket pollution) could otherwise force one ML-DSA-65 verify (~1 ms)
163/// per gossip message. This rate limit caps the verify-per-peer rate
164/// at 1/min, which is comfortably above the legitimate gossip cadence
165/// (the 10-20 min neighbor-sync round on each peer).
166const COMMITMENT_SIG_VERIFY_MIN_INTERVAL: Duration = Duration::from_secs(60);
167
168/// Hard cap on the size of `last_commitment_by_peer`.
169///
170/// Bounds the per-process memory cost of the auditor's per-peer
171/// commitment cache. Each entry holds a `StorageCommitment`
172/// (~5 KiB: 1952-byte pubkey + 3293-byte signature + small fields).
173/// At 4096 entries the cache is ~20 MiB, which comfortably covers a
174/// realistic close-group neighborhood. When the cap is hit, one
175/// arbitrary existing entry is evicted on insert (`HashMap` iteration
176/// order is unspecified; we do not track insertion order). The
177/// `PeerRemoved` handler proactively drops entries as the DHT
178/// detects departures, and `ingest_peer_commitment` only admits
179/// commitments from peers currently in the routing table — together
180/// the cap is the third line of defence against sybil/churn flooding.
181const MAX_LAST_COMMITMENT_BY_PEER: usize = 4096;
182
183/// Cap on the sticky `ever_capable_peers` set. Bounds memory so a
184/// long-running bootstrap node cannot have the set grow without limit
185/// from peer-id churn. Sized at 4x `MAX_LAST_COMMITMENT_BY_PEER` so
186/// the set comfortably outlives normal LRU churn but still caps the
187/// blast radius of identity-rotation attacks. Once full we refuse new
188/// inserts (no eviction) — keeps the historic set stable; new v12
189/// peers above the cap are treated as legacy on rejoin, which matches
190/// the behaviour before this set existed, not a security regression.
191const MAX_EVER_CAPABLE_PEERS: usize = 4 * MAX_LAST_COMMITMENT_BY_PEER;
192
193// ---------------------------------------------------------------------------
194// ReplicationEngine
195// ---------------------------------------------------------------------------
196
197/// The replication engine manages all replication background tasks and state.
198pub struct ReplicationEngine {
199    /// Replication configuration (shared across spawned tasks).
200    config: Arc<ReplicationConfig>,
201    /// P2P networking node.
202    p2p_node: Arc<P2PNode>,
203    /// Local chunk storage.
204    storage: Arc<LmdbStorage>,
205    /// Persistent paid-for-list.
206    paid_list: Arc<PaidList>,
207    /// Payment verifier for `PoP` validation.
208    payment_verifier: Arc<PaymentVerifier>,
209    /// Replication pipeline queues.
210    queues: Arc<RwLock<ReplicationQueues>>,
211    /// Neighbor sync cycle state.
212    sync_state: Arc<RwLock<NeighborSyncState>>,
213    /// Per-peer sync history (for `RepairOpportunity`).
214    ///
215    /// This map grows with peer churn and is intentionally unbounded: entries
216    /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
217    /// naturally bounded by the routing table's k-bucket capacity.
218    sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
219    /// Per-peer cooldown for gossip-triggered subtree audits (ADR-0002).
220    ///
221    /// Records when each peer was last audited so a burst of gossiped
222    /// commitment changes cannot spawn back-to-back audits of the same peer.
223    /// Bounded by routing-table membership and cleaned on `PeerRemoved`.
224    audit_on_gossip_cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
225    /// Completed local neighbor-sync cycle epoch for proof maturity.
226    sync_cycle_epoch: Arc<RwLock<u64>>,
227    /// Per-key repair proof tracking for audit eligibility.
228    repair_proofs: Arc<RwLock<RepairProofs>>,
229    /// Bootstrap state tracking.
230    bootstrap_state: Arc<RwLock<BootstrapState>>,
231    /// Whether this node is currently bootstrapping.
232    is_bootstrapping: Arc<RwLock<bool>>,
233    /// Trigger for early neighbor sync (signalled on topology changes).
234    sync_trigger: Arc<Notify>,
235    /// Notified when `is_bootstrapping` transitions from `true` to `false`.
236    bootstrap_complete_notify: Arc<Notify>,
237    /// Node identity (for signing storage commitments).
238    ///
239    /// Phase 3 of the v12 storage-bound audit design. The responder
240    /// uses this to sign its periodically-built `StorageCommitment`.
241    identity: Arc<NodeIdentity>,
242    /// Responder-side commitment state (two-slot atomic rotation).
243    ///
244    /// Periodically rebuilt from the live LMDB key set; gossiped on
245    /// outbound `NeighborSyncRequest`/`Response`; consulted by the
246    /// commitment-bound audit handler.
247    commitment_state: Arc<ResponderCommitmentState>,
248    /// Auditor-side per-peer commitment record (last known commitment +
249    /// sticky `commitment_capable` flag).
250    ///
251    /// Populated whenever an inbound gossip carries a verified
252    /// commitment from the sender. Used by `audit_tick` to snapshot
253    /// `expected_commitment_hash` into outbound challenges, and by
254    /// holder-eligibility (§6) to decide whether a peer's `recent_provers`
255    /// proof should be honoured. The sticky `commitment_capable` flag
256    /// flips true on first successful ingest and never reverts (§2
257    /// step 5).
258    last_commitment_by_peer: Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
259    /// Sticky set of peer IDs we have EVER seen carrying a v12
260    /// commitment, independent of whether their commitment bytes are
261    /// still in `last_commitment_by_peer`. The §6 holder-eligibility
262    /// closure consults this set to keep treating churned-out
263    /// previously-v12 peers as v12-capable (rather than degrading them
264    /// to "legacy" credit-unconditionally) when they re-appear on the
265    /// network before their next gossip arrives. Bounded growth: even
266    /// at one million peers seen over the node's lifetime, the set is
267    /// 32 MB.
268    ever_capable_peers: Arc<RwLock<HashSet<PeerId>>>,
269    /// Auditor-side holder-eligibility cache (v12 §6).
270    ///
271    /// Recorded on successful commitment-bound audit; read by future
272    /// quorum / paid-list eligibility checks (phase-3 stretch).
273    recent_provers: Arc<RwLock<RecentProvers>>,
274    /// Per-peer last sig-verify attempt timestamp for the §2 step 3 /
275    /// §11 `DoS` rate limit. Bumped on EVERY verify attempt (success or
276    /// failure) so a peer we've never successfully verified can't burn
277    /// CPU on a flood of structurally-plausible-but-invalid gossips.
278    /// Lives separately from `last_commitment_by_peer` because that
279    /// map's records only exist after a successful verify.
280    sig_verify_attempts: Arc<RwLock<HashMap<PeerId, Instant>>>,
281    /// Limits concurrent outbound replication sends to prevent bandwidth
282    /// saturation on home broadband connections.
283    send_semaphore: Arc<Semaphore>,
284    /// Bounds concurrent IN-FLIGHT audit-responder tasks (subtree round 1 +
285    /// byte round 2). Those are spawned off the serial message loop so disk
286    /// reads don't block replication; the semaphore restores a global
287    /// backpressure ceiling so the node can't fan out unbounded `get_raw` reads
288    /// / multi-MiB byte serves.
289    audit_responder_semaphore: Arc<Semaphore>,
290    /// Per-source in-flight audit-responder counts, capped at
291    /// [`MAX_AUDIT_RESPONSES_PER_PEER`]. The GLOBAL semaphore alone is not
292    /// flood-fair: one peer spamming challenges could occupy every slot and
293    /// starve honest auditors, whose dropped challenges then convert to
294    /// audit timeouts against HONEST peers (codex-r2 A). This
295    /// per-peer cap guarantees no single source can hold more than its share,
296    /// so a flood self-throttles without denying service to everyone else.
297    audit_responder_inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
298    /// Receiver for fresh-write events from the chunk PUT handler.
299    ///
300    /// When present, `start()` spawns a drainer task that calls
301    /// `replicate_fresh` for each event.
302    fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
303    /// Sender for delayed possession-check events (ADR-0003). The fresh-write
304    /// drainer pushes the responsible close-group peers here after each fresh
305    /// replication; the possession-check scheduler drains the paired receiver.
306    possession_check_tx: mpsc::UnboundedSender<possession::PossessionCheckEvent>,
307    /// Receiver paired with `possession_check_tx`; taken by the scheduler task.
308    possession_check_rx: Option<mpsc::UnboundedReceiver<possession::PossessionCheckEvent>>,
309    /// Shutdown token.
310    shutdown: CancellationToken,
311    /// Background task handles.
312    task_handles: Vec<JoinHandle<()>>,
313}
314
315impl ReplicationEngine {
316    /// Create a new replication engine.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the `PaidList` LMDB environment cannot be opened
321    /// or if the configuration fails validation.
322    #[allow(clippy::too_many_arguments)]
323    pub async fn new(
324        config: ReplicationConfig,
325        p2p_node: Arc<P2PNode>,
326        storage: Arc<LmdbStorage>,
327        payment_verifier: Arc<PaymentVerifier>,
328        identity: Arc<NodeIdentity>,
329        root_dir: &Path,
330        fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
331        shutdown: CancellationToken,
332    ) -> Result<Self> {
333        config.validate().map_err(Error::Config)?;
334
335        let paid_list = Arc::new(
336            PaidList::new(root_dir)
337                .await
338                .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
339        );
340
341        let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
342        let config = Arc::new(config);
343        let (possession_check_tx, possession_check_rx) = mpsc::unbounded_channel();
344
345        Ok(Self {
346            config: Arc::clone(&config),
347            p2p_node,
348            storage,
349            paid_list,
350            payment_verifier,
351            queues: Arc::new(RwLock::new(ReplicationQueues::new())),
352            sync_state: Arc::new(RwLock::new(initial_neighbors)),
353            sync_history: Arc::new(RwLock::new(HashMap::new())),
354            audit_on_gossip_cooldown: Arc::new(RwLock::new(HashMap::new())),
355            sync_cycle_epoch: Arc::new(RwLock::new(0)),
356            repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
357            bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
358            is_bootstrapping: Arc::new(RwLock::new(true)),
359            sync_trigger: Arc::new(Notify::new()),
360            bootstrap_complete_notify: Arc::new(Notify::new()),
361            identity,
362            commitment_state: Arc::new(ResponderCommitmentState::new()),
363            last_commitment_by_peer: Arc::new(RwLock::new(HashMap::new())),
364            ever_capable_peers: Arc::new(RwLock::new(HashSet::new())),
365            recent_provers: Arc::new(RwLock::new(RecentProvers::new())),
366            sig_verify_attempts: Arc::new(RwLock::new(HashMap::new())),
367            send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
368            audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)),
369            audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())),
370            fresh_write_rx: Some(fresh_write_rx),
371            possession_check_tx,
372            possession_check_rx: Some(possession_check_rx),
373            shutdown,
374            task_handles: Vec::new(),
375        })
376    }
377
378    /// Get a reference to the `PaidList`.
379    #[must_use]
380    pub fn paid_list(&self) -> &Arc<PaidList> {
381        &self.paid_list
382    }
383
384    /// Get a reference to the responder's commitment state. Used by audit
385    /// handlers to look up commitments by hash; used by the rotation tick
386    /// to install fresh ones.
387    #[must_use]
388    pub fn commitment_state(&self) -> &Arc<ResponderCommitmentState> {
389        &self.commitment_state
390    }
391
392    /// Get a reference to the auditor's last-commitment-by-peer table.
393    #[must_use]
394    pub fn last_commitment_by_peer(&self) -> &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>> {
395        &self.last_commitment_by_peer
396    }
397
398    /// Get a reference to the holder-eligibility cache. Phase-3 stretch:
399    /// will be read by quorum / paid-list eligibility checks.
400    #[must_use]
401    pub fn recent_provers(&self) -> &Arc<RwLock<RecentProvers>> {
402        &self.recent_provers
403    }
404
405    /// Test-only: rebuild + rotate this node's storage commitment now over its
406    /// current key set (normally on a 1h timer). Lets a test commit to chunks it
407    /// just stored without waiting for the rotation cadence.
408    ///
409    /// # Errors
410    ///
411    /// Propagates any error from reading the local key set or building/signing
412    /// the commitment.
413    #[cfg(any(test, feature = "test-utils"))]
414    pub async fn rebuild_commitment_now(&self) -> Result<()> {
415        rebuild_and_rotate_commitment(
416            &self.storage,
417            &self.identity,
418            &self.commitment_state,
419            &self.p2p_node,
420            &self.config,
421        )
422        .await
423    }
424
425    /// Test-only: directly seed this node's cached commitment for `peer`,
426    /// simulating "we received `peer`'s gossiped commitment" without depending
427    /// on neighbor-sync propagation timing. Lets a two-node audit test pin the
428    /// peer's commitment deterministically.
429    #[cfg(any(feature = "test-utils", test))]
430    pub async fn inject_peer_commitment_for_test(
431        &self,
432        peer: &PeerId,
433        commitment: StorageCommitment,
434    ) {
435        let now = Instant::now();
436        self.last_commitment_by_peer
437            .write()
438            .await
439            .insert(*peer, PeerCommitmentRecord::from_verified(commitment, now));
440        self.ever_capable_peers.write().await.insert(*peer);
441    }
442
443    /// Test-only: run ONE subtree audit against `peer` right now, pinned to the
444    /// commitment this node has cached for it (from gossip), over the live wire.
445    /// Returns the audit outcome so tests can assert honest-pass / adversary-fail
446    /// in a real two-node setting without waiting for the gossip cadence.
447    ///
448    /// Returns `AuditTickResult::Idle` if we have no cached commitment for the
449    /// peer yet (gossip hasn't reached us). Gated to test builds.
450    #[cfg(any(test, feature = "test-utils"))]
451    pub async fn audit_peer_now(&self, peer: &PeerId) -> audit::AuditTickResult {
452        let target = {
453            let map = self.last_commitment_by_peer.read().await;
454            map.get(peer)
455                .and_then(PeerCommitmentRecord::last_commitment)
456                .and_then(|c| commitment_hash(c).map(|h| (h, c.key_count)))
457        };
458        let Some((pin, key_count)) = target else {
459            return audit::AuditTickResult::Idle;
460        };
461        let credit = storage_commitment_audit::AuditCredit {
462            recent_provers: &self.recent_provers,
463        };
464        storage_commitment_audit::run_subtree_audit(
465            &self.p2p_node,
466            &self.config,
467            peer,
468            pin,
469            key_count,
470            Some(&credit),
471        )
472        .await
473    }
474
475    /// Test-only: run the possession check immediately for `key` against
476    /// `peers`, bypassing the scheduler's randomised 5-15 minute settle delay.
477    ///
478    /// Penalises any peer that does not hold `key` at `AuditChallenge`
479    /// severity (ADR-0003). Lets e2e tests assert the detection+penalty path
480    /// deterministically without waiting for the scheduled check.
481    #[cfg(any(test, feature = "test-utils"))]
482    pub async fn run_possession_check_now(&self, key: XorName, peers: Vec<PeerId>) {
483        possession::run_possession_check(
484            key,
485            peers,
486            &self.p2p_node,
487            &self.storage,
488            &self.config,
489            &self.sync_state,
490            &self.shutdown,
491        )
492        .await;
493    }
494
495    /// Start all background tasks.
496    ///
497    /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
498    /// the `BootstrapComplete` event emitted during DHT bootstrap is not
499    /// missed by the bootstrap-sync gate.
500    pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
501        if !self.task_handles.is_empty() {
502            error!("ReplicationEngine::start() called while already running — ignoring");
503            return;
504        }
505        info!("Starting replication engine");
506
507        self.start_message_handler();
508        self.start_neighbor_sync_loop();
509        self.start_self_lookup_loop();
510        // Audit #2 (responsible-chunk): periodic tick auditing peers for the
511        // chunks they SHOULD store (responsibility + prior hint).
512        self.start_audit_loop();
513        // Audit #1 (storage-commitment) is gossip-triggered in the message
514        // handler when a peer's commitment is ingested, not on a periodic tick.
515        self.start_commitment_rotation_loop();
516        self.start_fetch_worker();
517        self.start_verification_worker();
518        self.start_bootstrap_sync(dht_events);
519        self.start_fresh_write_drainer();
520        self.start_possession_check_scheduler();
521
522        info!(
523            "Replication engine started with {} background tasks",
524            self.task_handles.len()
525        );
526    }
527
528    /// Returns `true` if the node is still in the replication bootstrap phase.
529    ///
530    /// During bootstrap, audit challenges return `Bootstrapping` instead of
531    /// digests, and neighbor sync responses carry `bootstrapping: true`.
532    pub async fn is_bootstrapping(&self) -> bool {
533        *self.is_bootstrapping.read().await
534    }
535
536    /// Wait until the replication bootstrap phase completes.
537    ///
538    /// Returns immediately if bootstrap has already completed. Useful for
539    /// readiness probes, health checks, and test harnesses that need the
540    /// node to be fully operational before proceeding.
541    ///
542    /// Returns `true` if bootstrap completed within the timeout, `false`
543    /// if the timeout elapsed first.
544    pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
545        // Register the notification future *before* checking the flag so that
546        // a transition between the read and the await is not missed.
547        let notified = self.bootstrap_complete_notify.notified();
548        tokio::pin!(notified);
549        notified.as_mut().enable();
550
551        if !*self.is_bootstrapping.read().await {
552            return true;
553        }
554
555        tokio::time::timeout(timeout, notified).await.is_ok()
556    }
557
558    /// Cancel all background tasks and wait for them to terminate.
559    ///
560    /// This must be awaited before dropping the engine when the caller needs
561    /// the `Arc<LmdbStorage>` references held by background tasks to be
562    /// released (e.g. before reopening the same LMDB environment).
563    pub async fn shutdown(&mut self) {
564        self.shutdown.cancel();
565        for (i, mut handle) in self.task_handles.drain(..).enumerate() {
566            match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
567                Ok(Ok(())) => {}
568                Ok(Err(e)) if e.is_cancelled() => {}
569                Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
570                Err(_) => {
571                    warn!("Replication task {i} did not stop within 10s, aborting");
572                    handle.abort();
573                }
574            }
575        }
576    }
577
578    /// Trigger an early neighbor sync round.
579    ///
580    /// Useful after topology changes (new nodes joining, network heal after
581    /// partition) when the caller wants replication to converge faster than
582    /// the regular 10-20 minute cadence.
583    pub fn trigger_neighbor_sync(&self) {
584        self.sync_trigger.notify_one();
585    }
586
587    /// Execute fresh replication for a newly stored record, then schedule the
588    /// delayed possession check for the responsible close-group peers
589    /// (ADR-0003). The production PUT path schedules via the fresh-write
590    /// drainer; this direct entry point schedules here so callers (and tests)
591    /// that drive replication directly still get the possession check.
592    pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
593        let peers = fresh::replicate_fresh(
594            key,
595            data,
596            proof_of_payment,
597            &self.p2p_node,
598            &self.paid_list,
599            &self.config,
600            &self.send_semaphore,
601        )
602        .await;
603        if !peers.is_empty() {
604            let _ = self
605                .possession_check_tx
606                .send(possession::PossessionCheckEvent { key: *key, peers });
607        }
608    }
609
610    // =======================================================================
611    // Background task launchers
612    // =======================================================================
613
614    /// Spawn a task that drains the fresh-write channel and triggers
615    /// replication for each newly-stored chunk.
616    fn start_fresh_write_drainer(&mut self) {
617        let Some(mut rx) = self.fresh_write_rx.take() else {
618            return;
619        };
620        let p2p = Arc::clone(&self.p2p_node);
621        let paid_list = Arc::clone(&self.paid_list);
622        let config = Arc::clone(&self.config);
623        let send_semaphore = Arc::clone(&self.send_semaphore);
624        let possession_tx = self.possession_check_tx.clone();
625        let shutdown = self.shutdown.clone();
626
627        let handle = tokio::spawn(async move {
628            loop {
629                tokio::select! {
630                    () = shutdown.cancelled() => break,
631                    event = rx.recv() => {
632                        let Some(event) = event else { break };
633                        let peers = fresh::replicate_fresh(
634                            &event.key,
635                            &event.data,
636                            &event.payment_proof,
637                            &p2p,
638                            &paid_list,
639                            &config,
640                            &send_semaphore,
641                        )
642                        .await;
643                        // Schedule the delayed possession check (ADR-0003) for
644                        // the responsible close-group peers. A closed receiver
645                        // (engine shutting down) is ignored.
646                        if !peers.is_empty() {
647                            let _ = possession_tx.send(possession::PossessionCheckEvent {
648                                key: event.key,
649                                peers,
650                            });
651                        }
652                    }
653                }
654            }
655            debug!("Fresh-write drainer shut down");
656        });
657        self.task_handles.push(handle);
658    }
659
660    /// Spawn the possession-check scheduler (ADR-0003).
661    ///
662    /// Drains scheduled possession-check events and, for each, waits a
663    /// randomised 5-15 minute settle delay before probing every responsible
664    /// peer for actual possession. A peer that cryptographically fails to prove
665    /// possession, including by timeout, is penalised at `AuditChallenge`
666    /// severity.
667    fn start_possession_check_scheduler(&mut self) {
668        let Some(mut rx) = self.possession_check_rx.take() else {
669            return;
670        };
671        let p2p = Arc::clone(&self.p2p_node);
672        let storage = Arc::clone(&self.storage);
673        let config = Arc::clone(&self.config);
674        let sync_state = Arc::clone(&self.sync_state);
675        let shutdown = self.shutdown.clone();
676
677        let handle = tokio::spawn(async move {
678            loop {
679                tokio::select! {
680                    () = shutdown.cancelled() => break,
681                    event = rx.recv() => {
682                        let Some(event) = event else { break };
683                        // Spawn a per-chunk delayed check so the drain loop
684                        // keeps pace with the write rate. Each check sleeps the
685                        // randomised settle delay, then probes every peer.
686                        let p2p = Arc::clone(&p2p);
687                        let storage = Arc::clone(&storage);
688                        let config = Arc::clone(&config);
689                        let sync_state = Arc::clone(&sync_state);
690                        let shutdown = shutdown.clone();
691                        let delay_min = config.possession_check_delay_min;
692                        let delay_max = config.possession_check_delay_max;
693                        tokio::spawn(async move {
694                            let delay = possession::random_delay(delay_min, delay_max);
695                            tokio::select! {
696                                () = shutdown.cancelled() => {}
697                                () = tokio::time::sleep(delay) => {
698                                    possession::run_possession_check(
699                                        event.key,
700                                        event.peers,
701                                        &p2p,
702                                        &storage,
703                                        &config,
704                                        &sync_state,
705                                        &shutdown,
706                                    )
707                                    .await;
708                                }
709                            }
710                        });
711                    }
712                }
713            }
714            debug!("Possession-check scheduler shut down");
715        });
716        self.task_handles.push(handle);
717    }
718
719    #[allow(clippy::too_many_lines)]
720    fn start_message_handler(&mut self) {
721        let mut p2p_events = self.p2p_node.subscribe_events();
722        let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
723        let p2p = Arc::clone(&self.p2p_node);
724        let storage = Arc::clone(&self.storage);
725        let paid_list = Arc::clone(&self.paid_list);
726        let payment_verifier = Arc::clone(&self.payment_verifier);
727        let queues = Arc::clone(&self.queues);
728        let config = Arc::clone(&self.config);
729        let shutdown = self.shutdown.clone();
730        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
731        let bootstrap_state = Arc::clone(&self.bootstrap_state);
732        let sync_history = Arc::clone(&self.sync_history);
733        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
734        let repair_proofs = Arc::clone(&self.repair_proofs);
735        let sync_trigger = Arc::clone(&self.sync_trigger);
736        let my_commitment_state = Arc::clone(&self.commitment_state);
737        let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
738        let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
739        let recent_provers = Arc::clone(&self.recent_provers);
740        let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
741        let audit_on_gossip_cooldown = Arc::clone(&self.audit_on_gossip_cooldown);
742        let sync_state = Arc::clone(&self.sync_state);
743        let audit_responder_semaphore = Arc::clone(&self.audit_responder_semaphore);
744        let audit_responder_inflight = Arc::clone(&self.audit_responder_inflight);
745
746        // ADR-0002 gossip-audit trigger: bundled state so an ingested *changed*
747        // commitment can spawn a probabilistic, cooldown-gated subtree audit.
748        let gossip_audit = GossipAuditTrigger {
749            p2p_node: Arc::clone(&p2p),
750            config: Arc::clone(&config),
751            recent_provers: Arc::clone(&recent_provers),
752            sync_state: Arc::clone(&sync_state),
753            cooldown: Arc::clone(&audit_on_gossip_cooldown),
754        };
755
756        let handle = tokio::spawn(async move {
757            loop {
758                tokio::select! {
759                    () = shutdown.cancelled() => break,
760                    event = p2p_events.recv() => {
761                        let Ok(event) = event else { continue };
762                        if let P2PEvent::Message {
763                            topic,
764                            source: Some(source),
765                            data,
766                            ..
767                        } = event {
768                            // Determine if this is a replication message
769                            // and whether it arrived via the /rr/ request-response
770                            // path (which wraps payloads in RequestResponseEnvelope).
771                            let rr_info = if topic == REPLICATION_PROTOCOL_ID {
772                                Some((data.clone(), None))
773                            } else if topic.starts_with(RR_PREFIX)
774                                && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
775                            {
776                                P2PNode::parse_request_envelope(&data)
777                                    .filter(|(_, is_resp, _)| !is_resp)
778                                    .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
779                            } else {
780                                None
781                            };
782                            if let Some((payload, rr_message_id)) = rr_info {
783                                match handle_replication_message(
784                                    &source,
785                                    &payload,
786                                    &p2p,
787                                    &storage,
788                                    &paid_list,
789                                    &payment_verifier,
790                                    &queues,
791                                    &config,
792                                    &is_bootstrapping,
793                                    &bootstrap_state,
794                                    &sync_history,
795                                    &sync_cycle_epoch,
796                                    &repair_proofs,
797                                    &last_commitment_by_peer,
798                                    &ever_capable_peers,
799                                    &sig_verify_attempts,
800                                    &my_commitment_state,
801                                    &gossip_audit,
802                                    &audit_responder_semaphore,
803                                    &audit_responder_inflight,
804                                    rr_message_id.as_deref(),
805                                ).await {
806                                    Ok(()) => {}
807                                    Err(e) => {
808                                        debug!(
809                                            "Replication message from {source} error: {e}"
810                                        );
811                                    }
812                                }
813                            }
814                        }
815                    }
816                    // Gap 4: Topology churn handling (Section 13).
817                    //
818                    // The DHT routing table emits KClosestPeersChanged when the
819                    // K-closest peer set actually changes, which is the precise
820                    // signal for triggering neighbor sync. This replaces the
821                    // previous approach of checking every PeerConnected /
822                    // PeerDisconnected event against the close group.
823                    dht_event = dht_events.recv() => {
824                        let Ok(dht_event) = dht_event else { continue };
825                        match dht_event {
826                            DhtNetworkEvent::KClosestPeersChanged { old, new } => {
827                                let old_peers = old
828                                    .iter()
829                                    .take(config.neighbor_sync_scope)
830                                    .copied()
831                                    .collect::<HashSet<_>>();
832                                let new_scoped = new
833                                    .iter()
834                                    .take(config.neighbor_sync_scope)
835                                    .copied()
836                                    .collect::<Vec<_>>();
837                                let new_peers =
838                                    new_scoped.iter().copied().collect::<HashSet<_>>();
839                                let entrants = new_scoped
840                                    .iter()
841                                    .copied()
842                                    .filter(|peer| !old_peers.contains(peer))
843                                    .collect::<Vec<_>>();
844                                let entrant_count = entrants.len();
845                                let (priority_insertions, sync_removals) = {
846                                    let mut state = sync_state.write().await;
847                                    let sync_removals = state.retain_sync_peers(&new_peers);
848                                    let priority_insertions = state.queue_priority_peers(entrants);
849                                    (priority_insertions, sync_removals)
850                                };
851                                if priority_insertions > 0 {
852                                    debug!(
853                                        "K-closest peers changed, queued {priority_insertions}/{entrant_count} new close peers for priority neighbor sync and pruned {sync_removals} departed pending sync entries"
854                                    );
855                                } else {
856                                    debug!(
857                                        "K-closest peers changed, no additional close peers queued, pruned {sync_removals} departed pending sync entries, triggering early neighbor sync"
858                                    );
859                                }
860                                sync_trigger.notify_one();
861                            }
862                            DhtNetworkEvent::PeerRemoved { peer_id } => {
863                                sync_state.write().await.remove_peer(&peer_id);
864                                repair_proofs.write().await.remove_peer(&peer_id);
865                                // v12: drop the commitment bytes and the
866                                // recent-prover credit so a churn / sybil
867                                // attacker cannot leave behind one
868                                // StorageCommitment per identity in
869                                // `last_commitment_by_peer`. Also drop the
870                                // sig-verify rate-limit timestamp.
871                                last_commitment_by_peer.write().await.remove(&peer_id);
872                                recent_provers.write().await.forget_peer(&peer_id);
873                                sig_verify_attempts.write().await.remove(&peer_id);
874                                // Same for the gossip-audit cooldown (ADR-0002).
875                                audit_on_gossip_cooldown.write().await.remove(&peer_id);
876                                // The sticky `commitment_capable` flag is
877                                // preserved orthogonally via
878                                // `ever_capable_peers` — even after this
879                                // removal, a re-joining peer continues to
880                                // be treated as v12-capable rather than
881                                // legacy (§3 shield).
882                            }
883                            _ => {}
884                        }
885                    }
886                }
887            }
888            debug!("Replication message handler shut down");
889        });
890        self.task_handles.push(handle);
891    }
892
893    fn start_neighbor_sync_loop(&mut self) {
894        let p2p = Arc::clone(&self.p2p_node);
895        let storage = Arc::clone(&self.storage);
896        let paid_list = Arc::clone(&self.paid_list);
897        let queues = Arc::clone(&self.queues);
898        let config = Arc::clone(&self.config);
899        let shutdown = self.shutdown.clone();
900        let sync_state = Arc::clone(&self.sync_state);
901        let sync_history = Arc::clone(&self.sync_history);
902        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
903        let repair_proofs = Arc::clone(&self.repair_proofs);
904        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
905        let bootstrap_state = Arc::clone(&self.bootstrap_state);
906        let sync_trigger = Arc::clone(&self.sync_trigger);
907        let commitment_state = Arc::clone(&self.commitment_state);
908        let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
909        let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
910        let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
911        // ADR-0002: a peer's commitment also arrives on the sync RESPONSE path
912        // (we initiated, they piggybacked theirs). Carry a gossip-audit trigger
913        // here too so a peer that only ever answers — never initiates sync —
914        // is still audited; otherwise it could fully evade auditing.
915        let gossip_audit = GossipAuditTrigger {
916            p2p_node: Arc::clone(&p2p),
917            config: Arc::clone(&config),
918            recent_provers: Arc::clone(&self.recent_provers),
919            sync_state: Arc::clone(&sync_state),
920            cooldown: Arc::clone(&self.audit_on_gossip_cooldown),
921        };
922
923        let handle = tokio::spawn(async move {
924            loop {
925                let interval = config.random_neighbor_sync_interval();
926                tokio::select! {
927                    () = shutdown.cancelled() => break,
928                    () = tokio::time::sleep(interval) => {}
929                    () = sync_trigger.notified() => {
930                        debug!("Neighbor sync triggered by topology change");
931                    }
932                }
933                // Wrap the sync round in a select so shutdown cancels
934                // in-progress network operations rather than waiting for
935                // the full round to complete.
936                tokio::select! {
937                    () = shutdown.cancelled() => break,
938                    () = run_neighbor_sync_round(
939                        &p2p,
940                        &storage,
941                        &paid_list,
942                        &queues,
943                        &config,
944                        &sync_state,
945                        &sync_history,
946                        &sync_cycle_epoch,
947                        &repair_proofs,
948                        &is_bootstrapping,
949                        &bootstrap_state,
950                        &commitment_state,
951                        &last_commitment_by_peer,
952                        &ever_capable_peers,
953                        &sig_verify_attempts,
954                        &gossip_audit,
955                    ) => {}
956                }
957            }
958            debug!("Neighbor sync loop shut down");
959        });
960        self.task_handles.push(handle);
961    }
962
963    fn start_self_lookup_loop(&mut self) {
964        let p2p = Arc::clone(&self.p2p_node);
965        let config = Arc::clone(&self.config);
966        let shutdown = self.shutdown.clone();
967
968        let handle = tokio::spawn(async move {
969            loop {
970                let interval = config.random_self_lookup_interval();
971                tokio::select! {
972                    () = shutdown.cancelled() => break,
973                    () = tokio::time::sleep(interval) => {
974                        if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
975                            debug!("Self-lookup failed: {e}");
976                        }
977                    }
978                }
979            }
980            debug!("Self-lookup loop shut down");
981        });
982        self.task_handles.push(handle);
983    }
984
985    /// Periodic responsible-chunk audit loop (audit #2): every
986    /// [`ReplicationConfig::random_audit_tick_interval`] (~10-20 min), audit one
987    /// eligible close peer for the chunks it *should* be storing (by
988    /// responsibility and prior repair hint), independent of the gossip-triggered
989    /// storage-commitment audit. Waits for bootstrap to drain, then runs one tick
990    /// immediately and periodically thereafter.
991    fn start_audit_loop(&mut self) {
992        let p2p = Arc::clone(&self.p2p_node);
993        let storage = Arc::clone(&self.storage);
994        let config = Arc::clone(&self.config);
995        let shutdown = self.shutdown.clone();
996        let sync_history = Arc::clone(&self.sync_history);
997        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
998        let repair_proofs = Arc::clone(&self.repair_proofs);
999        let bootstrap_state = Arc::clone(&self.bootstrap_state);
1000        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1001        let sync_state = Arc::clone(&self.sync_state);
1002
1003        let handle = tokio::spawn(async move {
1004            // Invariant 19: wait for bootstrap to drain before starting audits.
1005            loop {
1006                tokio::select! {
1007                    () = shutdown.cancelled() => return,
1008                    () = tokio::time::sleep(
1009                        std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
1010                    ) => {
1011                        if bootstrap_state.read().await.is_drained() {
1012                            break;
1013                        }
1014                    }
1015                }
1016            }
1017
1018            // Run one audit tick immediately after bootstrap drain.
1019            {
1020                let bootstrapping = *is_bootstrapping.read().await;
1021                let result = {
1022                    let history = sync_history.read().await;
1023                    let current_sync_epoch = *sync_cycle_epoch.read().await;
1024                    audit::audit_tick_with_repair_proofs(
1025                        &p2p,
1026                        &storage,
1027                        &config,
1028                        &history,
1029                        &repair_proofs,
1030                        current_sync_epoch,
1031                        bootstrapping,
1032                    )
1033                    .await
1034                };
1035                handle_audit_result(&result, &p2p, &sync_state, &config).await;
1036            }
1037
1038            // Then run periodically.
1039            loop {
1040                let interval = config.random_audit_tick_interval();
1041                tokio::select! {
1042                    () = shutdown.cancelled() => break,
1043                    () = tokio::time::sleep(interval) => {
1044                        let bootstrapping = *is_bootstrapping.read().await;
1045                        let result = {
1046                            let history = sync_history.read().await;
1047                            let current_sync_epoch = *sync_cycle_epoch.read().await;
1048                            audit::audit_tick_with_repair_proofs(
1049                                &p2p,
1050                                &storage,
1051                                &config,
1052                                &history,
1053                                &repair_proofs,
1054                                current_sync_epoch,
1055                                bootstrapping,
1056                            )
1057                            .await
1058                        };
1059                        handle_audit_result(&result, &p2p, &sync_state, &config).await;
1060                    }
1061                }
1062            }
1063            debug!("Audit loop shut down");
1064        });
1065        self.task_handles.push(handle);
1066    }
1067
1068    /// Periodically rebuild + sign + rotate the responder's storage
1069    /// commitment.
1070    ///
1071    /// Phase 3 of the v12 storage-bound audit. Once per
1072    /// [`COMMITMENT_ROTATION_INTERVAL_SECS`], the responder reads the
1073    /// current LMDB key set, builds a Merkle tree (for content-addressed
1074    /// chunks `bytes_hash == key`, so no chunk re-read is needed), signs
1075    /// the root with the node's `MlDsaSecretKey`, and rotates the result
1076    /// into `commitment_state`. Old `previous` slot is dropped by the
1077    /// rotate (per `ResponderCommitmentState::rotate`).
1078    ///
1079    /// Skips if the key set is empty (no commitment to make) — the
1080    /// auditor side falls back to the legacy plain-digest path for
1081    /// peers that have never gossiped a commitment.
1082    fn start_commitment_rotation_loop(&mut self) {
1083        let storage = Arc::clone(&self.storage);
1084        let identity = Arc::clone(&self.identity);
1085        let commitment_state = Arc::clone(&self.commitment_state);
1086        let shutdown = self.shutdown.clone();
1087        let p2p = Arc::clone(&self.p2p_node);
1088        let config = Arc::clone(&self.config);
1089        let sync_trigger = Arc::clone(&self.sync_trigger);
1090        let recent_provers = Arc::clone(&self.recent_provers);
1091
1092        let handle = tokio::spawn(async move {
1093            // Build the first commitment immediately on startup so a
1094            // restarted node can answer commitment-bound audits right
1095            // away — otherwise current() stays None for a full rotation
1096            // interval and audits silently fall back to legacy.
1097            //
1098            // After the first build, trigger an immediate neighbor-sync
1099            // round so the new commitment gossips out within seconds.
1100            // Without this, after a restart remote auditors keep pinning
1101            // the pre-restart (rotated-away) hash until their normal
1102            // sync cadence elapses — up to 1 h in the worst case,
1103            // during which time commitment-bound audits hit "unknown
1104            // commitment hash" -> Idle no-ops.
1105            // ML-DSA signatures are randomized so we cannot reproduce
1106            // the pre-restart hash; the only honest path to recovery
1107            // is fast re-gossip.
1108            if let Err(e) =
1109                rebuild_and_rotate_commitment(&storage, &identity, &commitment_state, &p2p, &config)
1110                    .await
1111            {
1112                warn!("Initial commitment build failed: {e}");
1113            } else {
1114                sync_trigger.notify_one();
1115            }
1116            loop {
1117                tokio::select! {
1118                    () = shutdown.cancelled() => break,
1119                    () = tokio::time::sleep(
1120                        std::time::Duration::from_secs(COMMITMENT_ROTATION_INTERVAL_SECS)
1121                    ) => {
1122                        if let Err(e) = rebuild_and_rotate_commitment(
1123                            &storage,
1124                            &identity,
1125                            &commitment_state,
1126                            &p2p,
1127                            &config,
1128                        ).await {
1129                            warn!("Commitment rotation failed: {e}");
1130                        }
1131                        // Piggyback a sweep of expired recent_provers
1132                        // entries on the rotation tick (same cadence,
1133                        // 1 h). is_credited_holder already honours the
1134                        // TTL on read, but the sweep reclaims memory
1135                        // for entries we'll never re-read.
1136                        let dropped = recent_provers.write().await.sweep_expired(
1137                            std::time::Instant::now()
1138                        );
1139                        if dropped > 0 {
1140                            debug!("recent_provers: swept {dropped} expired entries");
1141                        }
1142                    }
1143                }
1144            }
1145            debug!("Commitment rotation loop shut down");
1146        });
1147        self.task_handles.push(handle);
1148    }
1149
1150    #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
1151    fn start_fetch_worker(&mut self) {
1152        let p2p = Arc::clone(&self.p2p_node);
1153        let storage = Arc::clone(&self.storage);
1154        let queues = Arc::clone(&self.queues);
1155        let config = Arc::clone(&self.config);
1156        let shutdown = self.shutdown.clone();
1157        let bootstrap_state = Arc::clone(&self.bootstrap_state);
1158        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1159        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1160        let concurrency = max_parallel_fetch();
1161
1162        info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
1163
1164        let handle = tokio::spawn(async move {
1165            // Each in-flight future yields (key, Option<FetchOutcome>) so we
1166            // always recover the key — even if the inner task panics.
1167            let mut in_flight = FuturesUnordered::<FetchFuture>::new();
1168
1169            loop {
1170                // Fill up to `concurrency` slots from the queue.
1171                {
1172                    let mut q = queues.write().await;
1173                    while in_flight.len() < concurrency {
1174                        let Some(candidate) = q.dequeue_fetch() else {
1175                            break;
1176                        };
1177                        let Some(&source) = candidate.sources.first() else {
1178                            warn!(
1179                                "Fetch candidate {} has no sources — dropping",
1180                                hex::encode(candidate.key)
1181                            );
1182                            continue;
1183                        };
1184                        q.start_fetch(candidate.key, source, candidate.sources.clone());
1185
1186                        let p2p = Arc::clone(&p2p);
1187                        let storage = Arc::clone(&storage);
1188                        let config = Arc::clone(&config);
1189                        let token = shutdown.clone();
1190                        let fetch_key = candidate.key;
1191                        in_flight.push(Box::pin(async move {
1192                            let handle = tokio::spawn(async move {
1193                                // Cancel-aware: abort when the engine shuts down.
1194                                tokio::select! {
1195                                    () = token.cancelled() => FetchOutcome {
1196                                        key: fetch_key,
1197                                        result: FetchResult::SourceFailed,
1198                                    },
1199                                    outcome = execute_single_fetch(
1200                                        p2p, storage, config, fetch_key, source,
1201                                    ) => outcome,
1202                                }
1203                            });
1204                            match handle.await {
1205                                Ok(outcome) => (outcome.key, Some(outcome)),
1206                                Err(e) => {
1207                                    error!(
1208                                        "Fetch task for {} panicked: {e}",
1209                                        hex::encode(fetch_key)
1210                                    );
1211                                    (fetch_key, None)
1212                                }
1213                            }
1214                        }));
1215                    }
1216                } // release queues write lock
1217
1218                if in_flight.is_empty() {
1219                    // No work — wait for new items or shutdown.
1220                    tokio::select! {
1221                        () = shutdown.cancelled() => break,
1222                        () = tokio::time::sleep(
1223                            std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
1224                        ) => continue,
1225                    }
1226                }
1227
1228                // Wait for the next fetch to complete and process the result.
1229                tokio::select! {
1230                    () = shutdown.cancelled() => break,
1231                    Some((key, maybe_outcome)) = in_flight.next() => {
1232                        let mut q = queues.write().await;
1233                        let terminal = if let Some(outcome) = maybe_outcome {
1234                            match outcome.result {
1235                                FetchResult::Stored => {
1236                                    q.complete_fetch(&key);
1237                                    true
1238                                }
1239                                FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
1240                                    if let Some(next_peer) = q.retry_fetch(&key) {
1241                                        // Spawn a new fetch task for the next source.
1242                                        let p2p = Arc::clone(&p2p);
1243                                        let storage = Arc::clone(&storage);
1244                                        let config = Arc::clone(&config);
1245                                        let token = shutdown.clone();
1246                                        let fetch_key = key;
1247                                        in_flight.push(Box::pin(async move {
1248                                            let handle = tokio::spawn(async move {
1249                                                tokio::select! {
1250                                                    () = token.cancelled() => FetchOutcome {
1251                                                        key: fetch_key,
1252                                                        result: FetchResult::SourceFailed,
1253                                                    },
1254                                                    outcome = execute_single_fetch(
1255                                                        p2p, storage, config, fetch_key, next_peer,
1256                                                    ) => outcome,
1257                                                }
1258                                            });
1259                                            match handle.await {
1260                                                Ok(outcome) => (outcome.key, Some(outcome)),
1261                                                Err(e) => {
1262                                                    error!(
1263                                                        "Fetch task for {} panicked: {e}",
1264                                                        hex::encode(fetch_key)
1265                                                    );
1266                                                    (fetch_key, None)
1267                                                }
1268                                            }
1269                                        }));
1270                                        false
1271                                    } else {
1272                                        q.complete_fetch(&key);
1273                                        true
1274                                    }
1275                                }
1276                            }
1277                        } else {
1278                            // Task panicked — reclaim the in-flight slot.
1279                            q.complete_fetch(&key);
1280                            true
1281                        };
1282
1283                        // Shrink bootstrap pending set on terminal exit.
1284                        if terminal {
1285                            drop(q); // release queues lock before acquiring bootstrap_state
1286                            if !bootstrap_state.read().await.is_drained() {
1287                                bootstrap_state.write().await.remove_key(&key);
1288                                let q = queues.read().await;
1289                                if bootstrap::check_bootstrap_drained(
1290                                    &bootstrap_state,
1291                                    &q,
1292                                )
1293                                .await
1294                                {
1295                                    complete_bootstrap(
1296                                        &is_bootstrapping,
1297                                        &bootstrap_complete_notify,
1298                                    ).await;
1299                                }
1300                            }
1301                        }
1302                    }
1303                }
1304            }
1305
1306            // Cancel and drain remaining in-flight fetches on shutdown.
1307            // The CancellationToken is already cancelled by this point, so
1308            // spawned tasks will see cancellation via their select! branches.
1309            while in_flight.next().await.is_some() {}
1310            debug!("Fetch worker shut down");
1311        });
1312        self.task_handles.push(handle);
1313    }
1314
1315    fn start_verification_worker(&mut self) {
1316        let p2p = Arc::clone(&self.p2p_node);
1317        let storage = Arc::clone(&self.storage);
1318        let queues = Arc::clone(&self.queues);
1319        let paid_list = Arc::clone(&self.paid_list);
1320        let config = Arc::clone(&self.config);
1321        let shutdown = self.shutdown.clone();
1322        let bootstrap_state = Arc::clone(&self.bootstrap_state);
1323        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1324        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1325        let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
1326        let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
1327        let recent_provers = Arc::clone(&self.recent_provers);
1328
1329        let handle = tokio::spawn(async move {
1330            loop {
1331                tokio::select! {
1332                    () = shutdown.cancelled() => break,
1333                    () = tokio::time::sleep(
1334                        std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
1335                    ) => {
1336                        let ctx = VerificationCycleContext {
1337                            p2p_node: &p2p,
1338                            paid_list: &paid_list,
1339                            storage: &storage,
1340                            queues: &queues,
1341                            config: &config,
1342                            bootstrap_state: &bootstrap_state,
1343                            is_bootstrapping: &is_bootstrapping,
1344                            bootstrap_complete_notify: &bootstrap_complete_notify,
1345                            last_commitment_by_peer: &last_commitment_by_peer,
1346                            ever_capable_peers: &ever_capable_peers,
1347                            recent_provers: &recent_provers,
1348                        };
1349                        run_verification_cycle(ctx).await;
1350                    }
1351                }
1352            }
1353            debug!("Verification worker shut down");
1354        });
1355        self.task_handles.push(handle);
1356    }
1357
1358    /// Gap 3: Run a one-shot bootstrap sync on startup.
1359    ///
1360    /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
1361    /// (indicating the routing table is populated) before snapshotting
1362    /// close neighbors. Falls back after a timeout so bootstrap nodes
1363    /// (which have no peers and therefore never receive the event) still
1364    /// proceed.
1365    ///
1366    /// After the gate, finds close neighbors, syncs with each in
1367    /// round-robin batches, admits returned hints into the verification
1368    /// pipeline, and tracks discovered keys for bootstrap drain detection.
1369    #[allow(clippy::too_many_lines)]
1370    fn start_bootstrap_sync(
1371        &mut self,
1372        dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
1373    ) {
1374        let p2p = Arc::clone(&self.p2p_node);
1375        let storage = Arc::clone(&self.storage);
1376        let paid_list = Arc::clone(&self.paid_list);
1377        let queues = Arc::clone(&self.queues);
1378        let config = Arc::clone(&self.config);
1379        let shutdown = self.shutdown.clone();
1380        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1381        let bootstrap_state = Arc::clone(&self.bootstrap_state);
1382        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1383        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
1384        let repair_proofs = Arc::clone(&self.repair_proofs);
1385        let my_commitment_state = Arc::clone(&self.commitment_state);
1386        let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
1387        let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
1388        let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
1389
1390        let handle = tokio::spawn(async move {
1391            // Wait for DHT bootstrap to complete before snapshotting
1392            // neighbors. The routing table is empty until saorsa-core
1393            // finishes its FIND_NODE rounds and bucket refreshes.
1394            let gate = bootstrap::wait_for_bootstrap_complete(
1395                dht_events,
1396                config.bootstrap_complete_timeout_secs,
1397                &shutdown,
1398            )
1399            .await;
1400
1401            if gate == bootstrap::BootstrapGateResult::Shutdown {
1402                return;
1403            }
1404
1405            let self_id = *p2p.peer_id();
1406            let neighbors =
1407                neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
1408                    .await;
1409
1410            if neighbors.is_empty() {
1411                info!("Bootstrap sync: no close neighbors found, marking drained");
1412                bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
1413                complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
1414                return;
1415            }
1416
1417            let neighbor_count = neighbors.len();
1418            info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
1419
1420            // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
1421            for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
1422                if shutdown.is_cancelled() {
1423                    break;
1424                }
1425
1426                let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
1427                    batch,
1428                    &storage,
1429                    &paid_list,
1430                    &p2p,
1431                    config.close_group_size,
1432                    config.paid_list_close_group_size,
1433                )
1434                .await;
1435
1436                for peer in batch {
1437                    if shutdown.is_cancelled() {
1438                        break;
1439                    }
1440
1441                    // Re-read on each iteration so peers see current state.
1442                    let bootstrapping = *is_bootstrapping.read().await;
1443
1444                    bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
1445
1446                    let hints = hints_by_peer.remove(peer).unwrap_or_default();
1447                    let outcome = neighbor_sync::sync_with_peer_with_hints(
1448                        peer,
1449                        &p2p,
1450                        &config,
1451                        bootstrapping,
1452                        hints,
1453                        // Atomically snapshot + mark-gossiped: emitted in the
1454                        // bootstrap-sync request, so we stay answerable for it
1455                        // (ADR-0002). One critical section avoids a TOCTOU where a
1456                        // concurrent retire/rotate drops the slot between read and
1457                        // mark.
1458                        my_commitment_state
1459                            .current_for_gossip()
1460                            .map(|b| b.commitment().clone()),
1461                    )
1462                    .await;
1463
1464                    bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
1465
1466                    if let Some(outcome) = outcome {
1467                        // Ingest the peer's piggybacked commitment from the
1468                        // response (same verification as the request path).
1469                        // Bootstrap is the FIRST gossip we receive from most
1470                        // peers, so this populates last_commitment_by_peer.
1471                        //
1472                        // We intentionally do NOT trigger a gossip-audit here:
1473                        // during bootstrap this node may itself still be
1474                        // bootstrapping (audits are gated on that), and the
1475                        // close-group/RT view is not yet stable. The peer is
1476                        // audited on the first STEADY-STATE neighbor-sync round
1477                        // after bootstrap drains (request + response paths both
1478                        // trigger), which is within one sync cycle — so caching
1479                        // the commitment here is sufficient and there is no
1480                        // coverage gap (ADR-0002).
1481                        ingest_peer_commitment(
1482                            peer,
1483                            outcome.response.commitment.as_ref(),
1484                            &p2p,
1485                            &last_commitment_by_peer,
1486                            &ever_capable_peers,
1487                            &sig_verify_attempts,
1488                        )
1489                        .await; // sig_verify_attempts in scope from line ~1080
1490
1491                        if !outcome.response.bootstrapping {
1492                            record_sent_replica_hints(
1493                                peer,
1494                                &outcome.sent_replica_hints,
1495                                &repair_proofs,
1496                                &sync_cycle_epoch,
1497                            )
1498                            .await;
1499                            // Admit hints into verification pipeline.
1500                            let outcome = admit_and_queue_hints(
1501                                &self_id,
1502                                peer,
1503                                &outcome.response.replica_hints,
1504                                &outcome.response.paid_hints,
1505                                &p2p,
1506                                &config,
1507                                &storage,
1508                                &paid_list,
1509                                &queues,
1510                            )
1511                            .await;
1512
1513                            // Track discovered keys for drain detection.
1514                            if !outcome.discovered.is_empty() {
1515                                bootstrap::track_discovered_keys(
1516                                    &bootstrap_state,
1517                                    &outcome.discovered,
1518                                )
1519                                .await;
1520                            }
1521
1522                            // Record / retire capacity rejections so the
1523                            // drain check correctly reflects whether each
1524                            // source still owes us re-hinted work after
1525                            // queue overflow.
1526                            if outcome.capacity_rejected_count > 0 {
1527                                bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
1528                            } else {
1529                                bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
1530                            }
1531                        }
1532                    }
1533                }
1534            }
1535
1536            // Check drain condition.
1537            {
1538                let q = queues.read().await;
1539                if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
1540                    complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
1541                }
1542            }
1543
1544            info!("Bootstrap sync completed");
1545        });
1546        self.task_handles.push(handle);
1547    }
1548}
1549
1550// ===========================================================================
1551// Free functions for background tasks
1552// ===========================================================================
1553
1554/// RAII admission for one audit-responder task: holds the GLOBAL permit and,
1555/// on drop, decrements the PER-PEER in-flight count. Moving this into the
1556/// spawned task ties both bounds to the task's exact lifetime — no manual
1557/// decrement to forget on an early return or panic.
1558struct AuditResponderGuard {
1559    _permit: tokio::sync::OwnedSemaphorePermit,
1560    inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
1561    peer: PeerId,
1562}
1563
1564impl Drop for AuditResponderGuard {
1565    fn drop(&mut self) {
1566        // Decrement (and prune to keep the map bounded) without blocking the
1567        // async runtime: a short lock on a tiny map.
1568        //
1569        // Fast path: if the (uncontended, tiny) lock is free, decrement inline
1570        // with no spawn. Otherwise defer to a task — but only if a runtime is
1571        // actually current, so `Drop` during shutdown (no runtime) can never
1572        // panic. A missed decrement at shutdown is harmless: the whole map is
1573        // being dropped with the engine.
1574        let peer = self.peer;
1575        if let Ok(mut map) = self.inflight.try_write() {
1576            if let Some(n) = map.get_mut(&peer) {
1577                *n = n.saturating_sub(1);
1578                if *n == 0 {
1579                    map.remove(&peer);
1580                }
1581            }
1582            return;
1583        }
1584        if let Ok(handle) = tokio::runtime::Handle::try_current() {
1585            let inflight = Arc::clone(&self.inflight);
1586            handle.spawn(async move {
1587                let mut map = inflight.write().await;
1588                if let Some(n) = map.get_mut(&peer) {
1589                    *n = n.saturating_sub(1);
1590                    if *n == 0 {
1591                        map.remove(&peer);
1592                    }
1593                }
1594            });
1595        }
1596    }
1597}
1598
1599/// Try to admit one audit-responder task for `source`: take a global permit AND
1600/// a per-peer slot (both bounded). Returns `None` (caller drops the challenge,
1601/// leaving the remote auditor to apply that audit path's timeout policy) if
1602/// either ceiling is hit, so one flooder can neither exhaust the global pool's
1603/// effect on others nor exceed its own per-peer share (codex-r2 A).
1604async fn admit_audit_responder(
1605    semaphore: &Arc<Semaphore>,
1606    inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
1607    source: &PeerId,
1608) -> Option<AuditResponderGuard> {
1609    // Per-peer cap first (cheap, and the fairness-critical bound), committed
1610    // under the write lock so concurrent challenges from the same peer can't
1611    // both slip past the cap.
1612    {
1613        let mut map = inflight.write().await;
1614        let entry = map.entry(*source).or_insert(0);
1615        if *entry >= MAX_AUDIT_RESPONSES_PER_PEER {
1616            return None;
1617        }
1618        *entry += 1;
1619    }
1620    // Then the global ceiling. If it's exhausted, give back the per-peer slot we
1621    // just claimed so it isn't leaked.
1622    let Ok(permit) = Arc::clone(semaphore).try_acquire_owned() else {
1623        let mut map = inflight.write().await;
1624        if let Some(n) = map.get_mut(source) {
1625            *n = n.saturating_sub(1);
1626            if *n == 0 {
1627                map.remove(source);
1628            }
1629        }
1630        return None;
1631    };
1632    Some(AuditResponderGuard {
1633        _permit: permit,
1634        inflight: Arc::clone(inflight),
1635        peer: *source,
1636    })
1637}
1638
1639/// Handle an incoming replication protocol message.
1640///
1641/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
1642/// request-response path and the response must be sent via `send_response`
1643/// so saorsa-core can route it back to the waiting `send_request` caller.
1644#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1645async fn handle_replication_message(
1646    source: &PeerId,
1647    data: &[u8],
1648    p2p_node: &Arc<P2PNode>,
1649    storage: &Arc<LmdbStorage>,
1650    paid_list: &Arc<PaidList>,
1651    payment_verifier: &Arc<PaymentVerifier>,
1652    queues: &Arc<RwLock<ReplicationQueues>>,
1653    config: &ReplicationConfig,
1654    is_bootstrapping: &Arc<RwLock<bool>>,
1655    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1656    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1657    sync_cycle_epoch: &Arc<RwLock<u64>>,
1658    repair_proofs: &Arc<RwLock<RepairProofs>>,
1659    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
1660    ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
1661    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
1662    my_commitment_state: &Arc<ResponderCommitmentState>,
1663    gossip_audit: &GossipAuditTrigger,
1664    audit_responder_semaphore: &Arc<Semaphore>,
1665    audit_responder_inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
1666    rr_message_id: Option<&str>,
1667) -> Result<()> {
1668    let msg = ReplicationMessage::decode(data)
1669        .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
1670
1671    match msg.body {
1672        ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
1673            handle_fresh_offer(
1674                source,
1675                offer,
1676                storage,
1677                paid_list,
1678                payment_verifier,
1679                p2p_node,
1680                config,
1681                msg.request_id,
1682                rr_message_id,
1683            )
1684            .await
1685        }
1686        ReplicationMessageBody::PaidNotify(ref notify) => {
1687            handle_paid_notify(
1688                source,
1689                notify,
1690                paid_list,
1691                payment_verifier,
1692                p2p_node,
1693                config,
1694            )
1695            .await
1696        }
1697        ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1698            let bootstrapping = *is_bootstrapping.read().await;
1699            // Phase-3 storage-bound audit: store the sender's
1700            // commitment for use as `expected_commitment_hash` in
1701            // future audits. Verify signature before storing so a peer
1702            // cannot inject a forged commitment for someone else.
1703            if let Some(target) = ingest_peer_commitment(
1704                source,
1705                request.commitment.as_ref(),
1706                p2p_node,
1707                last_commitment_by_peer,
1708                ever_capable_peers,
1709                sig_verify_attempts,
1710            )
1711            .await
1712            {
1713                maybe_trigger_gossip_audit(gossip_audit, source, target).await;
1714            }
1715            handle_neighbor_sync_request(
1716                source,
1717                request,
1718                p2p_node,
1719                storage,
1720                paid_list,
1721                queues,
1722                config,
1723                bootstrapping,
1724                bootstrap_state,
1725                sync_history,
1726                sync_cycle_epoch,
1727                repair_proofs,
1728                // Atomically snapshot + mark-gossiped: emitted in the sync
1729                // response, so we must stay answerable for it (ADR-0002).
1730                my_commitment_state
1731                    .current_for_gossip()
1732                    .map(|b| b.commitment().clone()),
1733                msg.request_id,
1734                rr_message_id,
1735            )
1736            .await
1737        }
1738        ReplicationMessageBody::VerificationRequest(ref request) => {
1739            handle_verification_request(
1740                source,
1741                request,
1742                storage,
1743                paid_list,
1744                p2p_node,
1745                msg.request_id,
1746                rr_message_id,
1747            )
1748            .await
1749        }
1750        ReplicationMessageBody::FetchRequest(ref request) => {
1751            handle_fetch_request(
1752                source,
1753                request,
1754                storage,
1755                p2p_node,
1756                msg.request_id,
1757                rr_message_id,
1758            )
1759            .await
1760        }
1761        ReplicationMessageBody::AuditChallenge(challenge) => {
1762            // Responsible-chunk audit (audit #2) responder: answer with per-key
1763            // possession digests. This same handler also answers the
1764            // prune-confirmation audit, which sends the same `AuditChallenge`
1765            // wire message.
1766            //
1767            // Answering digests the stored bytes of every challenged key, so —
1768            // like the subtree/byte audits below — run it on a detached task off
1769            // this serial message loop. Handling it inline lets one challenge
1770            // block all other replication traffic until its digests complete
1771            // (head-of-line blocking). The same flood-fair admission applies: a
1772            // global ceiling AND a per-peer cap, dropping the challenge if either
1773            // is hit. Responsible/prune audit timeouts are penalised by the
1774            // caller, so the caps must remain high enough for honest audit load;
1775            // the per-peer share still prevents one flooder from starving others.
1776            let Some(guard) =
1777                admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1778                    .await
1779            else {
1780                warn!(
1781                    "Audit challenge reply not sent: kind=responsible response=dropped \
1782                     source={source} (audit-responder capacity reached)"
1783                );
1784                return Ok(());
1785            };
1786            let bootstrapping = *is_bootstrapping.read().await;
1787            let storage = Arc::clone(storage);
1788            let p2p_node = Arc::clone(p2p_node);
1789            let source = *source;
1790            let request_id = msg.request_id;
1791            let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1792            tokio::spawn(async move {
1793                let _guard = guard; // global permit + per-peer slot, held until done
1794                if let Err(e) = handle_audit_challenge_msg(
1795                    &source,
1796                    &challenge,
1797                    &storage,
1798                    &p2p_node,
1799                    bootstrapping,
1800                    request_id,
1801                    rr_message_id.as_deref(),
1802                )
1803                .await
1804                {
1805                    debug!("Audit challenge from {source} error: {e}");
1806                }
1807            });
1808            Ok(())
1809        }
1810        ReplicationMessageBody::SubtreeAuditChallenge(challenge) => {
1811            // Gossip-triggered storage-bound subtree audit (ADR-0002). The
1812            // responder rebuilds the WHOLE nonce-selected subtree, reading every
1813            // leaf's bytes from disk (`get_raw` × ~sqrt(N) leaves). Run it on a
1814            // detached task so this serial message loop is never blocked on disk
1815            // I/O — otherwise one audit stalls all replication traffic (§5).
1816            //
1817            // A bounded, flood-fair admission restores backpressure (codex#1 +
1818            // codex-r2 A): a global ceiling AND a per-peer cap. If either is hit
1819            // we drop this challenge. Subtree auditors grace timeout
1820            // non-responses, so capacity drops throttle flooders without turning
1821            // into trust penalties (and one source cannot starve other peers,
1822            // since its share is capped per-peer).
1823            info!(
1824                "Audit challenge received: kind=subtree source={source} request_response={}",
1825                rr_message_id.is_some(),
1826            );
1827            let Some(guard) =
1828                admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1829                    .await
1830            else {
1831                warn!(
1832                    "Audit challenge reply not sent: kind=subtree response=dropped \
1833                     source={source} (audit-responder capacity reached)"
1834                );
1835                return Ok(());
1836            };
1837            let bootstrapping = *is_bootstrapping.read().await;
1838            let storage = Arc::clone(storage);
1839            let p2p_node = Arc::clone(p2p_node);
1840            let my_commitment_state = Arc::clone(my_commitment_state);
1841            let source = *source;
1842            let request_id = msg.request_id;
1843            let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1844            tokio::spawn(async move {
1845                let _guard = guard; // global permit + per-peer slot, held until done
1846                let response = storage_commitment_audit::handle_subtree_challenge(
1847                    &challenge,
1848                    &storage,
1849                    p2p_node.peer_id(),
1850                    bootstrapping,
1851                    Some(&my_commitment_state),
1852                )
1853                .await;
1854                let response_kind = subtree_audit_response_kind(&response);
1855                let sent = send_replication_response_checked(
1856                    &source,
1857                    &p2p_node,
1858                    request_id,
1859                    ReplicationMessageBody::SubtreeAuditResponse(response),
1860                    rr_message_id.as_deref(),
1861                )
1862                .await;
1863                if sent {
1864                    info!(
1865                        "Audit challenge reply sent: kind=subtree response={response_kind} \
1866                         source={source} request_response={}",
1867                        rr_message_id.is_some(),
1868                    );
1869                } else {
1870                    warn!(
1871                        "Audit challenge reply not sent: kind=subtree response={response_kind} \
1872                         source={source} request_response={}",
1873                        rr_message_id.is_some(),
1874                    );
1875                }
1876            });
1877            Ok(())
1878        }
1879        ReplicationMessageBody::SubtreeByteChallenge(challenge) => {
1880            // Round 2 of the storage audit (ADR-0002): serve the original bytes
1881            // for the auditor's spot-check keys, or signal `Absent` for a
1882            // committed key we can no longer produce. Reads chunk bytes from
1883            // disk, so likewise spawned off the serial loop (§5) under the same
1884            // flood-fair admission (codex#1 + codex-r2 A).
1885            info!(
1886                "Audit challenge received: kind=byte source={source} request_response={}",
1887                rr_message_id.is_some(),
1888            );
1889            let Some(guard) =
1890                admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1891                    .await
1892            else {
1893                warn!(
1894                    "Audit challenge reply not sent: kind=byte response=dropped \
1895                     source={source} (audit-responder capacity reached)"
1896                );
1897                return Ok(());
1898            };
1899            let bootstrapping = *is_bootstrapping.read().await;
1900            let storage = Arc::clone(storage);
1901            let p2p_node = Arc::clone(p2p_node);
1902            let my_commitment_state = Arc::clone(my_commitment_state);
1903            let source = *source;
1904            let request_id = msg.request_id;
1905            let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1906            tokio::spawn(async move {
1907                let _guard = guard; // global permit + per-peer slot, held until done
1908                let response = storage_commitment_audit::handle_subtree_byte_challenge(
1909                    &challenge,
1910                    &storage,
1911                    p2p_node.peer_id(),
1912                    bootstrapping,
1913                    Some(&my_commitment_state),
1914                )
1915                .await;
1916                let response_kind = subtree_byte_response_kind(&response);
1917                let sent = send_replication_response_checked(
1918                    &source,
1919                    &p2p_node,
1920                    request_id,
1921                    ReplicationMessageBody::SubtreeByteResponse(response),
1922                    rr_message_id.as_deref(),
1923                )
1924                .await;
1925                if sent {
1926                    info!(
1927                        "Audit challenge reply sent: kind=byte response={response_kind} \
1928                         source={source} request_response={}",
1929                        rr_message_id.is_some(),
1930                    );
1931                } else {
1932                    warn!(
1933                        "Audit challenge reply not sent: kind=byte response={response_kind} \
1934                         source={source} request_response={}",
1935                        rr_message_id.is_some(),
1936                    );
1937                }
1938            });
1939            Ok(())
1940        }
1941        // Response messages are handled by their respective request initiators.
1942        ReplicationMessageBody::FreshReplicationResponse(_)
1943        | ReplicationMessageBody::NeighborSyncResponse(_)
1944        | ReplicationMessageBody::VerificationResponse(_)
1945        | ReplicationMessageBody::FetchResponse(_)
1946        | ReplicationMessageBody::AuditResponse(_)
1947        | ReplicationMessageBody::SubtreeAuditResponse(_)
1948        | ReplicationMessageBody::SubtreeByteResponse(_) => Ok(()),
1949    }
1950}
1951
1952// ---------------------------------------------------------------------------
1953// Per-message-type handlers
1954// ---------------------------------------------------------------------------
1955
1956#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1957async fn handle_fresh_offer(
1958    source: &PeerId,
1959    offer: &protocol::FreshReplicationOffer,
1960    storage: &Arc<LmdbStorage>,
1961    paid_list: &Arc<PaidList>,
1962    payment_verifier: &Arc<PaymentVerifier>,
1963    p2p_node: &Arc<P2PNode>,
1964    config: &ReplicationConfig,
1965    request_id: u64,
1966    rr_message_id: Option<&str>,
1967) -> Result<()> {
1968    let self_id = *p2p_node.peer_id();
1969
1970    // Rule 5: reject if PoP is missing.
1971    if offer.proof_of_payment.is_empty() {
1972        send_replication_response(
1973            source,
1974            p2p_node,
1975            request_id,
1976            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1977                key: offer.key,
1978                reason: "Missing proof of payment".to_string(),
1979            }),
1980            rr_message_id,
1981        )
1982        .await;
1983        return Ok(());
1984    }
1985
1986    // Enforce chunk size invariant: the normal PUT path rejects data larger
1987    // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1988    // prevent peers from pushing oversized records through replication.
1989    if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1990        warn!(
1991            "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1992            hex::encode(offer.key),
1993            offer.data.len(),
1994            crate::ant_protocol::MAX_CHUNK_SIZE,
1995        );
1996        p2p_node
1997            .report_trust_event(
1998                source,
1999                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2000            )
2001            .await;
2002        send_replication_response(
2003            source,
2004            p2p_node,
2005            request_id,
2006            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
2007                key: offer.key,
2008                reason: format!(
2009                    "Data size {} exceeds maximum chunk size {}",
2010                    offer.data.len(),
2011                    crate::ant_protocol::MAX_CHUNK_SIZE,
2012                ),
2013            }),
2014            rr_message_id,
2015        )
2016        .await;
2017        return Ok(());
2018    }
2019
2020    // Mirror the normal PUT path: the advertised key must be the content
2021    // address of the supplied bytes before any expensive payment verification.
2022    let computed_key = crate::client::compute_address(&offer.data);
2023    if computed_key != offer.key {
2024        warn!(
2025            "Rejecting fresh offer for key {}: content address mismatch, computed {}",
2026            hex::encode(offer.key),
2027            hex::encode(computed_key),
2028        );
2029        p2p_node
2030            .report_trust_event(
2031                source,
2032                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2033            )
2034            .await;
2035        send_replication_response(
2036            source,
2037            p2p_node,
2038            request_id,
2039            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
2040                key: offer.key,
2041                reason: format!(
2042                    "Content address mismatch: expected {}, computed {}",
2043                    hex::encode(offer.key),
2044                    hex::encode(computed_key),
2045                ),
2046            }),
2047            rr_message_id,
2048        )
2049        .await;
2050        return Ok(());
2051    }
2052
2053    // Rule 7: check storage admission. Fresh chunk receivers accept across the
2054    // paid-close-group neighbourhood (`paid_list_close_group_size`, = K_BUCKET_SIZE,
2055    // the same width client PUTs use), not just the close group plus a small
2056    // margin (ADR-0003). During full-node shunning a healthy replica's routing
2057    // table may still list closer full nodes it hasn't evicted yet, ranking it
2058    // outside the narrow window in its own view; the wider accept window absorbs
2059    // that transient skew so the chunk still lands. Retention (pruning) stays at
2060    // `storage_admission_width`, so steady-state replication is unchanged.
2061    if !admission::is_responsible(
2062        &self_id,
2063        &offer.key,
2064        p2p_node,
2065        config.paid_list_close_group_size,
2066    )
2067    .await
2068    {
2069        send_replication_response(
2070            source,
2071            p2p_node,
2072            request_id,
2073            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
2074                key: offer.key,
2075                reason: "Not in storage-admission range for this key".to_string(),
2076            }),
2077            rr_message_id,
2078        )
2079        .await;
2080        return Ok(());
2081    }
2082
2083    // Disk-space pre-check — mirror the PUT handler (V2-411). A full node can
2084    // never store this record, so reject it before the expensive payment
2085    // verification (EVM on-chain query / merkle pool work) rather than verifying
2086    // and only then failing at `storage.put` below. Reuses the cached capacity
2087    // check (passing results only, so freed space is detected promptly), and the
2088    // store path keeps its own check as defence-in-depth.
2089    if let Err(e) = storage.check_capacity() {
2090        info!(
2091            target: "ant_node::storage::disk_precheck",
2092            key = %hex::encode(offer.key),
2093            "Rejecting fresh replication offer before payment verification: {e}"
2094        );
2095        send_replication_response(
2096            source,
2097            p2p_node,
2098            request_id,
2099            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
2100                key: offer.key,
2101                reason: e.to_string(),
2102            }),
2103            rr_message_id,
2104        )
2105        .await;
2106        return Ok(());
2107    }
2108
2109    // Gap 1: Validate PoP via PaymentVerifier. Fresh replication is still
2110    // part of the immediate write fan-out: this receiver is about to store the
2111    // record as if the client had PUT it here directly. Storage admission
2112    // was checked above before proof work. ClientPut verification applies
2113    // store-strength cache semantics, paid-quote issuer K-closeness and local
2114    // price floor checks for single-node proofs, and merkle candidate
2115    // closeness for merkle proofs.
2116    match payment_verifier
2117        .verify_payment(
2118            &offer.key,
2119            Some(&offer.proof_of_payment),
2120            fresh_offer_payment_context(),
2121        )
2122        .await
2123    {
2124        Ok(status) if status.can_store() => {
2125            debug!(
2126                "PoP validated for fresh offer key {}",
2127                hex::encode(offer.key)
2128            );
2129        }
2130        Ok(_) => {
2131            send_replication_response(
2132                source,
2133                p2p_node,
2134                request_id,
2135                ReplicationMessageBody::FreshReplicationResponse(
2136                    FreshReplicationResponse::Rejected {
2137                        key: offer.key,
2138                        reason: "Payment verification failed: payment required".to_string(),
2139                    },
2140                ),
2141                rr_message_id,
2142            )
2143            .await;
2144            return Ok(());
2145        }
2146        Err(e) => {
2147            warn!(
2148                "PoP verification error for key {}: {e}",
2149                hex::encode(offer.key)
2150            );
2151            send_replication_response(
2152                source,
2153                p2p_node,
2154                request_id,
2155                ReplicationMessageBody::FreshReplicationResponse(
2156                    FreshReplicationResponse::Rejected {
2157                        key: offer.key,
2158                        reason: format!("Payment verification error: {e}"),
2159                    },
2160                ),
2161                rr_message_id,
2162            )
2163            .await;
2164            return Ok(());
2165        }
2166    }
2167
2168    // Rule 6: add to PaidForList.
2169    if let Err(e) = paid_list.insert(&offer.key).await {
2170        warn!("Failed to add key to PaidForList: {e}");
2171    }
2172
2173    // Store the record.
2174    match storage.put(&offer.key, &offer.data).await {
2175        Ok(_) => {
2176            send_replication_response(
2177                source,
2178                p2p_node,
2179                request_id,
2180                ReplicationMessageBody::FreshReplicationResponse(
2181                    FreshReplicationResponse::Accepted { key: offer.key },
2182                ),
2183                rr_message_id,
2184            )
2185            .await;
2186        }
2187        Err(e) => {
2188            send_replication_response(
2189                source,
2190                p2p_node,
2191                request_id,
2192                ReplicationMessageBody::FreshReplicationResponse(
2193                    FreshReplicationResponse::Rejected {
2194                        key: offer.key,
2195                        reason: e.to_string(),
2196                    },
2197                ),
2198                rr_message_id,
2199            )
2200            .await;
2201        }
2202    }
2203
2204    Ok(())
2205}
2206
2207async fn handle_paid_notify(
2208    _source: &PeerId,
2209    notify: &protocol::PaidNotify,
2210    paid_list: &Arc<PaidList>,
2211    payment_verifier: &Arc<PaymentVerifier>,
2212    p2p_node: &Arc<P2PNode>,
2213    config: &ReplicationConfig,
2214) -> Result<()> {
2215    let self_id = *p2p_node.peer_id();
2216
2217    // Rule 3: validate PoP presence before adding.
2218    if notify.proof_of_payment.is_empty() {
2219        return Ok(());
2220    }
2221
2222    // Check if we're in PaidCloseGroup for this key.
2223    if !admission::is_in_paid_close_group(
2224        &self_id,
2225        &notify.key,
2226        p2p_node,
2227        config.paid_list_close_group_size,
2228    )
2229    .await
2230    {
2231        return Ok(());
2232    }
2233
2234    // Gap 1: Validate PoP via PaymentVerifier. PaidNotify admits fresh
2235    // paid-list metadata, so local paid-list close-group membership was checked
2236    // above before proof work. The verifier then runs the same payment proof
2237    // checks as ClientPut while writing a paid-list-strength cache entry.
2238    match payment_verifier
2239        .verify_payment(
2240            &notify.key,
2241            Some(&notify.proof_of_payment),
2242            paid_notify_payment_context(),
2243        )
2244        .await
2245    {
2246        Ok(status) if status.can_store() => {
2247            debug!(
2248                "PoP validated for paid notify key {}",
2249                hex::encode(notify.key)
2250            );
2251        }
2252        Ok(_) => {
2253            warn!(
2254                "Paid notify rejected: payment required for key {}",
2255                hex::encode(notify.key)
2256            );
2257            return Ok(());
2258        }
2259        Err(e) => {
2260            warn!(
2261                "PoP verification error for paid notify key {}: {e}",
2262                hex::encode(notify.key)
2263            );
2264            return Ok(());
2265        }
2266    }
2267
2268    if let Err(e) = paid_list.insert(&notify.key).await {
2269        warn!("Failed to add paid notify key to PaidForList: {e}");
2270    }
2271
2272    Ok(())
2273}
2274
2275#[allow(clippy::too_many_arguments)]
2276async fn handle_neighbor_sync_request(
2277    source: &PeerId,
2278    request: &protocol::NeighborSyncRequest,
2279    p2p_node: &Arc<P2PNode>,
2280    storage: &Arc<LmdbStorage>,
2281    paid_list: &Arc<PaidList>,
2282    queues: &Arc<RwLock<ReplicationQueues>>,
2283    config: &ReplicationConfig,
2284    is_bootstrapping: bool,
2285    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2286    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2287    sync_cycle_epoch: &Arc<RwLock<u64>>,
2288    repair_proofs: &Arc<RwLock<RepairProofs>>,
2289    my_commitment: Option<StorageCommitment>,
2290    request_id: u64,
2291    rr_message_id: Option<&str>,
2292) -> Result<()> {
2293    let self_id = *p2p_node.peer_id();
2294
2295    // No per-request hint count limit: the wire message size limit
2296    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
2297    // challenges, sync hints don't drive expensive computation — they just
2298    // enter the verification queue. A per-request limit here would break
2299    // bootstrap replication for newly-joined nodes with 0 stored chunks.
2300
2301    // Build response (outbound hints).
2302    let (response, sent_replica_hints, sender_in_rt) =
2303        neighbor_sync::handle_sync_request_with_proofs(
2304            source,
2305            request,
2306            p2p_node,
2307            storage,
2308            paid_list,
2309            config,
2310            is_bootstrapping,
2311            my_commitment.clone(),
2312        )
2313        .await;
2314
2315    // Send response.
2316    let response_sent = send_replication_response_checked(
2317        source,
2318        p2p_node,
2319        request_id,
2320        ReplicationMessageBody::NeighborSyncResponse(response),
2321        rr_message_id,
2322    )
2323    .await;
2324
2325    // Process inbound hints only if sender is in LocalRT (Rule 4-6).
2326    if !sender_in_rt {
2327        return Ok(());
2328    }
2329
2330    // Update sync history for this peer before recording repair proofs so a
2331    // same-tick audit cannot combine a fresh key proof with stale peer maturity.
2332    {
2333        let mut history = sync_history.write().await;
2334        let record = history.entry(*source).or_insert(PeerSyncRecord {
2335            last_sync: None,
2336            cycles_since_sync: 0,
2337        });
2338        record.last_sync = Some(Instant::now());
2339        record.cycles_since_sync = 0;
2340    }
2341
2342    if response_sent && !request.bootstrapping {
2343        record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
2344            .await;
2345    }
2346
2347    // Admit inbound hints and queue for verification.
2348    let outcome = admit_and_queue_hints(
2349        &self_id,
2350        source,
2351        &request.replica_hints,
2352        &request.paid_hints,
2353        p2p_node,
2354        config,
2355        storage,
2356        paid_list,
2357        queues,
2358    )
2359    .await;
2360
2361    // Track discovered keys for bootstrap drain detection so that hints
2362    // admitted via inbound sync requests are not missed. Capacity-rejected
2363    // hints keep this source on the "not yet drained" list until its next
2364    // sync re-admits them; a clean cycle clears the source.
2365    if is_bootstrapping {
2366        if !outcome.discovered.is_empty() {
2367            bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2368        }
2369        if outcome.capacity_rejected_count > 0 {
2370            bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
2371        } else {
2372            bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
2373        }
2374    }
2375
2376    Ok(())
2377}
2378
2379async fn handle_verification_request(
2380    source: &PeerId,
2381    request: &protocol::VerificationRequest,
2382    storage: &Arc<LmdbStorage>,
2383    paid_list: &Arc<PaidList>,
2384    p2p_node: &Arc<P2PNode>,
2385    request_id: u64,
2386    rr_message_id: Option<&str>,
2387) -> Result<()> {
2388    // No per-request key count limit: the wire message size limit
2389    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
2390    // does cheap storage lookups per key, not expensive computation like
2391    // audit digest generation.
2392
2393    #[allow(clippy::cast_possible_truncation)]
2394    let keys_len = request.keys.len() as u32;
2395    let paid_check_set: HashSet<u32> = request
2396        .paid_list_check_indices
2397        .iter()
2398        .copied()
2399        .filter(|&idx| {
2400            if idx >= keys_len {
2401                warn!(
2402                    "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
2403                    request.keys.len(),
2404                );
2405                false
2406            } else {
2407                true
2408            }
2409        })
2410        .collect();
2411
2412    let mut results = Vec::with_capacity(request.keys.len());
2413    for (i, key) in request.keys.iter().enumerate() {
2414        let present = storage.exists(key).unwrap_or(false);
2415        let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
2416            Some(paid_list.contains(key).unwrap_or(false))
2417        } else {
2418            None
2419        };
2420        results.push(protocol::KeyVerificationResult {
2421            key: *key,
2422            present,
2423            paid,
2424        });
2425    }
2426
2427    send_replication_response(
2428        source,
2429        p2p_node,
2430        request_id,
2431        ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
2432        rr_message_id,
2433    )
2434    .await;
2435
2436    Ok(())
2437}
2438
2439async fn handle_fetch_request(
2440    source: &PeerId,
2441    request: &protocol::FetchRequest,
2442    storage: &Arc<LmdbStorage>,
2443    p2p_node: &Arc<P2PNode>,
2444    request_id: u64,
2445    rr_message_id: Option<&str>,
2446) -> Result<()> {
2447    let response = match storage.get(&request.key).await {
2448        Ok(Some(data)) => protocol::FetchResponse::Success {
2449            key: request.key,
2450            data,
2451        },
2452        Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
2453        Err(e) => protocol::FetchResponse::Error {
2454            key: request.key,
2455            reason: format!("{e}"),
2456        },
2457    };
2458
2459    send_replication_response(
2460        source,
2461        p2p_node,
2462        request_id,
2463        ReplicationMessageBody::FetchResponse(response),
2464        rr_message_id,
2465    )
2466    .await;
2467
2468    Ok(())
2469}
2470
2471/// Responder for an incoming `AuditChallenge` (responsible-chunk audit #2, and
2472/// the prune-confirmation audit, which reuses the same wire message): reply with
2473/// per-key possession digests.
2474async fn handle_audit_challenge_msg(
2475    source: &PeerId,
2476    challenge: &protocol::AuditChallenge,
2477    storage: &Arc<LmdbStorage>,
2478    p2p_node: &Arc<P2PNode>,
2479    is_bootstrapping: bool,
2480    request_id: u64,
2481    rr_message_id: Option<&str>,
2482) -> Result<()> {
2483    #[allow(clippy::cast_possible_truncation)]
2484    let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
2485    info!(
2486        "Audit challenge received: kind=responsible keys={} bootstrapping={} request_response={}",
2487        challenge.keys.len(),
2488        is_bootstrapping,
2489        rr_message_id.is_some(),
2490    );
2491
2492    let response = audit::handle_audit_challenge(
2493        challenge,
2494        storage,
2495        p2p_node.peer_id(),
2496        is_bootstrapping,
2497        stored_chunks,
2498    )
2499    .await;
2500    let response_kind = audit_response_kind(&response);
2501
2502    let sent = send_replication_response_checked(
2503        source,
2504        p2p_node,
2505        request_id,
2506        ReplicationMessageBody::AuditResponse(response),
2507        rr_message_id,
2508    )
2509    .await;
2510    if sent {
2511        info!(
2512            "Audit challenge reply sent: kind=responsible response={} keys={} request_response={}",
2513            response_kind,
2514            challenge.keys.len(),
2515            rr_message_id.is_some(),
2516        );
2517    } else {
2518        warn!(
2519            "Audit challenge reply not sent: kind=responsible response={} keys={} request_response={}",
2520            response_kind,
2521            challenge.keys.len(),
2522            rr_message_id.is_some(),
2523        );
2524    }
2525
2526    Ok(())
2527}
2528
2529fn audit_response_kind(response: &protocol::AuditResponse) -> &'static str {
2530    match response {
2531        protocol::AuditResponse::Digests { .. } => "digests",
2532        protocol::AuditResponse::Bootstrapping { .. } => "bootstrapping",
2533        protocol::AuditResponse::Rejected { .. } => "rejected",
2534    }
2535}
2536
2537fn subtree_audit_response_kind(response: &protocol::SubtreeAuditResponse) -> &'static str {
2538    match response {
2539        protocol::SubtreeAuditResponse::Proof { .. } => "proof",
2540        protocol::SubtreeAuditResponse::Bootstrapping { .. } => "bootstrapping",
2541        protocol::SubtreeAuditResponse::Rejected { .. } => "rejected",
2542    }
2543}
2544
2545fn subtree_byte_response_kind(response: &protocol::SubtreeByteResponse) -> &'static str {
2546    match response {
2547        protocol::SubtreeByteResponse::Items { .. } => "items",
2548        protocol::SubtreeByteResponse::Bootstrapping { .. } => "bootstrapping",
2549        protocol::SubtreeByteResponse::Rejected { .. } => "rejected",
2550    }
2551}
2552
2553// ---------------------------------------------------------------------------
2554// Message sending helper
2555// ---------------------------------------------------------------------------
2556
2557/// Send a replication response message as a best-effort reply.
2558///
2559/// Encode and send failures are logged by the checked helper. Most response
2560/// paths do not need to branch on send success, so this wrapper keeps those
2561/// call sites explicit about their best-effort behavior.
2562async fn send_replication_response(
2563    peer: &PeerId,
2564    p2p_node: &Arc<P2PNode>,
2565    request_id: u64,
2566    body: ReplicationMessageBody,
2567    rr_message_id: Option<&str>,
2568) {
2569    let _ =
2570        send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
2571}
2572
2573/// Send a replication response message and report whether it was accepted.
2574///
2575/// Returns `true` after the message is encoded and accepted by the P2P send
2576/// path. Returns `false` after logging an encode or send failure. Repair-proof
2577/// recording uses this to avoid trusting hints that were not actually sent.
2578///
2579/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
2580/// request-response path so saorsa-core can route it back to the caller's
2581/// `send_request` future. Otherwise it is sent as a plain message.
2582async fn send_replication_response_checked(
2583    peer: &PeerId,
2584    p2p_node: &Arc<P2PNode>,
2585    request_id: u64,
2586    body: ReplicationMessageBody,
2587    rr_message_id: Option<&str>,
2588) -> bool {
2589    let msg = ReplicationMessage { request_id, body };
2590    let encoded = match msg.encode() {
2591        Ok(data) => data,
2592        Err(e) => {
2593            warn!("Failed to encode replication response: {e}");
2594            return false;
2595        }
2596    };
2597    let result = if let Some(msg_id) = rr_message_id {
2598        p2p_node
2599            .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
2600            .await
2601    } else {
2602        p2p_node
2603            .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
2604            .await
2605    };
2606    if let Err(e) = result {
2607        debug!("Failed to send replication response to {peer}: {e}");
2608        return false;
2609    }
2610    true
2611}
2612
2613async fn record_sent_replica_hints(
2614    peer: &PeerId,
2615    hints: &[neighbor_sync::SentReplicaHint],
2616    repair_proofs: &Arc<RwLock<RepairProofs>>,
2617    sync_cycle_epoch: &Arc<RwLock<u64>>,
2618) {
2619    if hints.is_empty() {
2620        return;
2621    }
2622
2623    let hinted_at_epoch = *sync_cycle_epoch.read().await;
2624    let mut proofs = repair_proofs.write().await;
2625    for hint in hints {
2626        if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
2627            debug!(
2628                "Recorded repair hint proof for peer {peer} and key {}",
2629                hex::encode(hint.key)
2630            );
2631        }
2632    }
2633}
2634
2635// ---------------------------------------------------------------------------
2636// Neighbor sync round
2637// ---------------------------------------------------------------------------
2638
2639/// Run one neighbor sync round.
2640#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2641async fn run_neighbor_sync_round(
2642    p2p_node: &Arc<P2PNode>,
2643    storage: &Arc<LmdbStorage>,
2644    paid_list: &Arc<PaidList>,
2645    queues: &Arc<RwLock<ReplicationQueues>>,
2646    config: &ReplicationConfig,
2647    sync_state: &Arc<RwLock<NeighborSyncState>>,
2648    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2649    sync_cycle_epoch: &Arc<RwLock<u64>>,
2650    repair_proofs: &Arc<RwLock<RepairProofs>>,
2651    is_bootstrapping: &Arc<RwLock<bool>>,
2652    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2653    commitment_state: &Arc<ResponderCommitmentState>,
2654    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
2655    ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
2656    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
2657    gossip_audit: &GossipAuditTrigger,
2658) {
2659    let self_id = *p2p_node.peer_id();
2660    let bootstrapping = *is_bootstrapping.read().await;
2661
2662    // Check if cycle is complete; start new one if needed.
2663    // We check under a read lock, then release it before the expensive
2664    // prune pass and DHT snapshot so other tasks are not starved.
2665    let cycle_complete = sync_state.read().await.is_cycle_complete();
2666    if cycle_complete {
2667        // A completed local neighbor-sync cycle advances the epoch component
2668        // of repair-proof maturity. The per-key wall-clock minimum age is
2669        // checked when audits are selected.
2670        {
2671            let mut history = sync_history.write().await;
2672            for record in history.values_mut() {
2673                record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
2674            }
2675        }
2676        let current_sync_epoch = {
2677            let mut epoch = sync_cycle_epoch.write().await;
2678            *epoch = epoch.saturating_add(1);
2679            *epoch
2680        };
2681
2682        // Post-cycle pruning (Section 11) — runs without holding sync_state.
2683        // Remote prune-confirmation audits are storage-proof audits and only
2684        // run after bootstrap has drained.
2685        let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
2686        pruning::run_prune_pass_with_context(pruning::PrunePassContext {
2687            self_id: &self_id,
2688            storage,
2689            paid_list,
2690            p2p_node,
2691            config,
2692            sync_state,
2693            repair_proofs,
2694            current_sync_epoch,
2695            #[cfg(any(test, feature = "test-utils"))]
2696            repair_proof_now: None,
2697            allow_remote_prune_audits,
2698            commitment_state: Some(commitment_state),
2699        })
2700        .await;
2701
2702        // Take fresh close-neighbor snapshot (DHT query, no lock held).
2703        let neighbors =
2704            neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
2705                .await;
2706
2707        // Now re-acquire write lock and re-check before swapping cycle.
2708        let mut state = sync_state.write().await;
2709        if state.is_cycle_complete() {
2710            // Preserve cooldown and bootstrap-claim tracking across cycles.
2711            // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
2712            // would reset the abuse detection timer every cycle.
2713            let old_sync_times = std::mem::take(&mut state.last_sync_times);
2714            let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
2715            let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
2716            let old_prune_cursor = state.prune_cursor;
2717            *state = NeighborSyncState::new_cycle(neighbors);
2718            state.last_sync_times = old_sync_times;
2719            state.bootstrap_claims = old_bootstrap_claims;
2720            state.bootstrap_claim_history = old_bootstrap_claim_history;
2721            state.prune_cursor = old_prune_cursor;
2722        }
2723    }
2724
2725    // Select batch of peers.
2726    let batch = {
2727        let mut state = sync_state.write().await;
2728        neighbor_sync::select_sync_batch(
2729            &mut state,
2730            config.neighbor_sync_peer_count,
2731            config.neighbor_sync_cooldown,
2732        )
2733    };
2734
2735    if batch.is_empty() {
2736        return;
2737    }
2738
2739    debug!("Neighbor sync: syncing with {} peers", batch.len());
2740
2741    // Snapshot our current commitment once per round so all peers in
2742    // this batch see the same thing (gossip is the responder's attestation;
2743    // same value across the batch is fine and reduces RwLock churn). Atomically
2744    // snapshot + mark-gossiped so we stay answerable for exactly what we emit
2745    // (ADR-0002 retention), with no TOCTOU vs a concurrent retire/rotate.
2746    let my_commitment = commitment_state
2747        .current_for_gossip()
2748        .map(|b| b.commitment().clone());
2749
2750    let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
2751        &batch,
2752        storage,
2753        paid_list,
2754        p2p_node,
2755        config.close_group_size,
2756        config.paid_list_close_group_size,
2757    )
2758    .await;
2759
2760    // Sync with each peer in the batch.
2761    for peer in &batch {
2762        let hints = hints_by_peer.remove(peer).unwrap_or_default();
2763        let outcome = neighbor_sync::sync_with_peer_with_hints(
2764            peer,
2765            p2p_node,
2766            config,
2767            bootstrapping,
2768            hints,
2769            my_commitment.clone(),
2770        )
2771        .await;
2772
2773        if let Some(outcome) = outcome {
2774            handle_sync_response(
2775                &self_id,
2776                peer,
2777                &outcome.response,
2778                &outcome.sent_replica_hints,
2779                p2p_node,
2780                config,
2781                bootstrapping,
2782                bootstrap_state,
2783                storage,
2784                paid_list,
2785                queues,
2786                sync_state,
2787                sync_history,
2788                sync_cycle_epoch,
2789                repair_proofs,
2790                last_commitment_by_peer,
2791                ever_capable_peers,
2792                sig_verify_attempts,
2793                gossip_audit,
2794            )
2795            .await;
2796        } else {
2797            // Sync failed -- remove peer and try to fill slot.
2798            let replacement = {
2799                let mut state = sync_state.write().await;
2800                neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
2801            };
2802
2803            // Attempt sync with the replacement peer (if one was found).
2804            if let Some(replacement_peer) = replacement {
2805                let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers(
2806                    std::slice::from_ref(&replacement_peer),
2807                    storage,
2808                    paid_list,
2809                    p2p_node,
2810                    config.close_group_size,
2811                    config.paid_list_close_group_size,
2812                )
2813                .await;
2814                let hints = replacement_hints
2815                    .remove(&replacement_peer)
2816                    .unwrap_or_default();
2817                let replacement_outcome = neighbor_sync::sync_with_peer_with_hints(
2818                    &replacement_peer,
2819                    p2p_node,
2820                    config,
2821                    bootstrapping,
2822                    hints,
2823                    my_commitment.clone(),
2824                )
2825                .await;
2826
2827                if let Some(outcome) = replacement_outcome {
2828                    handle_sync_response(
2829                        &self_id,
2830                        &replacement_peer,
2831                        &outcome.response,
2832                        &outcome.sent_replica_hints,
2833                        p2p_node,
2834                        config,
2835                        bootstrapping,
2836                        bootstrap_state,
2837                        storage,
2838                        paid_list,
2839                        queues,
2840                        sync_state,
2841                        sync_history,
2842                        sync_cycle_epoch,
2843                        repair_proofs,
2844                        last_commitment_by_peer,
2845                        ever_capable_peers,
2846                        sig_verify_attempts,
2847                        gossip_audit,
2848                    )
2849                    .await;
2850                }
2851            }
2852        }
2853    }
2854}
2855
2856/// Process a successful neighbor sync response: record the sync, check for
2857/// bootstrap claim abuse, and admit inbound hints.
2858#[allow(clippy::too_many_arguments)]
2859async fn handle_sync_response(
2860    self_id: &PeerId,
2861    peer: &PeerId,
2862    resp: &NeighborSyncResponse,
2863    sent_replica_hints: &[neighbor_sync::SentReplicaHint],
2864    p2p_node: &Arc<P2PNode>,
2865    config: &ReplicationConfig,
2866    bootstrapping: bool,
2867    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2868    storage: &Arc<LmdbStorage>,
2869    paid_list: &Arc<PaidList>,
2870    queues: &Arc<RwLock<ReplicationQueues>>,
2871    sync_state: &Arc<RwLock<NeighborSyncState>>,
2872    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2873    sync_cycle_epoch: &Arc<RwLock<u64>>,
2874    repair_proofs: &Arc<RwLock<RepairProofs>>,
2875    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
2876    ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
2877    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
2878    gossip_audit: &GossipAuditTrigger,
2879) {
2880    // Ingest the peer's commitment if they piggybacked one on the response.
2881    // Same verification as the request path (peer-id binding + signature);
2882    // forged commitments are dropped at the edge. A *changed* commitment here
2883    // is a gossip-audit trigger just like on the request path — so a peer that
2884    // only ever answers sync (never initiates) is still audited (ADR-0002).
2885    if let Some(target) = ingest_peer_commitment(
2886        peer,
2887        resp.commitment.as_ref(),
2888        p2p_node,
2889        last_commitment_by_peer,
2890        ever_capable_peers,
2891        sig_verify_attempts,
2892    )
2893    .await
2894    {
2895        maybe_trigger_gossip_audit(gossip_audit, peer, target).await;
2896    }
2897
2898    // Record successful sync.
2899    {
2900        let mut state = sync_state.write().await;
2901        neighbor_sync::record_successful_sync(&mut state, peer);
2902    }
2903    {
2904        let mut history = sync_history.write().await;
2905        let record = history.entry(*peer).or_insert(PeerSyncRecord {
2906            last_sync: None,
2907            cycles_since_sync: 0,
2908        });
2909        record.last_sync = Some(Instant::now());
2910        record.cycles_since_sync = 0;
2911    }
2912
2913    // Process inbound hints from response (skip if peer is bootstrapping).
2914    if resp.bootstrapping {
2915        // Gap 6: BootstrapClaimAbuse grace period enforcement.
2916        // Separate state mutation from network I/O to avoid holding the
2917        // write lock across report_trust_event.
2918        let should_report = {
2919            let now = Instant::now();
2920            let mut state = sync_state.write().await;
2921            match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
2922                BootstrapClaimObservation::WithinGrace { .. } => false,
2923                BootstrapClaimObservation::PastGrace { first_seen } => {
2924                    warn!(
2925                        "Peer {peer} has been claiming bootstrap for {:?}, \
2926                         exceeding grace period of {:?} — reporting abuse",
2927                        now.duration_since(first_seen),
2928                        config.bootstrap_claim_grace_period,
2929                    );
2930                    true
2931                }
2932                BootstrapClaimObservation::Repeated { first_seen } => {
2933                    warn!(
2934                        "Peer {peer} repeated bootstrap claim after previously stopping; \
2935                         first claim was {:?} ago — reporting abuse",
2936                        now.duration_since(first_seen),
2937                    );
2938                    true
2939                }
2940            }
2941        };
2942        if should_report {
2943            p2p_node
2944                .report_trust_event(
2945                    peer,
2946                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2947                )
2948                .await;
2949        }
2950    } else {
2951        // Peer is not claiming bootstrap; clear active claim while retaining
2952        // history so the peer cannot start a second grace window later.
2953        {
2954            let mut state = sync_state.write().await;
2955            state.clear_active_bootstrap_claim(peer);
2956        }
2957        record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
2958        let outcome = admit_and_queue_hints(
2959            self_id,
2960            peer,
2961            &resp.replica_hints,
2962            &resp.paid_hints,
2963            p2p_node,
2964            config,
2965            storage,
2966            paid_list,
2967            queues,
2968        )
2969        .await;
2970
2971        // Track discovered keys for bootstrap drain detection so that hints
2972        // admitted via regular neighbor sync are not missed. Capacity-
2973        // rejected hints keep this source on the "not yet drained" list
2974        // until its next sync replays them; a clean cycle clears it.
2975        if bootstrapping {
2976            if !outcome.discovered.is_empty() {
2977                bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2978            }
2979            if outcome.capacity_rejected_count > 0 {
2980                bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
2981            } else {
2982                bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
2983            }
2984        }
2985    }
2986}
2987
2988/// Admit hints and queue them for verification, returning newly-discovered keys.
2989///
2990/// Shared by neighbor-sync request handling, response handling, and bootstrap
2991/// sync so that admission + queueing logic lives in one place.
2992#[allow(clippy::too_many_arguments)]
2993/// Outcome of [`admit_and_queue_hints`].
2994///
2995/// `capacity_rejected_count` is non-zero when one or more legitimately
2996/// admissible hints were dropped because `pending_verify`'s global or
2997/// per-source bound was hit. Callers that care about completeness
2998/// (bootstrap drain accounting) MUST NOT treat their work as complete while
2999/// this is > 0 — the source will need to re-hint after capacity frees up.
3000struct AdmissionOutcome {
3001    discovered: HashSet<XorName>,
3002    capacity_rejected_count: usize,
3003}
3004
3005#[allow(clippy::too_many_arguments)]
3006async fn admit_and_queue_hints(
3007    self_id: &PeerId,
3008    source_peer: &PeerId,
3009    replica_hints: &[XorName],
3010    paid_hints: &[XorName],
3011    p2p_node: &Arc<P2PNode>,
3012    config: &ReplicationConfig,
3013    storage: &Arc<LmdbStorage>,
3014    paid_list: &Arc<PaidList>,
3015    queues: &Arc<RwLock<ReplicationQueues>>,
3016) -> AdmissionOutcome {
3017    let pending_keys: HashSet<XorName> = {
3018        let q = queues.read().await;
3019        q.pending_keys().into_iter().collect()
3020    };
3021
3022    let admitted = admission::admit_hints(
3023        self_id,
3024        replica_hints,
3025        paid_hints,
3026        p2p_node,
3027        config,
3028        storage,
3029        paid_list,
3030        &pending_keys,
3031    )
3032    .await;
3033
3034    let mut discovered = HashSet::new();
3035    let mut capacity_rejected_count: usize = 0;
3036    let mut q = queues.write().await;
3037    let now = Instant::now();
3038
3039    for key in admitted.replica_keys {
3040        if !storage.exists(&key).unwrap_or(false) {
3041            let result = q.add_pending_verify(
3042                key,
3043                VerificationEntry {
3044                    state: VerificationState::PendingVerify,
3045                    pipeline: HintPipeline::Replica,
3046                    verified_sources: Vec::new(),
3047                    tried_sources: HashSet::new(),
3048                    created_at: now,
3049                    hint_sender: *source_peer,
3050                },
3051            );
3052            match result {
3053                crate::replication::scheduling::AdmissionResult::Admitted => {
3054                    discovered.insert(key);
3055                }
3056                crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
3057                crate::replication::scheduling::AdmissionResult::CapacityRejected => {
3058                    capacity_rejected_count += 1;
3059                }
3060            }
3061        }
3062    }
3063
3064    for key in admitted.paid_only_keys {
3065        let result = q.add_pending_verify(
3066            key,
3067            VerificationEntry {
3068                state: VerificationState::PendingVerify,
3069                pipeline: HintPipeline::PaidOnly,
3070                verified_sources: Vec::new(),
3071                tried_sources: HashSet::new(),
3072                created_at: now,
3073                hint_sender: *source_peer,
3074            },
3075        );
3076        match result {
3077            crate::replication::scheduling::AdmissionResult::Admitted => {
3078                discovered.insert(key);
3079            }
3080            crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
3081            crate::replication::scheduling::AdmissionResult::CapacityRejected => {
3082                capacity_rejected_count += 1;
3083            }
3084        }
3085    }
3086
3087    if capacity_rejected_count > 0 {
3088        debug!(
3089            "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
3090             rejected at queue capacity; source will need to re-hint after pending_verify drains"
3091        );
3092    }
3093
3094    AdmissionOutcome {
3095        discovered,
3096        capacity_rejected_count,
3097    }
3098}
3099
3100// ---------------------------------------------------------------------------
3101// Verification cycle
3102// ---------------------------------------------------------------------------
3103
3104/// Run one verification cycle: process pending keys through quorum checks.
3105#[allow(clippy::too_many_lines)]
3106async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
3107    let cycle_started = Instant::now();
3108    let VerificationCycleContext {
3109        p2p_node,
3110        paid_list,
3111        storage,
3112        queues,
3113        config,
3114        bootstrap_state,
3115        is_bootstrapping,
3116        bootstrap_complete_notify,
3117        last_commitment_by_peer,
3118        ever_capable_peers,
3119        recent_provers,
3120    } = ctx;
3121
3122    // Evict stale entries that have been pending too long (e.g. unreachable
3123    // verification targets during a network partition).
3124    {
3125        let mut q = queues.write().await;
3126        q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
3127    }
3128
3129    let pending_keys = {
3130        let q = queues.read().await;
3131        q.pending_keys()
3132    };
3133
3134    if pending_keys.is_empty() {
3135        return;
3136    }
3137    let initial_pending_count = pending_keys.len();
3138
3139    let self_id = *p2p_node.peer_id();
3140
3141    // Step 1: Check local PaidForList for fast-path authorization (Section 9,
3142    // step 4).
3143    let mut local_paid_presence_probe_keys = Vec::new();
3144    let mut local_paid_paid_only_keys = Vec::new();
3145    let mut keys_needing_network = Vec::new();
3146    let mut terminal_keys: Vec<XorName> = Vec::new();
3147    {
3148        let mut q = queues.write().await;
3149        for key in &pending_keys {
3150            if paid_list.contains(key).unwrap_or(false) {
3151                if let Some(pipeline) =
3152                    q.set_pending_state(key, VerificationState::PaidListVerified)
3153                {
3154                    match pipeline {
3155                        HintPipeline::PaidOnly => {
3156                            // Paid-only + local paid state needs one more
3157                            // storage-admission check outside this lock: if we
3158                            // are also in the close group plus storage margin,
3159                            // the hint can repair a missing replica.
3160                            local_paid_paid_only_keys.push(*key);
3161                        }
3162                        HintPipeline::Replica => {
3163                            // Local paid-list membership authorizes the key.
3164                            // We still need a presence probe to discover fetch
3165                            // sources, but we must not require remote paid
3166                            // majority or presence quorum.
3167                            local_paid_presence_probe_keys.push(*key);
3168                        }
3169                    }
3170                }
3171            } else {
3172                keys_needing_network.push(*key);
3173            }
3174        }
3175    }
3176
3177    if !local_paid_paid_only_keys.is_empty() {
3178        let mut terminal_paid_only = Vec::new();
3179        for key in local_paid_paid_only_keys {
3180            if storage.exists(&key).unwrap_or(false) {
3181                terminal_paid_only.push(key);
3182            } else if admission::is_responsible(
3183                &self_id,
3184                &key,
3185                p2p_node,
3186                storage_admission_width(config.close_group_size),
3187            )
3188            .await
3189            {
3190                local_paid_presence_probe_keys.push(key);
3191            } else {
3192                terminal_paid_only.push(key);
3193            }
3194        }
3195
3196        if !terminal_paid_only.is_empty() {
3197            let mut q = queues.write().await;
3198            for key in terminal_paid_only {
3199                q.remove_pending(&key);
3200                terminal_keys.push(key);
3201            }
3202        }
3203    }
3204
3205    let local_paid_probe_count = local_paid_presence_probe_keys.len();
3206    let keys_needing_network_count = keys_needing_network.len();
3207
3208    // Step 1b: Local paid-list hit for fetch-eligible keys. Per Section 9
3209    // step 4, authorization succeeds immediately; run a presence-only probe
3210    // to find any holder we can fetch from.
3211    if !local_paid_presence_probe_keys.is_empty() {
3212        let targets = quorum::compute_presence_targets(
3213            &local_paid_presence_probe_keys,
3214            p2p_node,
3215            config,
3216            &self_id,
3217        )
3218        .await;
3219        let evidence = quorum::run_verification_round(
3220            &local_paid_presence_probe_keys,
3221            &targets,
3222            p2p_node,
3223            config,
3224        )
3225        .await;
3226
3227        let mut q = queues.write().await;
3228        for key in local_paid_presence_probe_keys {
3229            if storage.exists(&key).unwrap_or(false) {
3230                q.remove_pending(&key);
3231                terminal_keys.push(key);
3232                continue;
3233            }
3234            let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
3235                quorum::present_sources_for_key(&key, ev, &targets)
3236            });
3237            if sources.is_empty() {
3238                // Terminal failure: remove pending and report. No fetch path.
3239                q.remove_pending(&key);
3240                warn!(
3241                    "Locally paid key {} has no responding holders (possible data loss)",
3242                    hex::encode(key)
3243                );
3244                terminal_keys.push(key);
3245            } else {
3246                let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
3247                // Atomic remove+enqueue: if fetch_queue is at capacity, the
3248                // pending entry is preserved and retried next cycle (no
3249                // silent drop of verified replica-repair work).
3250                let _ = q.promote_pending_to_fetch(key, distance, sources);
3251            }
3252        }
3253    }
3254
3255    // Steps 2-5: Network verification (skipped if all keys resolved locally).
3256    if !keys_needing_network.is_empty() {
3257        // Step 2: Compute targets and run network verification round.
3258        let targets =
3259            quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
3260                .await;
3261
3262        let evidence =
3263            quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
3264
3265        // Step 3: Evaluate results — collect outcomes without holding the write
3266        // lock across paid-list I/O.
3267        //
3268        // v12 §6 holder-eligibility: snapshot the per-peer last-commitment
3269        // table and recent_provers cache up front so the synchronous
3270        // evaluate_key_evidence_with_holder_check predicate can consult
3271        // them without awaiting. The predicate downgrades a Present
3272        // claim to Unresolved unless the peer is credited for that key.
3273        // Snapshot per-peer commitment data. We need two views:
3274        //   - `commitment_by_peer_snapshot`: peers that currently have
3275        //     a verified commitment record on file (used to look up
3276        //     their current hash).
3277        //   - `capable_peer_snapshot`: the sticky "ever v12-capable"
3278        //     set. Sourced from a separate set rather than the
3279        //     commitment map so eviction (PeerRemoved cleanup, sybil
3280        //     cap at `MAX_LAST_COMMITMENT_BY_PEER`) does NOT downgrade
3281        //     a previously-v12 peer to "legacy" credit-unconditionally.
3282        //     Legacy / pre-v12 peers that have never sent a commitment
3283        //     remain absent from the set and are credited via the
3284        //     legacy path so mixed-version networks stay live.
3285        let commitment_by_peer_snapshot: HashMap<PeerId, [u8; 32]> = {
3286            let map = last_commitment_by_peer.read().await;
3287            map.iter()
3288                // Read the CACHED hash (§13) — no per-cycle re-serialize/re-hash
3289                // of every peer's ~5 KiB commitment.
3290                .filter_map(|(p, rec)| rec.commitment_hash().map(|h| (*p, h)))
3291                .collect()
3292        };
3293        let capable_peer_snapshot: HashSet<PeerId> = ever_capable_peers.read().await.clone();
3294        // Take a full snapshot of recent_provers under the read lock,
3295        // then release. The cache is bounded (16/key × keys), so the
3296        // clone is cheap.
3297        let provers_snapshot = recent_provers.read().await.clone();
3298        // For the replica-fetch path, we need to know whether THIS
3299        // node already holds the key being verified. The v12 §6
3300        // holder-credit gate is meant to prevent uncredited Present
3301        // claims from contributing to paid-list / reward quorum for
3302        // keys we DO hold (and could audit ourselves). For keys we
3303        // are trying to FETCH (i.e. not in local storage), there is
3304        // no possible local audit credit, and gating the presence
3305        // quorum on credit would deadlock replica-repair in a
3306        // fully v12-capable close group.
3307        let mut locally_held: HashSet<XorName> = HashSet::new();
3308        for key in &keys_needing_network {
3309            if storage.exists(key).unwrap_or(false) {
3310                locally_held.insert(*key);
3311            }
3312        }
3313        let holder_credit = |peer: &PeerId, key: &XorName| -> bool {
3314            if !locally_held.contains(key) {
3315                // Replica-fetch path: we don't hold this key, so we
3316                // cannot have collected audit credit for it. Trust
3317                // Present claims to drive fetch-source promotion;
3318                // chunk-PUT payment_verifier is the security backstop
3319                // when the bytes actually arrive.
3320                return true;
3321            }
3322            if !capable_peer_snapshot.contains(peer) {
3323                // Pre-v12 / legacy peer that has never gossiped a
3324                // commitment. The v12 §6 holder-eligibility check
3325                // doesn't apply: their Present evidence comes through
3326                // the legacy path and we credit it unconditionally
3327                // so a mixed-version network stays live during
3328                // transition.
3329                return true;
3330            }
3331            let Some(hash) = commitment_by_peer_snapshot.get(peer) else {
3332                // Peer is commitment_capable (sticky) but currently
3333                // has no live commitment record on file (e.g. their
3334                // last gossip was evicted from the LRU cache, or it
3335                // failed verification). Withhold credit until they
3336                // re-prove storage under a fresh commitment.
3337                return false;
3338            };
3339            provers_snapshot.is_credited_holder(key, peer, hash)
3340        };
3341
3342        let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
3343        {
3344            let q = queues.read().await;
3345            for key in &keys_needing_network {
3346                let Some(ev) = evidence.get(key) else {
3347                    continue;
3348                };
3349                let Some(entry) = q.get_pending(key) else {
3350                    continue;
3351                };
3352                let outcome = quorum::evaluate_key_evidence_with_holder_check(
3353                    key,
3354                    ev,
3355                    &targets,
3356                    config,
3357                    holder_credit,
3358                );
3359                evaluated.push((*key, outcome, entry.pipeline));
3360            }
3361        } // read lock released
3362
3363        // Step 4: Insert verified keys into PaidForList (no lock held).
3364        let mut paid_insert_keys: Vec<XorName> = Vec::new();
3365        for (key, outcome, _) in &evaluated {
3366            if matches!(
3367                outcome,
3368                KeyVerificationOutcome::QuorumVerified { .. }
3369                    | KeyVerificationOutcome::PaidListVerified { .. }
3370            ) {
3371                paid_insert_keys.push(*key);
3372            }
3373        }
3374        for key in &paid_insert_keys {
3375            if let Err(e) = paid_list.insert(key).await {
3376                warn!("Failed to add verified key to PaidForList: {e}");
3377            }
3378        }
3379
3380        // Paid-only hints normally update PaidForList only. If this node is
3381        // also within the storage-admission group for the key, a verified
3382        // paid-only hint can safely repair a missing replica using sources
3383        // from the same verification round.
3384        let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
3385        for (key, outcome, pipeline) in &evaluated {
3386            if *pipeline == HintPipeline::PaidOnly
3387                && matches!(
3388                    outcome,
3389                    KeyVerificationOutcome::QuorumVerified { .. }
3390                        | KeyVerificationOutcome::PaidListVerified { .. }
3391                )
3392                && !storage.exists(key).unwrap_or(false)
3393                && admission::is_responsible(
3394                    &self_id,
3395                    key,
3396                    p2p_node,
3397                    storage_admission_width(config.close_group_size),
3398                )
3399                .await
3400            {
3401                paid_only_fetch_keys.insert(*key);
3402            }
3403        }
3404
3405        // Step 5: Update queues with the evaluated outcomes.
3406        let mut q = queues.write().await;
3407        for (key, outcome, pipeline) in evaluated {
3408            match outcome {
3409                KeyVerificationOutcome::QuorumVerified { sources }
3410                | KeyVerificationOutcome::PaidListVerified { sources } => {
3411                    let fetch_eligible =
3412                        pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
3413                    if fetch_eligible && !sources.is_empty() {
3414                        let distance =
3415                            crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
3416                        // Atomic remove+enqueue: on fetch_queue capacity miss
3417                        // the pending entry is preserved so this verified key
3418                        // is retried on the next cycle (no silent drop).
3419                        let _ = q.promote_pending_to_fetch(key, distance, sources);
3420                        // Not terminal — either moved to fetch queue, or
3421                        // retained as pending until queue drains.
3422                    } else if fetch_eligible && sources.is_empty() {
3423                        warn!(
3424                            "Verified storage-admitted key {} has no holders (possible data loss)",
3425                            hex::encode(key)
3426                        );
3427                        q.remove_pending(&key);
3428                        terminal_keys.push(key);
3429                    } else {
3430                        q.remove_pending(&key);
3431                        terminal_keys.push(key);
3432                    }
3433                }
3434                KeyVerificationOutcome::QuorumFailed
3435                | KeyVerificationOutcome::QuorumInconclusive => {
3436                    q.remove_pending(&key);
3437                    terminal_keys.push(key);
3438                }
3439            }
3440        }
3441    }
3442
3443    // Step 6: Remove terminal keys from bootstrap pending set and re-check
3444    // the drain condition.
3445    update_bootstrap_after_verification(
3446        &terminal_keys,
3447        bootstrap_state,
3448        queues,
3449        is_bootstrapping,
3450        bootstrap_complete_notify,
3451    )
3452    .await;
3453
3454    let (pending_after, fetch_after, in_flight_after) = {
3455        let q = queues.read().await;
3456        (
3457            q.pending_count(),
3458            q.fetch_queue_count(),
3459            q.in_flight_count(),
3460        )
3461    };
3462    let terminal_key_count = terminal_keys.len();
3463    let elapsed_ms = cycle_started.elapsed().as_millis();
3464
3465    if elapsed_ms >= VERIFICATION_CYCLE_SLOW_LOG_MS {
3466        info!(
3467            target: "ant_node::replication::verification",
3468            "Slow replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
3469        );
3470    } else {
3471        debug!(
3472            target: "ant_node::replication::verification",
3473            "Replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
3474        );
3475    }
3476}
3477
3478/// Post-verification bootstrap bookkeeping: remove terminal keys from the
3479/// bootstrap pending set and transition out of bootstrapping when drained.
3480async fn update_bootstrap_after_verification(
3481    terminal_keys: &[XorName],
3482    bootstrap_state: &Arc<RwLock<BootstrapState>>,
3483    queues: &Arc<RwLock<ReplicationQueues>>,
3484    is_bootstrapping: &Arc<RwLock<bool>>,
3485    bootstrap_complete_notify: &Arc<Notify>,
3486) {
3487    if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
3488        return;
3489    }
3490    {
3491        let mut bs = bootstrap_state.write().await;
3492        for key in terminal_keys {
3493            bs.remove_key(key);
3494        }
3495    }
3496    let q = queues.read().await;
3497    if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
3498        complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
3499    }
3500}
3501
3502/// Set `is_bootstrapping` to `false` and wake all waiters.
3503async fn complete_bootstrap(
3504    is_bootstrapping: &Arc<RwLock<bool>>,
3505    bootstrap_complete_notify: &Arc<Notify>,
3506) {
3507    *is_bootstrapping.write().await = false;
3508    bootstrap_complete_notify.notify_waiters();
3509    info!("Replication bootstrap complete");
3510}
3511
3512// ---------------------------------------------------------------------------
3513// Fetch types and single-fetch executor
3514// ---------------------------------------------------------------------------
3515
3516/// Result classification for a single fetch attempt.
3517enum FetchResult {
3518    /// Data fetched, integrity-checked, and stored successfully.
3519    Stored,
3520    /// Content-address integrity check failed — do not retry.
3521    IntegrityFailed,
3522    /// Source failed (network error or non-success response) — retryable.
3523    SourceFailed,
3524}
3525
3526/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
3527/// worker loop to update queue state.
3528struct FetchOutcome {
3529    key: XorName,
3530    result: FetchResult,
3531}
3532
3533#[allow(clippy::too_many_lines)]
3534/// Execute a single fetch request against `source` for `key`.
3535///
3536/// Handles encoding, network I/O, integrity checking, storage, and trust
3537/// event reporting.  Returns a [`FetchOutcome`] so the caller can update
3538/// queue state without holding any locks during the network round-trip.
3539async fn execute_single_fetch(
3540    p2p_node: Arc<P2PNode>,
3541    storage: Arc<LmdbStorage>,
3542    config: Arc<ReplicationConfig>,
3543    key: XorName,
3544    source: PeerId,
3545) -> FetchOutcome {
3546    let request = protocol::FetchRequest { key };
3547    let msg = ReplicationMessage {
3548        request_id: rand::thread_rng().gen::<u64>(),
3549        body: ReplicationMessageBody::FetchRequest(request),
3550    };
3551
3552    let encoded = match msg.encode() {
3553        Ok(data) => data,
3554        Err(e) => {
3555            warn!("Failed to encode fetch request: {e}");
3556            return FetchOutcome {
3557                key,
3558                result: FetchResult::SourceFailed,
3559            };
3560        }
3561    };
3562
3563    let result = p2p_node
3564        .send_request(
3565            &source,
3566            REPLICATION_PROTOCOL_ID,
3567            encoded,
3568            config.fetch_request_timeout,
3569        )
3570        .await;
3571
3572    match result {
3573        Ok(response) => {
3574            let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
3575                p2p_node
3576                    .report_trust_event(
3577                        &source,
3578                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3579                    )
3580                    .await;
3581                return FetchOutcome {
3582                    key,
3583                    result: FetchResult::SourceFailed,
3584                };
3585            };
3586
3587            match resp_msg.body {
3588                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
3589                    key: resp_key,
3590                    data,
3591                }) => {
3592                    // Validate the response key matches the requested key.
3593                    // A malicious peer could serve valid data for a different
3594                    // key, passing integrity checks while the requested key
3595                    // is falsely marked as fetched.
3596                    if resp_key != key {
3597                        warn!(
3598                            "Fetch response key mismatch: requested {}, got {}",
3599                            hex::encode(key),
3600                            hex::encode(resp_key)
3601                        );
3602                        p2p_node
3603                            .report_trust_event(
3604                                &source,
3605                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3606                            )
3607                            .await;
3608                        return FetchOutcome {
3609                            key,
3610                            result: FetchResult::IntegrityFailed,
3611                        };
3612                    }
3613
3614                    // Enforce chunk size invariant on fetched data.
3615                    // Checked before the content-address hash to avoid
3616                    // hashing up to 10 MiB of oversized junk data.
3617                    if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
3618                        warn!(
3619                            "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
3620                            hex::encode(resp_key),
3621                            data.len(),
3622                            crate::ant_protocol::MAX_CHUNK_SIZE,
3623                        );
3624                        p2p_node
3625                            .report_trust_event(
3626                                &source,
3627                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3628                            )
3629                            .await;
3630                        return FetchOutcome {
3631                            key,
3632                            result: FetchResult::IntegrityFailed,
3633                        };
3634                    }
3635
3636                    // Content-address integrity check.
3637                    let computed = crate::client::compute_address(&data);
3638                    if computed != resp_key {
3639                        warn!(
3640                            "Fetched record integrity check failed: expected {}, got {}",
3641                            hex::encode(resp_key),
3642                            hex::encode(computed)
3643                        );
3644                        p2p_node
3645                            .report_trust_event(
3646                                &source,
3647                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3648                            )
3649                            .await;
3650                        return FetchOutcome {
3651                            key,
3652                            result: FetchResult::IntegrityFailed,
3653                        };
3654                    }
3655
3656                    if let Err(e) = storage.put(&resp_key, &data).await {
3657                        warn!(
3658                            "Failed to store fetched record {}: {e}",
3659                            hex::encode(resp_key)
3660                        );
3661                        return FetchOutcome {
3662                            key,
3663                            result: FetchResult::SourceFailed,
3664                        };
3665                    }
3666
3667                    FetchOutcome {
3668                        key,
3669                        result: FetchResult::Stored,
3670                    }
3671                }
3672                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
3673                    ..
3674                }) => {
3675                    // This peer was selected as a fetch source because it
3676                    // recently answered `Present` during verification. A
3677                    // subsequent NotFound is evidence of a stale/false claim
3678                    // or chunk wiping, so penalize lightly and try another
3679                    // verified source.
3680                    warn!(
3681                        "Fetch: verified source {source} returned NotFound for {}",
3682                        hex::encode(key)
3683                    );
3684                    p2p_node
3685                        .report_trust_event(
3686                            &source,
3687                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3688                        )
3689                        .await;
3690                    FetchOutcome {
3691                        key,
3692                        result: FetchResult::SourceFailed,
3693                    }
3694                }
3695                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
3696                    reason,
3697                    ..
3698                }) => {
3699                    warn!(
3700                        "Fetch: peer {source} returned error for {}: {reason}",
3701                        hex::encode(key)
3702                    );
3703                    p2p_node
3704                        .report_trust_event(
3705                            &source,
3706                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3707                        )
3708                        .await;
3709                    FetchOutcome {
3710                        key,
3711                        result: FetchResult::SourceFailed,
3712                    }
3713                }
3714                _ => {
3715                    // Unexpected message type — treat as malformed.
3716                    p2p_node
3717                        .report_trust_event(
3718                            &source,
3719                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3720                        )
3721                        .await;
3722                    FetchOutcome {
3723                        key,
3724                        result: FetchResult::SourceFailed,
3725                    }
3726                }
3727            }
3728        }
3729        Err(e) => {
3730            debug!("Fetch request to {source} failed: {e}");
3731            // No ApplicationFailure here — P2PNode::send_request() already
3732            // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
3733            FetchOutcome {
3734                key,
3735                result: FetchResult::SourceFailed,
3736            }
3737        }
3738    }
3739}
3740
3741// ---------------------------------------------------------------------------
3742// Audit result handler
3743// ---------------------------------------------------------------------------
3744
3745/// Format the first confirmed-failed key as a 16-hex-char label.
3746///
3747/// Pairs with `challenged_peer` to form a stable cross-host correlation
3748/// handle in the audit-failure log line, e.g.
3749///
3750/// ```text
3751/// Audit failure for <peer>: …, `first_failed_key=0x18878f1d2d9e0612`
3752/// ```
3753///
3754/// Falls back to `"0x"` when the list is empty so the log line never
3755/// contains a misleading default.
3756fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String {
3757    confirmed_failed_keys.first().map_or_else(
3758        || "0x".to_string(),
3759        |k| format!("0x{}", hex::encode(&k[..8])),
3760    )
3761}
3762
3763/// Execute the side effects for a subtree storage-commitment audit failure.
3764///
3765/// Subtree timeouts are fully graced: the multi-round, multi-chunk challenge can
3766/// legitimately time out on slow or loaded honest peers, so it never touches the
3767/// responsible-chunk audit path or its timeout accounting. Confirmed subtree
3768/// failures still penalise immediately and revoke holder credit.
3769async fn handle_subtree_failed_audit(
3770    challenged_peer: &PeerId,
3771    confirmed_failed_key_count: usize,
3772    reason: &AuditFailureReason,
3773    p2p_node: &Arc<P2PNode>,
3774    sync_state: &Arc<RwLock<NeighborSyncState>>,
3775    recent_provers: &Arc<RwLock<RecentProvers>>,
3776) {
3777    if matches!(reason, AuditFailureReason::Timeout) {
3778        debug!(
3779            "Audit timeout for {challenged_peer} fully graced \
3780             (subtree audit does not evict on timeout)"
3781        );
3782        return;
3783    }
3784
3785    // The caller already logged the rich failure line with reason + per-category
3786    // summary; avoid a redundant second error log here.
3787    let _ = confirmed_failed_key_count;
3788    {
3789        let mut state = sync_state.write().await;
3790        state.clear_active_bootstrap_claim(challenged_peer);
3791    }
3792    {
3793        let mut provers_guard = recent_provers.write().await;
3794        apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason);
3795    }
3796    p2p_node
3797        .report_trust_event(
3798            challenged_peer,
3799            TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
3800        )
3801        .await;
3802}
3803
3804/// Handle audit result: log findings and emit trust events.
3805async fn handle_subtree_audit_result(
3806    result: &AuditTickResult,
3807    p2p_node: &Arc<P2PNode>,
3808    sync_state: &Arc<RwLock<NeighborSyncState>>,
3809    recent_provers: &Arc<RwLock<RecentProvers>>,
3810    config: &ReplicationConfig,
3811) {
3812    match result {
3813        AuditTickResult::Passed {
3814            challenged_peer,
3815            keys_checked,
3816        } => {
3817            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
3818            // Peer responded normally — clear the active bootstrap claim while
3819            // retaining history so a later claim is treated as repeated abuse.
3820            {
3821                let mut state = sync_state.write().await;
3822                state.clear_active_bootstrap_claim(challenged_peer);
3823            }
3824            p2p_node
3825                .report_trust_event(
3826                    challenged_peer,
3827                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
3828                )
3829                .await;
3830        }
3831        AuditTickResult::Failed { evidence } => {
3832            if let FailureEvidence::AuditFailure {
3833                challenged_peer,
3834                confirmed_failed_keys,
3835                summary,
3836                reason,
3837                ..
3838            } = evidence
3839            {
3840                // Rich diagnostics (from main's audit-failure logging) + the
3841                // first-failed-key correlation handle.
3842                let first_failed_key = first_failed_key_label(confirmed_failed_keys);
3843                error!(
3844                    "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
3845                    confirmed_failed_keys.len(),
3846                    summary.challenged_keys,
3847                    summary.absent_keys,
3848                    summary.digest_mismatch_keys,
3849                );
3850                // Route the side effects through the subtree-only failure path.
3851                // Responsible-chunk `AuditChallenge` handling intentionally uses
3852                // its own old immediate-penalty handler below.
3853                handle_subtree_failed_audit(
3854                    challenged_peer,
3855                    confirmed_failed_keys.len(),
3856                    reason,
3857                    p2p_node,
3858                    sync_state,
3859                    recent_provers,
3860                )
3861                .await;
3862            }
3863        }
3864        AuditTickResult::BootstrapClaim { peer } => {
3865            // Gap 6: BootstrapClaimAbuse grace period in audit path.
3866            // Separate state mutation from network I/O to avoid holding the
3867            // write lock across report_trust_event.
3868            let should_report = {
3869                let now = Instant::now();
3870                let mut state = sync_state.write().await;
3871                match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
3872                {
3873                    BootstrapClaimObservation::WithinGrace { .. } => {
3874                        debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
3875                        false
3876                    }
3877                    BootstrapClaimObservation::PastGrace { first_seen } => {
3878                        warn!(
3879                            "Audit: peer {peer} claiming bootstrap past grace period \
3880                             ({:?} > {:?}), reporting abuse",
3881                            now.duration_since(first_seen),
3882                            config.bootstrap_claim_grace_period,
3883                        );
3884                        true
3885                    }
3886                    BootstrapClaimObservation::Repeated { first_seen } => {
3887                        warn!(
3888                            "Audit: peer {peer} repeated bootstrap claim after previously \
3889                             stopping; first claim was {:?} ago, reporting abuse",
3890                            now.duration_since(first_seen),
3891                        );
3892                        true
3893                    }
3894                }
3895            };
3896            if should_report {
3897                p2p_node
3898                    .report_trust_event(
3899                        peer,
3900                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3901                    )
3902                    .await;
3903            }
3904        }
3905        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
3906    }
3907}
3908
3909/// Whether a confirmed audit failure with this reason clears the peer's active
3910/// bootstrap claim. A `Timeout` does not (the peer may still be legitimately
3911/// bootstrapping); every confirmed storage-integrity reason does.
3912///
3913/// Responsible-chunk `AuditChallenge` failures use this directly: timeouts keep
3914/// the bootstrap claim but are still reported as audit failures, matching the
3915/// pre-ADR-0002 behaviour.
3916fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
3917    !matches!(reason, AuditFailureReason::Timeout)
3918}
3919
3920/// Handle the result of a responsible-chunk audit tick (audit #2): emit trust
3921/// events and manage bootstrap-claim state.
3922///
3923/// This is intentionally separate from the subtree audit result handler. A
3924/// responsible-chunk `AuditChallenge` `Failed` result reports
3925/// `ApplicationFailure` immediately for every reason, including `Timeout`,
3926/// restoring the pre-ADR-0002 behaviour.
3927async fn handle_audit_result(
3928    result: &AuditTickResult,
3929    p2p_node: &Arc<P2PNode>,
3930    sync_state: &Arc<RwLock<NeighborSyncState>>,
3931    config: &ReplicationConfig,
3932) {
3933    match result {
3934        AuditTickResult::Passed {
3935            challenged_peer,
3936            keys_checked,
3937        } => {
3938            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
3939            {
3940                let mut state = sync_state.write().await;
3941                state.clear_active_bootstrap_claim(challenged_peer);
3942            }
3943            p2p_node
3944                .report_trust_event(
3945                    challenged_peer,
3946                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
3947                )
3948                .await;
3949        }
3950        AuditTickResult::Failed { evidence } => {
3951            if let FailureEvidence::AuditFailure {
3952                challenged_peer,
3953                confirmed_failed_keys,
3954                summary,
3955                reason,
3956                ..
3957            } = evidence
3958            {
3959                let first_failed_key = first_failed_key_label(confirmed_failed_keys);
3960                error!(
3961                    "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
3962                    confirmed_failed_keys.len(),
3963                    summary.challenged_keys,
3964                    summary.absent_keys,
3965                    summary.digest_mismatch_keys,
3966                );
3967                if audit_failure_clears_bootstrap_claim(reason) {
3968                    let mut state = sync_state.write().await;
3969                    state.clear_active_bootstrap_claim(challenged_peer);
3970                } else {
3971                    debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
3972                }
3973                p2p_node
3974                    .report_trust_event(
3975                        challenged_peer,
3976                        TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
3977                    )
3978                    .await;
3979            }
3980        }
3981        AuditTickResult::BootstrapClaim { peer } => {
3982            let should_report = {
3983                let now = Instant::now();
3984                let mut state = sync_state.write().await;
3985                match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
3986                {
3987                    BootstrapClaimObservation::WithinGrace { .. } => {
3988                        debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
3989                        false
3990                    }
3991                    BootstrapClaimObservation::PastGrace { first_seen } => {
3992                        warn!(
3993                            "Audit: peer {peer} claiming bootstrap past grace period \
3994                             ({:?} > {:?}), reporting abuse",
3995                            now.duration_since(first_seen),
3996                            config.bootstrap_claim_grace_period,
3997                        );
3998                        true
3999                    }
4000                    BootstrapClaimObservation::Repeated { first_seen } => {
4001                        warn!(
4002                            "Audit: peer {peer} repeated bootstrap claim after previously \
4003                             stopping; first claim was {:?} ago, reporting abuse",
4004                            now.duration_since(first_seen),
4005                        );
4006                        true
4007                    }
4008                }
4009            };
4010            if should_report {
4011                p2p_node
4012                    .report_trust_event(
4013                        peer,
4014                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
4015                    )
4016                    .await;
4017            }
4018        }
4019        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
4020    }
4021}
4022
4023/// Whether a confirmed audit failure with this reason should revoke the
4024/// peer's `recent_provers` holder credit immediately (v12 §6).
4025///
4026/// `true` for any reason where the peer actually answered (or admitted
4027/// it cannot): `DigestMismatch`, `KeyAbsent`, `Rejected` ("missing
4028/// bytes for committed key"), `MalformedResponse` — these prove the
4029/// peer no longer holds what it committed to, so it must not keep
4030/// holder credit for the proof TTL. `false` for `Timeout`: a single
4031/// dropped packet must not strip an honest peer; the 40-min TTL is the
4032/// deliberate liveness cushion there.
4033fn audit_failure_revokes_holder_credit(reason: &AuditFailureReason) -> bool {
4034    !matches!(reason, AuditFailureReason::Timeout)
4035}
4036
4037/// Apply the holder-credit revocation decision for a confirmed audit
4038/// failure. Pure over `RecentProvers` so the handler wiring is unit-
4039/// testable without a live `P2PNode`: the production `Failed` arm of
4040/// `handle_subtree_audit_result` calls exactly this.
4041fn apply_audit_failure_credit_revocation(
4042    provers: &mut RecentProvers,
4043    challenged_peer: &PeerId,
4044    reason: &AuditFailureReason,
4045) {
4046    if audit_failure_revokes_holder_credit(reason) {
4047        provers.forget_peer(challenged_peer);
4048    }
4049}
4050
4051// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.
4052
4053// ---------------------------------------------------------------------------
4054// Storage-bound audit (ADR-0002) — gossip trigger + auditor-side ingestion
4055// ---------------------------------------------------------------------------
4056
4057/// State the gossip-audit trigger needs to spawn an audit. Bundled so the
4058/// message handler passes one value instead of a long argument list; all
4059/// fields are cheap `Arc` clones.
4060#[derive(Clone)]
4061struct GossipAuditTrigger {
4062    p2p_node: Arc<P2PNode>,
4063    config: Arc<ReplicationConfig>,
4064    recent_provers: Arc<RwLock<RecentProvers>>,
4065    sync_state: Arc<RwLock<NeighborSyncState>>,
4066    cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
4067}
4068
4069/// What a gossip ingest yields for the audit trigger: the commitment hash to
4070/// pin and the `key_count` needed to size the response deadline from the actual
4071/// `ceil(sqrt(N))` subtree (ADR-0002). Returned on every VALID gossip (changed
4072/// or not) so a stable-keyset node stays auditable — not just on its first
4073/// commitment.
4074#[derive(Debug, Clone, Copy)]
4075struct AuditTarget {
4076    pin_hash: [u8; 32],
4077    key_count: u32,
4078}
4079
4080/// Per-peer audit cooldown check-and-stamp (ADR-0002 "occasional surprise
4081/// exams, keeps load low"). Returns `true` if `peer` may be audited now (and
4082/// stamps `now`), `false` if it was audited within
4083/// `AUDIT_ON_GOSSIP_COOLDOWN_SECS`. Bounds the map under a flood of distinct
4084/// peers. Pure over the passed map so the flood/cooldown behaviour is testable
4085/// without a live node: a burst of gossips from one peer yields at most one
4086/// `true` per cooldown window.
4087fn cooldown_allows_audit(map: &mut HashMap<PeerId, Instant>, peer: &PeerId, now: Instant) -> bool {
4088    let cooldown = Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS);
4089    let known = match map.get(peer) {
4090        Some(&last) => {
4091            if now.saturating_duration_since(last) < cooldown {
4092                return false;
4093            }
4094            true
4095        }
4096        None => false,
4097    };
4098    // Bound the map under churn like its siblings (drop the oldest stamp) before
4099    // admitting a brand-new peer.
4100    if !known && map.len() >= MAX_LAST_COMMITMENT_BY_PEER {
4101        if let Some(victim) = map.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
4102            map.remove(&victim);
4103        }
4104    }
4105    map.insert(*peer, now);
4106    true
4107}
4108
4109/// The gossip-audit launch decision in ONE place so the ordering is shared
4110/// between production and its test (ADR-0002 "occasional surprise exams").
4111///
4112/// Order matters and is the security-relevant property: the per-peer cooldown is
4113/// checked-and-stamped FIRST, THEN the probability lottery (`lottery_wins`) is
4114/// applied. If the lottery were sampled first, a gossip flood would re-roll it on
4115/// every message until one won, multiplying audits. Because the cooldown is
4116/// stamped before the lottery is consulted, a LOSING ticket still consumes the
4117/// window — so each peer gets at most one audit lottery per cooldown window
4118/// regardless of how often it gossips. Production calls this with
4119/// `lottery_wins = gen_bool(AUDIT_ON_GOSSIP_PROBABILITY)`; the test calls it with
4120/// a deterministic `lottery_wins`, so a reorder regression here fails the test.
4121fn audit_launch_decision(
4122    map: &mut HashMap<PeerId, Instant>,
4123    peer: &PeerId,
4124    now: Instant,
4125    lottery_wins: bool,
4126) -> bool {
4127    // Gate 1: cooldown check-and-stamp (consumes the window even on a loss).
4128    if !cooldown_allows_audit(map, peer, now) {
4129        return false;
4130    }
4131    // Gate 2: the probability lottery.
4132    lottery_wins
4133}
4134
4135/// On a peer's *changed* gossiped commitment, maybe launch a subtree audit
4136/// (ADR-0002): fire with probability `AUDIT_ON_GOSSIP_PROBABILITY`, subject to a
4137/// per-peer cooldown, pinned to the just-ingested root. Detached so gossip
4138/// handling is never blocked on a network round-trip.
4139async fn maybe_trigger_gossip_audit(
4140    trigger: &GossipAuditTrigger,
4141    peer: &PeerId,
4142    target: AuditTarget,
4143) {
4144    // The launch decision (cooldown-then-lottery ordering) lives in the pure
4145    // `audit_launch_decision` so the ordering is shared with its test. Sample
4146    // the lottery here, then let the helper apply it AFTER the cooldown stamp.
4147    let now = Instant::now();
4148    let lottery_wins = rand::thread_rng().gen_bool(config::AUDIT_ON_GOSSIP_PROBABILITY);
4149    {
4150        let mut map = trigger.cooldown.write().await;
4151        if !audit_launch_decision(&mut map, peer, now, lottery_wins) {
4152            return;
4153        }
4154    }
4155
4156    let trigger = trigger.clone();
4157    let peer = *peer;
4158    tokio::spawn(async move {
4159        let credit = storage_commitment_audit::AuditCredit {
4160            recent_provers: &trigger.recent_provers,
4161        };
4162        let result = storage_commitment_audit::run_subtree_audit(
4163            &trigger.p2p_node,
4164            &trigger.config,
4165            &peer,
4166            target.pin_hash,
4167            target.key_count,
4168            Some(&credit),
4169        )
4170        .await;
4171        handle_subtree_audit_result(
4172            &result,
4173            &trigger.p2p_node,
4174            &trigger.sync_state,
4175            &trigger.recent_provers,
4176            &trigger.config,
4177        )
4178        .await;
4179    });
4180}
4181
4182/// Atomic check-and-stamp of the per-peer commitment sig-verify rate limit.
4183///
4184/// Returns `true` if a signature verify is allowed now (and stamps the attempt
4185/// time), `false` if the peer is within [`COMMITMENT_SIG_VERIFY_MIN_INTERVAL`]
4186/// of its last attempt. Holds one write lock across the decision so two
4187/// concurrent ingests from the same peer cannot both pass. Stamps BEFORE the
4188/// caller's expensive verify so a slow/failed verify still rate-limits the next
4189/// message. Bounds the map under a flood of distinct peer ids.
4190async fn sig_verify_rate_limit_ok(
4191    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
4192    source: &PeerId,
4193    now: Instant,
4194) -> bool {
4195    let mut attempts = sig_verify_attempts.write().await;
4196    if let Some(&last) = attempts.get(source) {
4197        if now.saturating_duration_since(last) < COMMITMENT_SIG_VERIFY_MIN_INTERVAL {
4198            return false;
4199        }
4200    }
4201    if attempts.len() >= MAX_LAST_COMMITMENT_BY_PEER && !attempts.contains_key(source) {
4202        if let Some(victim) = attempts.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
4203            attempts.remove(&victim);
4204        }
4205    }
4206    attempts.insert(*source, now);
4207    true
4208}
4209
4210/// Verify + store an inbound commitment from a gossip peer.
4211///
4212/// Called from the inbound `NeighborSyncRequest`/`Response` handlers and
4213/// the bootstrap-sync loop. Drops the commitment unless all five gates
4214/// pass:
4215///   1. `source` is in our DHT routing table (sybil/churn cap).
4216///   2. `commitment.sender_peer_id == source.as_bytes()` (peer-id
4217///      binding to the authenticated transport peer).
4218///   3. `BLAKE3(commitment.sender_public_key) == commitment.sender_peer_id`
4219///      (the embedded pubkey actually belongs to the claimed identity —
4220///      saorsa-core derives `PeerId = BLAKE3(pubkey)`).
4221///   4. `verify_commitment_signature(commitment)` succeeds against the
4222///      embedded public key. The signed payload binds the pubkey, so an
4223///      adversary cannot swap the key while keeping the body.
4224///   5. The cache has room or this is an update for an existing entry
4225///      (sybil cap, `MAX_LAST_COMMITMENT_BY_PEER`).
4226///
4227/// On all-pass, the commitment is stored as the auditor's per-peer
4228/// "last known commitment" for use as `expected_commitment_hash` in
4229/// future audits.
4230///
4231/// Failures (no commitment / mismatched peer id / bad signature) are
4232/// silent drops — gossip is best-effort and a malformed commitment from
4233/// one peer should not affect anything else.
4234///
4235/// Returns `Some(AuditTarget)` whenever a VALID commitment was stored (whether
4236/// or not its root changed), so the caller can run a probabilistic,
4237/// cooldown-gated subtree audit. Returning on *every* valid gossip — not only
4238/// changed ones — is deliberate (ADR-0002): a node with a stable key set keeps
4239/// being auditable, so it cannot pass one audit and then delete data while
4240/// re-gossiping the same root forever. The cooldown + probability bound the
4241/// audit frequency. Returns `None` only if the commitment was dropped (failed a
4242/// gate) or there is nothing to pin.
4243///
4244/// Handle a capable peer gossiping `None` (a commitment downgrade).
4245///
4246/// A capable peer that previously gossiped a commitment but now gossips `None`
4247/// is trying to drop off the audit path. Within the answerability window we keep
4248/// the cached commitment pinned AND return it as an audit target so this gossip
4249/// still schedules a subtree audit against the peer's last known commitment — if
4250/// it genuinely dropped the data, the audit fails (there is no periodic tick, so
4251/// the trigger MUST fire here or the downgrade is never re-challenged).
4252///
4253/// But this only holds within the SAME `GOSSIP_ANSWERABILITY_TTL` the responder
4254/// honours for its own retired commitment: once that elapses since we last
4255/// received the peer's commitment, an honest peer has legitimately retired that
4256/// root (its responder side `retire_current`s and lets it age out) and can no
4257/// longer answer a pin on it. Auditing it past the TTL would manufacture a false
4258/// failure, so we then forget the cached commitment (keeping the sticky
4259/// `commitment_capable` bit) and stop pinning it.
4260async fn handle_commitment_downgrade(
4261    source: &PeerId,
4262    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
4263) -> Option<AuditTarget> {
4264    let now = Instant::now();
4265    let cached = {
4266        let map = last_commitment_by_peer.read().await;
4267        map.get(source).and_then(|rec| {
4268            if !rec.commitment_capable {
4269                return None;
4270            }
4271            let last = rec.last_commitment()?;
4272            let pin = rec.commitment_hash()?;
4273            let fresh = now.saturating_duration_since(rec.received_at)
4274                < crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
4275            Some((pin, last.key_count, fresh))
4276        })
4277    };
4278    match cached {
4279        Some((pin, key_count, true)) => {
4280            warn!(
4281                "ingest_peer_commitment: commitment-capable peer {source} sent None \
4282                 (downgrade attempt); auditing against its last cached commitment"
4283            );
4284            Some(AuditTarget {
4285                pin_hash: pin,
4286                key_count,
4287            })
4288        }
4289        Some((_, _, false)) => {
4290            // Cached commitment has aged past the answerability window — forget
4291            // it so we stop pinning a root the peer is no longer obliged to
4292            // answer. Keep `commitment_capable` (sticky). Re-check freshness
4293            // UNDER the write lock (compare-and-clear): a concurrent valid gossip
4294            // from this peer may have refreshed `received_at` in the gap between
4295            // our read and write locks; if so, leave its fresh commitment intact.
4296            if let Some(rec) = last_commitment_by_peer.write().await.get_mut(source) {
4297                let still_stale = now.saturating_duration_since(rec.received_at)
4298                    >= crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
4299                if still_stale {
4300                    rec.clear_commitment();
4301                    debug!(
4302                        "ingest_peer_commitment: capable peer {source} sent None and its cached \
4303                         commitment aged past the answerability TTL; forgetting it"
4304                    );
4305                }
4306            }
4307            None
4308        }
4309        None => None,
4310    }
4311}
4312
4313async fn ingest_peer_commitment(
4314    source: &PeerId,
4315    commitment: Option<&StorageCommitment>,
4316    p2p_node: &Arc<P2PNode>,
4317    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
4318    ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
4319    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
4320) -> Option<AuditTarget> {
4321    let Some(c) = commitment else {
4322        return handle_commitment_downgrade(source, last_commitment_by_peer).await;
4323    };
4324    // RT-membership gate: only accept commitments from peers in our
4325    // routing table. Off-RT senders (sybils, drive-by relays) cannot
4326    // populate the cache, which closes the hole where a flood of
4327    // off-RT identities could fill the cap and evict honest
4328    // peers. The neighbor-sync request handler applies the same gate
4329    // before admitting inbound replication hints (see neighbor_sync.rs
4330    // `sender_in_rt`); we mirror that policy here for the commitment
4331    // piggyback.
4332    if !p2p_node.dht_manager().is_in_routing_table(source).await {
4333        debug!("ingest_peer_commitment: source {source} not in routing table (dropped)");
4334        return None;
4335    }
4336    // Peer-id binding: the commitment's claimed sender must match the
4337    // authenticated transport peer (`source`). Defeats relay/replay
4338    // and also pins which embedded public key we are about to verify
4339    // against — the verify itself trusts the embedded key, so the
4340    // peer-id binding is the link to a real identity.
4341    if &c.sender_peer_id != source.as_bytes() {
4342        warn!(
4343            "ingest_peer_commitment: sender_peer_id mismatch from {source} \
4344             (dropped, possible relay attempt)"
4345        );
4346        return None;
4347    }
4348    // Peer-id to embedded-pubkey binding: saorsa-core derives PeerId as
4349    // BLAKE3(pubkey_bytes). Without this check, a responder could sign
4350    // with a throwaway key they own and lie about which identity it
4351    // belongs to (the embedded-key signature would verify trivially).
4352    let derived_peer_id = *blake3::hash(&c.sender_public_key).as_bytes();
4353    if derived_peer_id != c.sender_peer_id {
4354        warn!(
4355            "ingest_peer_commitment: embedded pubkey does not hash to claimed peer_id for \
4356             {source} (dropped, throwaway-key attack)"
4357        );
4358        return None;
4359    }
4360    // §2 step 3 + §11 DoS: rate-limit per-peer to at most one ML-DSA
4361    // signature verify per `COMMITMENT_SIG_VERIFY_MIN_INTERVAL`. A
4362    // sybil/RT-membership-bypassing peer that flooded valid-looking
4363    // gossip would otherwise burn CPU on every message. The rate
4364    // limit is checked AFTER cheap structural gates (RT, peer-id
4365    // binding, pubkey-binding) and BEFORE the expensive sig verify.
4366    //
4367    // Tracked in `sig_verify_attempts` (separate from
4368    // last_commitment_by_peer) so EVERY attempt — successful or not —
4369    // bumps the rate-limit clock. Reading only from PeerCommitmentRecord
4370    // would skip the cap for peers we've never successfully verified,
4371    // letting a flood of invalid-but-structurally-plausible gossips
4372    // burn CPU.
4373    let now = Instant::now();
4374    if !sig_verify_rate_limit_ok(sig_verify_attempts, source, now).await {
4375        debug!(
4376            "ingest_peer_commitment: rate-limited sig verify from {source} \
4377             (< {COMMITMENT_SIG_VERIFY_MIN_INTERVAL:?} since last attempt); dropped"
4378        );
4379        return None;
4380    }
4381    // Signature verify, using the public key embedded in the commitment
4382    // itself. The pubkey is bound by the signature payload (see
4383    // commitment_signed_payload) so an adversary cannot keep the body
4384    // and swap the key to one they hold the secret for.
4385    if !crate::replication::commitment::verify_commitment_signature(c) {
4386        warn!(
4387            "ingest_peer_commitment: signature did not verify under embedded key for {source} \
4388             (dropped, forged commitment)"
4389        );
4390        return None;
4391    }
4392    // The new commitment's hash, used to store and to pin for the audit target.
4393    let new_hash = commitment_hash(c);
4394    let mut map = last_commitment_by_peer.write().await;
4395    // Sybil/churn cap: if we're at the hard cap AND this is a new peer,
4396    // evict an arbitrary existing entry to make room. Updates for peers
4397    // already in the map are always accepted (they replace, not grow).
4398    if map.len() >= MAX_LAST_COMMITMENT_BY_PEER && !map.contains_key(source) {
4399        // Drop one arbitrary entry. HashMap iter order is random which
4400        // is fine — over time PeerRemoved cleanup keeps the working set
4401        // anchored on the real RT membership; this cap only fires under
4402        // active flooding attempts.
4403        if let Some(victim) = map.keys().next().copied() {
4404            map.remove(&victim);
4405            warn!(
4406                "ingest_peer_commitment: cache full ({MAX_LAST_COMMITMENT_BY_PEER}); \
4407                 evicted {victim} to admit {source}"
4408            );
4409        }
4410    }
4411    // Preserve sticky commitment_capable across updates — once true,
4412    // always true. New entries start with capable = true (we just
4413    // verified a valid commitment from this peer).
4414    map.entry(*source)
4415        .and_modify(|r| {
4416            // set_commitment refreshes the cached hash (§13) alongside the
4417            // commitment + received_at so they never drift.
4418            r.set_commitment(c.clone(), now);
4419            r.last_sig_verify_at = now;
4420            r.commitment_capable = true; // sticky-redundant but explicit
4421        })
4422        .or_insert_with(|| PeerCommitmentRecord::from_verified(c.clone(), now));
4423    drop(map);
4424    // Record the sticky "ever v12-capable" bit in a set independent of
4425    // `last_commitment_by_peer` (whose entries can be evicted by
4426    // `PeerRemoved` and the sybil cap). This is what the §3 audit
4427    // shield and the §6 holder-eligibility closure consult to decide
4428    // whether the peer is expected to speak v12.
4429    //
4430    // Capped at `MAX_EVER_CAPABLE_PEERS` to bound memory under
4431    // identity-rotation attacks: once full, new entries are refused.
4432    // Refusal degrades over-cap peers to the behaviour before this set
4433    // existed (treated as legacy on rejoin), which is not a security
4434    // regression and preserves the historic set stable.
4435    {
4436        let mut set = ever_capable_peers.write().await;
4437        if set.contains(source) || set.len() < MAX_EVER_CAPABLE_PEERS {
4438            set.insert(*source);
4439        } else {
4440            warn!(
4441                "ingest_peer_commitment: ever_capable_peers at cap \
4442                 ({MAX_EVER_CAPABLE_PEERS}); refusing to record {source} as sticky-capable"
4443            );
4444        }
4445    }
4446    // Return an audit target for EVERY valid stored commitment (changed or
4447    // not), so the caller's cooldown+probability-gated trigger keeps a
4448    // stable-keyset peer auditable over time (ADR-0002). Only a serialization
4449    // failure (new_hash == None, unreachable for a real commitment) yields None.
4450    new_hash.map(|pin_hash| AuditTarget {
4451        pin_hash,
4452        key_count: c.key_count,
4453    })
4454}
4455
4456// ---------------------------------------------------------------------------
4457// Storage-bound audit (v12) — responder commitment rotation
4458// ---------------------------------------------------------------------------
4459
4460/// Read the current LMDB key set, build + sign a fresh
4461/// `StorageCommitment`, and rotate it into `state` as the new `current`.
4462/// The prior `current` is demoted to `previous`; the prior `previous` is
4463/// dropped (per `ResponderCommitmentState::rotate`).
4464///
4465/// For content-addressed chunks (Autonomi's chunk store), `address ==
4466/// BLAKE3(content)`, so `bytes_hash := key` and we don't have to
4467/// re-read each chunk's bytes to compute the leaf hash.
4468///
4469/// Skips (returns `Ok(())`) if the key set is empty — no commitment to
4470/// rotate. The auditor side handles "no commitment for this peer" by
4471/// falling back to the legacy plain-digest audit path.
4472async fn rebuild_and_rotate_commitment(
4473    storage: &Arc<LmdbStorage>,
4474    identity: &Arc<NodeIdentity>,
4475    state: &Arc<ResponderCommitmentState>,
4476    p2p: &Arc<P2PNode>,
4477    config: &Arc<ReplicationConfig>,
4478) -> Result<()> {
4479    use saorsa_pqc::api::sig::{MlDsaSecretKey, MlDsaVariant};
4480
4481    let stored_keys = storage
4482        .all_keys()
4483        .await
4484        .map_err(|e| Error::Storage(format!("commitment build: read keys: {e}")))?;
4485
4486    // Commit only to keys we are still RESPONSIBLE for ("want-to-hold"), not
4487    // everything currently on disk ("hold"). This is the half of the retention
4488    // contract that lets out-of-range chunks age out: a key that has left our
4489    // close group is excluded from the NEXT commitment, so within at most
4490    // RETAINED_GOSSIPED_COMMITMENTS gossip rotations it falls out of the
4491    // last-2-gossiped window, `ResponderCommitmentState::is_held` goes false,
4492    // and the pruner (which until then vetoes its deletion) reclaims it. Without
4493    // this filter the pruner's reprieve would keep re-committing stale keys
4494    // forever (the rebuild reads all_keys, so a retained-on-disk key would be
4495    // re-committed and re-gossiped every rotation — a permanent pin).
4496    let storage_empty = stored_keys.is_empty();
4497    let self_id = *p2p.peer_id();
4498    let mut keys = Vec::with_capacity(stored_keys.len());
4499    for k in stored_keys {
4500        if admission::is_responsible(&self_id, &k, p2p, config.close_group_size).await {
4501            keys.push(k);
4502        }
4503    }
4504
4505    if keys.is_empty() {
4506        if storage_empty {
4507            // Storage is genuinely empty — there is nothing to answer for, so
4508            // drop the previously advertised commitment immediately. Keeping it
4509            // would leave remote auditors pinning a hash we can never satisfy
4510            // again (the bytes are gone).
4511            if state.retained_slot_count() > 0 {
4512                debug!("Commitment rotation: storage empty, clearing retained slots");
4513                state.clear_all();
4514            }
4515            return Ok(());
4516        }
4517        // Bytes are still on disk but no key is currently in range. We must NOT
4518        // clear retention here: a peer may still be pinning a root we gossiped
4519        // moments ago and could demand its bytes in a round-2 challenge, which
4520        // we can still answer (the bytes are present). But we must STOP
4521        // advertising the stale commitment: retire it so `current()` returns
4522        // `None` and the gossip-emit sites stop re-emitting and re-stamping it.
4523        // The retired slot then ages out by its gossip-answerability TTL while
4524        // remaining answerable for in-flight pins until then. Once it ages out,
4525        // `is_held` flips false and the pruner reclaims the now-uncommitted,
4526        // out-of-range chunks. (Calling `age_out` alone would leave `current()`
4527        // pointing at the stale root, which the gossip loop would keep
4528        // re-stamping — pinning its keys forever.)
4529        debug!(
4530            "Commitment rotation: no responsible keys to commit to; retiring current commitment \
4531             (stays answerable until its gossip TTL lapses, bytes still on disk)"
4532        );
4533        state.retire_current();
4534        return Ok(());
4535    }
4536
4537    // Cap to MAX_COMMITMENT_KEY_COUNT for v12 (responder must not commit
4538    // to more than the protocol limit; auditor would reject the
4539    // commitment otherwise).
4540    let cap = commitment::MAX_COMMITMENT_KEY_COUNT as usize;
4541    if keys.len() > cap {
4542        warn!(
4543            "Commitment rotation: key set ({}) exceeds MAX_COMMITMENT_KEY_COUNT ({}); \
4544             truncating — investigate as this likely means a misconfiguration",
4545            keys.len(),
4546            cap
4547        );
4548    }
4549
4550    // INVARIANT: this module is only used with CONTENT-ADDRESSED chunks,
4551    // where `key == BLAKE3(content)`, so `bytes_hash := key` and we skip a
4552    // full chunk re-read per rotation.
4553    //
4554    // Consequence to be precise about: because the leaf is `(key, key)`,
4555    // the Merkle root commits to the SET OF KEYS, not to the bytes. The
4556    // commitment therefore binds "which keys I claim to hold"; it does NOT
4557    // by itself prove byte possession. Byte possession is enforced by the
4558    // audit-verify path, which recomputes `bytes_hash == BLAKE3(local_bytes)`
4559    // and the per-key digest against the AUDITOR'S OWN local copy of the
4560    // bytes — so a responder that holds the key list but dropped the bytes
4561    // still fails (`missing bytes for committed key` / digest mismatch).
4562    // This is sound ONLY while keys are content addresses. If this module
4563    // is ever reused for non-content-addressed records (`bytes_hash != key`),
4564    // the `(k, k)` shortcut would let a byte-less node forge a valid root and
4565    // MUST be replaced with `(key, BLAKE3(bytes))` computed from real bytes.
4566    let entries: Vec<_> = keys.into_iter().take(cap).map(|k| (k, k)).collect();
4567
4568    // No-op-rotation guard: compute just the Merkle root from `entries`
4569    // and compare against the currently-advertised commitment's root.
4570    // If they match, the key set is unchanged and a new rotation would
4571    // only swap a randomized ML-DSA signature for a fresh one — same
4572    // content, different commitment_hash. That invalidates every
4573    // outstanding `recent_provers` credit on this node across the
4574    // close group with no security benefit, breaking steady-state
4575    // quorum liveness on large nodes that can't re-audit every key
4576    // every rotation interval. Skip the rotation entirely when the
4577    // tree is unchanged.
4578    // Build the tree ONCE here (moving `entries`): it serves both the no-op
4579    // root check below and, if we proceed, the signed commitment via
4580    // `build_from_tree` (§11 — previously the tree was built here and AGAIN
4581    // inside `BuiltCommitment::build`).
4582    let candidate_tree = commitment::MerkleTree::build(entries)
4583        .map_err(|e| Error::Crypto(format!("commitment tree build: {e}")))?;
4584    let candidate_root = candidate_tree.root();
4585    if let Some(current) = state.current() {
4586        if current.commitment().root == candidate_root {
4587            debug!(
4588                "Commitment rotation: key set unchanged (root={}); skipping no-op re-sign",
4589                hex::encode(candidate_root)
4590            );
4591            // Even though we skip re-signing (to avoid invalidating holder
4592            // credit), retention must still advance on the wall clock: a
4593            // previously-gossiped commitment that holds a now-out-of-range key
4594            // must be able to age out of the answerability window even when the
4595            // committed key set is frozen here for many rotations. Without this,
4596            // the no-op guard would pin a stale slot — and its key — forever.
4597            state.age_out();
4598            return Ok(());
4599        }
4600    }
4601
4602    let sk_bytes = identity.secret_key_bytes().to_vec();
4603    let sk = MlDsaSecretKey::from_bytes(MlDsaVariant::MlDsa65, &sk_bytes)
4604        .map_err(|e| Error::Crypto(format!("commitment build: load sk: {e}")))?;
4605    let pk_bytes = identity.public_key().as_bytes().to_vec();
4606    let peer_id_bytes = *p2p.peer_id().as_bytes();
4607
4608    let built = commitment_state::BuiltCommitment::build_from_tree(
4609        candidate_tree,
4610        &peer_id_bytes,
4611        &sk,
4612        &pk_bytes,
4613    )
4614    .map_err(|e| Error::Crypto(format!("commitment build: {e}")))?;
4615
4616    let hash = hex::encode(built.hash());
4617    let key_count = built.commitment().key_count;
4618    state.rotate(built);
4619    info!("Storage commitment rotated: hash={hash} key_count={key_count}");
4620    Ok(())
4621}
4622
4623#[cfg(test)]
4624#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
4625mod tests {
4626    use super::{
4627        apply_audit_failure_credit_revocation, audit_failure_clears_bootstrap_claim,
4628        audit_failure_revokes_holder_credit, audit_launch_decision, config, cooldown_allows_audit,
4629        first_failed_key_label, fresh_offer_payment_context, paid_notify_payment_context,
4630    };
4631    use crate::payment::VerificationContext;
4632    use crate::replication::recent_provers::RecentProvers;
4633    use crate::replication::types::AuditFailureReason;
4634    use saorsa_core::identity::PeerId;
4635    use std::collections::HashMap;
4636    use std::time::Duration;
4637    use std::time::Instant;
4638
4639    fn test_peer(b: u8) -> PeerId {
4640        let mut bytes = [0u8; 32];
4641        bytes[0] = b;
4642        PeerId::from_bytes(bytes)
4643    }
4644
4645    fn test_key(b: u8) -> crate::ant_protocol::XorName {
4646        let mut k = [0u8; 32];
4647        k[0] = b;
4648        k
4649    }
4650
4651    #[test]
4652    fn fresh_offer_runs_client_put_payment_checks() {
4653        assert_eq!(
4654            fresh_offer_payment_context(),
4655            VerificationContext::ClientPut
4656        );
4657    }
4658
4659    #[test]
4660    fn paid_notify_uses_paid_list_admission_payment_checks() {
4661        assert_eq!(
4662            paid_notify_payment_context(),
4663            VerificationContext::PaidListAdmission
4664        );
4665    }
4666
4667    #[test]
4668    fn audit_timeout_preserves_active_bootstrap_claim() {
4669        assert!(!audit_failure_clears_bootstrap_claim(
4670            &AuditFailureReason::Timeout
4671        ));
4672    }
4673
4674    fn strike_peer(b: u8) -> PeerId {
4675        let mut bytes = [0u8; 32];
4676        bytes[0] = b;
4677        PeerId::from_bytes(bytes)
4678    }
4679
4680    // ADR-0002: "occasional surprise exams, keeps load low" — the per-peer
4681    // cooldown must collapse a gossip flood into at most one audit per window.
4682
4683    #[test]
4684    fn gossip_flood_yields_at_most_one_audit_per_cooldown_window() {
4685        let peer = strike_peer(1);
4686        let mut map: HashMap<PeerId, Instant> = HashMap::new();
4687        let t0 = Instant::now();
4688        // First gossip in the window passes; a burst of further gossips at the
4689        // same instant are all suppressed.
4690        assert!(cooldown_allows_audit(&mut map, &peer, t0));
4691        let mut passed = 1;
4692        for _ in 0..100 {
4693            if cooldown_allows_audit(&mut map, &peer, t0) {
4694                passed += 1;
4695            }
4696        }
4697        assert_eq!(
4698            passed, 1,
4699            "a flood at one instant must trigger exactly one audit"
4700        );
4701    }
4702
4703    // ADR-0002 ordering invariant: `maybe_trigger_gossip_audit` stamps the
4704    // per-peer cooldown BEFORE the probability lottery, so a LOSING ticket still
4705    // consumes the window. This is the property the isolated cooldown tests above
4706    // cannot see: they never sample the lottery, so a regression that reordered
4707    // the gates (sample probability first, only stamp the cooldown on a win)
4708    // would still pass them while breaking flood-resistance: a flood would then
4709    // re-roll the lottery on EVERY message until one won, multiplying audits.
4710    //
4711    // We model the exact production gate order (cooldown-then-lottery) with a
4712    // lottery driven by a fixed outcome instead of `gen_bool(..)`. The first
4713    // message LOSES the lottery; the remaining flood messages all WIN. With the
4714    // production order, the losing first ticket burns the window and every later
4715    // winner in the same window is blocked, so there are 0 audits this window. If
4716    // the gates were flipped, the second message's winning ticket would slip
4717    // through. The window only reopens after the cooldown elapses.
4718    //
4719    // FLIPS IF: the lottery is sampled before `cooldown_allows_audit` (a losing
4720    // ticket no longer consumes the window), re-enabling a flood-amplified audit
4721    // storm.
4722    #[test]
4723    fn losing_lottery_still_consumes_cooldown_window() {
4724        // Faithful re-implementation of the two gates in
4725        // `maybe_trigger_gossip_audit`, with the lottery outcome made
4726        // deterministic instead of `rand::thread_rng().gen_bool(..)`.
4727        // Calls the SHIPPED `audit_launch_decision` (the same function
4728        // `maybe_trigger_gossip_audit` uses), so a reorder of the two gates in
4729        // production fails this test — not a local reimplementation.
4730        let peer = strike_peer(3);
4731        let mut map: HashMap<PeerId, Instant> = HashMap::new();
4732        let t0 = Instant::now();
4733
4734        // First flooded message at t0 LOSES the lottery, but the cooldown is
4735        // stamped BEFORE the lottery is consulted, so the window is now consumed.
4736        assert!(
4737            !audit_launch_decision(&mut map, &peer, t0, false),
4738            "a losing ticket launches no audit"
4739        );
4740
4741        // 99 more flooded messages at the same instant would all WIN the lottery,
4742        // yet every one must be blocked by the cooldown the loser already stamped.
4743        // (If production sampled the lottery FIRST, these would each get a fresh
4744        // roll and audits would multiply — this assertion catches that reorder.)
4745        let mut audits = 0;
4746        for _ in 0..99 {
4747            if audit_launch_decision(&mut map, &peer, t0, true) {
4748                audits += 1;
4749            }
4750        }
4751        assert_eq!(
4752            audits, 0,
4753            "a losing first ticket must consume the window so no later flooded \
4754             message in the same window can audit"
4755        );
4756
4757        // The window only reopens after the cooldown elapses; the next winning
4758        // ticket then launches exactly one audit.
4759        let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
4760        assert!(
4761            audit_launch_decision(&mut map, &peer, after, true),
4762            "after the cooldown a winning ticket audits again"
4763        );
4764    }
4765
4766    #[test]
4767    fn cooldown_lets_audit_through_after_the_window() {
4768        let peer = strike_peer(2);
4769        let mut map: HashMap<PeerId, Instant> = HashMap::new();
4770        let t0 = Instant::now();
4771        assert!(cooldown_allows_audit(&mut map, &peer, t0));
4772        // Within the window: suppressed.
4773        let within = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS - 1);
4774        assert!(!cooldown_allows_audit(&mut map, &peer, within));
4775        // Past the window: allowed again.
4776        let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
4777        assert!(cooldown_allows_audit(&mut map, &peer, after));
4778    }
4779
4780    #[test]
4781    fn cooldown_is_per_peer_independent() {
4782        let mut map: HashMap<PeerId, Instant> = HashMap::new();
4783        let t0 = Instant::now();
4784        // Different peers each get their own first-audit pass at the same instant.
4785        for i in 0..20u8 {
4786            assert!(
4787                cooldown_allows_audit(&mut map, &strike_peer(i), t0),
4788                "peer {i} should be auditable independently"
4789            );
4790        }
4791    }
4792
4793    #[test]
4794    fn audit_on_gossip_constants_match_adr() {
4795        // Tripwire on the ADR-locked tunables. The spot-check count sits at the
4796        // top of the auditor's 3..=5 band (the auditor clamps to that band, so
4797        // values above 5 would silently never be requested).
4798        assert_eq!(config::AUDIT_SPOTCHECK_COUNT, 5);
4799        assert!((config::AUDIT_ON_GOSSIP_PROBABILITY - 0.2).abs() < f64::EPSILON);
4800        assert_eq!(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS, 30 * 60);
4801    }
4802
4803    // (d) A confirmed storage-integrity failure penalizes immediately and
4804    // revokes credit; it is not a timeout.
4805    #[test]
4806    fn digest_mismatch_is_not_a_timeout_and_penalizes_immediately() {
4807        assert!(audit_failure_clears_bootstrap_claim(
4808            &AuditFailureReason::DigestMismatch
4809        ));
4810        assert!(audit_failure_revokes_holder_credit(
4811            &AuditFailureReason::DigestMismatch
4812        ));
4813    }
4814
4815    /// The exact decision the `Failed` arm of `handle_subtree_audit_result`
4816    /// uses: confirmed failures revoke credit, `Timeout` does not.
4817    #[test]
4818    fn confirmed_failures_revoke_credit_timeout_does_not() {
4819        for reason in [
4820            AuditFailureReason::MalformedResponse,
4821            AuditFailureReason::DigestMismatch,
4822            AuditFailureReason::KeyAbsent,
4823            AuditFailureReason::Rejected,
4824        ] {
4825            assert!(
4826                audit_failure_revokes_holder_credit(&reason),
4827                "confirmed failure {reason:?} must revoke holder credit"
4828            );
4829        }
4830        assert!(
4831            !audit_failure_revokes_holder_credit(&AuditFailureReason::Timeout),
4832            "Timeout must NOT revoke credit (single dropped packet != storage loss)"
4833        );
4834    }
4835
4836    /// Wiring test for the security fix: the helper the handler calls
4837    /// actually strips a credited peer on a confirmed failure
4838    /// (`DigestMismatch`), and actually RETAINS credit on `Timeout`.
4839    /// Records genuine credit first so neither assertion is vacuous;
4840    /// this fails if `forget_peer` stops being called, or if the
4841    /// `Timeout` exclusion is dropped (both verified by mutation).
4842    #[test]
4843    fn apply_revocation_strips_on_digest_mismatch_retains_on_timeout() {
4844        let peer = test_peer(0xAB);
4845        let key = test_key(1);
4846        let hash = [0xCD; 32];
4847
4848        // Confirmed failure -> credit revoked.
4849        let mut provers = RecentProvers::new();
4850        provers.record_proof(key, peer, hash, Instant::now());
4851        assert!(
4852            provers.is_credited_holder(&key, &peer, &hash),
4853            "precondition: peer credited before failure"
4854        );
4855        apply_audit_failure_credit_revocation(
4856            &mut provers,
4857            &peer,
4858            &AuditFailureReason::DigestMismatch,
4859        );
4860        assert!(
4861            !provers.is_credited_holder(&key, &peer, &hash),
4862            "DigestMismatch must strip the peer's holder credit"
4863        );
4864
4865        // Timeout -> credit retained.
4866        let mut provers_timeout = RecentProvers::new();
4867        provers_timeout.record_proof(key, peer, hash, Instant::now());
4868        apply_audit_failure_credit_revocation(
4869            &mut provers_timeout,
4870            &peer,
4871            &AuditFailureReason::Timeout,
4872        );
4873        assert!(
4874            provers_timeout.is_credited_holder(&key, &peer, &hash),
4875            "Timeout must retain holder credit (deliberate liveness cushion)"
4876        );
4877    }
4878
4879    #[test]
4880    fn decoded_audit_failures_clear_active_bootstrap_claim() {
4881        for reason in [
4882            AuditFailureReason::MalformedResponse,
4883            AuditFailureReason::DigestMismatch,
4884            AuditFailureReason::KeyAbsent,
4885            AuditFailureReason::Rejected,
4886        ] {
4887            assert!(
4888                audit_failure_clears_bootstrap_claim(&reason),
4889                "decoded non-bootstrap failure {reason:?} should clear active claim"
4890            );
4891        }
4892    }
4893
4894    #[test]
4895    fn first_failed_key_label_truncates_to_16_hex_chars() {
4896        // The high-order 8 bytes of the XorName determine the label so an
4897        // operator can group audit-failures on the same chunk prefix.
4898        let mut key = [0u8; 32];
4899        key[0] = 0x18;
4900        key[7] = 0xff;
4901        // Low-order bytes (positions 8..32) are deliberately set to 0xAA
4902        // to verify they are NOT included in the label.
4903        for byte in &mut key[8..] {
4904            *byte = 0xAA;
4905        }
4906        let label = first_failed_key_label(&[key]);
4907        // Only the first 8 bytes are encoded, low-order bytes are dropped.
4908        assert_eq!(label, "0x18000000000000ff");
4909        assert_eq!(label.len(), "0x".len() + 16);
4910    }
4911
4912    #[test]
4913    fn first_failed_key_label_falls_back_when_empty() {
4914        // Should never happen in production (audit failure handling rejects
4915        // empty sets), but the formatter must still produce a valid label
4916        // so the log line doesn't contain a misleading default.
4917        assert_eq!(first_failed_key_label(&[]), "0x");
4918    }
4919
4920    #[test]
4921    fn first_failed_key_label_uses_first_key_only() {
4922        let first = [0x11u8; 32];
4923        let second = [0x22u8; 32];
4924        assert_eq!(
4925            first_failed_key_label(&[first, second]),
4926            format!("0x{}", hex::encode(&first[..8]))
4927        );
4928    }
4929}