Skip to main content

ant_node/replication/
mod.rs

1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod commitment;
21pub mod commitment_state;
22pub mod config;
23pub mod fresh;
24pub mod neighbor_sync;
25pub mod paid_list;
26pub mod protocol;
27pub mod pruning;
28pub mod quorum;
29pub mod recent_provers;
30pub mod scheduling;
31pub mod storage_commitment_audit;
32pub mod subtree;
33pub mod types;
34
35use std::collections::{HashMap, HashSet};
36use std::path::Path;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use std::pin::Pin;
41
42use crate::logging::{debug, error, info, warn};
43use futures::stream::FuturesUnordered;
44use futures::{Future, StreamExt};
45use rand::Rng;
46use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
47use tokio::task::JoinHandle;
48use tokio_util::sync::CancellationToken;
49
50use crate::ant_protocol::XorName;
51use crate::error::{Error, Result};
52use crate::payment::{PaymentVerifier, VerificationContext};
53use crate::replication::audit::AuditTickResult;
54use crate::replication::commitment::{commitment_hash, StorageCommitment};
55use crate::replication::commitment_state::{PeerCommitmentRecord, ResponderCommitmentState};
56use crate::replication::config::{
57    max_parallel_fetch, storage_admission_width, ReplicationConfig, MAX_AUDIT_RESPONSES_PER_PEER,
58    MAX_CONCURRENT_AUDIT_RESPONSES, MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID,
59};
60use crate::replication::paid_list::PaidList;
61use crate::replication::protocol::{
62    FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
63    VerificationResponse,
64};
65use crate::replication::quorum::KeyVerificationOutcome;
66use crate::replication::recent_provers::RecentProvers;
67use crate::replication::scheduling::ReplicationQueues;
68use crate::replication::types::{
69    AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
70    NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
71};
72use crate::storage::LmdbStorage;
73use saorsa_core::identity::{NodeIdentity, PeerId};
74use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
75
76// ---------------------------------------------------------------------------
77// Constants
78// ---------------------------------------------------------------------------
79
80/// Prefix used by saorsa-core's request-response mechanism.
81const RR_PREFIX: &str = "/rr/";
82
83fn fresh_offer_payment_context() -> VerificationContext {
84    VerificationContext::ClientPut
85}
86
87fn paid_notify_payment_context() -> VerificationContext {
88    VerificationContext::PaidListAdmission
89}
90
91/// Boxed future type for in-flight fetch tasks.
92type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
93
94/// Shared dependencies for one verification worker cycle.
95struct VerificationCycleContext<'a> {
96    p2p_node: &'a Arc<P2PNode>,
97    paid_list: &'a Arc<PaidList>,
98    storage: &'a Arc<LmdbStorage>,
99    queues: &'a Arc<RwLock<ReplicationQueues>>,
100    config: &'a ReplicationConfig,
101    bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
102    is_bootstrapping: &'a Arc<RwLock<bool>>,
103    bootstrap_complete_notify: &'a Arc<Notify>,
104    /// v12 §6 holder-eligibility inputs. The verifier downgrades a
105    /// peer's Present claim to Unresolved unless they're a credited
106    /// holder of the key (i.e. they recently passed a commitment-bound
107    /// audit on it under their currently-credited commitment hash).
108    last_commitment_by_peer: &'a Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
109    ever_capable_peers: &'a Arc<RwLock<HashSet<PeerId>>>,
110    recent_provers: &'a Arc<RwLock<RecentProvers>>,
111}
112
113/// Fetch worker polling interval in milliseconds.
114const FETCH_WORKER_POLL_MS: u64 = 100;
115
116/// Verification worker polling interval in milliseconds.
117const VERIFICATION_WORKER_POLL_MS: u64 = 250;
118
119/// Verification cycle duration that is worth surfacing at info level.
120const VERIFICATION_CYCLE_SLOW_LOG_MS: u128 = 500;
121
122/// Standard trust event weight for per-operation success/failure signals.
123///
124/// Used for individual replication fetch outcomes, integrity check failures,
125/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
126/// is reserved for confirmed audit failures.
127const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
128
129/// Bootstrap drain check interval in seconds.
130const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
131
132/// How often the responder rebuilds + rotates its storage commitment.
133///
134/// Each rebuild scans LMDB to compute leaf hashes; for ~10k keys this is
135/// sub-100ms (BLAKE3 + tree build). Retention is gossip-anchored, NOT
136/// rotation-anchored: the responder stays answerable for the current
137/// commitment plus the last `RETAINED_GOSSIPED_COMMITMENTS` (= 2) it
138/// actually gossiped, each kept for `GOSSIP_ANSWERABILITY_TTL` (3 h) after
139/// its last emission (see `commitment_state`). So the rotation cadence does
140/// not by itself bound answerability — a gossiped commitment stays
141/// answerable across rotations until its gossip TTL lapses.
142///
143/// Default: 1 hour, aligned with the worst-case neighbor-sync cooldown
144/// (`NEIGHBOR_SYNC_COOLDOWN_SECS = 3600`). Because the gossip TTL (3 h)
145/// comfortably exceeds the gap between our rotation and the next gossip
146/// arrival at a remote peer, this prevents the "unknown commitment hash" ->
147/// Idle audit-skip pattern from being the common case.
148///
149/// Why not faster: the v12 pin is bound to a specific point-in-time
150/// commitment, so rotation isn't security-critical for pin freshness —
151/// only for keeping the committed key set current as the responder
152/// writes new keys. 1 hour is plenty for that, and slow enough that
153/// honest auditors mostly hit `current` or `previous` rather than the
154/// "rotated past" case.
155const COMMITMENT_ROTATION_INTERVAL_SECS: u64 = 3600;
156
157/// Minimum interval between commitment signature verifications for a
158/// single peer (v10/v12 §2 step 3 + §11 `DoS`).
159///
160/// A sybil that bypasses the routing-table gate (e.g. by transient
161/// bucket pollution) could otherwise force one ML-DSA-65 verify (~1 ms)
162/// per gossip message. This rate limit caps the verify-per-peer rate
163/// at 1/min, which is comfortably above the legitimate gossip cadence
164/// (the 10-20 min neighbor-sync round on each peer).
165const COMMITMENT_SIG_VERIFY_MIN_INTERVAL: Duration = Duration::from_secs(60);
166
167/// Hard cap on the size of `last_commitment_by_peer`.
168///
169/// Bounds the per-process memory cost of the auditor's per-peer
170/// commitment cache. Each entry holds a `StorageCommitment`
171/// (~5 KiB: 1952-byte pubkey + 3293-byte signature + small fields).
172/// At 4096 entries the cache is ~20 MiB, which comfortably covers a
173/// realistic close-group neighborhood. When the cap is hit, one
174/// arbitrary existing entry is evicted on insert (`HashMap` iteration
175/// order is unspecified; we do not track insertion order). The
176/// `PeerRemoved` handler proactively drops entries as the DHT
177/// detects departures, and `ingest_peer_commitment` only admits
178/// commitments from peers currently in the routing table — together
179/// the cap is the third line of defence against sybil/churn flooding.
180const MAX_LAST_COMMITMENT_BY_PEER: usize = 4096;
181
182/// Cap on the sticky `ever_capable_peers` set. Bounds memory so a
183/// long-running bootstrap node cannot have the set grow without limit
184/// from peer-id churn. Sized at 4x `MAX_LAST_COMMITMENT_BY_PEER` so
185/// the set comfortably outlives normal LRU churn but still caps the
186/// blast radius of identity-rotation attacks. Once full we refuse new
187/// inserts (no eviction) — keeps the historic set stable; new v12
188/// peers above the cap are treated as legacy on rejoin, which matches
189/// the behaviour before this set existed, not a security regression.
190const MAX_EVER_CAPABLE_PEERS: usize = 4 * MAX_LAST_COMMITMENT_BY_PEER;
191
192// ---------------------------------------------------------------------------
193// ReplicationEngine
194// ---------------------------------------------------------------------------
195
196/// The replication engine manages all replication background tasks and state.
197pub struct ReplicationEngine {
198    /// Replication configuration (shared across spawned tasks).
199    config: Arc<ReplicationConfig>,
200    /// P2P networking node.
201    p2p_node: Arc<P2PNode>,
202    /// Local chunk storage.
203    storage: Arc<LmdbStorage>,
204    /// Persistent paid-for-list.
205    paid_list: Arc<PaidList>,
206    /// Payment verifier for `PoP` validation.
207    payment_verifier: Arc<PaymentVerifier>,
208    /// Replication pipeline queues.
209    queues: Arc<RwLock<ReplicationQueues>>,
210    /// Neighbor sync cycle state.
211    sync_state: Arc<RwLock<NeighborSyncState>>,
212    /// Per-peer sync history (for `RepairOpportunity`).
213    ///
214    /// This map grows with peer churn and is intentionally unbounded: entries
215    /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
216    /// naturally bounded by the routing table's k-bucket capacity.
217    sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
218    /// Per-peer consecutive audit-timeout strike counter.
219    ///
220    /// A timeout increments the peer's strike count; a successful audit
221    /// response resets it to zero. Only when a peer reaches
222    /// [`config::AUDIT_TIMEOUT_STRIKE_THRESHOLD`] consecutive timeouts is a
223    /// timeout reported as an `ApplicationFailure` trust event. This separates
224    /// honest transient slowness (resets on the next normal response) from a
225    /// peer that does not store the data and is slow on every audit. Lives
226    /// outside `NeighborSyncState` so it is never wiped by a neighbor-sync
227    /// cycle reset. Grows with peer churn like `sync_history`; entries are a
228    /// single `u32` and peer IDs are bounded by k-bucket capacity.
229    audit_timeout_strikes: Arc<RwLock<HashMap<PeerId, u32>>>,
230    /// Per-peer cooldown for gossip-triggered subtree audits (ADR-0002).
231    ///
232    /// Records when each peer was last audited so a burst of gossiped
233    /// commitment changes cannot spawn back-to-back audits of the same peer.
234    /// Bounded by routing-table membership and cleaned on `PeerRemoved`.
235    audit_on_gossip_cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
236    /// Completed local neighbor-sync cycle epoch for proof maturity.
237    sync_cycle_epoch: Arc<RwLock<u64>>,
238    /// Per-key repair proof tracking for audit eligibility.
239    repair_proofs: Arc<RwLock<RepairProofs>>,
240    /// Bootstrap state tracking.
241    bootstrap_state: Arc<RwLock<BootstrapState>>,
242    /// Whether this node is currently bootstrapping.
243    is_bootstrapping: Arc<RwLock<bool>>,
244    /// Trigger for early neighbor sync (signalled on topology changes).
245    sync_trigger: Arc<Notify>,
246    /// Notified when `is_bootstrapping` transitions from `true` to `false`.
247    bootstrap_complete_notify: Arc<Notify>,
248    /// Node identity (for signing storage commitments).
249    ///
250    /// Phase 3 of the v12 storage-bound audit design. The responder
251    /// uses this to sign its periodically-built `StorageCommitment`.
252    identity: Arc<NodeIdentity>,
253    /// Responder-side commitment state (two-slot atomic rotation).
254    ///
255    /// Periodically rebuilt from the live LMDB key set; gossiped on
256    /// outbound `NeighborSyncRequest`/`Response`; consulted by the
257    /// commitment-bound audit handler.
258    commitment_state: Arc<ResponderCommitmentState>,
259    /// Auditor-side per-peer commitment record (last known commitment +
260    /// sticky `commitment_capable` flag).
261    ///
262    /// Populated whenever an inbound gossip carries a verified
263    /// commitment from the sender. Used by `audit_tick` to snapshot
264    /// `expected_commitment_hash` into outbound challenges, and by
265    /// holder-eligibility (§6) to decide whether a peer's `recent_provers`
266    /// proof should be honoured. The sticky `commitment_capable` flag
267    /// flips true on first successful ingest and never reverts (§2
268    /// step 5).
269    last_commitment_by_peer: Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
270    /// Sticky set of peer IDs we have EVER seen carrying a v12
271    /// commitment, independent of whether their commitment bytes are
272    /// still in `last_commitment_by_peer`. The §6 holder-eligibility
273    /// closure consults this set to keep treating churned-out
274    /// previously-v12 peers as v12-capable (rather than degrading them
275    /// to "legacy" credit-unconditionally) when they re-appear on the
276    /// network before their next gossip arrives. Bounded growth: even
277    /// at one million peers seen over the node's lifetime, the set is
278    /// 32 MB.
279    ever_capable_peers: Arc<RwLock<HashSet<PeerId>>>,
280    /// Auditor-side holder-eligibility cache (v12 §6).
281    ///
282    /// Recorded on successful commitment-bound audit; read by future
283    /// quorum / paid-list eligibility checks (phase-3 stretch).
284    recent_provers: Arc<RwLock<RecentProvers>>,
285    /// Per-peer last sig-verify attempt timestamp for the §2 step 3 /
286    /// §11 `DoS` rate limit. Bumped on EVERY verify attempt (success or
287    /// failure) so a peer we've never successfully verified can't burn
288    /// CPU on a flood of structurally-plausible-but-invalid gossips.
289    /// Lives separately from `last_commitment_by_peer` because that
290    /// map's records only exist after a successful verify.
291    sig_verify_attempts: Arc<RwLock<HashMap<PeerId, Instant>>>,
292    /// Limits concurrent outbound replication sends to prevent bandwidth
293    /// saturation on home broadband connections.
294    send_semaphore: Arc<Semaphore>,
295    /// Bounds concurrent IN-FLIGHT audit-responder tasks (subtree round 1 +
296    /// byte round 2). Those are spawned off the serial message loop so disk
297    /// reads don't block replication; the semaphore restores a global
298    /// backpressure ceiling so the node can't fan out unbounded `get_raw` reads
299    /// / multi-MiB byte serves.
300    audit_responder_semaphore: Arc<Semaphore>,
301    /// Per-source in-flight audit-responder counts, capped at
302    /// [`MAX_AUDIT_RESPONSES_PER_PEER`]. The GLOBAL semaphore alone is not
303    /// flood-fair: one peer spamming challenges could occupy every slot and
304    /// starve honest auditors, whose dropped challenges then convert to
305    /// timeouts and record strikes on the HONEST peers (codex-r2 A). This
306    /// per-peer cap guarantees no single source can hold more than its share,
307    /// so a flood self-throttles without denying service to everyone else.
308    audit_responder_inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
309    /// Receiver for fresh-write events from the chunk PUT handler.
310    ///
311    /// When present, `start()` spawns a drainer task that calls
312    /// `replicate_fresh` for each event.
313    fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
314    /// Shutdown token.
315    shutdown: CancellationToken,
316    /// Background task handles.
317    task_handles: Vec<JoinHandle<()>>,
318}
319
320impl ReplicationEngine {
321    /// Create a new replication engine.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if the `PaidList` LMDB environment cannot be opened
326    /// or if the configuration fails validation.
327    #[allow(clippy::too_many_arguments)]
328    pub async fn new(
329        config: ReplicationConfig,
330        p2p_node: Arc<P2PNode>,
331        storage: Arc<LmdbStorage>,
332        payment_verifier: Arc<PaymentVerifier>,
333        identity: Arc<NodeIdentity>,
334        root_dir: &Path,
335        fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
336        shutdown: CancellationToken,
337    ) -> Result<Self> {
338        config.validate().map_err(Error::Config)?;
339
340        let paid_list = Arc::new(
341            PaidList::new(root_dir)
342                .await
343                .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
344        );
345
346        let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
347        let config = Arc::new(config);
348
349        Ok(Self {
350            config: Arc::clone(&config),
351            p2p_node,
352            storage,
353            paid_list,
354            payment_verifier,
355            queues: Arc::new(RwLock::new(ReplicationQueues::new())),
356            sync_state: Arc::new(RwLock::new(initial_neighbors)),
357            sync_history: Arc::new(RwLock::new(HashMap::new())),
358            audit_timeout_strikes: Arc::new(RwLock::new(HashMap::new())),
359            audit_on_gossip_cooldown: Arc::new(RwLock::new(HashMap::new())),
360            sync_cycle_epoch: Arc::new(RwLock::new(0)),
361            repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
362            bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
363            is_bootstrapping: Arc::new(RwLock::new(true)),
364            sync_trigger: Arc::new(Notify::new()),
365            bootstrap_complete_notify: Arc::new(Notify::new()),
366            identity,
367            commitment_state: Arc::new(ResponderCommitmentState::new()),
368            last_commitment_by_peer: Arc::new(RwLock::new(HashMap::new())),
369            ever_capable_peers: Arc::new(RwLock::new(HashSet::new())),
370            recent_provers: Arc::new(RwLock::new(RecentProvers::new())),
371            sig_verify_attempts: Arc::new(RwLock::new(HashMap::new())),
372            send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
373            audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)),
374            audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())),
375            fresh_write_rx: Some(fresh_write_rx),
376            shutdown,
377            task_handles: Vec::new(),
378        })
379    }
380
381    /// Get a reference to the `PaidList`.
382    #[must_use]
383    pub fn paid_list(&self) -> &Arc<PaidList> {
384        &self.paid_list
385    }
386
387    /// Get a reference to the responder's commitment state. Used by audit
388    /// handlers to look up commitments by hash; used by the rotation tick
389    /// to install fresh ones.
390    #[must_use]
391    pub fn commitment_state(&self) -> &Arc<ResponderCommitmentState> {
392        &self.commitment_state
393    }
394
395    /// Get a reference to the auditor's last-commitment-by-peer table.
396    #[must_use]
397    pub fn last_commitment_by_peer(&self) -> &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>> {
398        &self.last_commitment_by_peer
399    }
400
401    /// Get a reference to the holder-eligibility cache. Phase-3 stretch:
402    /// will be read by quorum / paid-list eligibility checks.
403    #[must_use]
404    pub fn recent_provers(&self) -> &Arc<RwLock<RecentProvers>> {
405        &self.recent_provers
406    }
407
408    /// Test-only: rebuild + rotate this node's storage commitment now over its
409    /// current key set (normally on a 1h timer). Lets a test commit to chunks it
410    /// just stored without waiting for the rotation cadence.
411    ///
412    /// # Errors
413    ///
414    /// Propagates any error from reading the local key set or building/signing
415    /// the commitment.
416    #[cfg(any(test, feature = "test-utils"))]
417    pub async fn rebuild_commitment_now(&self) -> Result<()> {
418        rebuild_and_rotate_commitment(
419            &self.storage,
420            &self.identity,
421            &self.commitment_state,
422            &self.p2p_node,
423            &self.config,
424        )
425        .await
426    }
427
428    /// Test-only: directly seed this node's cached commitment for `peer`,
429    /// simulating "we received `peer`'s gossiped commitment" without depending
430    /// on neighbor-sync propagation timing. Lets a two-node audit test pin the
431    /// peer's commitment deterministically.
432    #[cfg(any(feature = "test-utils", test))]
433    pub async fn inject_peer_commitment_for_test(
434        &self,
435        peer: &PeerId,
436        commitment: StorageCommitment,
437    ) {
438        let now = Instant::now();
439        self.last_commitment_by_peer
440            .write()
441            .await
442            .insert(*peer, PeerCommitmentRecord::from_verified(commitment, now));
443        self.ever_capable_peers.write().await.insert(*peer);
444    }
445
446    /// Test-only: run ONE subtree audit against `peer` right now, pinned to the
447    /// commitment this node has cached for it (from gossip), over the live wire.
448    /// Returns the audit outcome so tests can assert honest-pass / adversary-fail
449    /// in a real two-node setting without waiting for the gossip cadence.
450    ///
451    /// Returns `AuditTickResult::Idle` if we have no cached commitment for the
452    /// peer yet (gossip hasn't reached us). Gated to test builds.
453    #[cfg(any(test, feature = "test-utils"))]
454    pub async fn audit_peer_now(&self, peer: &PeerId) -> audit::AuditTickResult {
455        let target = {
456            let map = self.last_commitment_by_peer.read().await;
457            map.get(peer)
458                .and_then(PeerCommitmentRecord::last_commitment)
459                .and_then(|c| commitment_hash(c).map(|h| (h, c.key_count)))
460        };
461        let Some((pin, key_count)) = target else {
462            return audit::AuditTickResult::Idle;
463        };
464        let credit = storage_commitment_audit::AuditCredit {
465            recent_provers: &self.recent_provers,
466        };
467        storage_commitment_audit::run_subtree_audit(
468            &self.p2p_node,
469            &self.config,
470            peer,
471            pin,
472            key_count,
473            Some(&credit),
474        )
475        .await
476    }
477
478    /// Start all background tasks.
479    ///
480    /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
481    /// the `BootstrapComplete` event emitted during DHT bootstrap is not
482    /// missed by the bootstrap-sync gate.
483    pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
484        if !self.task_handles.is_empty() {
485            error!("ReplicationEngine::start() called while already running — ignoring");
486            return;
487        }
488        info!("Starting replication engine");
489
490        self.start_message_handler();
491        self.start_neighbor_sync_loop();
492        self.start_self_lookup_loop();
493        // Audit #2 (responsible-chunk): periodic tick auditing peers for the
494        // chunks they SHOULD store (responsibility + prior hint).
495        self.start_audit_loop();
496        // Audit #1 (storage-commitment) is gossip-triggered in the message
497        // handler when a peer's commitment is ingested, not on a periodic tick.
498        self.start_commitment_rotation_loop();
499        self.start_fetch_worker();
500        self.start_verification_worker();
501        self.start_bootstrap_sync(dht_events);
502        self.start_fresh_write_drainer();
503
504        info!(
505            "Replication engine started with {} background tasks",
506            self.task_handles.len()
507        );
508    }
509
510    /// Returns `true` if the node is still in the replication bootstrap phase.
511    ///
512    /// During bootstrap, audit challenges return `Bootstrapping` instead of
513    /// digests, and neighbor sync responses carry `bootstrapping: true`.
514    pub async fn is_bootstrapping(&self) -> bool {
515        *self.is_bootstrapping.read().await
516    }
517
518    /// Wait until the replication bootstrap phase completes.
519    ///
520    /// Returns immediately if bootstrap has already completed. Useful for
521    /// readiness probes, health checks, and test harnesses that need the
522    /// node to be fully operational before proceeding.
523    ///
524    /// Returns `true` if bootstrap completed within the timeout, `false`
525    /// if the timeout elapsed first.
526    pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
527        // Register the notification future *before* checking the flag so that
528        // a transition between the read and the await is not missed.
529        let notified = self.bootstrap_complete_notify.notified();
530        tokio::pin!(notified);
531        notified.as_mut().enable();
532
533        if !*self.is_bootstrapping.read().await {
534            return true;
535        }
536
537        tokio::time::timeout(timeout, notified).await.is_ok()
538    }
539
540    /// Cancel all background tasks and wait for them to terminate.
541    ///
542    /// This must be awaited before dropping the engine when the caller needs
543    /// the `Arc<LmdbStorage>` references held by background tasks to be
544    /// released (e.g. before reopening the same LMDB environment).
545    pub async fn shutdown(&mut self) {
546        self.shutdown.cancel();
547        for (i, mut handle) in self.task_handles.drain(..).enumerate() {
548            match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
549                Ok(Ok(())) => {}
550                Ok(Err(e)) if e.is_cancelled() => {}
551                Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
552                Err(_) => {
553                    warn!("Replication task {i} did not stop within 10s, aborting");
554                    handle.abort();
555                }
556            }
557        }
558    }
559
560    /// Trigger an early neighbor sync round.
561    ///
562    /// Useful after topology changes (new nodes joining, network heal after
563    /// partition) when the caller wants replication to converge faster than
564    /// the regular 10-20 minute cadence.
565    pub fn trigger_neighbor_sync(&self) {
566        self.sync_trigger.notify_one();
567    }
568
569    /// Execute fresh replication for a newly stored record.
570    pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
571        fresh::replicate_fresh(
572            key,
573            data,
574            proof_of_payment,
575            &self.p2p_node,
576            &self.paid_list,
577            &self.config,
578            &self.send_semaphore,
579        )
580        .await;
581    }
582
583    // =======================================================================
584    // Background task launchers
585    // =======================================================================
586
587    /// Spawn a task that drains the fresh-write channel and triggers
588    /// replication for each newly-stored chunk.
589    fn start_fresh_write_drainer(&mut self) {
590        let Some(mut rx) = self.fresh_write_rx.take() else {
591            return;
592        };
593        let p2p = Arc::clone(&self.p2p_node);
594        let paid_list = Arc::clone(&self.paid_list);
595        let config = Arc::clone(&self.config);
596        let send_semaphore = Arc::clone(&self.send_semaphore);
597        let shutdown = self.shutdown.clone();
598
599        let handle = tokio::spawn(async move {
600            loop {
601                tokio::select! {
602                    () = shutdown.cancelled() => break,
603                    event = rx.recv() => {
604                        let Some(event) = event else { break };
605                        fresh::replicate_fresh(
606                            &event.key,
607                            &event.data,
608                            &event.payment_proof,
609                            &p2p,
610                            &paid_list,
611                            &config,
612                            &send_semaphore,
613                        )
614                        .await;
615                    }
616                }
617            }
618            debug!("Fresh-write drainer shut down");
619        });
620        self.task_handles.push(handle);
621    }
622
623    #[allow(clippy::too_many_lines)]
624    fn start_message_handler(&mut self) {
625        let mut p2p_events = self.p2p_node.subscribe_events();
626        let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
627        let p2p = Arc::clone(&self.p2p_node);
628        let storage = Arc::clone(&self.storage);
629        let paid_list = Arc::clone(&self.paid_list);
630        let payment_verifier = Arc::clone(&self.payment_verifier);
631        let queues = Arc::clone(&self.queues);
632        let config = Arc::clone(&self.config);
633        let shutdown = self.shutdown.clone();
634        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
635        let bootstrap_state = Arc::clone(&self.bootstrap_state);
636        let sync_history = Arc::clone(&self.sync_history);
637        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
638        let repair_proofs = Arc::clone(&self.repair_proofs);
639        let sync_trigger = Arc::clone(&self.sync_trigger);
640        let my_commitment_state = Arc::clone(&self.commitment_state);
641        let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
642        let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
643        let recent_provers = Arc::clone(&self.recent_provers);
644        let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
645        let audit_timeout_strikes = Arc::clone(&self.audit_timeout_strikes);
646        let audit_on_gossip_cooldown = Arc::clone(&self.audit_on_gossip_cooldown);
647        let sync_state = Arc::clone(&self.sync_state);
648        let audit_responder_semaphore = Arc::clone(&self.audit_responder_semaphore);
649        let audit_responder_inflight = Arc::clone(&self.audit_responder_inflight);
650
651        // ADR-0002 gossip-audit trigger: bundled state so an ingested *changed*
652        // commitment can spawn a probabilistic, cooldown-gated subtree audit.
653        let gossip_audit = GossipAuditTrigger {
654            p2p_node: Arc::clone(&p2p),
655            config: Arc::clone(&config),
656            recent_provers: Arc::clone(&recent_provers),
657            sync_state: Arc::clone(&sync_state),
658            audit_timeout_strikes: Arc::clone(&audit_timeout_strikes),
659            cooldown: Arc::clone(&audit_on_gossip_cooldown),
660        };
661
662        let handle = tokio::spawn(async move {
663            loop {
664                tokio::select! {
665                    () = shutdown.cancelled() => break,
666                    event = p2p_events.recv() => {
667                        let Ok(event) = event else { continue };
668                        if let P2PEvent::Message {
669                            topic,
670                            source: Some(source),
671                            data,
672                            ..
673                        } = event {
674                            // Determine if this is a replication message
675                            // and whether it arrived via the /rr/ request-response
676                            // path (which wraps payloads in RequestResponseEnvelope).
677                            let rr_info = if topic == REPLICATION_PROTOCOL_ID {
678                                Some((data.clone(), None))
679                            } else if topic.starts_with(RR_PREFIX)
680                                && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
681                            {
682                                P2PNode::parse_request_envelope(&data)
683                                    .filter(|(_, is_resp, _)| !is_resp)
684                                    .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
685                            } else {
686                                None
687                            };
688                            if let Some((payload, rr_message_id)) = rr_info {
689                                match handle_replication_message(
690                                    &source,
691                                    &payload,
692                                    &p2p,
693                                    &storage,
694                                    &paid_list,
695                                    &payment_verifier,
696                                    &queues,
697                                    &config,
698                                    &is_bootstrapping,
699                                    &bootstrap_state,
700                                    &sync_history,
701                                    &sync_cycle_epoch,
702                                    &repair_proofs,
703                                    &last_commitment_by_peer,
704                                    &ever_capable_peers,
705                                    &sig_verify_attempts,
706                                    &my_commitment_state,
707                                    &gossip_audit,
708                                    &audit_responder_semaphore,
709                                    &audit_responder_inflight,
710                                    rr_message_id.as_deref(),
711                                ).await {
712                                    Ok(()) => {}
713                                    Err(e) => {
714                                        debug!(
715                                            "Replication message from {source} error: {e}"
716                                        );
717                                    }
718                                }
719                            }
720                        }
721                    }
722                    // Gap 4: Topology churn handling (Section 13).
723                    //
724                    // The DHT routing table emits KClosestPeersChanged when the
725                    // K-closest peer set actually changes, which is the precise
726                    // signal for triggering neighbor sync. This replaces the
727                    // previous approach of checking every PeerConnected /
728                    // PeerDisconnected event against the close group.
729                    dht_event = dht_events.recv() => {
730                        let Ok(dht_event) = dht_event else { continue };
731                        match dht_event {
732                            DhtNetworkEvent::KClosestPeersChanged { old, new } => {
733                                let old_peers = old
734                                    .iter()
735                                    .take(config.neighbor_sync_scope)
736                                    .copied()
737                                    .collect::<HashSet<_>>();
738                                let new_scoped = new
739                                    .iter()
740                                    .take(config.neighbor_sync_scope)
741                                    .copied()
742                                    .collect::<Vec<_>>();
743                                let new_peers =
744                                    new_scoped.iter().copied().collect::<HashSet<_>>();
745                                let entrants = new_scoped
746                                    .iter()
747                                    .copied()
748                                    .filter(|peer| !old_peers.contains(peer))
749                                    .collect::<Vec<_>>();
750                                let entrant_count = entrants.len();
751                                let (priority_insertions, sync_removals) = {
752                                    let mut state = sync_state.write().await;
753                                    let sync_removals = state.retain_sync_peers(&new_peers);
754                                    let priority_insertions = state.queue_priority_peers(entrants);
755                                    (priority_insertions, sync_removals)
756                                };
757                                if priority_insertions > 0 {
758                                    debug!(
759                                        "K-closest peers changed, queued {priority_insertions}/{entrant_count} new close peers for priority neighbor sync and pruned {sync_removals} departed pending sync entries"
760                                    );
761                                } else {
762                                    debug!(
763                                        "K-closest peers changed, no additional close peers queued, pruned {sync_removals} departed pending sync entries, triggering early neighbor sync"
764                                    );
765                                }
766                                sync_trigger.notify_one();
767                            }
768                            DhtNetworkEvent::PeerRemoved { peer_id } => {
769                                sync_state.write().await.remove_peer(&peer_id);
770                                repair_proofs.write().await.remove_peer(&peer_id);
771                                // v12: drop the commitment bytes and the
772                                // recent-prover credit so a churn / sybil
773                                // attacker cannot leave behind one
774                                // StorageCommitment per identity in
775                                // `last_commitment_by_peer`. Also drop the
776                                // sig-verify rate-limit timestamp.
777                                last_commitment_by_peer.write().await.remove(&peer_id);
778                                recent_provers.write().await.forget_peer(&peer_id);
779                                sig_verify_attempts.write().await.remove(&peer_id);
780                                // Drop the timeout-strike entry too, so a
781                                // departed peer leaves no residual (keeps this
782                                // map bounded under churn, like its siblings).
783                                audit_timeout_strikes.write().await.remove(&peer_id);
784                                // Same for the gossip-audit cooldown (ADR-0002).
785                                audit_on_gossip_cooldown.write().await.remove(&peer_id);
786                                // The sticky `commitment_capable` flag is
787                                // preserved orthogonally via
788                                // `ever_capable_peers` — even after this
789                                // removal, a re-joining peer continues to
790                                // be treated as v12-capable rather than
791                                // legacy (§3 shield).
792                            }
793                            _ => {}
794                        }
795                    }
796                }
797            }
798            debug!("Replication message handler shut down");
799        });
800        self.task_handles.push(handle);
801    }
802
803    fn start_neighbor_sync_loop(&mut self) {
804        let p2p = Arc::clone(&self.p2p_node);
805        let storage = Arc::clone(&self.storage);
806        let paid_list = Arc::clone(&self.paid_list);
807        let queues = Arc::clone(&self.queues);
808        let config = Arc::clone(&self.config);
809        let shutdown = self.shutdown.clone();
810        let sync_state = Arc::clone(&self.sync_state);
811        let sync_history = Arc::clone(&self.sync_history);
812        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
813        let repair_proofs = Arc::clone(&self.repair_proofs);
814        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
815        let bootstrap_state = Arc::clone(&self.bootstrap_state);
816        let sync_trigger = Arc::clone(&self.sync_trigger);
817        let commitment_state = Arc::clone(&self.commitment_state);
818        let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
819        let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
820        let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
821        // ADR-0002: a peer's commitment also arrives on the sync RESPONSE path
822        // (we initiated, they piggybacked theirs). Carry a gossip-audit trigger
823        // here too so a peer that only ever answers — never initiates sync —
824        // is still audited; otherwise it could fully evade auditing.
825        let gossip_audit = GossipAuditTrigger {
826            p2p_node: Arc::clone(&p2p),
827            config: Arc::clone(&config),
828            recent_provers: Arc::clone(&self.recent_provers),
829            sync_state: Arc::clone(&sync_state),
830            audit_timeout_strikes: Arc::clone(&self.audit_timeout_strikes),
831            cooldown: Arc::clone(&self.audit_on_gossip_cooldown),
832        };
833
834        let handle = tokio::spawn(async move {
835            loop {
836                let interval = config.random_neighbor_sync_interval();
837                tokio::select! {
838                    () = shutdown.cancelled() => break,
839                    () = tokio::time::sleep(interval) => {}
840                    () = sync_trigger.notified() => {
841                        debug!("Neighbor sync triggered by topology change");
842                    }
843                }
844                // Wrap the sync round in a select so shutdown cancels
845                // in-progress network operations rather than waiting for
846                // the full round to complete.
847                tokio::select! {
848                    () = shutdown.cancelled() => break,
849                    () = run_neighbor_sync_round(
850                        &p2p,
851                        &storage,
852                        &paid_list,
853                        &queues,
854                        &config,
855                        &sync_state,
856                        &sync_history,
857                        &sync_cycle_epoch,
858                        &repair_proofs,
859                        &is_bootstrapping,
860                        &bootstrap_state,
861                        &commitment_state,
862                        &last_commitment_by_peer,
863                        &ever_capable_peers,
864                        &sig_verify_attempts,
865                        &gossip_audit,
866                    ) => {}
867                }
868            }
869            debug!("Neighbor sync loop shut down");
870        });
871        self.task_handles.push(handle);
872    }
873
874    fn start_self_lookup_loop(&mut self) {
875        let p2p = Arc::clone(&self.p2p_node);
876        let config = Arc::clone(&self.config);
877        let shutdown = self.shutdown.clone();
878
879        let handle = tokio::spawn(async move {
880            loop {
881                let interval = config.random_self_lookup_interval();
882                tokio::select! {
883                    () = shutdown.cancelled() => break,
884                    () = tokio::time::sleep(interval) => {
885                        if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
886                            debug!("Self-lookup failed: {e}");
887                        }
888                    }
889                }
890            }
891            debug!("Self-lookup loop shut down");
892        });
893        self.task_handles.push(handle);
894    }
895
896    /// Periodic responsible-chunk audit loop (audit #2): every
897    /// [`ReplicationConfig::random_audit_tick_interval`] (~10-20 min), audit one
898    /// eligible close peer for the chunks it *should* be storing (by
899    /// responsibility and prior repair hint), independent of the gossip-triggered
900    /// storage-commitment audit. Waits for bootstrap to drain, then runs one tick
901    /// immediately and periodically thereafter.
902    fn start_audit_loop(&mut self) {
903        let p2p = Arc::clone(&self.p2p_node);
904        let storage = Arc::clone(&self.storage);
905        let config = Arc::clone(&self.config);
906        let shutdown = self.shutdown.clone();
907        let sync_history = Arc::clone(&self.sync_history);
908        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
909        let repair_proofs = Arc::clone(&self.repair_proofs);
910        let bootstrap_state = Arc::clone(&self.bootstrap_state);
911        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
912        let sync_state = Arc::clone(&self.sync_state);
913        // Needed so the responsible-chunk audit routes failures through the same
914        // strike/grace path as the storage-commitment audit (timeouts graced,
915        // not penalised on the first occurrence) and can revoke holder credit on
916        // a confirmed failure.
917        let recent_provers = Arc::clone(&self.recent_provers);
918        let audit_timeout_strikes = Arc::clone(&self.audit_timeout_strikes);
919
920        let handle = tokio::spawn(async move {
921            // Invariant 19: wait for bootstrap to drain before starting audits.
922            loop {
923                tokio::select! {
924                    () = shutdown.cancelled() => return,
925                    () = tokio::time::sleep(
926                        std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
927                    ) => {
928                        if bootstrap_state.read().await.is_drained() {
929                            break;
930                        }
931                    }
932                }
933            }
934
935            // Run one audit tick immediately after bootstrap drain.
936            {
937                let bootstrapping = *is_bootstrapping.read().await;
938                let result = {
939                    let history = sync_history.read().await;
940                    let current_sync_epoch = *sync_cycle_epoch.read().await;
941                    audit::audit_tick_with_repair_proofs(
942                        &p2p,
943                        &storage,
944                        &config,
945                        &history,
946                        &repair_proofs,
947                        current_sync_epoch,
948                        bootstrapping,
949                    )
950                    .await
951                };
952                handle_audit_result(
953                    &result,
954                    &p2p,
955                    &sync_state,
956                    &recent_provers,
957                    &audit_timeout_strikes,
958                    &config,
959                )
960                .await;
961            }
962
963            // Then run periodically.
964            loop {
965                let interval = config.random_audit_tick_interval();
966                tokio::select! {
967                    () = shutdown.cancelled() => break,
968                    () = tokio::time::sleep(interval) => {
969                        let bootstrapping = *is_bootstrapping.read().await;
970                        let result = {
971                            let history = sync_history.read().await;
972                            let current_sync_epoch = *sync_cycle_epoch.read().await;
973                            audit::audit_tick_with_repair_proofs(
974                                &p2p,
975                                &storage,
976                                &config,
977                                &history,
978                                &repair_proofs,
979                                current_sync_epoch,
980                                bootstrapping,
981                            )
982                            .await
983                        };
984                        handle_audit_result(
985                    &result,
986                    &p2p,
987                    &sync_state,
988                    &recent_provers,
989                    &audit_timeout_strikes,
990                    &config,
991                )
992                .await;
993                    }
994                }
995            }
996            debug!("Audit loop shut down");
997        });
998        self.task_handles.push(handle);
999    }
1000
1001    /// Periodically rebuild + sign + rotate the responder's storage
1002    /// commitment.
1003    ///
1004    /// Phase 3 of the v12 storage-bound audit. Once per
1005    /// [`COMMITMENT_ROTATION_INTERVAL_SECS`], the responder reads the
1006    /// current LMDB key set, builds a Merkle tree (for content-addressed
1007    /// chunks `bytes_hash == key`, so no chunk re-read is needed), signs
1008    /// the root with the node's `MlDsaSecretKey`, and rotates the result
1009    /// into `commitment_state`. Old `previous` slot is dropped by the
1010    /// rotate (per `ResponderCommitmentState::rotate`).
1011    ///
1012    /// Skips if the key set is empty (no commitment to make) — the
1013    /// auditor side falls back to the legacy plain-digest path for
1014    /// peers that have never gossiped a commitment.
1015    fn start_commitment_rotation_loop(&mut self) {
1016        let storage = Arc::clone(&self.storage);
1017        let identity = Arc::clone(&self.identity);
1018        let commitment_state = Arc::clone(&self.commitment_state);
1019        let shutdown = self.shutdown.clone();
1020        let p2p = Arc::clone(&self.p2p_node);
1021        let config = Arc::clone(&self.config);
1022        let sync_trigger = Arc::clone(&self.sync_trigger);
1023        let recent_provers = Arc::clone(&self.recent_provers);
1024
1025        let handle = tokio::spawn(async move {
1026            // Build the first commitment immediately on startup so a
1027            // restarted node can answer commitment-bound audits right
1028            // away — otherwise current() stays None for a full rotation
1029            // interval and audits silently fall back to legacy.
1030            //
1031            // After the first build, trigger an immediate neighbor-sync
1032            // round so the new commitment gossips out within seconds.
1033            // Without this, after a restart remote auditors keep pinning
1034            // the pre-restart (rotated-away) hash until their normal
1035            // sync cadence elapses — up to 1 h in the worst case,
1036            // during which time commitment-bound audits hit "unknown
1037            // commitment hash" -> Idle no-ops.
1038            // ML-DSA signatures are randomized so we cannot reproduce
1039            // the pre-restart hash; the only honest path to recovery
1040            // is fast re-gossip.
1041            if let Err(e) =
1042                rebuild_and_rotate_commitment(&storage, &identity, &commitment_state, &p2p, &config)
1043                    .await
1044            {
1045                warn!("Initial commitment build failed: {e}");
1046            } else {
1047                sync_trigger.notify_one();
1048            }
1049            loop {
1050                tokio::select! {
1051                    () = shutdown.cancelled() => break,
1052                    () = tokio::time::sleep(
1053                        std::time::Duration::from_secs(COMMITMENT_ROTATION_INTERVAL_SECS)
1054                    ) => {
1055                        if let Err(e) = rebuild_and_rotate_commitment(
1056                            &storage,
1057                            &identity,
1058                            &commitment_state,
1059                            &p2p,
1060                            &config,
1061                        ).await {
1062                            warn!("Commitment rotation failed: {e}");
1063                        }
1064                        // Piggyback a sweep of expired recent_provers
1065                        // entries on the rotation tick (same cadence,
1066                        // 1 h). is_credited_holder already honours the
1067                        // TTL on read, but the sweep reclaims memory
1068                        // for entries we'll never re-read.
1069                        let dropped = recent_provers.write().await.sweep_expired(
1070                            std::time::Instant::now()
1071                        );
1072                        if dropped > 0 {
1073                            debug!("recent_provers: swept {dropped} expired entries");
1074                        }
1075                    }
1076                }
1077            }
1078            debug!("Commitment rotation loop shut down");
1079        });
1080        self.task_handles.push(handle);
1081    }
1082
1083    #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
1084    fn start_fetch_worker(&mut self) {
1085        let p2p = Arc::clone(&self.p2p_node);
1086        let storage = Arc::clone(&self.storage);
1087        let queues = Arc::clone(&self.queues);
1088        let config = Arc::clone(&self.config);
1089        let shutdown = self.shutdown.clone();
1090        let bootstrap_state = Arc::clone(&self.bootstrap_state);
1091        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1092        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1093        let concurrency = max_parallel_fetch();
1094
1095        info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
1096
1097        let handle = tokio::spawn(async move {
1098            // Each in-flight future yields (key, Option<FetchOutcome>) so we
1099            // always recover the key — even if the inner task panics.
1100            let mut in_flight = FuturesUnordered::<FetchFuture>::new();
1101
1102            loop {
1103                // Fill up to `concurrency` slots from the queue.
1104                {
1105                    let mut q = queues.write().await;
1106                    while in_flight.len() < concurrency {
1107                        let Some(candidate) = q.dequeue_fetch() else {
1108                            break;
1109                        };
1110                        let Some(&source) = candidate.sources.first() else {
1111                            warn!(
1112                                "Fetch candidate {} has no sources — dropping",
1113                                hex::encode(candidate.key)
1114                            );
1115                            continue;
1116                        };
1117                        q.start_fetch(candidate.key, source, candidate.sources.clone());
1118
1119                        let p2p = Arc::clone(&p2p);
1120                        let storage = Arc::clone(&storage);
1121                        let config = Arc::clone(&config);
1122                        let token = shutdown.clone();
1123                        let fetch_key = candidate.key;
1124                        in_flight.push(Box::pin(async move {
1125                            let handle = tokio::spawn(async move {
1126                                // Cancel-aware: abort when the engine shuts down.
1127                                tokio::select! {
1128                                    () = token.cancelled() => FetchOutcome {
1129                                        key: fetch_key,
1130                                        result: FetchResult::SourceFailed,
1131                                    },
1132                                    outcome = execute_single_fetch(
1133                                        p2p, storage, config, fetch_key, source,
1134                                    ) => outcome,
1135                                }
1136                            });
1137                            match handle.await {
1138                                Ok(outcome) => (outcome.key, Some(outcome)),
1139                                Err(e) => {
1140                                    error!(
1141                                        "Fetch task for {} panicked: {e}",
1142                                        hex::encode(fetch_key)
1143                                    );
1144                                    (fetch_key, None)
1145                                }
1146                            }
1147                        }));
1148                    }
1149                } // release queues write lock
1150
1151                if in_flight.is_empty() {
1152                    // No work — wait for new items or shutdown.
1153                    tokio::select! {
1154                        () = shutdown.cancelled() => break,
1155                        () = tokio::time::sleep(
1156                            std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
1157                        ) => continue,
1158                    }
1159                }
1160
1161                // Wait for the next fetch to complete and process the result.
1162                tokio::select! {
1163                    () = shutdown.cancelled() => break,
1164                    Some((key, maybe_outcome)) = in_flight.next() => {
1165                        let mut q = queues.write().await;
1166                        let terminal = if let Some(outcome) = maybe_outcome {
1167                            match outcome.result {
1168                                FetchResult::Stored => {
1169                                    q.complete_fetch(&key);
1170                                    true
1171                                }
1172                                FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
1173                                    if let Some(next_peer) = q.retry_fetch(&key) {
1174                                        // Spawn a new fetch task for the next source.
1175                                        let p2p = Arc::clone(&p2p);
1176                                        let storage = Arc::clone(&storage);
1177                                        let config = Arc::clone(&config);
1178                                        let token = shutdown.clone();
1179                                        let fetch_key = key;
1180                                        in_flight.push(Box::pin(async move {
1181                                            let handle = tokio::spawn(async move {
1182                                                tokio::select! {
1183                                                    () = token.cancelled() => FetchOutcome {
1184                                                        key: fetch_key,
1185                                                        result: FetchResult::SourceFailed,
1186                                                    },
1187                                                    outcome = execute_single_fetch(
1188                                                        p2p, storage, config, fetch_key, next_peer,
1189                                                    ) => outcome,
1190                                                }
1191                                            });
1192                                            match handle.await {
1193                                                Ok(outcome) => (outcome.key, Some(outcome)),
1194                                                Err(e) => {
1195                                                    error!(
1196                                                        "Fetch task for {} panicked: {e}",
1197                                                        hex::encode(fetch_key)
1198                                                    );
1199                                                    (fetch_key, None)
1200                                                }
1201                                            }
1202                                        }));
1203                                        false
1204                                    } else {
1205                                        q.complete_fetch(&key);
1206                                        true
1207                                    }
1208                                }
1209                            }
1210                        } else {
1211                            // Task panicked — reclaim the in-flight slot.
1212                            q.complete_fetch(&key);
1213                            true
1214                        };
1215
1216                        // Shrink bootstrap pending set on terminal exit.
1217                        if terminal {
1218                            drop(q); // release queues lock before acquiring bootstrap_state
1219                            if !bootstrap_state.read().await.is_drained() {
1220                                bootstrap_state.write().await.remove_key(&key);
1221                                let q = queues.read().await;
1222                                if bootstrap::check_bootstrap_drained(
1223                                    &bootstrap_state,
1224                                    &q,
1225                                )
1226                                .await
1227                                {
1228                                    complete_bootstrap(
1229                                        &is_bootstrapping,
1230                                        &bootstrap_complete_notify,
1231                                    ).await;
1232                                }
1233                            }
1234                        }
1235                    }
1236                }
1237            }
1238
1239            // Cancel and drain remaining in-flight fetches on shutdown.
1240            // The CancellationToken is already cancelled by this point, so
1241            // spawned tasks will see cancellation via their select! branches.
1242            while in_flight.next().await.is_some() {}
1243            debug!("Fetch worker shut down");
1244        });
1245        self.task_handles.push(handle);
1246    }
1247
1248    fn start_verification_worker(&mut self) {
1249        let p2p = Arc::clone(&self.p2p_node);
1250        let storage = Arc::clone(&self.storage);
1251        let queues = Arc::clone(&self.queues);
1252        let paid_list = Arc::clone(&self.paid_list);
1253        let config = Arc::clone(&self.config);
1254        let shutdown = self.shutdown.clone();
1255        let bootstrap_state = Arc::clone(&self.bootstrap_state);
1256        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1257        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1258        let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
1259        let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
1260        let recent_provers = Arc::clone(&self.recent_provers);
1261
1262        let handle = tokio::spawn(async move {
1263            loop {
1264                tokio::select! {
1265                    () = shutdown.cancelled() => break,
1266                    () = tokio::time::sleep(
1267                        std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
1268                    ) => {
1269                        let ctx = VerificationCycleContext {
1270                            p2p_node: &p2p,
1271                            paid_list: &paid_list,
1272                            storage: &storage,
1273                            queues: &queues,
1274                            config: &config,
1275                            bootstrap_state: &bootstrap_state,
1276                            is_bootstrapping: &is_bootstrapping,
1277                            bootstrap_complete_notify: &bootstrap_complete_notify,
1278                            last_commitment_by_peer: &last_commitment_by_peer,
1279                            ever_capable_peers: &ever_capable_peers,
1280                            recent_provers: &recent_provers,
1281                        };
1282                        run_verification_cycle(ctx).await;
1283                    }
1284                }
1285            }
1286            debug!("Verification worker shut down");
1287        });
1288        self.task_handles.push(handle);
1289    }
1290
1291    /// Gap 3: Run a one-shot bootstrap sync on startup.
1292    ///
1293    /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
1294    /// (indicating the routing table is populated) before snapshotting
1295    /// close neighbors. Falls back after a timeout so bootstrap nodes
1296    /// (which have no peers and therefore never receive the event) still
1297    /// proceed.
1298    ///
1299    /// After the gate, finds close neighbors, syncs with each in
1300    /// round-robin batches, admits returned hints into the verification
1301    /// pipeline, and tracks discovered keys for bootstrap drain detection.
1302    #[allow(clippy::too_many_lines)]
1303    fn start_bootstrap_sync(
1304        &mut self,
1305        dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
1306    ) {
1307        let p2p = Arc::clone(&self.p2p_node);
1308        let storage = Arc::clone(&self.storage);
1309        let paid_list = Arc::clone(&self.paid_list);
1310        let queues = Arc::clone(&self.queues);
1311        let config = Arc::clone(&self.config);
1312        let shutdown = self.shutdown.clone();
1313        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1314        let bootstrap_state = Arc::clone(&self.bootstrap_state);
1315        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1316        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
1317        let repair_proofs = Arc::clone(&self.repair_proofs);
1318        let my_commitment_state = Arc::clone(&self.commitment_state);
1319        let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
1320        let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
1321        let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
1322
1323        let handle = tokio::spawn(async move {
1324            // Wait for DHT bootstrap to complete before snapshotting
1325            // neighbors. The routing table is empty until saorsa-core
1326            // finishes its FIND_NODE rounds and bucket refreshes.
1327            let gate = bootstrap::wait_for_bootstrap_complete(
1328                dht_events,
1329                config.bootstrap_complete_timeout_secs,
1330                &shutdown,
1331            )
1332            .await;
1333
1334            if gate == bootstrap::BootstrapGateResult::Shutdown {
1335                return;
1336            }
1337
1338            let self_id = *p2p.peer_id();
1339            let neighbors =
1340                neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
1341                    .await;
1342
1343            if neighbors.is_empty() {
1344                info!("Bootstrap sync: no close neighbors found, marking drained");
1345                bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
1346                complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
1347                return;
1348            }
1349
1350            let neighbor_count = neighbors.len();
1351            info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
1352
1353            // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
1354            for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
1355                if shutdown.is_cancelled() {
1356                    break;
1357                }
1358
1359                let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
1360                    batch,
1361                    &storage,
1362                    &paid_list,
1363                    &p2p,
1364                    config.close_group_size,
1365                    config.paid_list_close_group_size,
1366                )
1367                .await;
1368
1369                for peer in batch {
1370                    if shutdown.is_cancelled() {
1371                        break;
1372                    }
1373
1374                    // Re-read on each iteration so peers see current state.
1375                    let bootstrapping = *is_bootstrapping.read().await;
1376
1377                    bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
1378
1379                    let hints = hints_by_peer.remove(peer).unwrap_or_default();
1380                    let outcome = neighbor_sync::sync_with_peer_with_hints(
1381                        peer,
1382                        &p2p,
1383                        &config,
1384                        bootstrapping,
1385                        hints,
1386                        // Atomically snapshot + mark-gossiped: emitted in the
1387                        // bootstrap-sync request, so we stay answerable for it
1388                        // (ADR-0002). One critical section avoids a TOCTOU where a
1389                        // concurrent retire/rotate drops the slot between read and
1390                        // mark.
1391                        my_commitment_state
1392                            .current_for_gossip()
1393                            .map(|b| b.commitment().clone()),
1394                    )
1395                    .await;
1396
1397                    bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
1398
1399                    if let Some(outcome) = outcome {
1400                        // Ingest the peer's piggybacked commitment from the
1401                        // response (same verification as the request path).
1402                        // Bootstrap is the FIRST gossip we receive from most
1403                        // peers, so this populates last_commitment_by_peer.
1404                        //
1405                        // We intentionally do NOT trigger a gossip-audit here:
1406                        // during bootstrap this node may itself still be
1407                        // bootstrapping (audits are gated on that), and the
1408                        // close-group/RT view is not yet stable. The peer is
1409                        // audited on the first STEADY-STATE neighbor-sync round
1410                        // after bootstrap drains (request + response paths both
1411                        // trigger), which is within one sync cycle — so caching
1412                        // the commitment here is sufficient and there is no
1413                        // coverage gap (ADR-0002).
1414                        ingest_peer_commitment(
1415                            peer,
1416                            outcome.response.commitment.as_ref(),
1417                            &p2p,
1418                            &last_commitment_by_peer,
1419                            &ever_capable_peers,
1420                            &sig_verify_attempts,
1421                        )
1422                        .await; // sig_verify_attempts in scope from line ~1080
1423
1424                        if !outcome.response.bootstrapping {
1425                            record_sent_replica_hints(
1426                                peer,
1427                                &outcome.sent_replica_hints,
1428                                &repair_proofs,
1429                                &sync_cycle_epoch,
1430                            )
1431                            .await;
1432                            // Admit hints into verification pipeline.
1433                            let outcome = admit_and_queue_hints(
1434                                &self_id,
1435                                peer,
1436                                &outcome.response.replica_hints,
1437                                &outcome.response.paid_hints,
1438                                &p2p,
1439                                &config,
1440                                &storage,
1441                                &paid_list,
1442                                &queues,
1443                            )
1444                            .await;
1445
1446                            // Track discovered keys for drain detection.
1447                            if !outcome.discovered.is_empty() {
1448                                bootstrap::track_discovered_keys(
1449                                    &bootstrap_state,
1450                                    &outcome.discovered,
1451                                )
1452                                .await;
1453                            }
1454
1455                            // Record / retire capacity rejections so the
1456                            // drain check correctly reflects whether each
1457                            // source still owes us re-hinted work after
1458                            // queue overflow.
1459                            if outcome.capacity_rejected_count > 0 {
1460                                bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
1461                            } else {
1462                                bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
1463                            }
1464                        }
1465                    }
1466                }
1467            }
1468
1469            // Check drain condition.
1470            {
1471                let q = queues.read().await;
1472                if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
1473                    complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
1474                }
1475            }
1476
1477            info!("Bootstrap sync completed");
1478        });
1479        self.task_handles.push(handle);
1480    }
1481}
1482
1483// ===========================================================================
1484// Free functions for background tasks
1485// ===========================================================================
1486
1487/// RAII admission for one audit-responder task: holds the GLOBAL permit and,
1488/// on drop, decrements the PER-PEER in-flight count. Moving this into the
1489/// spawned task ties both bounds to the task's exact lifetime — no manual
1490/// decrement to forget on an early return or panic.
1491struct AuditResponderGuard {
1492    _permit: tokio::sync::OwnedSemaphorePermit,
1493    inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
1494    peer: PeerId,
1495}
1496
1497impl Drop for AuditResponderGuard {
1498    fn drop(&mut self) {
1499        // Decrement (and prune to keep the map bounded) without blocking the
1500        // async runtime: a short lock on a tiny map.
1501        //
1502        // Fast path: if the (uncontended, tiny) lock is free, decrement inline
1503        // with no spawn. Otherwise defer to a task — but only if a runtime is
1504        // actually current, so `Drop` during shutdown (no runtime) can never
1505        // panic. A missed decrement at shutdown is harmless: the whole map is
1506        // being dropped with the engine.
1507        let peer = self.peer;
1508        if let Ok(mut map) = self.inflight.try_write() {
1509            if let Some(n) = map.get_mut(&peer) {
1510                *n = n.saturating_sub(1);
1511                if *n == 0 {
1512                    map.remove(&peer);
1513                }
1514            }
1515            return;
1516        }
1517        if let Ok(handle) = tokio::runtime::Handle::try_current() {
1518            let inflight = Arc::clone(&self.inflight);
1519            handle.spawn(async move {
1520                let mut map = inflight.write().await;
1521                if let Some(n) = map.get_mut(&peer) {
1522                    *n = n.saturating_sub(1);
1523                    if *n == 0 {
1524                        map.remove(&peer);
1525                    }
1526                }
1527            });
1528        }
1529    }
1530}
1531
1532/// Try to admit one audit-responder task for `source`: take a global permit AND
1533/// a per-peer slot (both bounded). Returns `None` (caller drops the challenge,
1534/// which the auditor graces as a timeout) if either ceiling is hit, so one
1535/// flooder can neither exhaust the global pool's effect on others nor exceed
1536/// its own per-peer share (codex-r2 A).
1537async fn admit_audit_responder(
1538    semaphore: &Arc<Semaphore>,
1539    inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
1540    source: &PeerId,
1541) -> Option<AuditResponderGuard> {
1542    // Per-peer cap first (cheap, and the fairness-critical bound), committed
1543    // under the write lock so concurrent challenges from the same peer can't
1544    // both slip past the cap.
1545    {
1546        let mut map = inflight.write().await;
1547        let entry = map.entry(*source).or_insert(0);
1548        if *entry >= MAX_AUDIT_RESPONSES_PER_PEER {
1549            return None;
1550        }
1551        *entry += 1;
1552    }
1553    // Then the global ceiling. If it's exhausted, give back the per-peer slot we
1554    // just claimed so it isn't leaked.
1555    let Ok(permit) = Arc::clone(semaphore).try_acquire_owned() else {
1556        let mut map = inflight.write().await;
1557        if let Some(n) = map.get_mut(source) {
1558            *n = n.saturating_sub(1);
1559            if *n == 0 {
1560                map.remove(source);
1561            }
1562        }
1563        return None;
1564    };
1565    Some(AuditResponderGuard {
1566        _permit: permit,
1567        inflight: Arc::clone(inflight),
1568        peer: *source,
1569    })
1570}
1571
1572/// Handle an incoming replication protocol message.
1573///
1574/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
1575/// request-response path and the response must be sent via `send_response`
1576/// so saorsa-core can route it back to the waiting `send_request` caller.
1577#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1578async fn handle_replication_message(
1579    source: &PeerId,
1580    data: &[u8],
1581    p2p_node: &Arc<P2PNode>,
1582    storage: &Arc<LmdbStorage>,
1583    paid_list: &Arc<PaidList>,
1584    payment_verifier: &Arc<PaymentVerifier>,
1585    queues: &Arc<RwLock<ReplicationQueues>>,
1586    config: &ReplicationConfig,
1587    is_bootstrapping: &Arc<RwLock<bool>>,
1588    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1589    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1590    sync_cycle_epoch: &Arc<RwLock<u64>>,
1591    repair_proofs: &Arc<RwLock<RepairProofs>>,
1592    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
1593    ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
1594    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
1595    my_commitment_state: &Arc<ResponderCommitmentState>,
1596    gossip_audit: &GossipAuditTrigger,
1597    audit_responder_semaphore: &Arc<Semaphore>,
1598    audit_responder_inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
1599    rr_message_id: Option<&str>,
1600) -> Result<()> {
1601    let msg = ReplicationMessage::decode(data)
1602        .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
1603
1604    match msg.body {
1605        ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
1606            handle_fresh_offer(
1607                source,
1608                offer,
1609                storage,
1610                paid_list,
1611                payment_verifier,
1612                p2p_node,
1613                config,
1614                msg.request_id,
1615                rr_message_id,
1616            )
1617            .await
1618        }
1619        ReplicationMessageBody::PaidNotify(ref notify) => {
1620            handle_paid_notify(
1621                source,
1622                notify,
1623                paid_list,
1624                payment_verifier,
1625                p2p_node,
1626                config,
1627            )
1628            .await
1629        }
1630        ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1631            let bootstrapping = *is_bootstrapping.read().await;
1632            // Phase-3 storage-bound audit: store the sender's
1633            // commitment for use as `expected_commitment_hash` in
1634            // future audits. Verify signature before storing so a peer
1635            // cannot inject a forged commitment for someone else.
1636            if let Some(target) = ingest_peer_commitment(
1637                source,
1638                request.commitment.as_ref(),
1639                p2p_node,
1640                last_commitment_by_peer,
1641                ever_capable_peers,
1642                sig_verify_attempts,
1643            )
1644            .await
1645            {
1646                maybe_trigger_gossip_audit(gossip_audit, source, target).await;
1647            }
1648            handle_neighbor_sync_request(
1649                source,
1650                request,
1651                p2p_node,
1652                storage,
1653                paid_list,
1654                queues,
1655                config,
1656                bootstrapping,
1657                bootstrap_state,
1658                sync_history,
1659                sync_cycle_epoch,
1660                repair_proofs,
1661                // Atomically snapshot + mark-gossiped: emitted in the sync
1662                // response, so we must stay answerable for it (ADR-0002).
1663                my_commitment_state
1664                    .current_for_gossip()
1665                    .map(|b| b.commitment().clone()),
1666                msg.request_id,
1667                rr_message_id,
1668            )
1669            .await
1670        }
1671        ReplicationMessageBody::VerificationRequest(ref request) => {
1672            handle_verification_request(
1673                source,
1674                request,
1675                storage,
1676                paid_list,
1677                p2p_node,
1678                msg.request_id,
1679                rr_message_id,
1680            )
1681            .await
1682        }
1683        ReplicationMessageBody::FetchRequest(ref request) => {
1684            handle_fetch_request(
1685                source,
1686                request,
1687                storage,
1688                p2p_node,
1689                msg.request_id,
1690                rr_message_id,
1691            )
1692            .await
1693        }
1694        ReplicationMessageBody::AuditChallenge(ref challenge) => {
1695            // Responsible-chunk audit (audit #2) responder: answer with per-key
1696            // possession digests. This same handler also answers the
1697            // prune-confirmation audit, which sends the same `AuditChallenge`
1698            // wire message.
1699            let bootstrapping = *is_bootstrapping.read().await;
1700            handle_audit_challenge_msg(
1701                source,
1702                challenge,
1703                storage,
1704                p2p_node,
1705                bootstrapping,
1706                msg.request_id,
1707                rr_message_id,
1708            )
1709            .await
1710        }
1711        ReplicationMessageBody::SubtreeAuditChallenge(challenge) => {
1712            // Gossip-triggered storage-bound subtree audit (ADR-0002). The
1713            // responder rebuilds the WHOLE nonce-selected subtree, reading every
1714            // leaf's bytes from disk (`get_raw` × ~sqrt(N) leaves). Run it on a
1715            // detached task so this serial message loop is never blocked on disk
1716            // I/O — otherwise one audit stalls all replication traffic (§5).
1717            //
1718            // A bounded, flood-fair admission restores backpressure (codex#1 +
1719            // codex-r2 A): a global ceiling AND a per-peer cap. If either is hit
1720            // we drop this challenge — the auditor graces a non-response as a
1721            // timeout, so an honest auditor is unaffected and only a flooder is
1722            // throttled (and it cannot starve other peers, since its share is
1723            // capped per-peer).
1724            info!(
1725                "Audit challenge received: kind=subtree source={source} request_response={}",
1726                rr_message_id.is_some(),
1727            );
1728            let Some(guard) =
1729                admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1730                    .await
1731            else {
1732                warn!(
1733                    "Audit challenge reply not sent: kind=subtree response=dropped \
1734                     source={source} (audit-responder capacity reached)"
1735                );
1736                return Ok(());
1737            };
1738            let bootstrapping = *is_bootstrapping.read().await;
1739            let storage = Arc::clone(storage);
1740            let p2p_node = Arc::clone(p2p_node);
1741            let my_commitment_state = Arc::clone(my_commitment_state);
1742            let source = *source;
1743            let request_id = msg.request_id;
1744            let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1745            tokio::spawn(async move {
1746                let _guard = guard; // global permit + per-peer slot, held until done
1747                let response = storage_commitment_audit::handle_subtree_challenge(
1748                    &challenge,
1749                    &storage,
1750                    p2p_node.peer_id(),
1751                    bootstrapping,
1752                    Some(&my_commitment_state),
1753                )
1754                .await;
1755                let response_kind = subtree_audit_response_kind(&response);
1756                let sent = send_replication_response_checked(
1757                    &source,
1758                    &p2p_node,
1759                    request_id,
1760                    ReplicationMessageBody::SubtreeAuditResponse(response),
1761                    rr_message_id.as_deref(),
1762                )
1763                .await;
1764                if sent {
1765                    info!(
1766                        "Audit challenge reply sent: kind=subtree response={response_kind} \
1767                         source={source} request_response={}",
1768                        rr_message_id.is_some(),
1769                    );
1770                } else {
1771                    warn!(
1772                        "Audit challenge reply not sent: kind=subtree response={response_kind} \
1773                         source={source} request_response={}",
1774                        rr_message_id.is_some(),
1775                    );
1776                }
1777            });
1778            Ok(())
1779        }
1780        ReplicationMessageBody::SubtreeByteChallenge(challenge) => {
1781            // Round 2 of the storage audit (ADR-0002): serve the original bytes
1782            // for the auditor's spot-check keys, or signal `Absent` for a
1783            // committed key we can no longer produce. Reads chunk bytes from
1784            // disk, so likewise spawned off the serial loop (§5) under the same
1785            // flood-fair admission (codex#1 + codex-r2 A).
1786            info!(
1787                "Audit challenge received: kind=byte source={source} request_response={}",
1788                rr_message_id.is_some(),
1789            );
1790            let Some(guard) =
1791                admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1792                    .await
1793            else {
1794                warn!(
1795                    "Audit challenge reply not sent: kind=byte response=dropped \
1796                     source={source} (audit-responder capacity reached)"
1797                );
1798                return Ok(());
1799            };
1800            let bootstrapping = *is_bootstrapping.read().await;
1801            let storage = Arc::clone(storage);
1802            let p2p_node = Arc::clone(p2p_node);
1803            let my_commitment_state = Arc::clone(my_commitment_state);
1804            let source = *source;
1805            let request_id = msg.request_id;
1806            let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1807            tokio::spawn(async move {
1808                let _guard = guard; // global permit + per-peer slot, held until done
1809                let response = storage_commitment_audit::handle_subtree_byte_challenge(
1810                    &challenge,
1811                    &storage,
1812                    p2p_node.peer_id(),
1813                    bootstrapping,
1814                    Some(&my_commitment_state),
1815                )
1816                .await;
1817                let response_kind = subtree_byte_response_kind(&response);
1818                let sent = send_replication_response_checked(
1819                    &source,
1820                    &p2p_node,
1821                    request_id,
1822                    ReplicationMessageBody::SubtreeByteResponse(response),
1823                    rr_message_id.as_deref(),
1824                )
1825                .await;
1826                if sent {
1827                    info!(
1828                        "Audit challenge reply sent: kind=byte response={response_kind} \
1829                         source={source} request_response={}",
1830                        rr_message_id.is_some(),
1831                    );
1832                } else {
1833                    warn!(
1834                        "Audit challenge reply not sent: kind=byte response={response_kind} \
1835                         source={source} request_response={}",
1836                        rr_message_id.is_some(),
1837                    );
1838                }
1839            });
1840            Ok(())
1841        }
1842        // Response messages are handled by their respective request initiators.
1843        ReplicationMessageBody::FreshReplicationResponse(_)
1844        | ReplicationMessageBody::NeighborSyncResponse(_)
1845        | ReplicationMessageBody::VerificationResponse(_)
1846        | ReplicationMessageBody::FetchResponse(_)
1847        | ReplicationMessageBody::AuditResponse(_)
1848        | ReplicationMessageBody::SubtreeAuditResponse(_)
1849        | ReplicationMessageBody::SubtreeByteResponse(_) => Ok(()),
1850    }
1851}
1852
1853// ---------------------------------------------------------------------------
1854// Per-message-type handlers
1855// ---------------------------------------------------------------------------
1856
1857#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1858async fn handle_fresh_offer(
1859    source: &PeerId,
1860    offer: &protocol::FreshReplicationOffer,
1861    storage: &Arc<LmdbStorage>,
1862    paid_list: &Arc<PaidList>,
1863    payment_verifier: &Arc<PaymentVerifier>,
1864    p2p_node: &Arc<P2PNode>,
1865    config: &ReplicationConfig,
1866    request_id: u64,
1867    rr_message_id: Option<&str>,
1868) -> Result<()> {
1869    let self_id = *p2p_node.peer_id();
1870
1871    // Rule 5: reject if PoP is missing.
1872    if offer.proof_of_payment.is_empty() {
1873        send_replication_response(
1874            source,
1875            p2p_node,
1876            request_id,
1877            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1878                key: offer.key,
1879                reason: "Missing proof of payment".to_string(),
1880            }),
1881            rr_message_id,
1882        )
1883        .await;
1884        return Ok(());
1885    }
1886
1887    // Enforce chunk size invariant: the normal PUT path rejects data larger
1888    // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1889    // prevent peers from pushing oversized records through replication.
1890    if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1891        warn!(
1892            "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1893            hex::encode(offer.key),
1894            offer.data.len(),
1895            crate::ant_protocol::MAX_CHUNK_SIZE,
1896        );
1897        p2p_node
1898            .report_trust_event(
1899                source,
1900                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1901            )
1902            .await;
1903        send_replication_response(
1904            source,
1905            p2p_node,
1906            request_id,
1907            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1908                key: offer.key,
1909                reason: format!(
1910                    "Data size {} exceeds maximum chunk size {}",
1911                    offer.data.len(),
1912                    crate::ant_protocol::MAX_CHUNK_SIZE,
1913                ),
1914            }),
1915            rr_message_id,
1916        )
1917        .await;
1918        return Ok(());
1919    }
1920
1921    // Mirror the normal PUT path: the advertised key must be the content
1922    // address of the supplied bytes before any expensive payment verification.
1923    let computed_key = crate::client::compute_address(&offer.data);
1924    if computed_key != offer.key {
1925        warn!(
1926            "Rejecting fresh offer for key {}: content address mismatch, computed {}",
1927            hex::encode(offer.key),
1928            hex::encode(computed_key),
1929        );
1930        p2p_node
1931            .report_trust_event(
1932                source,
1933                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1934            )
1935            .await;
1936        send_replication_response(
1937            source,
1938            p2p_node,
1939            request_id,
1940            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1941                key: offer.key,
1942                reason: format!(
1943                    "Content address mismatch: expected {}, computed {}",
1944                    hex::encode(offer.key),
1945                    hex::encode(computed_key),
1946                ),
1947            }),
1948            rr_message_id,
1949        )
1950        .await;
1951        return Ok(());
1952    }
1953
1954    // Rule 7: check storage admission. Fresh chunk receivers accept the close
1955    // group plus a small margin to absorb local routing-table disagreement.
1956    if !admission::is_responsible(
1957        &self_id,
1958        &offer.key,
1959        p2p_node,
1960        storage_admission_width(config.close_group_size),
1961    )
1962    .await
1963    {
1964        send_replication_response(
1965            source,
1966            p2p_node,
1967            request_id,
1968            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1969                key: offer.key,
1970                reason: "Not in storage-admission range for this key".to_string(),
1971            }),
1972            rr_message_id,
1973        )
1974        .await;
1975        return Ok(());
1976    }
1977
1978    // Disk-space pre-check — mirror the PUT handler (V2-411). A full node can
1979    // never store this record, so reject it before the expensive payment
1980    // verification (EVM on-chain query / merkle pool work) rather than verifying
1981    // and only then failing at `storage.put` below. Reuses the cached capacity
1982    // check (passing results only, so freed space is detected promptly), and the
1983    // store path keeps its own check as defence-in-depth.
1984    if let Err(e) = storage.check_capacity() {
1985        info!(
1986            target: "ant_node::storage::disk_precheck",
1987            key = %hex::encode(offer.key),
1988            "Rejecting fresh replication offer before payment verification: {e}"
1989        );
1990        send_replication_response(
1991            source,
1992            p2p_node,
1993            request_id,
1994            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1995                key: offer.key,
1996                reason: e.to_string(),
1997            }),
1998            rr_message_id,
1999        )
2000        .await;
2001        return Ok(());
2002    }
2003
2004    // Gap 1: Validate PoP via PaymentVerifier. Fresh replication is still
2005    // part of the immediate write fan-out: this receiver is about to store the
2006    // record as if the client had PUT it here directly. Storage admission
2007    // was checked above before proof work. ClientPut verification applies
2008    // store-strength cache semantics, paid-quote issuer K-closeness and local
2009    // price floor checks for single-node proofs, and merkle candidate
2010    // closeness for merkle proofs.
2011    match payment_verifier
2012        .verify_payment(
2013            &offer.key,
2014            Some(&offer.proof_of_payment),
2015            fresh_offer_payment_context(),
2016        )
2017        .await
2018    {
2019        Ok(status) if status.can_store() => {
2020            debug!(
2021                "PoP validated for fresh offer key {}",
2022                hex::encode(offer.key)
2023            );
2024        }
2025        Ok(_) => {
2026            send_replication_response(
2027                source,
2028                p2p_node,
2029                request_id,
2030                ReplicationMessageBody::FreshReplicationResponse(
2031                    FreshReplicationResponse::Rejected {
2032                        key: offer.key,
2033                        reason: "Payment verification failed: payment required".to_string(),
2034                    },
2035                ),
2036                rr_message_id,
2037            )
2038            .await;
2039            return Ok(());
2040        }
2041        Err(e) => {
2042            warn!(
2043                "PoP verification error for key {}: {e}",
2044                hex::encode(offer.key)
2045            );
2046            send_replication_response(
2047                source,
2048                p2p_node,
2049                request_id,
2050                ReplicationMessageBody::FreshReplicationResponse(
2051                    FreshReplicationResponse::Rejected {
2052                        key: offer.key,
2053                        reason: format!("Payment verification error: {e}"),
2054                    },
2055                ),
2056                rr_message_id,
2057            )
2058            .await;
2059            return Ok(());
2060        }
2061    }
2062
2063    // Rule 6: add to PaidForList.
2064    if let Err(e) = paid_list.insert(&offer.key).await {
2065        warn!("Failed to add key to PaidForList: {e}");
2066    }
2067
2068    // Store the record.
2069    match storage.put(&offer.key, &offer.data).await {
2070        Ok(_) => {
2071            send_replication_response(
2072                source,
2073                p2p_node,
2074                request_id,
2075                ReplicationMessageBody::FreshReplicationResponse(
2076                    FreshReplicationResponse::Accepted { key: offer.key },
2077                ),
2078                rr_message_id,
2079            )
2080            .await;
2081        }
2082        Err(e) => {
2083            send_replication_response(
2084                source,
2085                p2p_node,
2086                request_id,
2087                ReplicationMessageBody::FreshReplicationResponse(
2088                    FreshReplicationResponse::Rejected {
2089                        key: offer.key,
2090                        reason: e.to_string(),
2091                    },
2092                ),
2093                rr_message_id,
2094            )
2095            .await;
2096        }
2097    }
2098
2099    Ok(())
2100}
2101
2102async fn handle_paid_notify(
2103    _source: &PeerId,
2104    notify: &protocol::PaidNotify,
2105    paid_list: &Arc<PaidList>,
2106    payment_verifier: &Arc<PaymentVerifier>,
2107    p2p_node: &Arc<P2PNode>,
2108    config: &ReplicationConfig,
2109) -> Result<()> {
2110    let self_id = *p2p_node.peer_id();
2111
2112    // Rule 3: validate PoP presence before adding.
2113    if notify.proof_of_payment.is_empty() {
2114        return Ok(());
2115    }
2116
2117    // Check if we're in PaidCloseGroup for this key.
2118    if !admission::is_in_paid_close_group(
2119        &self_id,
2120        &notify.key,
2121        p2p_node,
2122        config.paid_list_close_group_size,
2123    )
2124    .await
2125    {
2126        return Ok(());
2127    }
2128
2129    // Gap 1: Validate PoP via PaymentVerifier. PaidNotify admits fresh
2130    // paid-list metadata, so local paid-list close-group membership was checked
2131    // above before proof work. The verifier then runs the same payment proof
2132    // checks as ClientPut while writing a paid-list-strength cache entry.
2133    match payment_verifier
2134        .verify_payment(
2135            &notify.key,
2136            Some(&notify.proof_of_payment),
2137            paid_notify_payment_context(),
2138        )
2139        .await
2140    {
2141        Ok(status) if status.can_store() => {
2142            debug!(
2143                "PoP validated for paid notify key {}",
2144                hex::encode(notify.key)
2145            );
2146        }
2147        Ok(_) => {
2148            warn!(
2149                "Paid notify rejected: payment required for key {}",
2150                hex::encode(notify.key)
2151            );
2152            return Ok(());
2153        }
2154        Err(e) => {
2155            warn!(
2156                "PoP verification error for paid notify key {}: {e}",
2157                hex::encode(notify.key)
2158            );
2159            return Ok(());
2160        }
2161    }
2162
2163    if let Err(e) = paid_list.insert(&notify.key).await {
2164        warn!("Failed to add paid notify key to PaidForList: {e}");
2165    }
2166
2167    Ok(())
2168}
2169
2170#[allow(clippy::too_many_arguments)]
2171async fn handle_neighbor_sync_request(
2172    source: &PeerId,
2173    request: &protocol::NeighborSyncRequest,
2174    p2p_node: &Arc<P2PNode>,
2175    storage: &Arc<LmdbStorage>,
2176    paid_list: &Arc<PaidList>,
2177    queues: &Arc<RwLock<ReplicationQueues>>,
2178    config: &ReplicationConfig,
2179    is_bootstrapping: bool,
2180    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2181    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2182    sync_cycle_epoch: &Arc<RwLock<u64>>,
2183    repair_proofs: &Arc<RwLock<RepairProofs>>,
2184    my_commitment: Option<StorageCommitment>,
2185    request_id: u64,
2186    rr_message_id: Option<&str>,
2187) -> Result<()> {
2188    let self_id = *p2p_node.peer_id();
2189
2190    // No per-request hint count limit: the wire message size limit
2191    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
2192    // challenges, sync hints don't drive expensive computation — they just
2193    // enter the verification queue. A per-request limit here would break
2194    // bootstrap replication for newly-joined nodes with 0 stored chunks.
2195
2196    // Build response (outbound hints).
2197    let (response, sent_replica_hints, sender_in_rt) =
2198        neighbor_sync::handle_sync_request_with_proofs(
2199            source,
2200            request,
2201            p2p_node,
2202            storage,
2203            paid_list,
2204            config,
2205            is_bootstrapping,
2206            my_commitment.clone(),
2207        )
2208        .await;
2209
2210    // Send response.
2211    let response_sent = send_replication_response_checked(
2212        source,
2213        p2p_node,
2214        request_id,
2215        ReplicationMessageBody::NeighborSyncResponse(response),
2216        rr_message_id,
2217    )
2218    .await;
2219
2220    // Process inbound hints only if sender is in LocalRT (Rule 4-6).
2221    if !sender_in_rt {
2222        return Ok(());
2223    }
2224
2225    // Update sync history for this peer before recording repair proofs so a
2226    // same-tick audit cannot combine a fresh key proof with stale peer maturity.
2227    {
2228        let mut history = sync_history.write().await;
2229        let record = history.entry(*source).or_insert(PeerSyncRecord {
2230            last_sync: None,
2231            cycles_since_sync: 0,
2232        });
2233        record.last_sync = Some(Instant::now());
2234        record.cycles_since_sync = 0;
2235    }
2236
2237    if response_sent && !request.bootstrapping {
2238        record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
2239            .await;
2240    }
2241
2242    // Admit inbound hints and queue for verification.
2243    let outcome = admit_and_queue_hints(
2244        &self_id,
2245        source,
2246        &request.replica_hints,
2247        &request.paid_hints,
2248        p2p_node,
2249        config,
2250        storage,
2251        paid_list,
2252        queues,
2253    )
2254    .await;
2255
2256    // Track discovered keys for bootstrap drain detection so that hints
2257    // admitted via inbound sync requests are not missed. Capacity-rejected
2258    // hints keep this source on the "not yet drained" list until its next
2259    // sync re-admits them; a clean cycle clears the source.
2260    if is_bootstrapping {
2261        if !outcome.discovered.is_empty() {
2262            bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2263        }
2264        if outcome.capacity_rejected_count > 0 {
2265            bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
2266        } else {
2267            bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
2268        }
2269    }
2270
2271    Ok(())
2272}
2273
2274async fn handle_verification_request(
2275    source: &PeerId,
2276    request: &protocol::VerificationRequest,
2277    storage: &Arc<LmdbStorage>,
2278    paid_list: &Arc<PaidList>,
2279    p2p_node: &Arc<P2PNode>,
2280    request_id: u64,
2281    rr_message_id: Option<&str>,
2282) -> Result<()> {
2283    // No per-request key count limit: the wire message size limit
2284    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
2285    // does cheap storage lookups per key, not expensive computation like
2286    // audit digest generation.
2287
2288    #[allow(clippy::cast_possible_truncation)]
2289    let keys_len = request.keys.len() as u32;
2290    let paid_check_set: HashSet<u32> = request
2291        .paid_list_check_indices
2292        .iter()
2293        .copied()
2294        .filter(|&idx| {
2295            if idx >= keys_len {
2296                warn!(
2297                    "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
2298                    request.keys.len(),
2299                );
2300                false
2301            } else {
2302                true
2303            }
2304        })
2305        .collect();
2306
2307    let mut results = Vec::with_capacity(request.keys.len());
2308    for (i, key) in request.keys.iter().enumerate() {
2309        let present = storage.exists(key).unwrap_or(false);
2310        let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
2311            Some(paid_list.contains(key).unwrap_or(false))
2312        } else {
2313            None
2314        };
2315        results.push(protocol::KeyVerificationResult {
2316            key: *key,
2317            present,
2318            paid,
2319        });
2320    }
2321
2322    send_replication_response(
2323        source,
2324        p2p_node,
2325        request_id,
2326        ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
2327        rr_message_id,
2328    )
2329    .await;
2330
2331    Ok(())
2332}
2333
2334async fn handle_fetch_request(
2335    source: &PeerId,
2336    request: &protocol::FetchRequest,
2337    storage: &Arc<LmdbStorage>,
2338    p2p_node: &Arc<P2PNode>,
2339    request_id: u64,
2340    rr_message_id: Option<&str>,
2341) -> Result<()> {
2342    let response = match storage.get(&request.key).await {
2343        Ok(Some(data)) => protocol::FetchResponse::Success {
2344            key: request.key,
2345            data,
2346        },
2347        Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
2348        Err(e) => protocol::FetchResponse::Error {
2349            key: request.key,
2350            reason: format!("{e}"),
2351        },
2352    };
2353
2354    send_replication_response(
2355        source,
2356        p2p_node,
2357        request_id,
2358        ReplicationMessageBody::FetchResponse(response),
2359        rr_message_id,
2360    )
2361    .await;
2362
2363    Ok(())
2364}
2365
2366/// Responder for an incoming `AuditChallenge` (responsible-chunk audit #2, and
2367/// the prune-confirmation audit, which reuses the same wire message): reply with
2368/// per-key possession digests.
2369async fn handle_audit_challenge_msg(
2370    source: &PeerId,
2371    challenge: &protocol::AuditChallenge,
2372    storage: &Arc<LmdbStorage>,
2373    p2p_node: &Arc<P2PNode>,
2374    is_bootstrapping: bool,
2375    request_id: u64,
2376    rr_message_id: Option<&str>,
2377) -> Result<()> {
2378    #[allow(clippy::cast_possible_truncation)]
2379    let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
2380    info!(
2381        "Audit challenge received: kind=responsible keys={} bootstrapping={} request_response={}",
2382        challenge.keys.len(),
2383        is_bootstrapping,
2384        rr_message_id.is_some(),
2385    );
2386
2387    let response = audit::handle_audit_challenge(
2388        challenge,
2389        storage,
2390        p2p_node.peer_id(),
2391        is_bootstrapping,
2392        stored_chunks,
2393    )
2394    .await;
2395    let response_kind = audit_response_kind(&response);
2396
2397    let sent = send_replication_response_checked(
2398        source,
2399        p2p_node,
2400        request_id,
2401        ReplicationMessageBody::AuditResponse(response),
2402        rr_message_id,
2403    )
2404    .await;
2405    if sent {
2406        info!(
2407            "Audit challenge reply sent: kind=responsible response={} keys={} request_response={}",
2408            response_kind,
2409            challenge.keys.len(),
2410            rr_message_id.is_some(),
2411        );
2412    } else {
2413        warn!(
2414            "Audit challenge reply not sent: kind=responsible response={} keys={} request_response={}",
2415            response_kind,
2416            challenge.keys.len(),
2417            rr_message_id.is_some(),
2418        );
2419    }
2420
2421    Ok(())
2422}
2423
2424fn audit_response_kind(response: &protocol::AuditResponse) -> &'static str {
2425    match response {
2426        protocol::AuditResponse::Digests { .. } => "digests",
2427        protocol::AuditResponse::Bootstrapping { .. } => "bootstrapping",
2428        protocol::AuditResponse::Rejected { .. } => "rejected",
2429    }
2430}
2431
2432fn subtree_audit_response_kind(response: &protocol::SubtreeAuditResponse) -> &'static str {
2433    match response {
2434        protocol::SubtreeAuditResponse::Proof { .. } => "proof",
2435        protocol::SubtreeAuditResponse::Bootstrapping { .. } => "bootstrapping",
2436        protocol::SubtreeAuditResponse::Rejected { .. } => "rejected",
2437    }
2438}
2439
2440fn subtree_byte_response_kind(response: &protocol::SubtreeByteResponse) -> &'static str {
2441    match response {
2442        protocol::SubtreeByteResponse::Items { .. } => "items",
2443        protocol::SubtreeByteResponse::Bootstrapping { .. } => "bootstrapping",
2444        protocol::SubtreeByteResponse::Rejected { .. } => "rejected",
2445    }
2446}
2447
2448// ---------------------------------------------------------------------------
2449// Message sending helper
2450// ---------------------------------------------------------------------------
2451
2452/// Send a replication response message as a best-effort reply.
2453///
2454/// Encode and send failures are logged by the checked helper. Most response
2455/// paths do not need to branch on send success, so this wrapper keeps those
2456/// call sites explicit about their best-effort behavior.
2457async fn send_replication_response(
2458    peer: &PeerId,
2459    p2p_node: &Arc<P2PNode>,
2460    request_id: u64,
2461    body: ReplicationMessageBody,
2462    rr_message_id: Option<&str>,
2463) {
2464    let _ =
2465        send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
2466}
2467
2468/// Send a replication response message and report whether it was accepted.
2469///
2470/// Returns `true` after the message is encoded and accepted by the P2P send
2471/// path. Returns `false` after logging an encode or send failure. Repair-proof
2472/// recording uses this to avoid trusting hints that were not actually sent.
2473///
2474/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
2475/// request-response path so saorsa-core can route it back to the caller's
2476/// `send_request` future. Otherwise it is sent as a plain message.
2477async fn send_replication_response_checked(
2478    peer: &PeerId,
2479    p2p_node: &Arc<P2PNode>,
2480    request_id: u64,
2481    body: ReplicationMessageBody,
2482    rr_message_id: Option<&str>,
2483) -> bool {
2484    let msg = ReplicationMessage { request_id, body };
2485    let encoded = match msg.encode() {
2486        Ok(data) => data,
2487        Err(e) => {
2488            warn!("Failed to encode replication response: {e}");
2489            return false;
2490        }
2491    };
2492    let result = if let Some(msg_id) = rr_message_id {
2493        p2p_node
2494            .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
2495            .await
2496    } else {
2497        p2p_node
2498            .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
2499            .await
2500    };
2501    if let Err(e) = result {
2502        debug!("Failed to send replication response to {peer}: {e}");
2503        return false;
2504    }
2505    true
2506}
2507
2508async fn record_sent_replica_hints(
2509    peer: &PeerId,
2510    hints: &[neighbor_sync::SentReplicaHint],
2511    repair_proofs: &Arc<RwLock<RepairProofs>>,
2512    sync_cycle_epoch: &Arc<RwLock<u64>>,
2513) {
2514    if hints.is_empty() {
2515        return;
2516    }
2517
2518    let hinted_at_epoch = *sync_cycle_epoch.read().await;
2519    let mut proofs = repair_proofs.write().await;
2520    for hint in hints {
2521        if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
2522            debug!(
2523                "Recorded repair hint proof for peer {peer} and key {}",
2524                hex::encode(hint.key)
2525            );
2526        }
2527    }
2528}
2529
2530// ---------------------------------------------------------------------------
2531// Neighbor sync round
2532// ---------------------------------------------------------------------------
2533
2534/// Run one neighbor sync round.
2535#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2536async fn run_neighbor_sync_round(
2537    p2p_node: &Arc<P2PNode>,
2538    storage: &Arc<LmdbStorage>,
2539    paid_list: &Arc<PaidList>,
2540    queues: &Arc<RwLock<ReplicationQueues>>,
2541    config: &ReplicationConfig,
2542    sync_state: &Arc<RwLock<NeighborSyncState>>,
2543    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2544    sync_cycle_epoch: &Arc<RwLock<u64>>,
2545    repair_proofs: &Arc<RwLock<RepairProofs>>,
2546    is_bootstrapping: &Arc<RwLock<bool>>,
2547    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2548    commitment_state: &Arc<ResponderCommitmentState>,
2549    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
2550    ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
2551    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
2552    gossip_audit: &GossipAuditTrigger,
2553) {
2554    let self_id = *p2p_node.peer_id();
2555    let bootstrapping = *is_bootstrapping.read().await;
2556
2557    // Check if cycle is complete; start new one if needed.
2558    // We check under a read lock, then release it before the expensive
2559    // prune pass and DHT snapshot so other tasks are not starved.
2560    let cycle_complete = sync_state.read().await.is_cycle_complete();
2561    if cycle_complete {
2562        // A completed local neighbor-sync cycle advances the epoch component
2563        // of repair-proof maturity. The per-key wall-clock minimum age is
2564        // checked when audits are selected.
2565        {
2566            let mut history = sync_history.write().await;
2567            for record in history.values_mut() {
2568                record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
2569            }
2570        }
2571        let current_sync_epoch = {
2572            let mut epoch = sync_cycle_epoch.write().await;
2573            *epoch = epoch.saturating_add(1);
2574            *epoch
2575        };
2576
2577        // Post-cycle pruning (Section 11) — runs without holding sync_state.
2578        // Remote prune-confirmation audits are storage-proof audits and only
2579        // run after bootstrap has drained.
2580        let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
2581        pruning::run_prune_pass_with_context(pruning::PrunePassContext {
2582            self_id: &self_id,
2583            storage,
2584            paid_list,
2585            p2p_node,
2586            config,
2587            sync_state,
2588            repair_proofs,
2589            current_sync_epoch,
2590            #[cfg(any(test, feature = "test-utils"))]
2591            repair_proof_now: None,
2592            allow_remote_prune_audits,
2593            commitment_state: Some(commitment_state),
2594        })
2595        .await;
2596
2597        // Take fresh close-neighbor snapshot (DHT query, no lock held).
2598        let neighbors =
2599            neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
2600                .await;
2601
2602        // Now re-acquire write lock and re-check before swapping cycle.
2603        let mut state = sync_state.write().await;
2604        if state.is_cycle_complete() {
2605            // Preserve cooldown and bootstrap-claim tracking across cycles.
2606            // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
2607            // would reset the abuse detection timer every cycle.
2608            let old_sync_times = std::mem::take(&mut state.last_sync_times);
2609            let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
2610            let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
2611            let old_prune_cursor = state.prune_cursor;
2612            *state = NeighborSyncState::new_cycle(neighbors);
2613            state.last_sync_times = old_sync_times;
2614            state.bootstrap_claims = old_bootstrap_claims;
2615            state.bootstrap_claim_history = old_bootstrap_claim_history;
2616            state.prune_cursor = old_prune_cursor;
2617        }
2618    }
2619
2620    // Select batch of peers.
2621    let batch = {
2622        let mut state = sync_state.write().await;
2623        neighbor_sync::select_sync_batch(
2624            &mut state,
2625            config.neighbor_sync_peer_count,
2626            config.neighbor_sync_cooldown,
2627        )
2628    };
2629
2630    if batch.is_empty() {
2631        return;
2632    }
2633
2634    debug!("Neighbor sync: syncing with {} peers", batch.len());
2635
2636    // Snapshot our current commitment once per round so all peers in
2637    // this batch see the same thing (gossip is the responder's attestation;
2638    // same value across the batch is fine and reduces RwLock churn). Atomically
2639    // snapshot + mark-gossiped so we stay answerable for exactly what we emit
2640    // (ADR-0002 retention), with no TOCTOU vs a concurrent retire/rotate.
2641    let my_commitment = commitment_state
2642        .current_for_gossip()
2643        .map(|b| b.commitment().clone());
2644
2645    let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
2646        &batch,
2647        storage,
2648        paid_list,
2649        p2p_node,
2650        config.close_group_size,
2651        config.paid_list_close_group_size,
2652    )
2653    .await;
2654
2655    // Sync with each peer in the batch.
2656    for peer in &batch {
2657        let hints = hints_by_peer.remove(peer).unwrap_or_default();
2658        let outcome = neighbor_sync::sync_with_peer_with_hints(
2659            peer,
2660            p2p_node,
2661            config,
2662            bootstrapping,
2663            hints,
2664            my_commitment.clone(),
2665        )
2666        .await;
2667
2668        if let Some(outcome) = outcome {
2669            handle_sync_response(
2670                &self_id,
2671                peer,
2672                &outcome.response,
2673                &outcome.sent_replica_hints,
2674                p2p_node,
2675                config,
2676                bootstrapping,
2677                bootstrap_state,
2678                storage,
2679                paid_list,
2680                queues,
2681                sync_state,
2682                sync_history,
2683                sync_cycle_epoch,
2684                repair_proofs,
2685                last_commitment_by_peer,
2686                ever_capable_peers,
2687                sig_verify_attempts,
2688                gossip_audit,
2689            )
2690            .await;
2691        } else {
2692            // Sync failed -- remove peer and try to fill slot.
2693            let replacement = {
2694                let mut state = sync_state.write().await;
2695                neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
2696            };
2697
2698            // Attempt sync with the replacement peer (if one was found).
2699            if let Some(replacement_peer) = replacement {
2700                let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers(
2701                    std::slice::from_ref(&replacement_peer),
2702                    storage,
2703                    paid_list,
2704                    p2p_node,
2705                    config.close_group_size,
2706                    config.paid_list_close_group_size,
2707                )
2708                .await;
2709                let hints = replacement_hints
2710                    .remove(&replacement_peer)
2711                    .unwrap_or_default();
2712                let replacement_outcome = neighbor_sync::sync_with_peer_with_hints(
2713                    &replacement_peer,
2714                    p2p_node,
2715                    config,
2716                    bootstrapping,
2717                    hints,
2718                    my_commitment.clone(),
2719                )
2720                .await;
2721
2722                if let Some(outcome) = replacement_outcome {
2723                    handle_sync_response(
2724                        &self_id,
2725                        &replacement_peer,
2726                        &outcome.response,
2727                        &outcome.sent_replica_hints,
2728                        p2p_node,
2729                        config,
2730                        bootstrapping,
2731                        bootstrap_state,
2732                        storage,
2733                        paid_list,
2734                        queues,
2735                        sync_state,
2736                        sync_history,
2737                        sync_cycle_epoch,
2738                        repair_proofs,
2739                        last_commitment_by_peer,
2740                        ever_capable_peers,
2741                        sig_verify_attempts,
2742                        gossip_audit,
2743                    )
2744                    .await;
2745                }
2746            }
2747        }
2748    }
2749}
2750
2751/// Process a successful neighbor sync response: record the sync, check for
2752/// bootstrap claim abuse, and admit inbound hints.
2753#[allow(clippy::too_many_arguments)]
2754async fn handle_sync_response(
2755    self_id: &PeerId,
2756    peer: &PeerId,
2757    resp: &NeighborSyncResponse,
2758    sent_replica_hints: &[neighbor_sync::SentReplicaHint],
2759    p2p_node: &Arc<P2PNode>,
2760    config: &ReplicationConfig,
2761    bootstrapping: bool,
2762    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2763    storage: &Arc<LmdbStorage>,
2764    paid_list: &Arc<PaidList>,
2765    queues: &Arc<RwLock<ReplicationQueues>>,
2766    sync_state: &Arc<RwLock<NeighborSyncState>>,
2767    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2768    sync_cycle_epoch: &Arc<RwLock<u64>>,
2769    repair_proofs: &Arc<RwLock<RepairProofs>>,
2770    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
2771    ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
2772    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
2773    gossip_audit: &GossipAuditTrigger,
2774) {
2775    // Ingest the peer's commitment if they piggybacked one on the response.
2776    // Same verification as the request path (peer-id binding + signature);
2777    // forged commitments are dropped at the edge. A *changed* commitment here
2778    // is a gossip-audit trigger just like on the request path — so a peer that
2779    // only ever answers sync (never initiates) is still audited (ADR-0002).
2780    if let Some(target) = ingest_peer_commitment(
2781        peer,
2782        resp.commitment.as_ref(),
2783        p2p_node,
2784        last_commitment_by_peer,
2785        ever_capable_peers,
2786        sig_verify_attempts,
2787    )
2788    .await
2789    {
2790        maybe_trigger_gossip_audit(gossip_audit, peer, target).await;
2791    }
2792
2793    // Record successful sync.
2794    {
2795        let mut state = sync_state.write().await;
2796        neighbor_sync::record_successful_sync(&mut state, peer);
2797    }
2798    {
2799        let mut history = sync_history.write().await;
2800        let record = history.entry(*peer).or_insert(PeerSyncRecord {
2801            last_sync: None,
2802            cycles_since_sync: 0,
2803        });
2804        record.last_sync = Some(Instant::now());
2805        record.cycles_since_sync = 0;
2806    }
2807
2808    // Process inbound hints from response (skip if peer is bootstrapping).
2809    if resp.bootstrapping {
2810        // Gap 6: BootstrapClaimAbuse grace period enforcement.
2811        // Separate state mutation from network I/O to avoid holding the
2812        // write lock across report_trust_event.
2813        let should_report = {
2814            let now = Instant::now();
2815            let mut state = sync_state.write().await;
2816            match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
2817                BootstrapClaimObservation::WithinGrace { .. } => false,
2818                BootstrapClaimObservation::PastGrace { first_seen } => {
2819                    warn!(
2820                        "Peer {peer} has been claiming bootstrap for {:?}, \
2821                         exceeding grace period of {:?} — reporting abuse",
2822                        now.duration_since(first_seen),
2823                        config.bootstrap_claim_grace_period,
2824                    );
2825                    true
2826                }
2827                BootstrapClaimObservation::Repeated { first_seen } => {
2828                    warn!(
2829                        "Peer {peer} repeated bootstrap claim after previously stopping; \
2830                         first claim was {:?} ago — reporting abuse",
2831                        now.duration_since(first_seen),
2832                    );
2833                    true
2834                }
2835            }
2836        };
2837        if should_report {
2838            p2p_node
2839                .report_trust_event(
2840                    peer,
2841                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2842                )
2843                .await;
2844        }
2845    } else {
2846        // Peer is not claiming bootstrap; clear active claim while retaining
2847        // history so the peer cannot start a second grace window later.
2848        {
2849            let mut state = sync_state.write().await;
2850            state.clear_active_bootstrap_claim(peer);
2851        }
2852        record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
2853        let outcome = admit_and_queue_hints(
2854            self_id,
2855            peer,
2856            &resp.replica_hints,
2857            &resp.paid_hints,
2858            p2p_node,
2859            config,
2860            storage,
2861            paid_list,
2862            queues,
2863        )
2864        .await;
2865
2866        // Track discovered keys for bootstrap drain detection so that hints
2867        // admitted via regular neighbor sync are not missed. Capacity-
2868        // rejected hints keep this source on the "not yet drained" list
2869        // until its next sync replays them; a clean cycle clears it.
2870        if bootstrapping {
2871            if !outcome.discovered.is_empty() {
2872                bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2873            }
2874            if outcome.capacity_rejected_count > 0 {
2875                bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
2876            } else {
2877                bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
2878            }
2879        }
2880    }
2881}
2882
2883/// Admit hints and queue them for verification, returning newly-discovered keys.
2884///
2885/// Shared by neighbor-sync request handling, response handling, and bootstrap
2886/// sync so that admission + queueing logic lives in one place.
2887#[allow(clippy::too_many_arguments)]
2888/// Outcome of [`admit_and_queue_hints`].
2889///
2890/// `capacity_rejected_count` is non-zero when one or more legitimately
2891/// admissible hints were dropped because `pending_verify`'s global or
2892/// per-source bound was hit. Callers that care about completeness
2893/// (bootstrap drain accounting) MUST NOT treat their work as complete while
2894/// this is > 0 — the source will need to re-hint after capacity frees up.
2895struct AdmissionOutcome {
2896    discovered: HashSet<XorName>,
2897    capacity_rejected_count: usize,
2898}
2899
2900#[allow(clippy::too_many_arguments)]
2901async fn admit_and_queue_hints(
2902    self_id: &PeerId,
2903    source_peer: &PeerId,
2904    replica_hints: &[XorName],
2905    paid_hints: &[XorName],
2906    p2p_node: &Arc<P2PNode>,
2907    config: &ReplicationConfig,
2908    storage: &Arc<LmdbStorage>,
2909    paid_list: &Arc<PaidList>,
2910    queues: &Arc<RwLock<ReplicationQueues>>,
2911) -> AdmissionOutcome {
2912    let pending_keys: HashSet<XorName> = {
2913        let q = queues.read().await;
2914        q.pending_keys().into_iter().collect()
2915    };
2916
2917    let admitted = admission::admit_hints(
2918        self_id,
2919        replica_hints,
2920        paid_hints,
2921        p2p_node,
2922        config,
2923        storage,
2924        paid_list,
2925        &pending_keys,
2926    )
2927    .await;
2928
2929    let mut discovered = HashSet::new();
2930    let mut capacity_rejected_count: usize = 0;
2931    let mut q = queues.write().await;
2932    let now = Instant::now();
2933
2934    for key in admitted.replica_keys {
2935        if !storage.exists(&key).unwrap_or(false) {
2936            let result = q.add_pending_verify(
2937                key,
2938                VerificationEntry {
2939                    state: VerificationState::PendingVerify,
2940                    pipeline: HintPipeline::Replica,
2941                    verified_sources: Vec::new(),
2942                    tried_sources: HashSet::new(),
2943                    created_at: now,
2944                    hint_sender: *source_peer,
2945                },
2946            );
2947            match result {
2948                crate::replication::scheduling::AdmissionResult::Admitted => {
2949                    discovered.insert(key);
2950                }
2951                crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2952                crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2953                    capacity_rejected_count += 1;
2954                }
2955            }
2956        }
2957    }
2958
2959    for key in admitted.paid_only_keys {
2960        let result = q.add_pending_verify(
2961            key,
2962            VerificationEntry {
2963                state: VerificationState::PendingVerify,
2964                pipeline: HintPipeline::PaidOnly,
2965                verified_sources: Vec::new(),
2966                tried_sources: HashSet::new(),
2967                created_at: now,
2968                hint_sender: *source_peer,
2969            },
2970        );
2971        match result {
2972            crate::replication::scheduling::AdmissionResult::Admitted => {
2973                discovered.insert(key);
2974            }
2975            crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2976            crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2977                capacity_rejected_count += 1;
2978            }
2979        }
2980    }
2981
2982    if capacity_rejected_count > 0 {
2983        debug!(
2984            "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
2985             rejected at queue capacity; source will need to re-hint after pending_verify drains"
2986        );
2987    }
2988
2989    AdmissionOutcome {
2990        discovered,
2991        capacity_rejected_count,
2992    }
2993}
2994
2995// ---------------------------------------------------------------------------
2996// Verification cycle
2997// ---------------------------------------------------------------------------
2998
2999/// Run one verification cycle: process pending keys through quorum checks.
3000#[allow(clippy::too_many_lines)]
3001async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
3002    let cycle_started = Instant::now();
3003    let VerificationCycleContext {
3004        p2p_node,
3005        paid_list,
3006        storage,
3007        queues,
3008        config,
3009        bootstrap_state,
3010        is_bootstrapping,
3011        bootstrap_complete_notify,
3012        last_commitment_by_peer,
3013        ever_capable_peers,
3014        recent_provers,
3015    } = ctx;
3016
3017    // Evict stale entries that have been pending too long (e.g. unreachable
3018    // verification targets during a network partition).
3019    {
3020        let mut q = queues.write().await;
3021        q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
3022    }
3023
3024    let pending_keys = {
3025        let q = queues.read().await;
3026        q.pending_keys()
3027    };
3028
3029    if pending_keys.is_empty() {
3030        return;
3031    }
3032    let initial_pending_count = pending_keys.len();
3033
3034    let self_id = *p2p_node.peer_id();
3035
3036    // Step 1: Check local PaidForList for fast-path authorization (Section 9,
3037    // step 4).
3038    let mut local_paid_presence_probe_keys = Vec::new();
3039    let mut local_paid_paid_only_keys = Vec::new();
3040    let mut keys_needing_network = Vec::new();
3041    let mut terminal_keys: Vec<XorName> = Vec::new();
3042    {
3043        let mut q = queues.write().await;
3044        for key in &pending_keys {
3045            if paid_list.contains(key).unwrap_or(false) {
3046                if let Some(pipeline) =
3047                    q.set_pending_state(key, VerificationState::PaidListVerified)
3048                {
3049                    match pipeline {
3050                        HintPipeline::PaidOnly => {
3051                            // Paid-only + local paid state needs one more
3052                            // storage-admission check outside this lock: if we
3053                            // are also in the close group plus storage margin,
3054                            // the hint can repair a missing replica.
3055                            local_paid_paid_only_keys.push(*key);
3056                        }
3057                        HintPipeline::Replica => {
3058                            // Local paid-list membership authorizes the key.
3059                            // We still need a presence probe to discover fetch
3060                            // sources, but we must not require remote paid
3061                            // majority or presence quorum.
3062                            local_paid_presence_probe_keys.push(*key);
3063                        }
3064                    }
3065                }
3066            } else {
3067                keys_needing_network.push(*key);
3068            }
3069        }
3070    }
3071
3072    if !local_paid_paid_only_keys.is_empty() {
3073        let mut terminal_paid_only = Vec::new();
3074        for key in local_paid_paid_only_keys {
3075            if storage.exists(&key).unwrap_or(false) {
3076                terminal_paid_only.push(key);
3077            } else if admission::is_responsible(
3078                &self_id,
3079                &key,
3080                p2p_node,
3081                storage_admission_width(config.close_group_size),
3082            )
3083            .await
3084            {
3085                local_paid_presence_probe_keys.push(key);
3086            } else {
3087                terminal_paid_only.push(key);
3088            }
3089        }
3090
3091        if !terminal_paid_only.is_empty() {
3092            let mut q = queues.write().await;
3093            for key in terminal_paid_only {
3094                q.remove_pending(&key);
3095                terminal_keys.push(key);
3096            }
3097        }
3098    }
3099
3100    let local_paid_probe_count = local_paid_presence_probe_keys.len();
3101    let keys_needing_network_count = keys_needing_network.len();
3102
3103    // Step 1b: Local paid-list hit for fetch-eligible keys. Per Section 9
3104    // step 4, authorization succeeds immediately; run a presence-only probe
3105    // to find any holder we can fetch from.
3106    if !local_paid_presence_probe_keys.is_empty() {
3107        let targets = quorum::compute_presence_targets(
3108            &local_paid_presence_probe_keys,
3109            p2p_node,
3110            config,
3111            &self_id,
3112        )
3113        .await;
3114        let evidence = quorum::run_verification_round(
3115            &local_paid_presence_probe_keys,
3116            &targets,
3117            p2p_node,
3118            config,
3119        )
3120        .await;
3121
3122        let mut q = queues.write().await;
3123        for key in local_paid_presence_probe_keys {
3124            if storage.exists(&key).unwrap_or(false) {
3125                q.remove_pending(&key);
3126                terminal_keys.push(key);
3127                continue;
3128            }
3129            let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
3130                quorum::present_sources_for_key(&key, ev, &targets)
3131            });
3132            if sources.is_empty() {
3133                // Terminal failure: remove pending and report. No fetch path.
3134                q.remove_pending(&key);
3135                warn!(
3136                    "Locally paid key {} has no responding holders (possible data loss)",
3137                    hex::encode(key)
3138                );
3139                terminal_keys.push(key);
3140            } else {
3141                let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
3142                // Atomic remove+enqueue: if fetch_queue is at capacity, the
3143                // pending entry is preserved and retried next cycle (no
3144                // silent drop of verified replica-repair work).
3145                let _ = q.promote_pending_to_fetch(key, distance, sources);
3146            }
3147        }
3148    }
3149
3150    // Steps 2-5: Network verification (skipped if all keys resolved locally).
3151    if !keys_needing_network.is_empty() {
3152        // Step 2: Compute targets and run network verification round.
3153        let targets =
3154            quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
3155                .await;
3156
3157        let evidence =
3158            quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
3159
3160        // Step 3: Evaluate results — collect outcomes without holding the write
3161        // lock across paid-list I/O.
3162        //
3163        // v12 §6 holder-eligibility: snapshot the per-peer last-commitment
3164        // table and recent_provers cache up front so the synchronous
3165        // evaluate_key_evidence_with_holder_check predicate can consult
3166        // them without awaiting. The predicate downgrades a Present
3167        // claim to Unresolved unless the peer is credited for that key.
3168        // Snapshot per-peer commitment data. We need two views:
3169        //   - `commitment_by_peer_snapshot`: peers that currently have
3170        //     a verified commitment record on file (used to look up
3171        //     their current hash).
3172        //   - `capable_peer_snapshot`: the sticky "ever v12-capable"
3173        //     set. Sourced from a separate set rather than the
3174        //     commitment map so eviction (PeerRemoved cleanup, sybil
3175        //     cap at `MAX_LAST_COMMITMENT_BY_PEER`) does NOT downgrade
3176        //     a previously-v12 peer to "legacy" credit-unconditionally.
3177        //     Legacy / pre-v12 peers that have never sent a commitment
3178        //     remain absent from the set and are credited via the
3179        //     legacy path so mixed-version networks stay live.
3180        let commitment_by_peer_snapshot: HashMap<PeerId, [u8; 32]> = {
3181            let map = last_commitment_by_peer.read().await;
3182            map.iter()
3183                // Read the CACHED hash (§13) — no per-cycle re-serialize/re-hash
3184                // of every peer's ~5 KiB commitment.
3185                .filter_map(|(p, rec)| rec.commitment_hash().map(|h| (*p, h)))
3186                .collect()
3187        };
3188        let capable_peer_snapshot: HashSet<PeerId> = ever_capable_peers.read().await.clone();
3189        // Take a full snapshot of recent_provers under the read lock,
3190        // then release. The cache is bounded (16/key × keys), so the
3191        // clone is cheap.
3192        let provers_snapshot = recent_provers.read().await.clone();
3193        // For the replica-fetch path, we need to know whether THIS
3194        // node already holds the key being verified. The v12 §6
3195        // holder-credit gate is meant to prevent uncredited Present
3196        // claims from contributing to paid-list / reward quorum for
3197        // keys we DO hold (and could audit ourselves). For keys we
3198        // are trying to FETCH (i.e. not in local storage), there is
3199        // no possible local audit credit, and gating the presence
3200        // quorum on credit would deadlock replica-repair in a
3201        // fully v12-capable close group.
3202        let mut locally_held: HashSet<XorName> = HashSet::new();
3203        for key in &keys_needing_network {
3204            if storage.exists(key).unwrap_or(false) {
3205                locally_held.insert(*key);
3206            }
3207        }
3208        let holder_credit = |peer: &PeerId, key: &XorName| -> bool {
3209            if !locally_held.contains(key) {
3210                // Replica-fetch path: we don't hold this key, so we
3211                // cannot have collected audit credit for it. Trust
3212                // Present claims to drive fetch-source promotion;
3213                // chunk-PUT payment_verifier is the security backstop
3214                // when the bytes actually arrive.
3215                return true;
3216            }
3217            if !capable_peer_snapshot.contains(peer) {
3218                // Pre-v12 / legacy peer that has never gossiped a
3219                // commitment. The v12 §6 holder-eligibility check
3220                // doesn't apply: their Present evidence comes through
3221                // the legacy path and we credit it unconditionally
3222                // so a mixed-version network stays live during
3223                // transition.
3224                return true;
3225            }
3226            let Some(hash) = commitment_by_peer_snapshot.get(peer) else {
3227                // Peer is commitment_capable (sticky) but currently
3228                // has no live commitment record on file (e.g. their
3229                // last gossip was evicted from the LRU cache, or it
3230                // failed verification). Withhold credit until they
3231                // re-prove storage under a fresh commitment.
3232                return false;
3233            };
3234            provers_snapshot.is_credited_holder(key, peer, hash)
3235        };
3236
3237        let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
3238        {
3239            let q = queues.read().await;
3240            for key in &keys_needing_network {
3241                let Some(ev) = evidence.get(key) else {
3242                    continue;
3243                };
3244                let Some(entry) = q.get_pending(key) else {
3245                    continue;
3246                };
3247                let outcome = quorum::evaluate_key_evidence_with_holder_check(
3248                    key,
3249                    ev,
3250                    &targets,
3251                    config,
3252                    holder_credit,
3253                );
3254                evaluated.push((*key, outcome, entry.pipeline));
3255            }
3256        } // read lock released
3257
3258        // Step 4: Insert verified keys into PaidForList (no lock held).
3259        let mut paid_insert_keys: Vec<XorName> = Vec::new();
3260        for (key, outcome, _) in &evaluated {
3261            if matches!(
3262                outcome,
3263                KeyVerificationOutcome::QuorumVerified { .. }
3264                    | KeyVerificationOutcome::PaidListVerified { .. }
3265            ) {
3266                paid_insert_keys.push(*key);
3267            }
3268        }
3269        for key in &paid_insert_keys {
3270            if let Err(e) = paid_list.insert(key).await {
3271                warn!("Failed to add verified key to PaidForList: {e}");
3272            }
3273        }
3274
3275        // Paid-only hints normally update PaidForList only. If this node is
3276        // also within the storage-admission group for the key, a verified
3277        // paid-only hint can safely repair a missing replica using sources
3278        // from the same verification round.
3279        let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
3280        for (key, outcome, pipeline) in &evaluated {
3281            if *pipeline == HintPipeline::PaidOnly
3282                && matches!(
3283                    outcome,
3284                    KeyVerificationOutcome::QuorumVerified { .. }
3285                        | KeyVerificationOutcome::PaidListVerified { .. }
3286                )
3287                && !storage.exists(key).unwrap_or(false)
3288                && admission::is_responsible(
3289                    &self_id,
3290                    key,
3291                    p2p_node,
3292                    storage_admission_width(config.close_group_size),
3293                )
3294                .await
3295            {
3296                paid_only_fetch_keys.insert(*key);
3297            }
3298        }
3299
3300        // Step 5: Update queues with the evaluated outcomes.
3301        let mut q = queues.write().await;
3302        for (key, outcome, pipeline) in evaluated {
3303            match outcome {
3304                KeyVerificationOutcome::QuorumVerified { sources }
3305                | KeyVerificationOutcome::PaidListVerified { sources } => {
3306                    let fetch_eligible =
3307                        pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
3308                    if fetch_eligible && !sources.is_empty() {
3309                        let distance =
3310                            crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
3311                        // Atomic remove+enqueue: on fetch_queue capacity miss
3312                        // the pending entry is preserved so this verified key
3313                        // is retried on the next cycle (no silent drop).
3314                        let _ = q.promote_pending_to_fetch(key, distance, sources);
3315                        // Not terminal — either moved to fetch queue, or
3316                        // retained as pending until queue drains.
3317                    } else if fetch_eligible && sources.is_empty() {
3318                        warn!(
3319                            "Verified storage-admitted key {} has no holders (possible data loss)",
3320                            hex::encode(key)
3321                        );
3322                        q.remove_pending(&key);
3323                        terminal_keys.push(key);
3324                    } else {
3325                        q.remove_pending(&key);
3326                        terminal_keys.push(key);
3327                    }
3328                }
3329                KeyVerificationOutcome::QuorumFailed
3330                | KeyVerificationOutcome::QuorumInconclusive => {
3331                    q.remove_pending(&key);
3332                    terminal_keys.push(key);
3333                }
3334            }
3335        }
3336    }
3337
3338    // Step 6: Remove terminal keys from bootstrap pending set and re-check
3339    // the drain condition.
3340    update_bootstrap_after_verification(
3341        &terminal_keys,
3342        bootstrap_state,
3343        queues,
3344        is_bootstrapping,
3345        bootstrap_complete_notify,
3346    )
3347    .await;
3348
3349    let (pending_after, fetch_after, in_flight_after) = {
3350        let q = queues.read().await;
3351        (
3352            q.pending_count(),
3353            q.fetch_queue_count(),
3354            q.in_flight_count(),
3355        )
3356    };
3357    let terminal_key_count = terminal_keys.len();
3358    let elapsed_ms = cycle_started.elapsed().as_millis();
3359
3360    if elapsed_ms >= VERIFICATION_CYCLE_SLOW_LOG_MS {
3361        info!(
3362            target: "ant_node::replication::verification",
3363            "Slow replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
3364        );
3365    } else {
3366        debug!(
3367            target: "ant_node::replication::verification",
3368            "Replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
3369        );
3370    }
3371}
3372
3373/// Post-verification bootstrap bookkeeping: remove terminal keys from the
3374/// bootstrap pending set and transition out of bootstrapping when drained.
3375async fn update_bootstrap_after_verification(
3376    terminal_keys: &[XorName],
3377    bootstrap_state: &Arc<RwLock<BootstrapState>>,
3378    queues: &Arc<RwLock<ReplicationQueues>>,
3379    is_bootstrapping: &Arc<RwLock<bool>>,
3380    bootstrap_complete_notify: &Arc<Notify>,
3381) {
3382    if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
3383        return;
3384    }
3385    {
3386        let mut bs = bootstrap_state.write().await;
3387        for key in terminal_keys {
3388            bs.remove_key(key);
3389        }
3390    }
3391    let q = queues.read().await;
3392    if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
3393        complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
3394    }
3395}
3396
3397/// Set `is_bootstrapping` to `false` and wake all waiters.
3398async fn complete_bootstrap(
3399    is_bootstrapping: &Arc<RwLock<bool>>,
3400    bootstrap_complete_notify: &Arc<Notify>,
3401) {
3402    *is_bootstrapping.write().await = false;
3403    bootstrap_complete_notify.notify_waiters();
3404    info!("Replication bootstrap complete");
3405}
3406
3407// ---------------------------------------------------------------------------
3408// Fetch types and single-fetch executor
3409// ---------------------------------------------------------------------------
3410
3411/// Result classification for a single fetch attempt.
3412enum FetchResult {
3413    /// Data fetched, integrity-checked, and stored successfully.
3414    Stored,
3415    /// Content-address integrity check failed — do not retry.
3416    IntegrityFailed,
3417    /// Source failed (network error or non-success response) — retryable.
3418    SourceFailed,
3419}
3420
3421/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
3422/// worker loop to update queue state.
3423struct FetchOutcome {
3424    key: XorName,
3425    result: FetchResult,
3426}
3427
3428#[allow(clippy::too_many_lines)]
3429/// Execute a single fetch request against `source` for `key`.
3430///
3431/// Handles encoding, network I/O, integrity checking, storage, and trust
3432/// event reporting.  Returns a [`FetchOutcome`] so the caller can update
3433/// queue state without holding any locks during the network round-trip.
3434async fn execute_single_fetch(
3435    p2p_node: Arc<P2PNode>,
3436    storage: Arc<LmdbStorage>,
3437    config: Arc<ReplicationConfig>,
3438    key: XorName,
3439    source: PeerId,
3440) -> FetchOutcome {
3441    let request = protocol::FetchRequest { key };
3442    let msg = ReplicationMessage {
3443        request_id: rand::thread_rng().gen::<u64>(),
3444        body: ReplicationMessageBody::FetchRequest(request),
3445    };
3446
3447    let encoded = match msg.encode() {
3448        Ok(data) => data,
3449        Err(e) => {
3450            warn!("Failed to encode fetch request: {e}");
3451            return FetchOutcome {
3452                key,
3453                result: FetchResult::SourceFailed,
3454            };
3455        }
3456    };
3457
3458    let result = p2p_node
3459        .send_request(
3460            &source,
3461            REPLICATION_PROTOCOL_ID,
3462            encoded,
3463            config.fetch_request_timeout,
3464        )
3465        .await;
3466
3467    match result {
3468        Ok(response) => {
3469            let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
3470                p2p_node
3471                    .report_trust_event(
3472                        &source,
3473                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3474                    )
3475                    .await;
3476                return FetchOutcome {
3477                    key,
3478                    result: FetchResult::SourceFailed,
3479                };
3480            };
3481
3482            match resp_msg.body {
3483                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
3484                    key: resp_key,
3485                    data,
3486                }) => {
3487                    // Validate the response key matches the requested key.
3488                    // A malicious peer could serve valid data for a different
3489                    // key, passing integrity checks while the requested key
3490                    // is falsely marked as fetched.
3491                    if resp_key != key {
3492                        warn!(
3493                            "Fetch response key mismatch: requested {}, got {}",
3494                            hex::encode(key),
3495                            hex::encode(resp_key)
3496                        );
3497                        p2p_node
3498                            .report_trust_event(
3499                                &source,
3500                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3501                            )
3502                            .await;
3503                        return FetchOutcome {
3504                            key,
3505                            result: FetchResult::IntegrityFailed,
3506                        };
3507                    }
3508
3509                    // Enforce chunk size invariant on fetched data.
3510                    // Checked before the content-address hash to avoid
3511                    // hashing up to 10 MiB of oversized junk data.
3512                    if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
3513                        warn!(
3514                            "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
3515                            hex::encode(resp_key),
3516                            data.len(),
3517                            crate::ant_protocol::MAX_CHUNK_SIZE,
3518                        );
3519                        p2p_node
3520                            .report_trust_event(
3521                                &source,
3522                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3523                            )
3524                            .await;
3525                        return FetchOutcome {
3526                            key,
3527                            result: FetchResult::IntegrityFailed,
3528                        };
3529                    }
3530
3531                    // Content-address integrity check.
3532                    let computed = crate::client::compute_address(&data);
3533                    if computed != resp_key {
3534                        warn!(
3535                            "Fetched record integrity check failed: expected {}, got {}",
3536                            hex::encode(resp_key),
3537                            hex::encode(computed)
3538                        );
3539                        p2p_node
3540                            .report_trust_event(
3541                                &source,
3542                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3543                            )
3544                            .await;
3545                        return FetchOutcome {
3546                            key,
3547                            result: FetchResult::IntegrityFailed,
3548                        };
3549                    }
3550
3551                    if let Err(e) = storage.put(&resp_key, &data).await {
3552                        warn!(
3553                            "Failed to store fetched record {}: {e}",
3554                            hex::encode(resp_key)
3555                        );
3556                        return FetchOutcome {
3557                            key,
3558                            result: FetchResult::SourceFailed,
3559                        };
3560                    }
3561
3562                    FetchOutcome {
3563                        key,
3564                        result: FetchResult::Stored,
3565                    }
3566                }
3567                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
3568                    ..
3569                }) => {
3570                    // This peer was selected as a fetch source because it
3571                    // recently answered `Present` during verification. A
3572                    // subsequent NotFound is evidence of a stale/false claim
3573                    // or chunk wiping, so penalize lightly and try another
3574                    // verified source.
3575                    warn!(
3576                        "Fetch: verified source {source} returned NotFound for {}",
3577                        hex::encode(key)
3578                    );
3579                    p2p_node
3580                        .report_trust_event(
3581                            &source,
3582                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3583                        )
3584                        .await;
3585                    FetchOutcome {
3586                        key,
3587                        result: FetchResult::SourceFailed,
3588                    }
3589                }
3590                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
3591                    reason,
3592                    ..
3593                }) => {
3594                    warn!(
3595                        "Fetch: peer {source} returned error for {}: {reason}",
3596                        hex::encode(key)
3597                    );
3598                    p2p_node
3599                        .report_trust_event(
3600                            &source,
3601                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3602                        )
3603                        .await;
3604                    FetchOutcome {
3605                        key,
3606                        result: FetchResult::SourceFailed,
3607                    }
3608                }
3609                _ => {
3610                    // Unexpected message type — treat as malformed.
3611                    p2p_node
3612                        .report_trust_event(
3613                            &source,
3614                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3615                        )
3616                        .await;
3617                    FetchOutcome {
3618                        key,
3619                        result: FetchResult::SourceFailed,
3620                    }
3621                }
3622            }
3623        }
3624        Err(e) => {
3625            debug!("Fetch request to {source} failed: {e}");
3626            // No ApplicationFailure here — P2PNode::send_request() already
3627            // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
3628            FetchOutcome {
3629                key,
3630                result: FetchResult::SourceFailed,
3631            }
3632        }
3633    }
3634}
3635
3636// ---------------------------------------------------------------------------
3637// Audit result handler
3638// ---------------------------------------------------------------------------
3639
3640/// Format the first confirmed-failed key as a 16-hex-char label.
3641///
3642/// Pairs with `challenged_peer` to form a stable cross-host correlation
3643/// handle in the audit-failure log line, e.g.
3644///
3645/// ```text
3646/// Audit failure for <peer>: …, `first_failed_key=0x18878f1d2d9e0612`
3647/// ```
3648///
3649/// Falls back to `"0x"` when the list is empty so the log line never
3650/// contains a misleading default.
3651fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String {
3652    confirmed_failed_keys.first().map_or_else(
3653        || "0x".to_string(),
3654        |k| format!("0x{}", hex::encode(&k[..8])),
3655    )
3656}
3657
3658/// Execute the side effects for a confirmed storage-commitment audit failure.
3659///
3660/// [`plan_failed_audit`] is the pure decision INCLUDING the strike selection
3661/// (record-a-strike-for-`Timeout` vs leave-untouched for confirmed failures),
3662/// extracted so the whole glue — not just the verdict — is testable without a
3663/// live `P2PNode`. This function is only the resulting I/O. Timeouts are graced
3664/// and rollout-gated (TIMEOUT-EVICTION-DISABLED); confirmed failures penalize on
3665/// the first occurrence and revoke holder credit.
3666async fn handle_failed_audit(
3667    challenged_peer: &PeerId,
3668    confirmed_failed_key_count: usize,
3669    reason: &AuditFailureReason,
3670    p2p_node: &Arc<P2PNode>,
3671    sync_state: &Arc<RwLock<NeighborSyncState>>,
3672    recent_provers: &Arc<RwLock<RecentProvers>>,
3673    audit_timeout_strikes: &Arc<RwLock<HashMap<PeerId, u32>>>,
3674) {
3675    let action = {
3676        let mut strikes = audit_timeout_strikes.write().await;
3677        plan_failed_audit(reason, &mut strikes, challenged_peer)
3678    };
3679    match action {
3680        AuditFailureAction::TimeoutGrace => {
3681            // Honest transient slowness: no penalty, no credit loss, retain the
3682            // bootstrap claim. Only *sustained* timeouts (a peer that always
3683            // has to refetch) survive to the threshold — the per-challenge
3684            // window is never widened.
3685            debug!(
3686                "Audit timeout for {challenged_peer} (under the {}-strike threshold); \
3687                 within grace, retaining bootstrap claim, no penalty",
3688                config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
3689            );
3690        }
3691        AuditFailureAction::TimeoutPenalize => {
3692            // Strikes are tracked/logged so the mechanism stays observable; the
3693            // trust report that drives eviction is gated behind
3694            // `TIMEOUT_EVICTION_ENABLED` (off this release — see its doc for the
3695            // rollout-death-spiral rationale). Confirmed storage-integrity
3696            // failures (ConfirmedPenalize below) are unaffected.
3697            warn!(
3698                "Audit timeout for {challenged_peer}: reached the {}-strike threshold of \
3699                 consecutive timeouts ({})",
3700                config::AUDIT_TIMEOUT_STRIKE_THRESHOLD,
3701                if config::TIMEOUT_EVICTION_ENABLED {
3702                    "penalizing"
3703                } else {
3704                    "eviction disabled this release — not penalizing"
3705                }
3706            );
3707            if config::TIMEOUT_EVICTION_ENABLED {
3708                p2p_node
3709                    .report_trust_event(
3710                        challenged_peer,
3711                        TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
3712                    )
3713                    .await;
3714            }
3715        }
3716        AuditFailureAction::ConfirmedPenalize => {
3717            // The caller (handle_subtree_audit_result) already logged the rich
3718            // failure line with reason + per-category summary; avoid a redundant
3719            // second error log here. `confirmed_failed_key_count` is retained in
3720            // the signature for callers/tests that assert on it.
3721            let _ = confirmed_failed_key_count;
3722            // Peer returned a non-bootstrap response — clear the active claim
3723            // while retaining claim history.
3724            {
3725                let mut state = sync_state.write().await;
3726                state.clear_active_bootstrap_claim(challenged_peer);
3727            }
3728            // Revoke holder credit on a CONFIRMED failure (DigestMismatch /
3729            // KeyAbsent / Rejected / MalformedResponse): the peer no longer
3730            // provably holds what it committed to, so it must not keep §6
3731            // holder credit for the proof TTL. The §5 `forget_commitment` path
3732            // only fires on an "unknown commitment hash" reply; genuine byte
3733            // loss surfaces here.
3734            {
3735                let mut provers_guard = recent_provers.write().await;
3736                apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason);
3737            }
3738            p2p_node
3739                .report_trust_event(
3740                    challenged_peer,
3741                    TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
3742                )
3743                .await;
3744        }
3745    }
3746}
3747
3748/// Handle audit result: log findings and emit trust events.
3749async fn handle_subtree_audit_result(
3750    result: &AuditTickResult,
3751    p2p_node: &Arc<P2PNode>,
3752    sync_state: &Arc<RwLock<NeighborSyncState>>,
3753    recent_provers: &Arc<RwLock<RecentProvers>>,
3754    audit_timeout_strikes: &Arc<RwLock<HashMap<PeerId, u32>>>,
3755    config: &ReplicationConfig,
3756) {
3757    match result {
3758        AuditTickResult::Passed {
3759            challenged_peer,
3760            keys_checked,
3761        } => {
3762            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
3763            // Peer responded normally — clear the active bootstrap claim while
3764            // retaining history so a later claim is treated as repeated abuse.
3765            {
3766                let mut state = sync_state.write().await;
3767                state.clear_active_bootstrap_claim(challenged_peer);
3768            }
3769            // A normal response proves the slowness (if any) was transient, so
3770            // reset the timeout-strike counter. Only *sustained* timeouts (a
3771            // peer that must refetch on every audit) survive this reset to
3772            // accumulate toward the penalty threshold.
3773            {
3774                let mut strikes = audit_timeout_strikes.write().await;
3775                strikes.remove(challenged_peer);
3776            }
3777            p2p_node
3778                .report_trust_event(
3779                    challenged_peer,
3780                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
3781                )
3782                .await;
3783        }
3784        AuditTickResult::Failed { evidence } => {
3785            if let FailureEvidence::AuditFailure {
3786                challenged_peer,
3787                confirmed_failed_keys,
3788                summary,
3789                reason,
3790                ..
3791            } = evidence
3792            {
3793                // Rich diagnostics (from main's audit-failure logging) + the
3794                // first-failed-key correlation handle.
3795                let first_failed_key = first_failed_key_label(confirmed_failed_keys);
3796                error!(
3797                    "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
3798                    confirmed_failed_keys.len(),
3799                    summary.challenged_keys,
3800                    summary.absent_keys,
3801                    summary.digest_mismatch_keys,
3802                );
3803                // Route the side effects through the strike-grace path: timeouts
3804                // are graced (and rollout-gated by TIMEOUT-EVICTION-DISABLED),
3805                // deterministic failures penalize on the first occurrence and
3806                // revoke holder credit. Do NOT report ApplicationFailure inline
3807                // here — that would evict honest not-yet-upgraded peers on a
3808                // single timeout during the breaking rollout.
3809                handle_failed_audit(
3810                    challenged_peer,
3811                    confirmed_failed_keys.len(),
3812                    reason,
3813                    p2p_node,
3814                    sync_state,
3815                    recent_provers,
3816                    audit_timeout_strikes,
3817                )
3818                .await;
3819            }
3820        }
3821        AuditTickResult::BootstrapClaim { peer } => {
3822            // Gap 6: BootstrapClaimAbuse grace period in audit path.
3823            // Separate state mutation from network I/O to avoid holding the
3824            // write lock across report_trust_event.
3825            let should_report = {
3826                let now = Instant::now();
3827                let mut state = sync_state.write().await;
3828                match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
3829                {
3830                    BootstrapClaimObservation::WithinGrace { .. } => {
3831                        debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
3832                        false
3833                    }
3834                    BootstrapClaimObservation::PastGrace { first_seen } => {
3835                        warn!(
3836                            "Audit: peer {peer} claiming bootstrap past grace period \
3837                             ({:?} > {:?}), reporting abuse",
3838                            now.duration_since(first_seen),
3839                            config.bootstrap_claim_grace_period,
3840                        );
3841                        true
3842                    }
3843                    BootstrapClaimObservation::Repeated { first_seen } => {
3844                        warn!(
3845                            "Audit: peer {peer} repeated bootstrap claim after previously \
3846                             stopping; first claim was {:?} ago, reporting abuse",
3847                            now.duration_since(first_seen),
3848                        );
3849                        true
3850                    }
3851                }
3852            };
3853            if should_report {
3854                p2p_node
3855                    .report_trust_event(
3856                        peer,
3857                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3858                    )
3859                    .await;
3860            }
3861        }
3862        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
3863    }
3864}
3865
3866/// Whether a confirmed audit failure with this reason clears the peer's active
3867/// bootstrap claim. A `Timeout` does not (the peer may still be legitimately
3868/// bootstrapping); every confirmed storage-integrity reason does.
3869///
3870/// Both audits now funnel through [`handle_failed_audit`], which derives the
3871/// clear-vs-retain decision from [`decide_audit_failure_action`]; this predicate
3872/// is retained as the readable single-source-of-truth that those tests assert
3873/// against (it is the exact `reason != Timeout` rule the action planner uses).
3874#[cfg(test)]
3875fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
3876    !matches!(reason, AuditFailureReason::Timeout)
3877}
3878
3879/// Handle the result of a responsible-chunk audit tick (audit #2): emit trust
3880/// events and manage bootstrap-claim state.
3881///
3882/// Delegates to [`handle_subtree_audit_result`] so BOTH audits share one
3883/// failure path: timeouts go through the strike/grace logic (graced under the
3884/// threshold, eviction gated off this release via `TIMEOUT-EVICTION-DISABLED`)
3885/// and only confirmed storage-integrity failures penalise on the first
3886/// occurrence and revoke holder credit. Previously this handler reported
3887/// `ApplicationFailure` inline for EVERY failure including `Timeout`, which —
3888/// with the breaking v2 wire change — would false-penalise honest
3889/// not-yet-upgraded peers on a single audit. (Audit #2 cannot credit holders,
3890/// so the shared handler's strike-reset/credit-revocation is a superset of what
3891/// it needs; the responsible-chunk audit never produces `Passed { .. }` with
3892/// holder credit, so nothing is over-credited.)
3893async fn handle_audit_result(
3894    result: &AuditTickResult,
3895    p2p_node: &Arc<P2PNode>,
3896    sync_state: &Arc<RwLock<NeighborSyncState>>,
3897    recent_provers: &Arc<RwLock<RecentProvers>>,
3898    audit_timeout_strikes: &Arc<RwLock<HashMap<PeerId, u32>>>,
3899    config: &ReplicationConfig,
3900) {
3901    handle_subtree_audit_result(
3902        result,
3903        p2p_node,
3904        sync_state,
3905        recent_provers,
3906        audit_timeout_strikes,
3907        config,
3908    )
3909    .await;
3910}
3911
3912/// What the audit-failure handler should do for a given failure, given the
3913/// peer's post-increment timeout-strike count. Pure (no I/O) so the whole
3914/// decision can be exercised end-to-end without a live `P2PNode`.
3915#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3916enum AuditFailureAction {
3917    /// Timeout under the strike threshold: no trust penalty, no credit
3918    /// revocation, retain the bootstrap claim (honest transient slowness).
3919    TimeoutGrace,
3920    /// Timeout at/over the threshold: report `ApplicationFailure`. Bootstrap
3921    /// claim retained; holder credit NOT revoked (the peer never admitted byte
3922    /// loss). The non-storing-peer case.
3923    TimeoutPenalize,
3924    /// Confirmed storage-integrity failure: penalize immediately, clear the
3925    /// active bootstrap claim, and revoke holder credit.
3926    ConfirmedPenalize,
3927}
3928
3929/// Upper bound on a peer's consecutive-timeout strike count. Must exceed the
3930/// largest reachable adaptive threshold (base + `MAX_ADAPTIVE_TIMEOUT_GRACE`) so
3931/// a genuinely non-responsive peer's count can always catch up to and cross an
3932/// inflated threshold — otherwise capping at the base would make timeout
3933/// penalties unreachable once the adaptive threshold rose.
3934const AUDIT_TIMEOUT_STRIKE_MAX: u32 = 64;
3935
3936/// Maximum extra grace the adaptive mechanism may add on top of the base
3937/// threshold. Bounds how far a (possibly stale) set of timing-out peers can
3938/// widen the window, so a small persistent failing cohort cannot push the
3939/// threshold arbitrarily high and shield a bad node indefinitely.
3940const MAX_ADAPTIVE_TIMEOUT_GRACE: u32 = 2 * config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
3941
3942/// Record an audit timeout for `peer` and return its new consecutive-timeout
3943/// strike count, saturating at [`AUDIT_TIMEOUT_STRIKE_MAX`] (well above any
3944/// reachable adaptive threshold). A successful audit removes the peer's entry
3945/// (the `Passed` arm of [`handle_subtree_audit_result`]), so only *consecutive*
3946/// timeouts accumulate here.
3947fn record_audit_timeout_strike(strikes: &mut HashMap<PeerId, u32>, peer: &PeerId) -> u32 {
3948    let count = strikes.entry(*peer).or_insert(0);
3949    *count = count.saturating_add(1).min(AUDIT_TIMEOUT_STRIKE_MAX);
3950    *count
3951}
3952
3953/// The adaptive timeout-strike threshold for judging `peer` (ADR-0002 "Network
3954/// Resilience"): `min(median of the OTHER timing-out peers' counts,
3955/// MAX_ADAPTIVE_TIMEOUT_GRACE) + base threshold`.
3956///
3957/// In a healthy network almost no peer carries timeout strikes, so the median
3958/// is 0 and the threshold is the base [`config::AUDIT_TIMEOUT_STRIKE_THRESHOLD`].
3959/// During genuine disruption many *honest* peers time out together, lifting the
3960/// median and widening the grace so the audit system does not pile onto a
3961/// struggling network — but the widening is capped at `MAX_ADAPTIVE_TIMEOUT_GRACE`
3962/// so a stale failing cohort cannot inflate it without bound.
3963///
3964/// `peer` is EXCLUDED from the median so a lone timing-out peer cannot raise its
3965/// own grace bar. Combined with the map being fed ONLY by timeouts (deterministic
3966/// failures never touch it), this closes self-inflation and bounds
3967/// attacker-inflation of the grace window.
3968fn adaptive_timeout_threshold(strikes: &HashMap<PeerId, u32>, peer: &PeerId) -> u32 {
3969    let grace = median_timeout_strikes_excluding(strikes, peer).min(MAX_ADAPTIVE_TIMEOUT_GRACE);
3970    grace.saturating_add(config::AUDIT_TIMEOUT_STRIKE_THRESHOLD)
3971}
3972
3973/// Lower median of the current per-peer consecutive-timeout counts, excluding
3974/// `peer`. No other peers → 0.
3975fn median_timeout_strikes_excluding(strikes: &HashMap<PeerId, u32>, peer: &PeerId) -> u32 {
3976    let mut counts: Vec<u32> = strikes
3977        .iter()
3978        .filter(|(p, _)| *p != peer)
3979        .map(|(_, c)| *c)
3980        .collect();
3981    if counts.is_empty() {
3982        return 0;
3983    }
3984    counts.sort_unstable();
3985    // Lower median: for even-sized inputs take the lower of the two middle
3986    // values ((len-1)/2), so the grace is conservative rather than inflated.
3987    counts.get((counts.len() - 1) / 2).copied().unwrap_or(0)
3988}
3989
3990/// Whether a peer's consecutive-timeout strike count reaches the (adaptive)
3991/// threshold for emitting an `ApplicationFailure` trust event.
3992fn timeout_strike_reaches_threshold(strikes: u32, threshold: u32) -> bool {
3993    strikes >= threshold
3994}
3995
3996/// Decide what to do about a confirmed audit failure. `timeout_strikes_after`
3997/// is the peer's strike count after recording this event and `timeout_threshold`
3998/// the adaptive threshold to compare against (both only meaningful when
3999/// `reason == Timeout`). Pure, so the integration-level decision can be asserted
4000/// in tests with no networking.
4001fn decide_audit_failure_action(
4002    reason: &AuditFailureReason,
4003    timeout_strikes_after: u32,
4004    timeout_threshold: u32,
4005) -> AuditFailureAction {
4006    if matches!(reason, AuditFailureReason::Timeout) {
4007        if timeout_strike_reaches_threshold(timeout_strikes_after, timeout_threshold) {
4008            AuditFailureAction::TimeoutPenalize
4009        } else {
4010            AuditFailureAction::TimeoutGrace
4011        }
4012    } else {
4013        AuditFailureAction::ConfirmedPenalize
4014    }
4015}
4016
4017/// Plan the response to a confirmed audit failure, performing the
4018/// strike-selection glue in-process: a `Timeout` records a strike against
4019/// `peer` (so consecutive timeouts accumulate) and is judged against the
4020/// adaptive threshold; every other reason is a confirmed failure that does NOT
4021/// touch the strike map. The caller owns the lock and performs the resulting I/O.
4022fn plan_failed_audit(
4023    reason: &AuditFailureReason,
4024    strikes: &mut HashMap<PeerId, u32>,
4025    peer: &PeerId,
4026) -> AuditFailureAction {
4027    // Snapshot the adaptive threshold from the *other* peers' counts (excluding
4028    // this peer), so a single peer's own timeouts cannot raise its own grace bar.
4029    let threshold = adaptive_timeout_threshold(strikes, peer);
4030    let strikes_after = if matches!(reason, AuditFailureReason::Timeout) {
4031        record_audit_timeout_strike(strikes, peer)
4032    } else {
4033        0
4034    };
4035    decide_audit_failure_action(reason, strikes_after, threshold)
4036}
4037
4038/// Whether a confirmed audit failure with this reason should revoke the
4039/// peer's `recent_provers` holder credit immediately (v12 §6).
4040///
4041/// `true` for any reason where the peer actually answered (or admitted
4042/// it cannot): `DigestMismatch`, `KeyAbsent`, `Rejected` ("missing
4043/// bytes for committed key"), `MalformedResponse` — these prove the
4044/// peer no longer holds what it committed to, so it must not keep
4045/// holder credit for the proof TTL. `false` for `Timeout`: a single
4046/// dropped packet must not strip an honest peer; the 40-min TTL is the
4047/// deliberate liveness cushion there.
4048fn audit_failure_revokes_holder_credit(reason: &AuditFailureReason) -> bool {
4049    !matches!(reason, AuditFailureReason::Timeout)
4050}
4051
4052/// Apply the holder-credit revocation decision for a confirmed audit
4053/// failure. Pure over `RecentProvers` so the handler wiring is unit-
4054/// testable without a live `P2PNode`: the production `Failed` arm of
4055/// `handle_subtree_audit_result` calls exactly this.
4056fn apply_audit_failure_credit_revocation(
4057    provers: &mut RecentProvers,
4058    challenged_peer: &PeerId,
4059    reason: &AuditFailureReason,
4060) {
4061    if audit_failure_revokes_holder_credit(reason) {
4062        provers.forget_peer(challenged_peer);
4063    }
4064}
4065
4066// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.
4067
4068// ---------------------------------------------------------------------------
4069// Storage-bound audit (ADR-0002) — gossip trigger + auditor-side ingestion
4070// ---------------------------------------------------------------------------
4071
4072/// State the gossip-audit trigger needs to spawn an audit. Bundled so the
4073/// message handler passes one value instead of a long argument list; all
4074/// fields are cheap `Arc` clones.
4075#[derive(Clone)]
4076struct GossipAuditTrigger {
4077    p2p_node: Arc<P2PNode>,
4078    config: Arc<ReplicationConfig>,
4079    recent_provers: Arc<RwLock<RecentProvers>>,
4080    sync_state: Arc<RwLock<NeighborSyncState>>,
4081    audit_timeout_strikes: Arc<RwLock<HashMap<PeerId, u32>>>,
4082    cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
4083}
4084
4085/// What a gossip ingest yields for the audit trigger: the commitment hash to
4086/// pin and the `key_count` needed to size the response deadline from the actual
4087/// `ceil(sqrt(N))` subtree (ADR-0002). Returned on every VALID gossip (changed
4088/// or not) so a stable-keyset node stays auditable — not just on its first
4089/// commitment.
4090#[derive(Debug, Clone, Copy)]
4091struct AuditTarget {
4092    pin_hash: [u8; 32],
4093    key_count: u32,
4094}
4095
4096/// Per-peer audit cooldown check-and-stamp (ADR-0002 "occasional surprise
4097/// exams, keeps load low"). Returns `true` if `peer` may be audited now (and
4098/// stamps `now`), `false` if it was audited within
4099/// `AUDIT_ON_GOSSIP_COOLDOWN_SECS`. Bounds the map under a flood of distinct
4100/// peers. Pure over the passed map so the flood/cooldown behaviour is testable
4101/// without a live node: a burst of gossips from one peer yields at most one
4102/// `true` per cooldown window.
4103fn cooldown_allows_audit(map: &mut HashMap<PeerId, Instant>, peer: &PeerId, now: Instant) -> bool {
4104    let cooldown = Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS);
4105    let known = match map.get(peer) {
4106        Some(&last) => {
4107            if now.saturating_duration_since(last) < cooldown {
4108                return false;
4109            }
4110            true
4111        }
4112        None => false,
4113    };
4114    // Bound the map under churn like its siblings (drop the oldest stamp) before
4115    // admitting a brand-new peer.
4116    if !known && map.len() >= MAX_LAST_COMMITMENT_BY_PEER {
4117        if let Some(victim) = map.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
4118            map.remove(&victim);
4119        }
4120    }
4121    map.insert(*peer, now);
4122    true
4123}
4124
4125/// The gossip-audit launch decision in ONE place so the ordering is shared
4126/// between production and its test (ADR-0002 "occasional surprise exams").
4127///
4128/// Order matters and is the security-relevant property: the per-peer cooldown is
4129/// checked-and-stamped FIRST, THEN the probability lottery (`lottery_wins`) is
4130/// applied. If the lottery were sampled first, a gossip flood would re-roll it on
4131/// every message until one won, multiplying audits. Because the cooldown is
4132/// stamped before the lottery is consulted, a LOSING ticket still consumes the
4133/// window — so each peer gets at most one audit lottery per cooldown window
4134/// regardless of how often it gossips. Production calls this with
4135/// `lottery_wins = gen_bool(AUDIT_ON_GOSSIP_PROBABILITY)`; the test calls it with
4136/// a deterministic `lottery_wins`, so a reorder regression here fails the test.
4137fn audit_launch_decision(
4138    map: &mut HashMap<PeerId, Instant>,
4139    peer: &PeerId,
4140    now: Instant,
4141    lottery_wins: bool,
4142) -> bool {
4143    // Gate 1: cooldown check-and-stamp (consumes the window even on a loss).
4144    if !cooldown_allows_audit(map, peer, now) {
4145        return false;
4146    }
4147    // Gate 2: the probability lottery.
4148    lottery_wins
4149}
4150
4151/// On a peer's *changed* gossiped commitment, maybe launch a subtree audit
4152/// (ADR-0002): fire with probability `AUDIT_ON_GOSSIP_PROBABILITY`, subject to a
4153/// per-peer cooldown, pinned to the just-ingested root. Detached so gossip
4154/// handling is never blocked on a network round-trip.
4155async fn maybe_trigger_gossip_audit(
4156    trigger: &GossipAuditTrigger,
4157    peer: &PeerId,
4158    target: AuditTarget,
4159) {
4160    // The launch decision (cooldown-then-lottery ordering) lives in the pure
4161    // `audit_launch_decision` so the ordering is shared with its test. Sample
4162    // the lottery here, then let the helper apply it AFTER the cooldown stamp.
4163    let now = Instant::now();
4164    let lottery_wins = rand::thread_rng().gen_bool(config::AUDIT_ON_GOSSIP_PROBABILITY);
4165    {
4166        let mut map = trigger.cooldown.write().await;
4167        if !audit_launch_decision(&mut map, peer, now, lottery_wins) {
4168            return;
4169        }
4170    }
4171
4172    let trigger = trigger.clone();
4173    let peer = *peer;
4174    tokio::spawn(async move {
4175        let credit = storage_commitment_audit::AuditCredit {
4176            recent_provers: &trigger.recent_provers,
4177        };
4178        let result = storage_commitment_audit::run_subtree_audit(
4179            &trigger.p2p_node,
4180            &trigger.config,
4181            &peer,
4182            target.pin_hash,
4183            target.key_count,
4184            Some(&credit),
4185        )
4186        .await;
4187        handle_subtree_audit_result(
4188            &result,
4189            &trigger.p2p_node,
4190            &trigger.sync_state,
4191            &trigger.recent_provers,
4192            &trigger.audit_timeout_strikes,
4193            &trigger.config,
4194        )
4195        .await;
4196    });
4197}
4198
4199/// Atomic check-and-stamp of the per-peer commitment sig-verify rate limit.
4200///
4201/// Returns `true` if a signature verify is allowed now (and stamps the attempt
4202/// time), `false` if the peer is within [`COMMITMENT_SIG_VERIFY_MIN_INTERVAL`]
4203/// of its last attempt. Holds one write lock across the decision so two
4204/// concurrent ingests from the same peer cannot both pass. Stamps BEFORE the
4205/// caller's expensive verify so a slow/failed verify still rate-limits the next
4206/// message. Bounds the map under a flood of distinct peer ids.
4207async fn sig_verify_rate_limit_ok(
4208    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
4209    source: &PeerId,
4210    now: Instant,
4211) -> bool {
4212    let mut attempts = sig_verify_attempts.write().await;
4213    if let Some(&last) = attempts.get(source) {
4214        if now.saturating_duration_since(last) < COMMITMENT_SIG_VERIFY_MIN_INTERVAL {
4215            return false;
4216        }
4217    }
4218    if attempts.len() >= MAX_LAST_COMMITMENT_BY_PEER && !attempts.contains_key(source) {
4219        if let Some(victim) = attempts.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
4220            attempts.remove(&victim);
4221        }
4222    }
4223    attempts.insert(*source, now);
4224    true
4225}
4226
4227/// Verify + store an inbound commitment from a gossip peer.
4228///
4229/// Called from the inbound `NeighborSyncRequest`/`Response` handlers and
4230/// the bootstrap-sync loop. Drops the commitment unless all five gates
4231/// pass:
4232///   1. `source` is in our DHT routing table (sybil/churn cap).
4233///   2. `commitment.sender_peer_id == source.as_bytes()` (peer-id
4234///      binding to the authenticated transport peer).
4235///   3. `BLAKE3(commitment.sender_public_key) == commitment.sender_peer_id`
4236///      (the embedded pubkey actually belongs to the claimed identity —
4237///      saorsa-core derives `PeerId = BLAKE3(pubkey)`).
4238///   4. `verify_commitment_signature(commitment)` succeeds against the
4239///      embedded public key. The signed payload binds the pubkey, so an
4240///      adversary cannot swap the key while keeping the body.
4241///   5. The cache has room or this is an update for an existing entry
4242///      (sybil cap, `MAX_LAST_COMMITMENT_BY_PEER`).
4243///
4244/// On all-pass, the commitment is stored as the auditor's per-peer
4245/// "last known commitment" for use as `expected_commitment_hash` in
4246/// future audits.
4247///
4248/// Failures (no commitment / mismatched peer id / bad signature) are
4249/// silent drops — gossip is best-effort and a malformed commitment from
4250/// one peer should not affect anything else.
4251///
4252/// Returns `Some(AuditTarget)` whenever a VALID commitment was stored (whether
4253/// or not its root changed), so the caller can run a probabilistic,
4254/// cooldown-gated subtree audit. Returning on *every* valid gossip — not only
4255/// changed ones — is deliberate (ADR-0002): a node with a stable key set keeps
4256/// being auditable, so it cannot pass one audit and then delete data while
4257/// re-gossiping the same root forever. The cooldown + probability bound the
4258/// audit frequency. Returns `None` only if the commitment was dropped (failed a
4259/// gate) or there is nothing to pin.
4260///
4261/// Handle a capable peer gossiping `None` (a commitment downgrade).
4262///
4263/// A capable peer that previously gossiped a commitment but now gossips `None`
4264/// is trying to drop off the audit path. Within the answerability window we keep
4265/// the cached commitment pinned AND return it as an audit target so this gossip
4266/// still schedules a subtree audit against the peer's last known commitment — if
4267/// it genuinely dropped the data, the audit fails (there is no periodic tick, so
4268/// the trigger MUST fire here or the downgrade is never re-challenged).
4269///
4270/// But this only holds within the SAME `GOSSIP_ANSWERABILITY_TTL` the responder
4271/// honours for its own retired commitment: once that elapses since we last
4272/// received the peer's commitment, an honest peer has legitimately retired that
4273/// root (its responder side `retire_current`s and lets it age out) and can no
4274/// longer answer a pin on it. Auditing it past the TTL would manufacture a false
4275/// failure, so we then forget the cached commitment (keeping the sticky
4276/// `commitment_capable` bit) and stop pinning it.
4277async fn handle_commitment_downgrade(
4278    source: &PeerId,
4279    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
4280) -> Option<AuditTarget> {
4281    let now = Instant::now();
4282    let cached = {
4283        let map = last_commitment_by_peer.read().await;
4284        map.get(source).and_then(|rec| {
4285            if !rec.commitment_capable {
4286                return None;
4287            }
4288            let last = rec.last_commitment()?;
4289            let pin = rec.commitment_hash()?;
4290            let fresh = now.saturating_duration_since(rec.received_at)
4291                < crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
4292            Some((pin, last.key_count, fresh))
4293        })
4294    };
4295    match cached {
4296        Some((pin, key_count, true)) => {
4297            warn!(
4298                "ingest_peer_commitment: commitment-capable peer {source} sent None \
4299                 (downgrade attempt); auditing against its last cached commitment"
4300            );
4301            Some(AuditTarget {
4302                pin_hash: pin,
4303                key_count,
4304            })
4305        }
4306        Some((_, _, false)) => {
4307            // Cached commitment has aged past the answerability window — forget
4308            // it so we stop pinning a root the peer is no longer obliged to
4309            // answer. Keep `commitment_capable` (sticky). Re-check freshness
4310            // UNDER the write lock (compare-and-clear): a concurrent valid gossip
4311            // from this peer may have refreshed `received_at` in the gap between
4312            // our read and write locks; if so, leave its fresh commitment intact.
4313            if let Some(rec) = last_commitment_by_peer.write().await.get_mut(source) {
4314                let still_stale = now.saturating_duration_since(rec.received_at)
4315                    >= crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
4316                if still_stale {
4317                    rec.clear_commitment();
4318                    debug!(
4319                        "ingest_peer_commitment: capable peer {source} sent None and its cached \
4320                         commitment aged past the answerability TTL; forgetting it"
4321                    );
4322                }
4323            }
4324            None
4325        }
4326        None => None,
4327    }
4328}
4329
4330async fn ingest_peer_commitment(
4331    source: &PeerId,
4332    commitment: Option<&StorageCommitment>,
4333    p2p_node: &Arc<P2PNode>,
4334    last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
4335    ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
4336    sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
4337) -> Option<AuditTarget> {
4338    let Some(c) = commitment else {
4339        return handle_commitment_downgrade(source, last_commitment_by_peer).await;
4340    };
4341    // RT-membership gate: only accept commitments from peers in our
4342    // routing table. Off-RT senders (sybils, drive-by relays) cannot
4343    // populate the cache, which closes the hole where a flood of
4344    // off-RT identities could fill the cap and evict honest
4345    // peers. The neighbor-sync request handler applies the same gate
4346    // before admitting inbound replication hints (see neighbor_sync.rs
4347    // `sender_in_rt`); we mirror that policy here for the commitment
4348    // piggyback.
4349    if !p2p_node.dht_manager().is_in_routing_table(source).await {
4350        debug!("ingest_peer_commitment: source {source} not in routing table (dropped)");
4351        return None;
4352    }
4353    // Peer-id binding: the commitment's claimed sender must match the
4354    // authenticated transport peer (`source`). Defeats relay/replay
4355    // and also pins which embedded public key we are about to verify
4356    // against — the verify itself trusts the embedded key, so the
4357    // peer-id binding is the link to a real identity.
4358    if &c.sender_peer_id != source.as_bytes() {
4359        warn!(
4360            "ingest_peer_commitment: sender_peer_id mismatch from {source} \
4361             (dropped, possible relay attempt)"
4362        );
4363        return None;
4364    }
4365    // Peer-id to embedded-pubkey binding: saorsa-core derives PeerId as
4366    // BLAKE3(pubkey_bytes). Without this check, a responder could sign
4367    // with a throwaway key they own and lie about which identity it
4368    // belongs to (the embedded-key signature would verify trivially).
4369    let derived_peer_id = *blake3::hash(&c.sender_public_key).as_bytes();
4370    if derived_peer_id != c.sender_peer_id {
4371        warn!(
4372            "ingest_peer_commitment: embedded pubkey does not hash to claimed peer_id for \
4373             {source} (dropped, throwaway-key attack)"
4374        );
4375        return None;
4376    }
4377    // §2 step 3 + §11 DoS: rate-limit per-peer to at most one ML-DSA
4378    // signature verify per `COMMITMENT_SIG_VERIFY_MIN_INTERVAL`. A
4379    // sybil/RT-membership-bypassing peer that flooded valid-looking
4380    // gossip would otherwise burn CPU on every message. The rate
4381    // limit is checked AFTER cheap structural gates (RT, peer-id
4382    // binding, pubkey-binding) and BEFORE the expensive sig verify.
4383    //
4384    // Tracked in `sig_verify_attempts` (separate from
4385    // last_commitment_by_peer) so EVERY attempt — successful or not —
4386    // bumps the rate-limit clock. Reading only from PeerCommitmentRecord
4387    // would skip the cap for peers we've never successfully verified,
4388    // letting a flood of invalid-but-structurally-plausible gossips
4389    // burn CPU.
4390    let now = Instant::now();
4391    if !sig_verify_rate_limit_ok(sig_verify_attempts, source, now).await {
4392        debug!(
4393            "ingest_peer_commitment: rate-limited sig verify from {source} \
4394             (< {COMMITMENT_SIG_VERIFY_MIN_INTERVAL:?} since last attempt); dropped"
4395        );
4396        return None;
4397    }
4398    // Signature verify, using the public key embedded in the commitment
4399    // itself. The pubkey is bound by the signature payload (see
4400    // commitment_signed_payload) so an adversary cannot keep the body
4401    // and swap the key to one they hold the secret for.
4402    if !crate::replication::commitment::verify_commitment_signature(c) {
4403        warn!(
4404            "ingest_peer_commitment: signature did not verify under embedded key for {source} \
4405             (dropped, forged commitment)"
4406        );
4407        return None;
4408    }
4409    // The new commitment's hash, used to store and to pin for the audit target.
4410    let new_hash = commitment_hash(c);
4411    let mut map = last_commitment_by_peer.write().await;
4412    // Sybil/churn cap: if we're at the hard cap AND this is a new peer,
4413    // evict an arbitrary existing entry to make room. Updates for peers
4414    // already in the map are always accepted (they replace, not grow).
4415    if map.len() >= MAX_LAST_COMMITMENT_BY_PEER && !map.contains_key(source) {
4416        // Drop one arbitrary entry. HashMap iter order is random which
4417        // is fine — over time PeerRemoved cleanup keeps the working set
4418        // anchored on the real RT membership; this cap only fires under
4419        // active flooding attempts.
4420        if let Some(victim) = map.keys().next().copied() {
4421            map.remove(&victim);
4422            warn!(
4423                "ingest_peer_commitment: cache full ({MAX_LAST_COMMITMENT_BY_PEER}); \
4424                 evicted {victim} to admit {source}"
4425            );
4426        }
4427    }
4428    // Preserve sticky commitment_capable across updates — once true,
4429    // always true. New entries start with capable = true (we just
4430    // verified a valid commitment from this peer).
4431    map.entry(*source)
4432        .and_modify(|r| {
4433            // set_commitment refreshes the cached hash (§13) alongside the
4434            // commitment + received_at so they never drift.
4435            r.set_commitment(c.clone(), now);
4436            r.last_sig_verify_at = now;
4437            r.commitment_capable = true; // sticky-redundant but explicit
4438        })
4439        .or_insert_with(|| PeerCommitmentRecord::from_verified(c.clone(), now));
4440    drop(map);
4441    // Record the sticky "ever v12-capable" bit in a set independent of
4442    // `last_commitment_by_peer` (whose entries can be evicted by
4443    // `PeerRemoved` and the sybil cap). This is what the §3 audit
4444    // shield and the §6 holder-eligibility closure consult to decide
4445    // whether the peer is expected to speak v12.
4446    //
4447    // Capped at `MAX_EVER_CAPABLE_PEERS` to bound memory under
4448    // identity-rotation attacks: once full, new entries are refused.
4449    // Refusal degrades over-cap peers to the behaviour before this set
4450    // existed (treated as legacy on rejoin), which is not a security
4451    // regression and preserves the historic set stable.
4452    {
4453        let mut set = ever_capable_peers.write().await;
4454        if set.contains(source) || set.len() < MAX_EVER_CAPABLE_PEERS {
4455            set.insert(*source);
4456        } else {
4457            warn!(
4458                "ingest_peer_commitment: ever_capable_peers at cap \
4459                 ({MAX_EVER_CAPABLE_PEERS}); refusing to record {source} as sticky-capable"
4460            );
4461        }
4462    }
4463    // Return an audit target for EVERY valid stored commitment (changed or
4464    // not), so the caller's cooldown+probability-gated trigger keeps a
4465    // stable-keyset peer auditable over time (ADR-0002). Only a serialization
4466    // failure (new_hash == None, unreachable for a real commitment) yields None.
4467    new_hash.map(|pin_hash| AuditTarget {
4468        pin_hash,
4469        key_count: c.key_count,
4470    })
4471}
4472
4473// ---------------------------------------------------------------------------
4474// Storage-bound audit (v12) — responder commitment rotation
4475// ---------------------------------------------------------------------------
4476
4477/// Read the current LMDB key set, build + sign a fresh
4478/// `StorageCommitment`, and rotate it into `state` as the new `current`.
4479/// The prior `current` is demoted to `previous`; the prior `previous` is
4480/// dropped (per `ResponderCommitmentState::rotate`).
4481///
4482/// For content-addressed chunks (Autonomi's chunk store), `address ==
4483/// BLAKE3(content)`, so `bytes_hash := key` and we don't have to
4484/// re-read each chunk's bytes to compute the leaf hash.
4485///
4486/// Skips (returns `Ok(())`) if the key set is empty — no commitment to
4487/// rotate. The auditor side handles "no commitment for this peer" by
4488/// falling back to the legacy plain-digest audit path.
4489async fn rebuild_and_rotate_commitment(
4490    storage: &Arc<LmdbStorage>,
4491    identity: &Arc<NodeIdentity>,
4492    state: &Arc<ResponderCommitmentState>,
4493    p2p: &Arc<P2PNode>,
4494    config: &Arc<ReplicationConfig>,
4495) -> Result<()> {
4496    use saorsa_pqc::api::sig::{MlDsaSecretKey, MlDsaVariant};
4497
4498    let stored_keys = storage
4499        .all_keys()
4500        .await
4501        .map_err(|e| Error::Storage(format!("commitment build: read keys: {e}")))?;
4502
4503    // Commit only to keys we are still RESPONSIBLE for ("want-to-hold"), not
4504    // everything currently on disk ("hold"). This is the half of the retention
4505    // contract that lets out-of-range chunks age out: a key that has left our
4506    // close group is excluded from the NEXT commitment, so within at most
4507    // RETAINED_GOSSIPED_COMMITMENTS gossip rotations it falls out of the
4508    // last-2-gossiped window, `ResponderCommitmentState::is_held` goes false,
4509    // and the pruner (which until then vetoes its deletion) reclaims it. Without
4510    // this filter the pruner's reprieve would keep re-committing stale keys
4511    // forever (the rebuild reads all_keys, so a retained-on-disk key would be
4512    // re-committed and re-gossiped every rotation — a permanent pin).
4513    let storage_empty = stored_keys.is_empty();
4514    let self_id = *p2p.peer_id();
4515    let mut keys = Vec::with_capacity(stored_keys.len());
4516    for k in stored_keys {
4517        if admission::is_responsible(&self_id, &k, p2p, config.close_group_size).await {
4518            keys.push(k);
4519        }
4520    }
4521
4522    if keys.is_empty() {
4523        if storage_empty {
4524            // Storage is genuinely empty — there is nothing to answer for, so
4525            // drop the previously advertised commitment immediately. Keeping it
4526            // would leave remote auditors pinning a hash we can never satisfy
4527            // again (the bytes are gone).
4528            if state.retained_slot_count() > 0 {
4529                debug!("Commitment rotation: storage empty, clearing retained slots");
4530                state.clear_all();
4531            }
4532            return Ok(());
4533        }
4534        // Bytes are still on disk but no key is currently in range. We must NOT
4535        // clear retention here: a peer may still be pinning a root we gossiped
4536        // moments ago and could demand its bytes in a round-2 challenge, which
4537        // we can still answer (the bytes are present). But we must STOP
4538        // advertising the stale commitment: retire it so `current()` returns
4539        // `None` and the gossip-emit sites stop re-emitting and re-stamping it.
4540        // The retired slot then ages out by its gossip-answerability TTL while
4541        // remaining answerable for in-flight pins until then. Once it ages out,
4542        // `is_held` flips false and the pruner reclaims the now-uncommitted,
4543        // out-of-range chunks. (Calling `age_out` alone would leave `current()`
4544        // pointing at the stale root, which the gossip loop would keep
4545        // re-stamping — pinning its keys forever.)
4546        debug!(
4547            "Commitment rotation: no responsible keys to commit to; retiring current commitment \
4548             (stays answerable until its gossip TTL lapses, bytes still on disk)"
4549        );
4550        state.retire_current();
4551        return Ok(());
4552    }
4553
4554    // Cap to MAX_COMMITMENT_KEY_COUNT for v12 (responder must not commit
4555    // to more than the protocol limit; auditor would reject the
4556    // commitment otherwise).
4557    let cap = commitment::MAX_COMMITMENT_KEY_COUNT as usize;
4558    if keys.len() > cap {
4559        warn!(
4560            "Commitment rotation: key set ({}) exceeds MAX_COMMITMENT_KEY_COUNT ({}); \
4561             truncating — investigate as this likely means a misconfiguration",
4562            keys.len(),
4563            cap
4564        );
4565    }
4566
4567    // INVARIANT: this module is only used with CONTENT-ADDRESSED chunks,
4568    // where `key == BLAKE3(content)`, so `bytes_hash := key` and we skip a
4569    // full chunk re-read per rotation.
4570    //
4571    // Consequence to be precise about: because the leaf is `(key, key)`,
4572    // the Merkle root commits to the SET OF KEYS, not to the bytes. The
4573    // commitment therefore binds "which keys I claim to hold"; it does NOT
4574    // by itself prove byte possession. Byte possession is enforced by the
4575    // audit-verify path, which recomputes `bytes_hash == BLAKE3(local_bytes)`
4576    // and the per-key digest against the AUDITOR'S OWN local copy of the
4577    // bytes — so a responder that holds the key list but dropped the bytes
4578    // still fails (`missing bytes for committed key` / digest mismatch).
4579    // This is sound ONLY while keys are content addresses. If this module
4580    // is ever reused for non-content-addressed records (`bytes_hash != key`),
4581    // the `(k, k)` shortcut would let a byte-less node forge a valid root and
4582    // MUST be replaced with `(key, BLAKE3(bytes))` computed from real bytes.
4583    let entries: Vec<_> = keys.into_iter().take(cap).map(|k| (k, k)).collect();
4584
4585    // No-op-rotation guard: compute just the Merkle root from `entries`
4586    // and compare against the currently-advertised commitment's root.
4587    // If they match, the key set is unchanged and a new rotation would
4588    // only swap a randomized ML-DSA signature for a fresh one — same
4589    // content, different commitment_hash. That invalidates every
4590    // outstanding `recent_provers` credit on this node across the
4591    // close group with no security benefit, breaking steady-state
4592    // quorum liveness on large nodes that can't re-audit every key
4593    // every rotation interval. Skip the rotation entirely when the
4594    // tree is unchanged.
4595    // Build the tree ONCE here (moving `entries`): it serves both the no-op
4596    // root check below and, if we proceed, the signed commitment via
4597    // `build_from_tree` (§11 — previously the tree was built here and AGAIN
4598    // inside `BuiltCommitment::build`).
4599    let candidate_tree = commitment::MerkleTree::build(entries)
4600        .map_err(|e| Error::Crypto(format!("commitment tree build: {e}")))?;
4601    let candidate_root = candidate_tree.root();
4602    if let Some(current) = state.current() {
4603        if current.commitment().root == candidate_root {
4604            debug!(
4605                "Commitment rotation: key set unchanged (root={}); skipping no-op re-sign",
4606                hex::encode(candidate_root)
4607            );
4608            // Even though we skip re-signing (to avoid invalidating holder
4609            // credit), retention must still advance on the wall clock: a
4610            // previously-gossiped commitment that holds a now-out-of-range key
4611            // must be able to age out of the answerability window even when the
4612            // committed key set is frozen here for many rotations. Without this,
4613            // the no-op guard would pin a stale slot — and its key — forever.
4614            state.age_out();
4615            return Ok(());
4616        }
4617    }
4618
4619    let sk_bytes = identity.secret_key_bytes().to_vec();
4620    let sk = MlDsaSecretKey::from_bytes(MlDsaVariant::MlDsa65, &sk_bytes)
4621        .map_err(|e| Error::Crypto(format!("commitment build: load sk: {e}")))?;
4622    let pk_bytes = identity.public_key().as_bytes().to_vec();
4623    let peer_id_bytes = *p2p.peer_id().as_bytes();
4624
4625    let built = commitment_state::BuiltCommitment::build_from_tree(
4626        candidate_tree,
4627        &peer_id_bytes,
4628        &sk,
4629        &pk_bytes,
4630    )
4631    .map_err(|e| Error::Crypto(format!("commitment build: {e}")))?;
4632
4633    let hash = hex::encode(built.hash());
4634    let key_count = built.commitment().key_count;
4635    state.rotate(built);
4636    info!("Storage commitment rotated: hash={hash} key_count={key_count}");
4637    Ok(())
4638}
4639
4640#[cfg(test)]
4641#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
4642mod tests {
4643    use super::{
4644        adaptive_timeout_threshold, apply_audit_failure_credit_revocation,
4645        audit_failure_clears_bootstrap_claim, audit_failure_revokes_holder_credit,
4646        audit_launch_decision, config, cooldown_allows_audit, decide_audit_failure_action,
4647        first_failed_key_label, fresh_offer_payment_context, median_timeout_strikes_excluding,
4648        paid_notify_payment_context, plan_failed_audit, record_audit_timeout_strike,
4649        timeout_strike_reaches_threshold, AuditFailureAction, AUDIT_TIMEOUT_STRIKE_MAX,
4650    };
4651    use crate::payment::VerificationContext;
4652    use crate::replication::recent_provers::RecentProvers;
4653    use crate::replication::types::AuditFailureReason;
4654    use saorsa_core::identity::PeerId;
4655    use std::collections::HashMap;
4656    use std::time::Duration;
4657    use std::time::Instant;
4658
4659    fn test_peer(b: u8) -> PeerId {
4660        let mut bytes = [0u8; 32];
4661        bytes[0] = b;
4662        PeerId::from_bytes(bytes)
4663    }
4664
4665    fn test_key(b: u8) -> crate::ant_protocol::XorName {
4666        let mut k = [0u8; 32];
4667        k[0] = b;
4668        k
4669    }
4670
4671    #[test]
4672    fn fresh_offer_runs_client_put_payment_checks() {
4673        assert_eq!(
4674            fresh_offer_payment_context(),
4675            VerificationContext::ClientPut
4676        );
4677    }
4678
4679    #[test]
4680    fn paid_notify_uses_paid_list_admission_payment_checks() {
4681        assert_eq!(
4682            paid_notify_payment_context(),
4683            VerificationContext::PaidListAdmission
4684        );
4685    }
4686
4687    #[test]
4688    fn audit_timeout_preserves_active_bootstrap_claim() {
4689        assert!(!audit_failure_clears_bootstrap_claim(
4690            &AuditFailureReason::Timeout
4691        ));
4692    }
4693
4694    fn strike_peer(b: u8) -> PeerId {
4695        let mut bytes = [0u8; 32];
4696        bytes[0] = b;
4697        PeerId::from_bytes(bytes)
4698    }
4699
4700    // HELPER-LEVEL: counter arithmetic + threshold predicate. The reset is
4701    // simulated by an in-test `strikes.remove`; the real reset path (the
4702    // `Passed` arm) is covered at the glue level below.
4703    #[test]
4704    fn single_timeout_then_success_emits_no_failure_and_resets() {
4705        let peer = strike_peer(1);
4706        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4707        let base = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
4708        let after_one = record_audit_timeout_strike(&mut strikes, &peer);
4709        assert_eq!(after_one, 1);
4710        assert!(!timeout_strike_reaches_threshold(after_one, base));
4711        strikes.remove(&peer);
4712        assert!(!strikes.contains_key(&peer));
4713    }
4714
4715    #[test]
4716    fn consecutive_timeouts_cross_threshold_at_n() {
4717        let peer = strike_peer(2);
4718        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4719        let n = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
4720        let mut last = 0;
4721        for i in 1..=n {
4722            last = record_audit_timeout_strike(&mut strikes, &peer);
4723            if i < n {
4724                assert!(!timeout_strike_reaches_threshold(last, n));
4725            }
4726        }
4727        assert!(timeout_strike_reaches_threshold(last, n));
4728        // The count keeps climbing past the base threshold (so it can also
4729        // cross a higher *adaptive* threshold), but is bounded by the strike
4730        // cap — no unbounded growth.
4731        let mut c = last;
4732        for _ in 0..200 {
4733            c = record_audit_timeout_strike(&mut strikes, &peer);
4734        }
4735        assert_eq!(
4736            c,
4737            super::AUDIT_TIMEOUT_STRIKE_MAX,
4738            "count saturates at the max cap"
4739        );
4740        assert!(c > n, "count must be able to exceed the base threshold");
4741    }
4742
4743    // ADR-0002 Network Resilience: adaptive timeout threshold.
4744
4745    #[test]
4746    fn median_timeout_strikes_basics() {
4747        let target = strike_peer(99);
4748        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4749        // No other peers → 0 (healthy network, threshold == base).
4750        assert_eq!(median_timeout_strikes_excluding(&strikes, &target), 0);
4751        strikes.insert(strike_peer(1), 1);
4752        strikes.insert(strike_peer(2), 3);
4753        strikes.insert(strike_peer(3), 5);
4754        // Sorted [1,3,5], lower-median index 1 → 3.
4755        assert_eq!(median_timeout_strikes_excluding(&strikes, &target), 3);
4756    }
4757
4758    // ADVERSARIAL (ADR point e + sybil-inflation bound). Two invariants the
4759    // existing suite leaves unpinned:
4760    //  1. EVEN-count inputs must take the LOWER of the two middle values. The
4761    //     existing basics test only feeds an odd-length cohort, so an
4762    //     implementation that used `len/2` (upper median) would still pass it.
4763    //     Here [1,4] -> lower median 1 (not 4) and [2,4,6,8] -> 4 (not 6).
4764    //  2. A sybil cohort pinned at the *strike cap* (the most an attacker could
4765    //     ever drive fabricated peers to) STILL cannot push the grace past
4766    //     MAX_ADAPTIVE_TIMEOUT_GRACE: the threshold saturates at base + max
4767    //     grace regardless of how high or how numerous the cohort is.
4768    // FLIPS IF: median switches to the upper element on even input, or the
4769    // grace clamp (`.min(MAX_ADAPTIVE_TIMEOUT_GRACE)`) is removed.
4770    #[test]
4771    fn even_count_takes_lower_median_and_sybil_cohort_cannot_exceed_grace_bound() {
4772        let target = strike_peer(150);
4773
4774        // Even count == 2: lower of [1, 4] is 1.
4775        let mut two: HashMap<PeerId, u32> = HashMap::new();
4776        two.insert(strike_peer(1), 1);
4777        two.insert(strike_peer(2), 4);
4778        assert_eq!(
4779            median_timeout_strikes_excluding(&two, &target),
4780            1,
4781            "even-count median must take the LOWER middle value (1), not the upper (4)"
4782        );
4783
4784        // Even count == 4: sorted [2,4,6,8], lower median index (4-1)/2 = 1 → 4.
4785        let mut four: HashMap<PeerId, u32> = HashMap::new();
4786        for (i, v) in (10u8..).zip([2u32, 4, 6, 8]) {
4787            four.insert(strike_peer(i), v);
4788        }
4789        assert_eq!(
4790            median_timeout_strikes_excluding(&four, &target),
4791            4,
4792            "even-count median must be the lower middle (4), not the upper (6)"
4793        );
4794
4795        // Sybil cohort pinned at the strike CAP — the strongest inflation an
4796        // attacker could mount — must not lift the threshold past base + max
4797        // grace. Try several cohort sizes (odd and even) to be sure.
4798        for cohort in [2u8, 5, 8, 20] {
4799            let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4800            for i in 0..cohort {
4801                strikes.insert(strike_peer(50 + i), super::AUDIT_TIMEOUT_STRIKE_MAX);
4802            }
4803            let threshold = adaptive_timeout_threshold(&strikes, &target);
4804            assert_eq!(
4805                threshold,
4806                config::AUDIT_TIMEOUT_STRIKE_THRESHOLD + super::MAX_ADAPTIVE_TIMEOUT_GRACE,
4807                "a sybil cohort at the strike cap (size {cohort}) must saturate the grace at \
4808                 the bound, never exceed it"
4809            );
4810        }
4811
4812        // And even at the bounded-but-inflated threshold, a genuinely
4813        // non-responsive target can still cross it (cap > max reachable
4814        // threshold), so the bound never shields a bad node forever.
4815        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4816        for i in 0..8u8 {
4817            strikes.insert(strike_peer(80 + i), super::AUDIT_TIMEOUT_STRIKE_MAX);
4818        }
4819        let threshold = adaptive_timeout_threshold(&strikes, &target);
4820        let mut c = 0;
4821        for _ in 0..(threshold + 5) {
4822            c = record_audit_timeout_strike(&mut strikes, &target);
4823        }
4824        assert!(
4825            timeout_strike_reaches_threshold(c, threshold),
4826            "target must still cross the bounded inflated threshold ({c} vs {threshold})"
4827        );
4828    }
4829
4830    #[test]
4831    fn lone_timing_out_peer_does_not_inflate_its_own_grace() {
4832        // The peer under judgement is excluded from the median, so a single bad
4833        // peer (the common case) is judged against the base threshold and caught
4834        // — it cannot raise its own bar as its strike count climbs.
4835        let bad = strike_peer(7);
4836        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4837        strikes.insert(bad, 5); // its own large count must not count
4838        assert_eq!(
4839            adaptive_timeout_threshold(&strikes, &bad),
4840            config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
4841        );
4842    }
4843
4844    #[test]
4845    fn widespread_timeouts_widen_the_grace() {
4846        // Genuine disruption: many OTHER honest peers carry timeout strikes. The
4847        // median rises, so the threshold for any given peer widens beyond the
4848        // base — the audit system does not pile onto a struggling network.
4849        let target = strike_peer(100);
4850        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4851        for i in 0..9u8 {
4852            strikes.insert(strike_peer(i), 4);
4853        }
4854        assert_eq!(
4855            adaptive_timeout_threshold(&strikes, &target),
4856            4 + config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
4857        );
4858        assert!(
4859            adaptive_timeout_threshold(&strikes, &target) > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
4860        );
4861    }
4862
4863    #[test]
4864    fn adaptive_grace_only_responds_to_timeouts_not_deterministic_failures() {
4865        // The strike map is fed ONLY by timeouts (plan_failed_audit records a
4866        // strike for Timeout and never for confirmed failures). So a flood of
4867        // deterministic failures cannot inflate the median to buy grace.
4868        let target = strike_peer(101);
4869        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4870        // Many confirmed (non-timeout) failures: these must NOT touch the map.
4871        for i in 0..9u8 {
4872            let action = plan_failed_audit(
4873                &AuditFailureReason::DigestMismatch,
4874                &mut strikes,
4875                &strike_peer(i),
4876            );
4877            assert_eq!(action, AuditFailureAction::ConfirmedPenalize);
4878        }
4879        assert!(
4880            strikes.is_empty(),
4881            "deterministic failures must not record strikes"
4882        );
4883        // Threshold stays at the base — an attacker cannot widen grace by
4884        // failing audits on purpose.
4885        assert_eq!(
4886            adaptive_timeout_threshold(&strikes, &target),
4887            config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
4888        );
4889    }
4890
4891    // ADR-0002: "occasional surprise exams, keeps load low" — the per-peer
4892    // cooldown must collapse a gossip flood into at most one audit per window.
4893
4894    #[test]
4895    fn gossip_flood_yields_at_most_one_audit_per_cooldown_window() {
4896        let peer = strike_peer(1);
4897        let mut map: HashMap<PeerId, Instant> = HashMap::new();
4898        let t0 = Instant::now();
4899        // First gossip in the window passes; a burst of further gossips at the
4900        // same instant are all suppressed.
4901        assert!(cooldown_allows_audit(&mut map, &peer, t0));
4902        let mut passed = 1;
4903        for _ in 0..100 {
4904            if cooldown_allows_audit(&mut map, &peer, t0) {
4905                passed += 1;
4906            }
4907        }
4908        assert_eq!(
4909            passed, 1,
4910            "a flood at one instant must trigger exactly one audit"
4911        );
4912    }
4913
4914    // ADR-0002 ordering invariant: `maybe_trigger_gossip_audit` stamps the
4915    // per-peer cooldown BEFORE the probability lottery, so a LOSING ticket still
4916    // consumes the window. This is the property the isolated cooldown tests above
4917    // cannot see: they never sample the lottery, so a regression that reordered
4918    // the gates (sample probability first, only stamp the cooldown on a win)
4919    // would still pass them while breaking flood-resistance: a flood would then
4920    // re-roll the lottery on EVERY message until one won, multiplying audits.
4921    //
4922    // We model the exact production gate order (cooldown-then-lottery) with a
4923    // lottery driven by a fixed outcome instead of `gen_bool(..)`. The first
4924    // message LOSES the lottery; the remaining flood messages all WIN. With the
4925    // production order, the losing first ticket burns the window and every later
4926    // winner in the same window is blocked, so there are 0 audits this window. If
4927    // the gates were flipped, the second message's winning ticket would slip
4928    // through. The window only reopens after the cooldown elapses.
4929    //
4930    // FLIPS IF: the lottery is sampled before `cooldown_allows_audit` (a losing
4931    // ticket no longer consumes the window), re-enabling a flood-amplified audit
4932    // storm.
4933    #[test]
4934    fn losing_lottery_still_consumes_cooldown_window() {
4935        // Faithful re-implementation of the two gates in
4936        // `maybe_trigger_gossip_audit`, with the lottery outcome made
4937        // deterministic instead of `rand::thread_rng().gen_bool(..)`.
4938        // Calls the SHIPPED `audit_launch_decision` (the same function
4939        // `maybe_trigger_gossip_audit` uses), so a reorder of the two gates in
4940        // production fails this test — not a local reimplementation.
4941        let peer = strike_peer(3);
4942        let mut map: HashMap<PeerId, Instant> = HashMap::new();
4943        let t0 = Instant::now();
4944
4945        // First flooded message at t0 LOSES the lottery, but the cooldown is
4946        // stamped BEFORE the lottery is consulted, so the window is now consumed.
4947        assert!(
4948            !audit_launch_decision(&mut map, &peer, t0, false),
4949            "a losing ticket launches no audit"
4950        );
4951
4952        // 99 more flooded messages at the same instant would all WIN the lottery,
4953        // yet every one must be blocked by the cooldown the loser already stamped.
4954        // (If production sampled the lottery FIRST, these would each get a fresh
4955        // roll and audits would multiply — this assertion catches that reorder.)
4956        let mut audits = 0;
4957        for _ in 0..99 {
4958            if audit_launch_decision(&mut map, &peer, t0, true) {
4959                audits += 1;
4960            }
4961        }
4962        assert_eq!(
4963            audits, 0,
4964            "a losing first ticket must consume the window so no later flooded \
4965             message in the same window can audit"
4966        );
4967
4968        // The window only reopens after the cooldown elapses; the next winning
4969        // ticket then launches exactly one audit.
4970        let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
4971        assert!(
4972            audit_launch_decision(&mut map, &peer, after, true),
4973            "after the cooldown a winning ticket audits again"
4974        );
4975    }
4976
4977    #[test]
4978    fn cooldown_lets_audit_through_after_the_window() {
4979        let peer = strike_peer(2);
4980        let mut map: HashMap<PeerId, Instant> = HashMap::new();
4981        let t0 = Instant::now();
4982        assert!(cooldown_allows_audit(&mut map, &peer, t0));
4983        // Within the window: suppressed.
4984        let within = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS - 1);
4985        assert!(!cooldown_allows_audit(&mut map, &peer, within));
4986        // Past the window: allowed again.
4987        let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
4988        assert!(cooldown_allows_audit(&mut map, &peer, after));
4989    }
4990
4991    #[test]
4992    fn cooldown_is_per_peer_independent() {
4993        let mut map: HashMap<PeerId, Instant> = HashMap::new();
4994        let t0 = Instant::now();
4995        // Different peers each get their own first-audit pass at the same instant.
4996        for i in 0..20u8 {
4997            assert!(
4998                cooldown_allows_audit(&mut map, &strike_peer(i), t0),
4999                "peer {i} should be auditable independently"
5000            );
5001        }
5002    }
5003
5004    #[test]
5005    fn inflated_adaptive_threshold_is_still_reachable_and_bounded() {
5006        // When the median lifts the threshold above the base, a genuinely
5007        // non-responsive peer's strike count must still be able to
5008        // reach it (the count is no longer capped at the base). And the grace
5009        // widening itself is bounded so it can't shield a bad node forever.
5010        let target = strike_peer(200);
5011        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5012        // A cohort of other peers each at a high strike count.
5013        for i in 0..9u8 {
5014            strikes.insert(strike_peer(i), 10);
5015        }
5016        let threshold = adaptive_timeout_threshold(&strikes, &target);
5017        // Grace is capped, so the threshold cannot exceed base + max grace.
5018        assert!(
5019            threshold <= config::AUDIT_TIMEOUT_STRIKE_THRESHOLD + super::MAX_ADAPTIVE_TIMEOUT_GRACE
5020        );
5021        assert!(threshold > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD);
5022        // The target peer can accumulate strikes past that inflated threshold.
5023        let mut c = 0;
5024        for _ in 0..threshold + 5 {
5025            c = record_audit_timeout_strike(&mut strikes, &target);
5026        }
5027        assert!(
5028            timeout_strike_reaches_threshold(c, threshold),
5029            "a persistent peer must be able to cross the inflated threshold ({c} vs {threshold})"
5030        );
5031    }
5032
5033    #[test]
5034    fn audit_on_gossip_constants_match_adr() {
5035        // Tripwire on the ADR-locked tunables. The spot-check count sits at the
5036        // top of the auditor's 3..=5 band (the auditor clamps to that band, so
5037        // values above 5 would silently never be requested).
5038        assert_eq!(config::AUDIT_SPOTCHECK_COUNT, 5);
5039        assert!((config::AUDIT_ON_GOSSIP_PROBABILITY - 0.2).abs() < f64::EPSILON);
5040        assert_eq!(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS, 30 * 60);
5041    }
5042
5043    // (d) A confirmed storage-integrity failure penalizes immediately and
5044    // revokes credit; it is not a timeout.
5045    #[test]
5046    fn digest_mismatch_is_not_a_timeout_and_penalizes_immediately() {
5047        assert!(audit_failure_clears_bootstrap_claim(
5048            &AuditFailureReason::DigestMismatch
5049        ));
5050        assert!(audit_failure_revokes_holder_credit(
5051            &AuditFailureReason::DigestMismatch
5052        ));
5053    }
5054
5055    // E2E (pure decision): an honest peer that times out once, recovers,
5056    // repeatedly, never reaches a penalty because each success resets strikes.
5057    // FLIPS IF: the strike threshold is removed or success stops resetting.
5058    #[test]
5059    fn e2e_honest_intermittent_timeouts_never_penalized() {
5060        let peer = strike_peer(10);
5061        let base = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
5062        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5063        for _ in 0..10 {
5064            let after = record_audit_timeout_strike(&mut strikes, &peer);
5065            assert_eq!(
5066                decide_audit_failure_action(&AuditFailureReason::Timeout, after, base),
5067                AuditFailureAction::TimeoutGrace
5068            );
5069            strikes.remove(&peer);
5070        }
5071        assert!(!strikes.contains_key(&peer));
5072    }
5073
5074    // E2E: a peer that times out on EVERY audit (never reset) crosses the
5075    // threshold and is penalized — the deterrent against non-storing peers.
5076    // FLIPS IF: per-challenge window widened so it answers in time, or strikes
5077    // reset without a success.
5078    #[test]
5079    fn e2e_persistent_timeouts_get_penalized() {
5080        let peer = strike_peer(11);
5081        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5082        let threshold = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
5083        let mut penalized_at = None;
5084        for tick in 1..=(threshold + 2) {
5085            let after = record_audit_timeout_strike(&mut strikes, &peer);
5086            if decide_audit_failure_action(&AuditFailureReason::Timeout, after, threshold)
5087                == AuditFailureAction::TimeoutPenalize
5088                && penalized_at.is_none()
5089            {
5090                penalized_at = Some(tick);
5091            }
5092        }
5093        assert_eq!(penalized_at, Some(threshold));
5094    }
5095
5096    // Glue: a Timeout through the real plan_failed_audit MUST record a strike on
5097    // the map AND penalize once enough accumulate.
5098    // FLIPS IF: the handler stops feeding Timeout through the strike counter
5099    // (e.g. strikes_after hard-coded to 0). (Mutation-verified.)
5100    #[test]
5101    fn e2e_glue_timeout_records_strike_and_penalizes_at_threshold() {
5102        let peer = strike_peer(20);
5103        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5104        let threshold = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
5105        let mut action = AuditFailureAction::TimeoutGrace;
5106        for tick in 1..=threshold {
5107            action = plan_failed_audit(&AuditFailureReason::Timeout, &mut strikes, &peer);
5108            assert_eq!(strikes.get(&peer).copied(), Some(tick));
5109        }
5110        assert_eq!(action, AuditFailureAction::TimeoutPenalize);
5111    }
5112
5113    // Glue: a confirmed failure through plan_failed_audit must NOT touch the
5114    // strike map and must return ConfirmedPenalize.
5115    #[test]
5116    fn e2e_glue_confirmed_failure_leaves_strike_map_untouched() {
5117        let peer = strike_peer(21);
5118        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5119        for reason in [
5120            AuditFailureReason::DigestMismatch,
5121            AuditFailureReason::KeyAbsent,
5122            AuditFailureReason::Rejected,
5123            AuditFailureReason::MalformedResponse,
5124        ] {
5125            assert_eq!(
5126                plan_failed_audit(&reason, &mut strikes, &peer),
5127                AuditFailureAction::ConfirmedPenalize
5128            );
5129        }
5130        assert!(strikes.is_empty());
5131    }
5132
5133    // ADR-0002 "Accounting and False Positives", adversarial: a DETERMINISTIC
5134    // failure is acted on the FIRST time it occurs, "regardless of network
5135    // conditions". Here the strike map is pre-loaded with many *other* peers
5136    // timing out, which inflates the adaptive timeout grace to its cap — the
5137    // most forgiving the network ever gets. Under that maximally-relaxed
5138    // window:
5139    //   - a brand-new peer's FIRST deterministic failure (DigestMismatch /
5140    //     Rejected / MalformedResponse) STILL returns ConfirmedPenalize, never
5141    //     a grace lane, and never touches the strike map; while
5142    //   - that same peer's FIRST timeout is only TimeoutGrace.
5143    // This proves the inflated grace is the timeout-only lane and can NEVER be
5144    // weaponized to buy a deterministic failure even one round of delay.
5145    // FLIPS IF: deterministic failures start consulting the strike threshold,
5146    // or ConfirmedPenalize is collapsed into a timeout action.
5147    #[test]
5148    fn deterministic_failure_penalizes_first_time_under_inflated_grace() {
5149        let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5150        // Saturate the adaptive grace: many other peers each carrying a high
5151        // consecutive-timeout count, so the median (and thus the grace) is
5152        // pushed to its MAX cap for any newly-judged peer.
5153        for b in 100..150u8 {
5154            let other = strike_peer(b);
5155            for _ in 0..AUDIT_TIMEOUT_STRIKE_MAX {
5156                record_audit_timeout_strike(&mut strikes, &other);
5157            }
5158        }
5159        let victim = strike_peer(7);
5160        // Sanity: the grace seen by the victim is genuinely inflated above base.
5161        let inflated = adaptive_timeout_threshold(&strikes, &victim);
5162        assert!(
5163            inflated > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD,
5164            "test precondition: grace must be inflated, got {inflated}"
5165        );
5166
5167        // First deterministic failure of each kind -> ConfirmedPenalize on
5168        // occurrence #1, and the victim is never inserted into the strike map.
5169        for reason in [
5170            AuditFailureReason::DigestMismatch,
5171            AuditFailureReason::Rejected,
5172            AuditFailureReason::MalformedResponse,
5173        ] {
5174            let action = plan_failed_audit(&reason, &mut strikes, &victim);
5175            assert_eq!(
5176                action,
5177                AuditFailureAction::ConfirmedPenalize,
5178                "{reason:?} must penalize on the first occurrence regardless of grace"
5179            );
5180            assert_ne!(
5181                action,
5182                AuditFailureAction::TimeoutPenalize,
5183                "a deterministic failure must NOT be routed through the (eviction-gated) \
5184                 timeout-penalize lane"
5185            );
5186            assert!(
5187                !strikes.contains_key(&victim),
5188                "deterministic failure must not touch the timeout strike map"
5189            );
5190            // And it always revokes holder credit / clears the claim.
5191            assert!(audit_failure_revokes_holder_credit(&reason));
5192            assert!(audit_failure_clears_bootstrap_claim(&reason));
5193        }
5194
5195        // The SAME victim's first timeout, under the same inflated grace, is
5196        // only TimeoutGrace (no penalty, no revocation, claim retained).
5197        let timeout_action = plan_failed_audit(&AuditFailureReason::Timeout, &mut strikes, &victim);
5198        assert_eq!(timeout_action, AuditFailureAction::TimeoutGrace);
5199        assert_eq!(strikes.get(&victim).copied(), Some(1));
5200        assert!(!audit_failure_revokes_holder_credit(
5201            &AuditFailureReason::Timeout
5202        ));
5203        assert!(!audit_failure_clears_bootstrap_claim(
5204            &AuditFailureReason::Timeout
5205        ));
5206    }
5207
5208    /// The exact decision the `Failed` arm of `handle_subtree_audit_result`
5209    /// uses: confirmed failures revoke credit, `Timeout` does not.
5210    #[test]
5211    fn confirmed_failures_revoke_credit_timeout_does_not() {
5212        for reason in [
5213            AuditFailureReason::MalformedResponse,
5214            AuditFailureReason::DigestMismatch,
5215            AuditFailureReason::KeyAbsent,
5216            AuditFailureReason::Rejected,
5217        ] {
5218            assert!(
5219                audit_failure_revokes_holder_credit(&reason),
5220                "confirmed failure {reason:?} must revoke holder credit"
5221            );
5222        }
5223        assert!(
5224            !audit_failure_revokes_holder_credit(&AuditFailureReason::Timeout),
5225            "Timeout must NOT revoke credit (single dropped packet != storage loss)"
5226        );
5227    }
5228
5229    /// Wiring test for the security fix: the helper the handler calls
5230    /// actually strips a credited peer on a confirmed failure
5231    /// (`DigestMismatch`), and actually RETAINS credit on `Timeout`.
5232    /// Records genuine credit first so neither assertion is vacuous;
5233    /// this fails if `forget_peer` stops being called, or if the
5234    /// `Timeout` exclusion is dropped (both verified by mutation).
5235    #[test]
5236    fn apply_revocation_strips_on_digest_mismatch_retains_on_timeout() {
5237        let peer = test_peer(0xAB);
5238        let key = test_key(1);
5239        let hash = [0xCD; 32];
5240
5241        // Confirmed failure -> credit revoked.
5242        let mut provers = RecentProvers::new();
5243        provers.record_proof(key, peer, hash, Instant::now());
5244        assert!(
5245            provers.is_credited_holder(&key, &peer, &hash),
5246            "precondition: peer credited before failure"
5247        );
5248        apply_audit_failure_credit_revocation(
5249            &mut provers,
5250            &peer,
5251            &AuditFailureReason::DigestMismatch,
5252        );
5253        assert!(
5254            !provers.is_credited_holder(&key, &peer, &hash),
5255            "DigestMismatch must strip the peer's holder credit"
5256        );
5257
5258        // Timeout -> credit retained.
5259        let mut provers_timeout = RecentProvers::new();
5260        provers_timeout.record_proof(key, peer, hash, Instant::now());
5261        apply_audit_failure_credit_revocation(
5262            &mut provers_timeout,
5263            &peer,
5264            &AuditFailureReason::Timeout,
5265        );
5266        assert!(
5267            provers_timeout.is_credited_holder(&key, &peer, &hash),
5268            "Timeout must retain holder credit (deliberate liveness cushion)"
5269        );
5270    }
5271
5272    #[test]
5273    fn decoded_audit_failures_clear_active_bootstrap_claim() {
5274        for reason in [
5275            AuditFailureReason::MalformedResponse,
5276            AuditFailureReason::DigestMismatch,
5277            AuditFailureReason::KeyAbsent,
5278            AuditFailureReason::Rejected,
5279        ] {
5280            assert!(
5281                audit_failure_clears_bootstrap_claim(&reason),
5282                "decoded non-bootstrap failure {reason:?} should clear active claim"
5283            );
5284        }
5285    }
5286
5287    #[test]
5288    fn first_failed_key_label_truncates_to_16_hex_chars() {
5289        // The high-order 8 bytes of the XorName determine the label so an
5290        // operator can group audit-failures on the same chunk prefix.
5291        let mut key = [0u8; 32];
5292        key[0] = 0x18;
5293        key[7] = 0xff;
5294        // Low-order bytes (positions 8..32) are deliberately set to 0xAA
5295        // to verify they are NOT included in the label.
5296        for byte in &mut key[8..] {
5297            *byte = 0xAA;
5298        }
5299        let label = first_failed_key_label(&[key]);
5300        // Only the first 8 bytes are encoded, low-order bytes are dropped.
5301        assert_eq!(label, "0x18000000000000ff");
5302        assert_eq!(label.len(), "0x".len() + 16);
5303    }
5304
5305    #[test]
5306    fn first_failed_key_label_falls_back_when_empty() {
5307        // Should never happen in production (handle_audit_failure rejects
5308        // empty sets), but the formatter must still produce a valid label
5309        // so the log line doesn't contain a misleading default.
5310        assert_eq!(first_failed_key_label(&[]), "0x");
5311    }
5312
5313    #[test]
5314    fn first_failed_key_label_uses_first_key_only() {
5315        let first = [0x11u8; 32];
5316        let second = [0x22u8; 32];
5317        assert_eq!(
5318            first_failed_key_label(&[first, second]),
5319            format!("0x{}", hex::encode(&first[..8]))
5320        );
5321    }
5322}