ant_node/replication/mod.rs
1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod commitment;
21pub mod commitment_state;
22pub mod config;
23pub mod fresh;
24pub mod neighbor_sync;
25pub mod paid_list;
26pub mod protocol;
27pub mod pruning;
28pub mod quorum;
29pub mod recent_provers;
30pub mod scheduling;
31pub mod storage_commitment_audit;
32pub mod subtree;
33pub mod types;
34
35use std::collections::{HashMap, HashSet};
36use std::path::Path;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use std::pin::Pin;
41
42use crate::logging::{debug, error, info, warn};
43use futures::stream::FuturesUnordered;
44use futures::{Future, StreamExt};
45use rand::Rng;
46use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
47use tokio::task::JoinHandle;
48use tokio_util::sync::CancellationToken;
49
50use crate::ant_protocol::XorName;
51use crate::error::{Error, Result};
52use crate::payment::{PaymentVerifier, VerificationContext};
53use crate::replication::audit::AuditTickResult;
54use crate::replication::commitment::{commitment_hash, StorageCommitment};
55use crate::replication::commitment_state::{PeerCommitmentRecord, ResponderCommitmentState};
56use crate::replication::config::{
57 max_parallel_fetch, storage_admission_width, ReplicationConfig, MAX_AUDIT_RESPONSES_PER_PEER,
58 MAX_CONCURRENT_AUDIT_RESPONSES, MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID,
59};
60use crate::replication::paid_list::PaidList;
61use crate::replication::protocol::{
62 FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
63 VerificationResponse,
64};
65use crate::replication::quorum::KeyVerificationOutcome;
66use crate::replication::recent_provers::RecentProvers;
67use crate::replication::scheduling::ReplicationQueues;
68use crate::replication::types::{
69 AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
70 NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
71};
72use crate::storage::LmdbStorage;
73use saorsa_core::identity::{NodeIdentity, PeerId};
74use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
75
76// ---------------------------------------------------------------------------
77// Constants
78// ---------------------------------------------------------------------------
79
80/// Prefix used by saorsa-core's request-response mechanism.
81const RR_PREFIX: &str = "/rr/";
82
83fn fresh_offer_payment_context() -> VerificationContext {
84 VerificationContext::ClientPut
85}
86
87fn paid_notify_payment_context() -> VerificationContext {
88 VerificationContext::PaidListAdmission
89}
90
91/// Boxed future type for in-flight fetch tasks.
92type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
93
94/// Shared dependencies for one verification worker cycle.
95struct VerificationCycleContext<'a> {
96 p2p_node: &'a Arc<P2PNode>,
97 paid_list: &'a Arc<PaidList>,
98 storage: &'a Arc<LmdbStorage>,
99 queues: &'a Arc<RwLock<ReplicationQueues>>,
100 config: &'a ReplicationConfig,
101 bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
102 is_bootstrapping: &'a Arc<RwLock<bool>>,
103 bootstrap_complete_notify: &'a Arc<Notify>,
104 /// v12 §6 holder-eligibility inputs. The verifier downgrades a
105 /// peer's Present claim to Unresolved unless they're a credited
106 /// holder of the key (i.e. they recently passed a commitment-bound
107 /// audit on it under their currently-credited commitment hash).
108 last_commitment_by_peer: &'a Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
109 ever_capable_peers: &'a Arc<RwLock<HashSet<PeerId>>>,
110 recent_provers: &'a Arc<RwLock<RecentProvers>>,
111}
112
113/// Fetch worker polling interval in milliseconds.
114const FETCH_WORKER_POLL_MS: u64 = 100;
115
116/// Verification worker polling interval in milliseconds.
117const VERIFICATION_WORKER_POLL_MS: u64 = 250;
118
119/// Verification cycle duration that is worth surfacing at info level.
120const VERIFICATION_CYCLE_SLOW_LOG_MS: u128 = 500;
121
122/// Standard trust event weight for per-operation success/failure signals.
123///
124/// Used for individual replication fetch outcomes, integrity check failures,
125/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
126/// is reserved for confirmed audit failures.
127const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
128
129/// Bootstrap drain check interval in seconds.
130const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
131
132/// How often the responder rebuilds + rotates its storage commitment.
133///
134/// Each rebuild scans LMDB to compute leaf hashes; for ~10k keys this is
135/// sub-100ms (BLAKE3 + tree build). Retention is gossip-anchored, NOT
136/// rotation-anchored: the responder stays answerable for the current
137/// commitment plus the last `RETAINED_GOSSIPED_COMMITMENTS` (= 2) it
138/// actually gossiped, each kept for `GOSSIP_ANSWERABILITY_TTL` (3 h) after
139/// its last emission (see `commitment_state`). So the rotation cadence does
140/// not by itself bound answerability — a gossiped commitment stays
141/// answerable across rotations until its gossip TTL lapses.
142///
143/// Default: 1 hour, aligned with the worst-case neighbor-sync cooldown
144/// (`NEIGHBOR_SYNC_COOLDOWN_SECS = 3600`). Because the gossip TTL (3 h)
145/// comfortably exceeds the gap between our rotation and the next gossip
146/// arrival at a remote peer, this prevents the "unknown commitment hash" ->
147/// Idle audit-skip pattern from being the common case.
148///
149/// Why not faster: the v12 pin is bound to a specific point-in-time
150/// commitment, so rotation isn't security-critical for pin freshness —
151/// only for keeping the committed key set current as the responder
152/// writes new keys. 1 hour is plenty for that, and slow enough that
153/// honest auditors mostly hit `current` or `previous` rather than the
154/// "rotated past" case.
155const COMMITMENT_ROTATION_INTERVAL_SECS: u64 = 3600;
156
157/// Minimum interval between commitment signature verifications for a
158/// single peer (v10/v12 §2 step 3 + §11 `DoS`).
159///
160/// A sybil that bypasses the routing-table gate (e.g. by transient
161/// bucket pollution) could otherwise force one ML-DSA-65 verify (~1 ms)
162/// per gossip message. This rate limit caps the verify-per-peer rate
163/// at 1/min, which is comfortably above the legitimate gossip cadence
164/// (the 10-20 min neighbor-sync round on each peer).
165const COMMITMENT_SIG_VERIFY_MIN_INTERVAL: Duration = Duration::from_secs(60);
166
167/// Hard cap on the size of `last_commitment_by_peer`.
168///
169/// Bounds the per-process memory cost of the auditor's per-peer
170/// commitment cache. Each entry holds a `StorageCommitment`
171/// (~5 KiB: 1952-byte pubkey + 3293-byte signature + small fields).
172/// At 4096 entries the cache is ~20 MiB, which comfortably covers a
173/// realistic close-group neighborhood. When the cap is hit, one
174/// arbitrary existing entry is evicted on insert (`HashMap` iteration
175/// order is unspecified; we do not track insertion order). The
176/// `PeerRemoved` handler proactively drops entries as the DHT
177/// detects departures, and `ingest_peer_commitment` only admits
178/// commitments from peers currently in the routing table — together
179/// the cap is the third line of defence against sybil/churn flooding.
180const MAX_LAST_COMMITMENT_BY_PEER: usize = 4096;
181
182/// Cap on the sticky `ever_capable_peers` set. Bounds memory so a
183/// long-running bootstrap node cannot have the set grow without limit
184/// from peer-id churn. Sized at 4x `MAX_LAST_COMMITMENT_BY_PEER` so
185/// the set comfortably outlives normal LRU churn but still caps the
186/// blast radius of identity-rotation attacks. Once full we refuse new
187/// inserts (no eviction) — keeps the historic set stable; new v12
188/// peers above the cap are treated as legacy on rejoin, which matches
189/// the behaviour before this set existed, not a security regression.
190const MAX_EVER_CAPABLE_PEERS: usize = 4 * MAX_LAST_COMMITMENT_BY_PEER;
191
192// ---------------------------------------------------------------------------
193// ReplicationEngine
194// ---------------------------------------------------------------------------
195
196/// The replication engine manages all replication background tasks and state.
197pub struct ReplicationEngine {
198 /// Replication configuration (shared across spawned tasks).
199 config: Arc<ReplicationConfig>,
200 /// P2P networking node.
201 p2p_node: Arc<P2PNode>,
202 /// Local chunk storage.
203 storage: Arc<LmdbStorage>,
204 /// Persistent paid-for-list.
205 paid_list: Arc<PaidList>,
206 /// Payment verifier for `PoP` validation.
207 payment_verifier: Arc<PaymentVerifier>,
208 /// Replication pipeline queues.
209 queues: Arc<RwLock<ReplicationQueues>>,
210 /// Neighbor sync cycle state.
211 sync_state: Arc<RwLock<NeighborSyncState>>,
212 /// Per-peer sync history (for `RepairOpportunity`).
213 ///
214 /// This map grows with peer churn and is intentionally unbounded: entries
215 /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
216 /// naturally bounded by the routing table's k-bucket capacity.
217 sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
218 /// Per-peer consecutive audit-timeout strike counter.
219 ///
220 /// A timeout increments the peer's strike count; a successful audit
221 /// response resets it to zero. Only when a peer reaches
222 /// [`config::AUDIT_TIMEOUT_STRIKE_THRESHOLD`] consecutive timeouts is a
223 /// timeout reported as an `ApplicationFailure` trust event. This separates
224 /// honest transient slowness (resets on the next normal response) from a
225 /// peer that does not store the data and is slow on every audit. Lives
226 /// outside `NeighborSyncState` so it is never wiped by a neighbor-sync
227 /// cycle reset. Grows with peer churn like `sync_history`; entries are a
228 /// single `u32` and peer IDs are bounded by k-bucket capacity.
229 audit_timeout_strikes: Arc<RwLock<HashMap<PeerId, u32>>>,
230 /// Per-peer cooldown for gossip-triggered subtree audits (ADR-0002).
231 ///
232 /// Records when each peer was last audited so a burst of gossiped
233 /// commitment changes cannot spawn back-to-back audits of the same peer.
234 /// Bounded by routing-table membership and cleaned on `PeerRemoved`.
235 audit_on_gossip_cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
236 /// Completed local neighbor-sync cycle epoch for proof maturity.
237 sync_cycle_epoch: Arc<RwLock<u64>>,
238 /// Per-key repair proof tracking for audit eligibility.
239 repair_proofs: Arc<RwLock<RepairProofs>>,
240 /// Bootstrap state tracking.
241 bootstrap_state: Arc<RwLock<BootstrapState>>,
242 /// Whether this node is currently bootstrapping.
243 is_bootstrapping: Arc<RwLock<bool>>,
244 /// Trigger for early neighbor sync (signalled on topology changes).
245 sync_trigger: Arc<Notify>,
246 /// Notified when `is_bootstrapping` transitions from `true` to `false`.
247 bootstrap_complete_notify: Arc<Notify>,
248 /// Node identity (for signing storage commitments).
249 ///
250 /// Phase 3 of the v12 storage-bound audit design. The responder
251 /// uses this to sign its periodically-built `StorageCommitment`.
252 identity: Arc<NodeIdentity>,
253 /// Responder-side commitment state (two-slot atomic rotation).
254 ///
255 /// Periodically rebuilt from the live LMDB key set; gossiped on
256 /// outbound `NeighborSyncRequest`/`Response`; consulted by the
257 /// commitment-bound audit handler.
258 commitment_state: Arc<ResponderCommitmentState>,
259 /// Auditor-side per-peer commitment record (last known commitment +
260 /// sticky `commitment_capable` flag).
261 ///
262 /// Populated whenever an inbound gossip carries a verified
263 /// commitment from the sender. Used by `audit_tick` to snapshot
264 /// `expected_commitment_hash` into outbound challenges, and by
265 /// holder-eligibility (§6) to decide whether a peer's `recent_provers`
266 /// proof should be honoured. The sticky `commitment_capable` flag
267 /// flips true on first successful ingest and never reverts (§2
268 /// step 5).
269 last_commitment_by_peer: Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
270 /// Sticky set of peer IDs we have EVER seen carrying a v12
271 /// commitment, independent of whether their commitment bytes are
272 /// still in `last_commitment_by_peer`. The §6 holder-eligibility
273 /// closure consults this set to keep treating churned-out
274 /// previously-v12 peers as v12-capable (rather than degrading them
275 /// to "legacy" credit-unconditionally) when they re-appear on the
276 /// network before their next gossip arrives. Bounded growth: even
277 /// at one million peers seen over the node's lifetime, the set is
278 /// 32 MB.
279 ever_capable_peers: Arc<RwLock<HashSet<PeerId>>>,
280 /// Auditor-side holder-eligibility cache (v12 §6).
281 ///
282 /// Recorded on successful commitment-bound audit; read by future
283 /// quorum / paid-list eligibility checks (phase-3 stretch).
284 recent_provers: Arc<RwLock<RecentProvers>>,
285 /// Per-peer last sig-verify attempt timestamp for the §2 step 3 /
286 /// §11 `DoS` rate limit. Bumped on EVERY verify attempt (success or
287 /// failure) so a peer we've never successfully verified can't burn
288 /// CPU on a flood of structurally-plausible-but-invalid gossips.
289 /// Lives separately from `last_commitment_by_peer` because that
290 /// map's records only exist after a successful verify.
291 sig_verify_attempts: Arc<RwLock<HashMap<PeerId, Instant>>>,
292 /// Limits concurrent outbound replication sends to prevent bandwidth
293 /// saturation on home broadband connections.
294 send_semaphore: Arc<Semaphore>,
295 /// Bounds concurrent IN-FLIGHT audit-responder tasks (subtree round 1 +
296 /// byte round 2). Those are spawned off the serial message loop so disk
297 /// reads don't block replication; the semaphore restores a global
298 /// backpressure ceiling so the node can't fan out unbounded `get_raw` reads
299 /// / multi-MiB byte serves.
300 audit_responder_semaphore: Arc<Semaphore>,
301 /// Per-source in-flight audit-responder counts, capped at
302 /// [`MAX_AUDIT_RESPONSES_PER_PEER`]. The GLOBAL semaphore alone is not
303 /// flood-fair: one peer spamming challenges could occupy every slot and
304 /// starve honest auditors, whose dropped challenges then convert to
305 /// timeouts and record strikes on the HONEST peers (codex-r2 A). This
306 /// per-peer cap guarantees no single source can hold more than its share,
307 /// so a flood self-throttles without denying service to everyone else.
308 audit_responder_inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
309 /// Receiver for fresh-write events from the chunk PUT handler.
310 ///
311 /// When present, `start()` spawns a drainer task that calls
312 /// `replicate_fresh` for each event.
313 fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
314 /// Shutdown token.
315 shutdown: CancellationToken,
316 /// Background task handles.
317 task_handles: Vec<JoinHandle<()>>,
318}
319
320impl ReplicationEngine {
321 /// Create a new replication engine.
322 ///
323 /// # Errors
324 ///
325 /// Returns an error if the `PaidList` LMDB environment cannot be opened
326 /// or if the configuration fails validation.
327 #[allow(clippy::too_many_arguments)]
328 pub async fn new(
329 config: ReplicationConfig,
330 p2p_node: Arc<P2PNode>,
331 storage: Arc<LmdbStorage>,
332 payment_verifier: Arc<PaymentVerifier>,
333 identity: Arc<NodeIdentity>,
334 root_dir: &Path,
335 fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
336 shutdown: CancellationToken,
337 ) -> Result<Self> {
338 config.validate().map_err(Error::Config)?;
339
340 let paid_list = Arc::new(
341 PaidList::new(root_dir)
342 .await
343 .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
344 );
345
346 let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
347 let config = Arc::new(config);
348
349 Ok(Self {
350 config: Arc::clone(&config),
351 p2p_node,
352 storage,
353 paid_list,
354 payment_verifier,
355 queues: Arc::new(RwLock::new(ReplicationQueues::new())),
356 sync_state: Arc::new(RwLock::new(initial_neighbors)),
357 sync_history: Arc::new(RwLock::new(HashMap::new())),
358 audit_timeout_strikes: Arc::new(RwLock::new(HashMap::new())),
359 audit_on_gossip_cooldown: Arc::new(RwLock::new(HashMap::new())),
360 sync_cycle_epoch: Arc::new(RwLock::new(0)),
361 repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
362 bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
363 is_bootstrapping: Arc::new(RwLock::new(true)),
364 sync_trigger: Arc::new(Notify::new()),
365 bootstrap_complete_notify: Arc::new(Notify::new()),
366 identity,
367 commitment_state: Arc::new(ResponderCommitmentState::new()),
368 last_commitment_by_peer: Arc::new(RwLock::new(HashMap::new())),
369 ever_capable_peers: Arc::new(RwLock::new(HashSet::new())),
370 recent_provers: Arc::new(RwLock::new(RecentProvers::new())),
371 sig_verify_attempts: Arc::new(RwLock::new(HashMap::new())),
372 send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
373 audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)),
374 audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())),
375 fresh_write_rx: Some(fresh_write_rx),
376 shutdown,
377 task_handles: Vec::new(),
378 })
379 }
380
381 /// Get a reference to the `PaidList`.
382 #[must_use]
383 pub fn paid_list(&self) -> &Arc<PaidList> {
384 &self.paid_list
385 }
386
387 /// Get a reference to the responder's commitment state. Used by audit
388 /// handlers to look up commitments by hash; used by the rotation tick
389 /// to install fresh ones.
390 #[must_use]
391 pub fn commitment_state(&self) -> &Arc<ResponderCommitmentState> {
392 &self.commitment_state
393 }
394
395 /// Get a reference to the auditor's last-commitment-by-peer table.
396 #[must_use]
397 pub fn last_commitment_by_peer(&self) -> &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>> {
398 &self.last_commitment_by_peer
399 }
400
401 /// Get a reference to the holder-eligibility cache. Phase-3 stretch:
402 /// will be read by quorum / paid-list eligibility checks.
403 #[must_use]
404 pub fn recent_provers(&self) -> &Arc<RwLock<RecentProvers>> {
405 &self.recent_provers
406 }
407
408 /// Test-only: rebuild + rotate this node's storage commitment now over its
409 /// current key set (normally on a 1h timer). Lets a test commit to chunks it
410 /// just stored without waiting for the rotation cadence.
411 ///
412 /// # Errors
413 ///
414 /// Propagates any error from reading the local key set or building/signing
415 /// the commitment.
416 #[cfg(any(test, feature = "test-utils"))]
417 pub async fn rebuild_commitment_now(&self) -> Result<()> {
418 rebuild_and_rotate_commitment(
419 &self.storage,
420 &self.identity,
421 &self.commitment_state,
422 &self.p2p_node,
423 &self.config,
424 )
425 .await
426 }
427
428 /// Test-only: directly seed this node's cached commitment for `peer`,
429 /// simulating "we received `peer`'s gossiped commitment" without depending
430 /// on neighbor-sync propagation timing. Lets a two-node audit test pin the
431 /// peer's commitment deterministically.
432 #[cfg(any(feature = "test-utils", test))]
433 pub async fn inject_peer_commitment_for_test(
434 &self,
435 peer: &PeerId,
436 commitment: StorageCommitment,
437 ) {
438 let now = Instant::now();
439 self.last_commitment_by_peer
440 .write()
441 .await
442 .insert(*peer, PeerCommitmentRecord::from_verified(commitment, now));
443 self.ever_capable_peers.write().await.insert(*peer);
444 }
445
446 /// Test-only: run ONE subtree audit against `peer` right now, pinned to the
447 /// commitment this node has cached for it (from gossip), over the live wire.
448 /// Returns the audit outcome so tests can assert honest-pass / adversary-fail
449 /// in a real two-node setting without waiting for the gossip cadence.
450 ///
451 /// Returns `AuditTickResult::Idle` if we have no cached commitment for the
452 /// peer yet (gossip hasn't reached us). Gated to test builds.
453 #[cfg(any(test, feature = "test-utils"))]
454 pub async fn audit_peer_now(&self, peer: &PeerId) -> audit::AuditTickResult {
455 let target = {
456 let map = self.last_commitment_by_peer.read().await;
457 map.get(peer)
458 .and_then(PeerCommitmentRecord::last_commitment)
459 .and_then(|c| commitment_hash(c).map(|h| (h, c.key_count)))
460 };
461 let Some((pin, key_count)) = target else {
462 return audit::AuditTickResult::Idle;
463 };
464 let credit = storage_commitment_audit::AuditCredit {
465 recent_provers: &self.recent_provers,
466 };
467 storage_commitment_audit::run_subtree_audit(
468 &self.p2p_node,
469 &self.config,
470 peer,
471 pin,
472 key_count,
473 Some(&credit),
474 )
475 .await
476 }
477
478 /// Start all background tasks.
479 ///
480 /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
481 /// the `BootstrapComplete` event emitted during DHT bootstrap is not
482 /// missed by the bootstrap-sync gate.
483 pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
484 if !self.task_handles.is_empty() {
485 error!("ReplicationEngine::start() called while already running — ignoring");
486 return;
487 }
488 info!("Starting replication engine");
489
490 self.start_message_handler();
491 self.start_neighbor_sync_loop();
492 self.start_self_lookup_loop();
493 // Audit #2 (responsible-chunk): periodic tick auditing peers for the
494 // chunks they SHOULD store (responsibility + prior hint).
495 self.start_audit_loop();
496 // Audit #1 (storage-commitment) is gossip-triggered in the message
497 // handler when a peer's commitment is ingested, not on a periodic tick.
498 self.start_commitment_rotation_loop();
499 self.start_fetch_worker();
500 self.start_verification_worker();
501 self.start_bootstrap_sync(dht_events);
502 self.start_fresh_write_drainer();
503
504 info!(
505 "Replication engine started with {} background tasks",
506 self.task_handles.len()
507 );
508 }
509
510 /// Returns `true` if the node is still in the replication bootstrap phase.
511 ///
512 /// During bootstrap, audit challenges return `Bootstrapping` instead of
513 /// digests, and neighbor sync responses carry `bootstrapping: true`.
514 pub async fn is_bootstrapping(&self) -> bool {
515 *self.is_bootstrapping.read().await
516 }
517
518 /// Wait until the replication bootstrap phase completes.
519 ///
520 /// Returns immediately if bootstrap has already completed. Useful for
521 /// readiness probes, health checks, and test harnesses that need the
522 /// node to be fully operational before proceeding.
523 ///
524 /// Returns `true` if bootstrap completed within the timeout, `false`
525 /// if the timeout elapsed first.
526 pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
527 // Register the notification future *before* checking the flag so that
528 // a transition between the read and the await is not missed.
529 let notified = self.bootstrap_complete_notify.notified();
530 tokio::pin!(notified);
531 notified.as_mut().enable();
532
533 if !*self.is_bootstrapping.read().await {
534 return true;
535 }
536
537 tokio::time::timeout(timeout, notified).await.is_ok()
538 }
539
540 /// Cancel all background tasks and wait for them to terminate.
541 ///
542 /// This must be awaited before dropping the engine when the caller needs
543 /// the `Arc<LmdbStorage>` references held by background tasks to be
544 /// released (e.g. before reopening the same LMDB environment).
545 pub async fn shutdown(&mut self) {
546 self.shutdown.cancel();
547 for (i, mut handle) in self.task_handles.drain(..).enumerate() {
548 match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
549 Ok(Ok(())) => {}
550 Ok(Err(e)) if e.is_cancelled() => {}
551 Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
552 Err(_) => {
553 warn!("Replication task {i} did not stop within 10s, aborting");
554 handle.abort();
555 }
556 }
557 }
558 }
559
560 /// Trigger an early neighbor sync round.
561 ///
562 /// Useful after topology changes (new nodes joining, network heal after
563 /// partition) when the caller wants replication to converge faster than
564 /// the regular 10-20 minute cadence.
565 pub fn trigger_neighbor_sync(&self) {
566 self.sync_trigger.notify_one();
567 }
568
569 /// Execute fresh replication for a newly stored record.
570 pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
571 fresh::replicate_fresh(
572 key,
573 data,
574 proof_of_payment,
575 &self.p2p_node,
576 &self.paid_list,
577 &self.config,
578 &self.send_semaphore,
579 )
580 .await;
581 }
582
583 // =======================================================================
584 // Background task launchers
585 // =======================================================================
586
587 /// Spawn a task that drains the fresh-write channel and triggers
588 /// replication for each newly-stored chunk.
589 fn start_fresh_write_drainer(&mut self) {
590 let Some(mut rx) = self.fresh_write_rx.take() else {
591 return;
592 };
593 let p2p = Arc::clone(&self.p2p_node);
594 let paid_list = Arc::clone(&self.paid_list);
595 let config = Arc::clone(&self.config);
596 let send_semaphore = Arc::clone(&self.send_semaphore);
597 let shutdown = self.shutdown.clone();
598
599 let handle = tokio::spawn(async move {
600 loop {
601 tokio::select! {
602 () = shutdown.cancelled() => break,
603 event = rx.recv() => {
604 let Some(event) = event else { break };
605 fresh::replicate_fresh(
606 &event.key,
607 &event.data,
608 &event.payment_proof,
609 &p2p,
610 &paid_list,
611 &config,
612 &send_semaphore,
613 )
614 .await;
615 }
616 }
617 }
618 debug!("Fresh-write drainer shut down");
619 });
620 self.task_handles.push(handle);
621 }
622
623 #[allow(clippy::too_many_lines)]
624 fn start_message_handler(&mut self) {
625 let mut p2p_events = self.p2p_node.subscribe_events();
626 let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
627 let p2p = Arc::clone(&self.p2p_node);
628 let storage = Arc::clone(&self.storage);
629 let paid_list = Arc::clone(&self.paid_list);
630 let payment_verifier = Arc::clone(&self.payment_verifier);
631 let queues = Arc::clone(&self.queues);
632 let config = Arc::clone(&self.config);
633 let shutdown = self.shutdown.clone();
634 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
635 let bootstrap_state = Arc::clone(&self.bootstrap_state);
636 let sync_history = Arc::clone(&self.sync_history);
637 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
638 let repair_proofs = Arc::clone(&self.repair_proofs);
639 let sync_trigger = Arc::clone(&self.sync_trigger);
640 let my_commitment_state = Arc::clone(&self.commitment_state);
641 let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
642 let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
643 let recent_provers = Arc::clone(&self.recent_provers);
644 let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
645 let audit_timeout_strikes = Arc::clone(&self.audit_timeout_strikes);
646 let audit_on_gossip_cooldown = Arc::clone(&self.audit_on_gossip_cooldown);
647 let sync_state = Arc::clone(&self.sync_state);
648 let audit_responder_semaphore = Arc::clone(&self.audit_responder_semaphore);
649 let audit_responder_inflight = Arc::clone(&self.audit_responder_inflight);
650
651 // ADR-0002 gossip-audit trigger: bundled state so an ingested *changed*
652 // commitment can spawn a probabilistic, cooldown-gated subtree audit.
653 let gossip_audit = GossipAuditTrigger {
654 p2p_node: Arc::clone(&p2p),
655 config: Arc::clone(&config),
656 recent_provers: Arc::clone(&recent_provers),
657 sync_state: Arc::clone(&sync_state),
658 audit_timeout_strikes: Arc::clone(&audit_timeout_strikes),
659 cooldown: Arc::clone(&audit_on_gossip_cooldown),
660 };
661
662 let handle = tokio::spawn(async move {
663 loop {
664 tokio::select! {
665 () = shutdown.cancelled() => break,
666 event = p2p_events.recv() => {
667 let Ok(event) = event else { continue };
668 if let P2PEvent::Message {
669 topic,
670 source: Some(source),
671 data,
672 ..
673 } = event {
674 // Determine if this is a replication message
675 // and whether it arrived via the /rr/ request-response
676 // path (which wraps payloads in RequestResponseEnvelope).
677 let rr_info = if topic == REPLICATION_PROTOCOL_ID {
678 Some((data.clone(), None))
679 } else if topic.starts_with(RR_PREFIX)
680 && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
681 {
682 P2PNode::parse_request_envelope(&data)
683 .filter(|(_, is_resp, _)| !is_resp)
684 .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
685 } else {
686 None
687 };
688 if let Some((payload, rr_message_id)) = rr_info {
689 match handle_replication_message(
690 &source,
691 &payload,
692 &p2p,
693 &storage,
694 &paid_list,
695 &payment_verifier,
696 &queues,
697 &config,
698 &is_bootstrapping,
699 &bootstrap_state,
700 &sync_history,
701 &sync_cycle_epoch,
702 &repair_proofs,
703 &last_commitment_by_peer,
704 &ever_capable_peers,
705 &sig_verify_attempts,
706 &my_commitment_state,
707 &gossip_audit,
708 &audit_responder_semaphore,
709 &audit_responder_inflight,
710 rr_message_id.as_deref(),
711 ).await {
712 Ok(()) => {}
713 Err(e) => {
714 debug!(
715 "Replication message from {source} error: {e}"
716 );
717 }
718 }
719 }
720 }
721 }
722 // Gap 4: Topology churn handling (Section 13).
723 //
724 // The DHT routing table emits KClosestPeersChanged when the
725 // K-closest peer set actually changes, which is the precise
726 // signal for triggering neighbor sync. This replaces the
727 // previous approach of checking every PeerConnected /
728 // PeerDisconnected event against the close group.
729 dht_event = dht_events.recv() => {
730 let Ok(dht_event) = dht_event else { continue };
731 match dht_event {
732 DhtNetworkEvent::KClosestPeersChanged { old, new } => {
733 let old_peers = old
734 .iter()
735 .take(config.neighbor_sync_scope)
736 .copied()
737 .collect::<HashSet<_>>();
738 let new_scoped = new
739 .iter()
740 .take(config.neighbor_sync_scope)
741 .copied()
742 .collect::<Vec<_>>();
743 let new_peers =
744 new_scoped.iter().copied().collect::<HashSet<_>>();
745 let entrants = new_scoped
746 .iter()
747 .copied()
748 .filter(|peer| !old_peers.contains(peer))
749 .collect::<Vec<_>>();
750 let entrant_count = entrants.len();
751 let (priority_insertions, sync_removals) = {
752 let mut state = sync_state.write().await;
753 let sync_removals = state.retain_sync_peers(&new_peers);
754 let priority_insertions = state.queue_priority_peers(entrants);
755 (priority_insertions, sync_removals)
756 };
757 if priority_insertions > 0 {
758 debug!(
759 "K-closest peers changed, queued {priority_insertions}/{entrant_count} new close peers for priority neighbor sync and pruned {sync_removals} departed pending sync entries"
760 );
761 } else {
762 debug!(
763 "K-closest peers changed, no additional close peers queued, pruned {sync_removals} departed pending sync entries, triggering early neighbor sync"
764 );
765 }
766 sync_trigger.notify_one();
767 }
768 DhtNetworkEvent::PeerRemoved { peer_id } => {
769 sync_state.write().await.remove_peer(&peer_id);
770 repair_proofs.write().await.remove_peer(&peer_id);
771 // v12: drop the commitment bytes and the
772 // recent-prover credit so a churn / sybil
773 // attacker cannot leave behind one
774 // StorageCommitment per identity in
775 // `last_commitment_by_peer`. Also drop the
776 // sig-verify rate-limit timestamp.
777 last_commitment_by_peer.write().await.remove(&peer_id);
778 recent_provers.write().await.forget_peer(&peer_id);
779 sig_verify_attempts.write().await.remove(&peer_id);
780 // Drop the timeout-strike entry too, so a
781 // departed peer leaves no residual (keeps this
782 // map bounded under churn, like its siblings).
783 audit_timeout_strikes.write().await.remove(&peer_id);
784 // Same for the gossip-audit cooldown (ADR-0002).
785 audit_on_gossip_cooldown.write().await.remove(&peer_id);
786 // The sticky `commitment_capable` flag is
787 // preserved orthogonally via
788 // `ever_capable_peers` — even after this
789 // removal, a re-joining peer continues to
790 // be treated as v12-capable rather than
791 // legacy (§3 shield).
792 }
793 _ => {}
794 }
795 }
796 }
797 }
798 debug!("Replication message handler shut down");
799 });
800 self.task_handles.push(handle);
801 }
802
803 fn start_neighbor_sync_loop(&mut self) {
804 let p2p = Arc::clone(&self.p2p_node);
805 let storage = Arc::clone(&self.storage);
806 let paid_list = Arc::clone(&self.paid_list);
807 let queues = Arc::clone(&self.queues);
808 let config = Arc::clone(&self.config);
809 let shutdown = self.shutdown.clone();
810 let sync_state = Arc::clone(&self.sync_state);
811 let sync_history = Arc::clone(&self.sync_history);
812 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
813 let repair_proofs = Arc::clone(&self.repair_proofs);
814 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
815 let bootstrap_state = Arc::clone(&self.bootstrap_state);
816 let sync_trigger = Arc::clone(&self.sync_trigger);
817 let commitment_state = Arc::clone(&self.commitment_state);
818 let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
819 let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
820 let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
821 // ADR-0002: a peer's commitment also arrives on the sync RESPONSE path
822 // (we initiated, they piggybacked theirs). Carry a gossip-audit trigger
823 // here too so a peer that only ever answers — never initiates sync —
824 // is still audited; otherwise it could fully evade auditing.
825 let gossip_audit = GossipAuditTrigger {
826 p2p_node: Arc::clone(&p2p),
827 config: Arc::clone(&config),
828 recent_provers: Arc::clone(&self.recent_provers),
829 sync_state: Arc::clone(&sync_state),
830 audit_timeout_strikes: Arc::clone(&self.audit_timeout_strikes),
831 cooldown: Arc::clone(&self.audit_on_gossip_cooldown),
832 };
833
834 let handle = tokio::spawn(async move {
835 loop {
836 let interval = config.random_neighbor_sync_interval();
837 tokio::select! {
838 () = shutdown.cancelled() => break,
839 () = tokio::time::sleep(interval) => {}
840 () = sync_trigger.notified() => {
841 debug!("Neighbor sync triggered by topology change");
842 }
843 }
844 // Wrap the sync round in a select so shutdown cancels
845 // in-progress network operations rather than waiting for
846 // the full round to complete.
847 tokio::select! {
848 () = shutdown.cancelled() => break,
849 () = run_neighbor_sync_round(
850 &p2p,
851 &storage,
852 &paid_list,
853 &queues,
854 &config,
855 &sync_state,
856 &sync_history,
857 &sync_cycle_epoch,
858 &repair_proofs,
859 &is_bootstrapping,
860 &bootstrap_state,
861 &commitment_state,
862 &last_commitment_by_peer,
863 &ever_capable_peers,
864 &sig_verify_attempts,
865 &gossip_audit,
866 ) => {}
867 }
868 }
869 debug!("Neighbor sync loop shut down");
870 });
871 self.task_handles.push(handle);
872 }
873
874 fn start_self_lookup_loop(&mut self) {
875 let p2p = Arc::clone(&self.p2p_node);
876 let config = Arc::clone(&self.config);
877 let shutdown = self.shutdown.clone();
878
879 let handle = tokio::spawn(async move {
880 loop {
881 let interval = config.random_self_lookup_interval();
882 tokio::select! {
883 () = shutdown.cancelled() => break,
884 () = tokio::time::sleep(interval) => {
885 if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
886 debug!("Self-lookup failed: {e}");
887 }
888 }
889 }
890 }
891 debug!("Self-lookup loop shut down");
892 });
893 self.task_handles.push(handle);
894 }
895
896 /// Periodic responsible-chunk audit loop (audit #2): every
897 /// [`ReplicationConfig::random_audit_tick_interval`] (~10-20 min), audit one
898 /// eligible close peer for the chunks it *should* be storing (by
899 /// responsibility and prior repair hint), independent of the gossip-triggered
900 /// storage-commitment audit. Waits for bootstrap to drain, then runs one tick
901 /// immediately and periodically thereafter.
902 fn start_audit_loop(&mut self) {
903 let p2p = Arc::clone(&self.p2p_node);
904 let storage = Arc::clone(&self.storage);
905 let config = Arc::clone(&self.config);
906 let shutdown = self.shutdown.clone();
907 let sync_history = Arc::clone(&self.sync_history);
908 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
909 let repair_proofs = Arc::clone(&self.repair_proofs);
910 let bootstrap_state = Arc::clone(&self.bootstrap_state);
911 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
912 let sync_state = Arc::clone(&self.sync_state);
913 // Needed so the responsible-chunk audit routes failures through the same
914 // strike/grace path as the storage-commitment audit (timeouts graced,
915 // not penalised on the first occurrence) and can revoke holder credit on
916 // a confirmed failure.
917 let recent_provers = Arc::clone(&self.recent_provers);
918 let audit_timeout_strikes = Arc::clone(&self.audit_timeout_strikes);
919
920 let handle = tokio::spawn(async move {
921 // Invariant 19: wait for bootstrap to drain before starting audits.
922 loop {
923 tokio::select! {
924 () = shutdown.cancelled() => return,
925 () = tokio::time::sleep(
926 std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
927 ) => {
928 if bootstrap_state.read().await.is_drained() {
929 break;
930 }
931 }
932 }
933 }
934
935 // Run one audit tick immediately after bootstrap drain.
936 {
937 let bootstrapping = *is_bootstrapping.read().await;
938 let result = {
939 let history = sync_history.read().await;
940 let current_sync_epoch = *sync_cycle_epoch.read().await;
941 audit::audit_tick_with_repair_proofs(
942 &p2p,
943 &storage,
944 &config,
945 &history,
946 &repair_proofs,
947 current_sync_epoch,
948 bootstrapping,
949 )
950 .await
951 };
952 handle_audit_result(
953 &result,
954 &p2p,
955 &sync_state,
956 &recent_provers,
957 &audit_timeout_strikes,
958 &config,
959 )
960 .await;
961 }
962
963 // Then run periodically.
964 loop {
965 let interval = config.random_audit_tick_interval();
966 tokio::select! {
967 () = shutdown.cancelled() => break,
968 () = tokio::time::sleep(interval) => {
969 let bootstrapping = *is_bootstrapping.read().await;
970 let result = {
971 let history = sync_history.read().await;
972 let current_sync_epoch = *sync_cycle_epoch.read().await;
973 audit::audit_tick_with_repair_proofs(
974 &p2p,
975 &storage,
976 &config,
977 &history,
978 &repair_proofs,
979 current_sync_epoch,
980 bootstrapping,
981 )
982 .await
983 };
984 handle_audit_result(
985 &result,
986 &p2p,
987 &sync_state,
988 &recent_provers,
989 &audit_timeout_strikes,
990 &config,
991 )
992 .await;
993 }
994 }
995 }
996 debug!("Audit loop shut down");
997 });
998 self.task_handles.push(handle);
999 }
1000
1001 /// Periodically rebuild + sign + rotate the responder's storage
1002 /// commitment.
1003 ///
1004 /// Phase 3 of the v12 storage-bound audit. Once per
1005 /// [`COMMITMENT_ROTATION_INTERVAL_SECS`], the responder reads the
1006 /// current LMDB key set, builds a Merkle tree (for content-addressed
1007 /// chunks `bytes_hash == key`, so no chunk re-read is needed), signs
1008 /// the root with the node's `MlDsaSecretKey`, and rotates the result
1009 /// into `commitment_state`. Old `previous` slot is dropped by the
1010 /// rotate (per `ResponderCommitmentState::rotate`).
1011 ///
1012 /// Skips if the key set is empty (no commitment to make) — the
1013 /// auditor side falls back to the legacy plain-digest path for
1014 /// peers that have never gossiped a commitment.
1015 fn start_commitment_rotation_loop(&mut self) {
1016 let storage = Arc::clone(&self.storage);
1017 let identity = Arc::clone(&self.identity);
1018 let commitment_state = Arc::clone(&self.commitment_state);
1019 let shutdown = self.shutdown.clone();
1020 let p2p = Arc::clone(&self.p2p_node);
1021 let config = Arc::clone(&self.config);
1022 let sync_trigger = Arc::clone(&self.sync_trigger);
1023 let recent_provers = Arc::clone(&self.recent_provers);
1024
1025 let handle = tokio::spawn(async move {
1026 // Build the first commitment immediately on startup so a
1027 // restarted node can answer commitment-bound audits right
1028 // away — otherwise current() stays None for a full rotation
1029 // interval and audits silently fall back to legacy.
1030 //
1031 // After the first build, trigger an immediate neighbor-sync
1032 // round so the new commitment gossips out within seconds.
1033 // Without this, after a restart remote auditors keep pinning
1034 // the pre-restart (rotated-away) hash until their normal
1035 // sync cadence elapses — up to 1 h in the worst case,
1036 // during which time commitment-bound audits hit "unknown
1037 // commitment hash" -> Idle no-ops.
1038 // ML-DSA signatures are randomized so we cannot reproduce
1039 // the pre-restart hash; the only honest path to recovery
1040 // is fast re-gossip.
1041 if let Err(e) =
1042 rebuild_and_rotate_commitment(&storage, &identity, &commitment_state, &p2p, &config)
1043 .await
1044 {
1045 warn!("Initial commitment build failed: {e}");
1046 } else {
1047 sync_trigger.notify_one();
1048 }
1049 loop {
1050 tokio::select! {
1051 () = shutdown.cancelled() => break,
1052 () = tokio::time::sleep(
1053 std::time::Duration::from_secs(COMMITMENT_ROTATION_INTERVAL_SECS)
1054 ) => {
1055 if let Err(e) = rebuild_and_rotate_commitment(
1056 &storage,
1057 &identity,
1058 &commitment_state,
1059 &p2p,
1060 &config,
1061 ).await {
1062 warn!("Commitment rotation failed: {e}");
1063 }
1064 // Piggyback a sweep of expired recent_provers
1065 // entries on the rotation tick (same cadence,
1066 // 1 h). is_credited_holder already honours the
1067 // TTL on read, but the sweep reclaims memory
1068 // for entries we'll never re-read.
1069 let dropped = recent_provers.write().await.sweep_expired(
1070 std::time::Instant::now()
1071 );
1072 if dropped > 0 {
1073 debug!("recent_provers: swept {dropped} expired entries");
1074 }
1075 }
1076 }
1077 }
1078 debug!("Commitment rotation loop shut down");
1079 });
1080 self.task_handles.push(handle);
1081 }
1082
1083 #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
1084 fn start_fetch_worker(&mut self) {
1085 let p2p = Arc::clone(&self.p2p_node);
1086 let storage = Arc::clone(&self.storage);
1087 let queues = Arc::clone(&self.queues);
1088 let config = Arc::clone(&self.config);
1089 let shutdown = self.shutdown.clone();
1090 let bootstrap_state = Arc::clone(&self.bootstrap_state);
1091 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1092 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1093 let concurrency = max_parallel_fetch();
1094
1095 info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
1096
1097 let handle = tokio::spawn(async move {
1098 // Each in-flight future yields (key, Option<FetchOutcome>) so we
1099 // always recover the key — even if the inner task panics.
1100 let mut in_flight = FuturesUnordered::<FetchFuture>::new();
1101
1102 loop {
1103 // Fill up to `concurrency` slots from the queue.
1104 {
1105 let mut q = queues.write().await;
1106 while in_flight.len() < concurrency {
1107 let Some(candidate) = q.dequeue_fetch() else {
1108 break;
1109 };
1110 let Some(&source) = candidate.sources.first() else {
1111 warn!(
1112 "Fetch candidate {} has no sources — dropping",
1113 hex::encode(candidate.key)
1114 );
1115 continue;
1116 };
1117 q.start_fetch(candidate.key, source, candidate.sources.clone());
1118
1119 let p2p = Arc::clone(&p2p);
1120 let storage = Arc::clone(&storage);
1121 let config = Arc::clone(&config);
1122 let token = shutdown.clone();
1123 let fetch_key = candidate.key;
1124 in_flight.push(Box::pin(async move {
1125 let handle = tokio::spawn(async move {
1126 // Cancel-aware: abort when the engine shuts down.
1127 tokio::select! {
1128 () = token.cancelled() => FetchOutcome {
1129 key: fetch_key,
1130 result: FetchResult::SourceFailed,
1131 },
1132 outcome = execute_single_fetch(
1133 p2p, storage, config, fetch_key, source,
1134 ) => outcome,
1135 }
1136 });
1137 match handle.await {
1138 Ok(outcome) => (outcome.key, Some(outcome)),
1139 Err(e) => {
1140 error!(
1141 "Fetch task for {} panicked: {e}",
1142 hex::encode(fetch_key)
1143 );
1144 (fetch_key, None)
1145 }
1146 }
1147 }));
1148 }
1149 } // release queues write lock
1150
1151 if in_flight.is_empty() {
1152 // No work — wait for new items or shutdown.
1153 tokio::select! {
1154 () = shutdown.cancelled() => break,
1155 () = tokio::time::sleep(
1156 std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
1157 ) => continue,
1158 }
1159 }
1160
1161 // Wait for the next fetch to complete and process the result.
1162 tokio::select! {
1163 () = shutdown.cancelled() => break,
1164 Some((key, maybe_outcome)) = in_flight.next() => {
1165 let mut q = queues.write().await;
1166 let terminal = if let Some(outcome) = maybe_outcome {
1167 match outcome.result {
1168 FetchResult::Stored => {
1169 q.complete_fetch(&key);
1170 true
1171 }
1172 FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
1173 if let Some(next_peer) = q.retry_fetch(&key) {
1174 // Spawn a new fetch task for the next source.
1175 let p2p = Arc::clone(&p2p);
1176 let storage = Arc::clone(&storage);
1177 let config = Arc::clone(&config);
1178 let token = shutdown.clone();
1179 let fetch_key = key;
1180 in_flight.push(Box::pin(async move {
1181 let handle = tokio::spawn(async move {
1182 tokio::select! {
1183 () = token.cancelled() => FetchOutcome {
1184 key: fetch_key,
1185 result: FetchResult::SourceFailed,
1186 },
1187 outcome = execute_single_fetch(
1188 p2p, storage, config, fetch_key, next_peer,
1189 ) => outcome,
1190 }
1191 });
1192 match handle.await {
1193 Ok(outcome) => (outcome.key, Some(outcome)),
1194 Err(e) => {
1195 error!(
1196 "Fetch task for {} panicked: {e}",
1197 hex::encode(fetch_key)
1198 );
1199 (fetch_key, None)
1200 }
1201 }
1202 }));
1203 false
1204 } else {
1205 q.complete_fetch(&key);
1206 true
1207 }
1208 }
1209 }
1210 } else {
1211 // Task panicked — reclaim the in-flight slot.
1212 q.complete_fetch(&key);
1213 true
1214 };
1215
1216 // Shrink bootstrap pending set on terminal exit.
1217 if terminal {
1218 drop(q); // release queues lock before acquiring bootstrap_state
1219 if !bootstrap_state.read().await.is_drained() {
1220 bootstrap_state.write().await.remove_key(&key);
1221 let q = queues.read().await;
1222 if bootstrap::check_bootstrap_drained(
1223 &bootstrap_state,
1224 &q,
1225 )
1226 .await
1227 {
1228 complete_bootstrap(
1229 &is_bootstrapping,
1230 &bootstrap_complete_notify,
1231 ).await;
1232 }
1233 }
1234 }
1235 }
1236 }
1237 }
1238
1239 // Cancel and drain remaining in-flight fetches on shutdown.
1240 // The CancellationToken is already cancelled by this point, so
1241 // spawned tasks will see cancellation via their select! branches.
1242 while in_flight.next().await.is_some() {}
1243 debug!("Fetch worker shut down");
1244 });
1245 self.task_handles.push(handle);
1246 }
1247
1248 fn start_verification_worker(&mut self) {
1249 let p2p = Arc::clone(&self.p2p_node);
1250 let storage = Arc::clone(&self.storage);
1251 let queues = Arc::clone(&self.queues);
1252 let paid_list = Arc::clone(&self.paid_list);
1253 let config = Arc::clone(&self.config);
1254 let shutdown = self.shutdown.clone();
1255 let bootstrap_state = Arc::clone(&self.bootstrap_state);
1256 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1257 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1258 let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
1259 let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
1260 let recent_provers = Arc::clone(&self.recent_provers);
1261
1262 let handle = tokio::spawn(async move {
1263 loop {
1264 tokio::select! {
1265 () = shutdown.cancelled() => break,
1266 () = tokio::time::sleep(
1267 std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
1268 ) => {
1269 let ctx = VerificationCycleContext {
1270 p2p_node: &p2p,
1271 paid_list: &paid_list,
1272 storage: &storage,
1273 queues: &queues,
1274 config: &config,
1275 bootstrap_state: &bootstrap_state,
1276 is_bootstrapping: &is_bootstrapping,
1277 bootstrap_complete_notify: &bootstrap_complete_notify,
1278 last_commitment_by_peer: &last_commitment_by_peer,
1279 ever_capable_peers: &ever_capable_peers,
1280 recent_provers: &recent_provers,
1281 };
1282 run_verification_cycle(ctx).await;
1283 }
1284 }
1285 }
1286 debug!("Verification worker shut down");
1287 });
1288 self.task_handles.push(handle);
1289 }
1290
1291 /// Gap 3: Run a one-shot bootstrap sync on startup.
1292 ///
1293 /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
1294 /// (indicating the routing table is populated) before snapshotting
1295 /// close neighbors. Falls back after a timeout so bootstrap nodes
1296 /// (which have no peers and therefore never receive the event) still
1297 /// proceed.
1298 ///
1299 /// After the gate, finds close neighbors, syncs with each in
1300 /// round-robin batches, admits returned hints into the verification
1301 /// pipeline, and tracks discovered keys for bootstrap drain detection.
1302 #[allow(clippy::too_many_lines)]
1303 fn start_bootstrap_sync(
1304 &mut self,
1305 dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
1306 ) {
1307 let p2p = Arc::clone(&self.p2p_node);
1308 let storage = Arc::clone(&self.storage);
1309 let paid_list = Arc::clone(&self.paid_list);
1310 let queues = Arc::clone(&self.queues);
1311 let config = Arc::clone(&self.config);
1312 let shutdown = self.shutdown.clone();
1313 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1314 let bootstrap_state = Arc::clone(&self.bootstrap_state);
1315 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1316 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
1317 let repair_proofs = Arc::clone(&self.repair_proofs);
1318 let my_commitment_state = Arc::clone(&self.commitment_state);
1319 let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
1320 let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
1321 let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
1322
1323 let handle = tokio::spawn(async move {
1324 // Wait for DHT bootstrap to complete before snapshotting
1325 // neighbors. The routing table is empty until saorsa-core
1326 // finishes its FIND_NODE rounds and bucket refreshes.
1327 let gate = bootstrap::wait_for_bootstrap_complete(
1328 dht_events,
1329 config.bootstrap_complete_timeout_secs,
1330 &shutdown,
1331 )
1332 .await;
1333
1334 if gate == bootstrap::BootstrapGateResult::Shutdown {
1335 return;
1336 }
1337
1338 let self_id = *p2p.peer_id();
1339 let neighbors =
1340 neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
1341 .await;
1342
1343 if neighbors.is_empty() {
1344 info!("Bootstrap sync: no close neighbors found, marking drained");
1345 bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
1346 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
1347 return;
1348 }
1349
1350 let neighbor_count = neighbors.len();
1351 info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
1352
1353 // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
1354 for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
1355 if shutdown.is_cancelled() {
1356 break;
1357 }
1358
1359 let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
1360 batch,
1361 &storage,
1362 &paid_list,
1363 &p2p,
1364 config.close_group_size,
1365 config.paid_list_close_group_size,
1366 )
1367 .await;
1368
1369 for peer in batch {
1370 if shutdown.is_cancelled() {
1371 break;
1372 }
1373
1374 // Re-read on each iteration so peers see current state.
1375 let bootstrapping = *is_bootstrapping.read().await;
1376
1377 bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
1378
1379 let hints = hints_by_peer.remove(peer).unwrap_or_default();
1380 let outcome = neighbor_sync::sync_with_peer_with_hints(
1381 peer,
1382 &p2p,
1383 &config,
1384 bootstrapping,
1385 hints,
1386 // Atomically snapshot + mark-gossiped: emitted in the
1387 // bootstrap-sync request, so we stay answerable for it
1388 // (ADR-0002). One critical section avoids a TOCTOU where a
1389 // concurrent retire/rotate drops the slot between read and
1390 // mark.
1391 my_commitment_state
1392 .current_for_gossip()
1393 .map(|b| b.commitment().clone()),
1394 )
1395 .await;
1396
1397 bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
1398
1399 if let Some(outcome) = outcome {
1400 // Ingest the peer's piggybacked commitment from the
1401 // response (same verification as the request path).
1402 // Bootstrap is the FIRST gossip we receive from most
1403 // peers, so this populates last_commitment_by_peer.
1404 //
1405 // We intentionally do NOT trigger a gossip-audit here:
1406 // during bootstrap this node may itself still be
1407 // bootstrapping (audits are gated on that), and the
1408 // close-group/RT view is not yet stable. The peer is
1409 // audited on the first STEADY-STATE neighbor-sync round
1410 // after bootstrap drains (request + response paths both
1411 // trigger), which is within one sync cycle — so caching
1412 // the commitment here is sufficient and there is no
1413 // coverage gap (ADR-0002).
1414 ingest_peer_commitment(
1415 peer,
1416 outcome.response.commitment.as_ref(),
1417 &p2p,
1418 &last_commitment_by_peer,
1419 &ever_capable_peers,
1420 &sig_verify_attempts,
1421 )
1422 .await; // sig_verify_attempts in scope from line ~1080
1423
1424 if !outcome.response.bootstrapping {
1425 record_sent_replica_hints(
1426 peer,
1427 &outcome.sent_replica_hints,
1428 &repair_proofs,
1429 &sync_cycle_epoch,
1430 )
1431 .await;
1432 // Admit hints into verification pipeline.
1433 let outcome = admit_and_queue_hints(
1434 &self_id,
1435 peer,
1436 &outcome.response.replica_hints,
1437 &outcome.response.paid_hints,
1438 &p2p,
1439 &config,
1440 &storage,
1441 &paid_list,
1442 &queues,
1443 )
1444 .await;
1445
1446 // Track discovered keys for drain detection.
1447 if !outcome.discovered.is_empty() {
1448 bootstrap::track_discovered_keys(
1449 &bootstrap_state,
1450 &outcome.discovered,
1451 )
1452 .await;
1453 }
1454
1455 // Record / retire capacity rejections so the
1456 // drain check correctly reflects whether each
1457 // source still owes us re-hinted work after
1458 // queue overflow.
1459 if outcome.capacity_rejected_count > 0 {
1460 bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
1461 } else {
1462 bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
1463 }
1464 }
1465 }
1466 }
1467 }
1468
1469 // Check drain condition.
1470 {
1471 let q = queues.read().await;
1472 if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
1473 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
1474 }
1475 }
1476
1477 info!("Bootstrap sync completed");
1478 });
1479 self.task_handles.push(handle);
1480 }
1481}
1482
1483// ===========================================================================
1484// Free functions for background tasks
1485// ===========================================================================
1486
1487/// RAII admission for one audit-responder task: holds the GLOBAL permit and,
1488/// on drop, decrements the PER-PEER in-flight count. Moving this into the
1489/// spawned task ties both bounds to the task's exact lifetime — no manual
1490/// decrement to forget on an early return or panic.
1491struct AuditResponderGuard {
1492 _permit: tokio::sync::OwnedSemaphorePermit,
1493 inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
1494 peer: PeerId,
1495}
1496
1497impl Drop for AuditResponderGuard {
1498 fn drop(&mut self) {
1499 // Decrement (and prune to keep the map bounded) without blocking the
1500 // async runtime: a short lock on a tiny map.
1501 //
1502 // Fast path: if the (uncontended, tiny) lock is free, decrement inline
1503 // with no spawn. Otherwise defer to a task — but only if a runtime is
1504 // actually current, so `Drop` during shutdown (no runtime) can never
1505 // panic. A missed decrement at shutdown is harmless: the whole map is
1506 // being dropped with the engine.
1507 let peer = self.peer;
1508 if let Ok(mut map) = self.inflight.try_write() {
1509 if let Some(n) = map.get_mut(&peer) {
1510 *n = n.saturating_sub(1);
1511 if *n == 0 {
1512 map.remove(&peer);
1513 }
1514 }
1515 return;
1516 }
1517 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1518 let inflight = Arc::clone(&self.inflight);
1519 handle.spawn(async move {
1520 let mut map = inflight.write().await;
1521 if let Some(n) = map.get_mut(&peer) {
1522 *n = n.saturating_sub(1);
1523 if *n == 0 {
1524 map.remove(&peer);
1525 }
1526 }
1527 });
1528 }
1529 }
1530}
1531
1532/// Try to admit one audit-responder task for `source`: take a global permit AND
1533/// a per-peer slot (both bounded). Returns `None` (caller drops the challenge,
1534/// which the auditor graces as a timeout) if either ceiling is hit, so one
1535/// flooder can neither exhaust the global pool's effect on others nor exceed
1536/// its own per-peer share (codex-r2 A).
1537async fn admit_audit_responder(
1538 semaphore: &Arc<Semaphore>,
1539 inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
1540 source: &PeerId,
1541) -> Option<AuditResponderGuard> {
1542 // Per-peer cap first (cheap, and the fairness-critical bound), committed
1543 // under the write lock so concurrent challenges from the same peer can't
1544 // both slip past the cap.
1545 {
1546 let mut map = inflight.write().await;
1547 let entry = map.entry(*source).or_insert(0);
1548 if *entry >= MAX_AUDIT_RESPONSES_PER_PEER {
1549 return None;
1550 }
1551 *entry += 1;
1552 }
1553 // Then the global ceiling. If it's exhausted, give back the per-peer slot we
1554 // just claimed so it isn't leaked.
1555 let Ok(permit) = Arc::clone(semaphore).try_acquire_owned() else {
1556 let mut map = inflight.write().await;
1557 if let Some(n) = map.get_mut(source) {
1558 *n = n.saturating_sub(1);
1559 if *n == 0 {
1560 map.remove(source);
1561 }
1562 }
1563 return None;
1564 };
1565 Some(AuditResponderGuard {
1566 _permit: permit,
1567 inflight: Arc::clone(inflight),
1568 peer: *source,
1569 })
1570}
1571
1572/// Handle an incoming replication protocol message.
1573///
1574/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
1575/// request-response path and the response must be sent via `send_response`
1576/// so saorsa-core can route it back to the waiting `send_request` caller.
1577#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1578async fn handle_replication_message(
1579 source: &PeerId,
1580 data: &[u8],
1581 p2p_node: &Arc<P2PNode>,
1582 storage: &Arc<LmdbStorage>,
1583 paid_list: &Arc<PaidList>,
1584 payment_verifier: &Arc<PaymentVerifier>,
1585 queues: &Arc<RwLock<ReplicationQueues>>,
1586 config: &ReplicationConfig,
1587 is_bootstrapping: &Arc<RwLock<bool>>,
1588 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1589 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1590 sync_cycle_epoch: &Arc<RwLock<u64>>,
1591 repair_proofs: &Arc<RwLock<RepairProofs>>,
1592 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
1593 ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
1594 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
1595 my_commitment_state: &Arc<ResponderCommitmentState>,
1596 gossip_audit: &GossipAuditTrigger,
1597 audit_responder_semaphore: &Arc<Semaphore>,
1598 audit_responder_inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
1599 rr_message_id: Option<&str>,
1600) -> Result<()> {
1601 let msg = ReplicationMessage::decode(data)
1602 .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
1603
1604 match msg.body {
1605 ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
1606 handle_fresh_offer(
1607 source,
1608 offer,
1609 storage,
1610 paid_list,
1611 payment_verifier,
1612 p2p_node,
1613 config,
1614 msg.request_id,
1615 rr_message_id,
1616 )
1617 .await
1618 }
1619 ReplicationMessageBody::PaidNotify(ref notify) => {
1620 handle_paid_notify(
1621 source,
1622 notify,
1623 paid_list,
1624 payment_verifier,
1625 p2p_node,
1626 config,
1627 )
1628 .await
1629 }
1630 ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1631 let bootstrapping = *is_bootstrapping.read().await;
1632 // Phase-3 storage-bound audit: store the sender's
1633 // commitment for use as `expected_commitment_hash` in
1634 // future audits. Verify signature before storing so a peer
1635 // cannot inject a forged commitment for someone else.
1636 if let Some(target) = ingest_peer_commitment(
1637 source,
1638 request.commitment.as_ref(),
1639 p2p_node,
1640 last_commitment_by_peer,
1641 ever_capable_peers,
1642 sig_verify_attempts,
1643 )
1644 .await
1645 {
1646 maybe_trigger_gossip_audit(gossip_audit, source, target).await;
1647 }
1648 handle_neighbor_sync_request(
1649 source,
1650 request,
1651 p2p_node,
1652 storage,
1653 paid_list,
1654 queues,
1655 config,
1656 bootstrapping,
1657 bootstrap_state,
1658 sync_history,
1659 sync_cycle_epoch,
1660 repair_proofs,
1661 // Atomically snapshot + mark-gossiped: emitted in the sync
1662 // response, so we must stay answerable for it (ADR-0002).
1663 my_commitment_state
1664 .current_for_gossip()
1665 .map(|b| b.commitment().clone()),
1666 msg.request_id,
1667 rr_message_id,
1668 )
1669 .await
1670 }
1671 ReplicationMessageBody::VerificationRequest(ref request) => {
1672 handle_verification_request(
1673 source,
1674 request,
1675 storage,
1676 paid_list,
1677 p2p_node,
1678 msg.request_id,
1679 rr_message_id,
1680 )
1681 .await
1682 }
1683 ReplicationMessageBody::FetchRequest(ref request) => {
1684 handle_fetch_request(
1685 source,
1686 request,
1687 storage,
1688 p2p_node,
1689 msg.request_id,
1690 rr_message_id,
1691 )
1692 .await
1693 }
1694 ReplicationMessageBody::AuditChallenge(ref challenge) => {
1695 // Responsible-chunk audit (audit #2) responder: answer with per-key
1696 // possession digests. This same handler also answers the
1697 // prune-confirmation audit, which sends the same `AuditChallenge`
1698 // wire message.
1699 let bootstrapping = *is_bootstrapping.read().await;
1700 handle_audit_challenge_msg(
1701 source,
1702 challenge,
1703 storage,
1704 p2p_node,
1705 bootstrapping,
1706 msg.request_id,
1707 rr_message_id,
1708 )
1709 .await
1710 }
1711 ReplicationMessageBody::SubtreeAuditChallenge(challenge) => {
1712 // Gossip-triggered storage-bound subtree audit (ADR-0002). The
1713 // responder rebuilds the WHOLE nonce-selected subtree, reading every
1714 // leaf's bytes from disk (`get_raw` × ~sqrt(N) leaves). Run it on a
1715 // detached task so this serial message loop is never blocked on disk
1716 // I/O — otherwise one audit stalls all replication traffic (§5).
1717 //
1718 // A bounded, flood-fair admission restores backpressure (codex#1 +
1719 // codex-r2 A): a global ceiling AND a per-peer cap. If either is hit
1720 // we drop this challenge — the auditor graces a non-response as a
1721 // timeout, so an honest auditor is unaffected and only a flooder is
1722 // throttled (and it cannot starve other peers, since its share is
1723 // capped per-peer).
1724 info!(
1725 "Audit challenge received: kind=subtree source={source} request_response={}",
1726 rr_message_id.is_some(),
1727 );
1728 let Some(guard) =
1729 admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1730 .await
1731 else {
1732 warn!(
1733 "Audit challenge reply not sent: kind=subtree response=dropped \
1734 source={source} (audit-responder capacity reached)"
1735 );
1736 return Ok(());
1737 };
1738 let bootstrapping = *is_bootstrapping.read().await;
1739 let storage = Arc::clone(storage);
1740 let p2p_node = Arc::clone(p2p_node);
1741 let my_commitment_state = Arc::clone(my_commitment_state);
1742 let source = *source;
1743 let request_id = msg.request_id;
1744 let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1745 tokio::spawn(async move {
1746 let _guard = guard; // global permit + per-peer slot, held until done
1747 let response = storage_commitment_audit::handle_subtree_challenge(
1748 &challenge,
1749 &storage,
1750 p2p_node.peer_id(),
1751 bootstrapping,
1752 Some(&my_commitment_state),
1753 )
1754 .await;
1755 let response_kind = subtree_audit_response_kind(&response);
1756 let sent = send_replication_response_checked(
1757 &source,
1758 &p2p_node,
1759 request_id,
1760 ReplicationMessageBody::SubtreeAuditResponse(response),
1761 rr_message_id.as_deref(),
1762 )
1763 .await;
1764 if sent {
1765 info!(
1766 "Audit challenge reply sent: kind=subtree response={response_kind} \
1767 source={source} request_response={}",
1768 rr_message_id.is_some(),
1769 );
1770 } else {
1771 warn!(
1772 "Audit challenge reply not sent: kind=subtree response={response_kind} \
1773 source={source} request_response={}",
1774 rr_message_id.is_some(),
1775 );
1776 }
1777 });
1778 Ok(())
1779 }
1780 ReplicationMessageBody::SubtreeByteChallenge(challenge) => {
1781 // Round 2 of the storage audit (ADR-0002): serve the original bytes
1782 // for the auditor's spot-check keys, or signal `Absent` for a
1783 // committed key we can no longer produce. Reads chunk bytes from
1784 // disk, so likewise spawned off the serial loop (§5) under the same
1785 // flood-fair admission (codex#1 + codex-r2 A).
1786 info!(
1787 "Audit challenge received: kind=byte source={source} request_response={}",
1788 rr_message_id.is_some(),
1789 );
1790 let Some(guard) =
1791 admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1792 .await
1793 else {
1794 warn!(
1795 "Audit challenge reply not sent: kind=byte response=dropped \
1796 source={source} (audit-responder capacity reached)"
1797 );
1798 return Ok(());
1799 };
1800 let bootstrapping = *is_bootstrapping.read().await;
1801 let storage = Arc::clone(storage);
1802 let p2p_node = Arc::clone(p2p_node);
1803 let my_commitment_state = Arc::clone(my_commitment_state);
1804 let source = *source;
1805 let request_id = msg.request_id;
1806 let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1807 tokio::spawn(async move {
1808 let _guard = guard; // global permit + per-peer slot, held until done
1809 let response = storage_commitment_audit::handle_subtree_byte_challenge(
1810 &challenge,
1811 &storage,
1812 p2p_node.peer_id(),
1813 bootstrapping,
1814 Some(&my_commitment_state),
1815 )
1816 .await;
1817 let response_kind = subtree_byte_response_kind(&response);
1818 let sent = send_replication_response_checked(
1819 &source,
1820 &p2p_node,
1821 request_id,
1822 ReplicationMessageBody::SubtreeByteResponse(response),
1823 rr_message_id.as_deref(),
1824 )
1825 .await;
1826 if sent {
1827 info!(
1828 "Audit challenge reply sent: kind=byte response={response_kind} \
1829 source={source} request_response={}",
1830 rr_message_id.is_some(),
1831 );
1832 } else {
1833 warn!(
1834 "Audit challenge reply not sent: kind=byte response={response_kind} \
1835 source={source} request_response={}",
1836 rr_message_id.is_some(),
1837 );
1838 }
1839 });
1840 Ok(())
1841 }
1842 // Response messages are handled by their respective request initiators.
1843 ReplicationMessageBody::FreshReplicationResponse(_)
1844 | ReplicationMessageBody::NeighborSyncResponse(_)
1845 | ReplicationMessageBody::VerificationResponse(_)
1846 | ReplicationMessageBody::FetchResponse(_)
1847 | ReplicationMessageBody::AuditResponse(_)
1848 | ReplicationMessageBody::SubtreeAuditResponse(_)
1849 | ReplicationMessageBody::SubtreeByteResponse(_) => Ok(()),
1850 }
1851}
1852
1853// ---------------------------------------------------------------------------
1854// Per-message-type handlers
1855// ---------------------------------------------------------------------------
1856
1857#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1858async fn handle_fresh_offer(
1859 source: &PeerId,
1860 offer: &protocol::FreshReplicationOffer,
1861 storage: &Arc<LmdbStorage>,
1862 paid_list: &Arc<PaidList>,
1863 payment_verifier: &Arc<PaymentVerifier>,
1864 p2p_node: &Arc<P2PNode>,
1865 config: &ReplicationConfig,
1866 request_id: u64,
1867 rr_message_id: Option<&str>,
1868) -> Result<()> {
1869 let self_id = *p2p_node.peer_id();
1870
1871 // Rule 5: reject if PoP is missing.
1872 if offer.proof_of_payment.is_empty() {
1873 send_replication_response(
1874 source,
1875 p2p_node,
1876 request_id,
1877 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1878 key: offer.key,
1879 reason: "Missing proof of payment".to_string(),
1880 }),
1881 rr_message_id,
1882 )
1883 .await;
1884 return Ok(());
1885 }
1886
1887 // Enforce chunk size invariant: the normal PUT path rejects data larger
1888 // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1889 // prevent peers from pushing oversized records through replication.
1890 if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1891 warn!(
1892 "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1893 hex::encode(offer.key),
1894 offer.data.len(),
1895 crate::ant_protocol::MAX_CHUNK_SIZE,
1896 );
1897 p2p_node
1898 .report_trust_event(
1899 source,
1900 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1901 )
1902 .await;
1903 send_replication_response(
1904 source,
1905 p2p_node,
1906 request_id,
1907 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1908 key: offer.key,
1909 reason: format!(
1910 "Data size {} exceeds maximum chunk size {}",
1911 offer.data.len(),
1912 crate::ant_protocol::MAX_CHUNK_SIZE,
1913 ),
1914 }),
1915 rr_message_id,
1916 )
1917 .await;
1918 return Ok(());
1919 }
1920
1921 // Mirror the normal PUT path: the advertised key must be the content
1922 // address of the supplied bytes before any expensive payment verification.
1923 let computed_key = crate::client::compute_address(&offer.data);
1924 if computed_key != offer.key {
1925 warn!(
1926 "Rejecting fresh offer for key {}: content address mismatch, computed {}",
1927 hex::encode(offer.key),
1928 hex::encode(computed_key),
1929 );
1930 p2p_node
1931 .report_trust_event(
1932 source,
1933 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1934 )
1935 .await;
1936 send_replication_response(
1937 source,
1938 p2p_node,
1939 request_id,
1940 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1941 key: offer.key,
1942 reason: format!(
1943 "Content address mismatch: expected {}, computed {}",
1944 hex::encode(offer.key),
1945 hex::encode(computed_key),
1946 ),
1947 }),
1948 rr_message_id,
1949 )
1950 .await;
1951 return Ok(());
1952 }
1953
1954 // Rule 7: check storage admission. Fresh chunk receivers accept the close
1955 // group plus a small margin to absorb local routing-table disagreement.
1956 if !admission::is_responsible(
1957 &self_id,
1958 &offer.key,
1959 p2p_node,
1960 storage_admission_width(config.close_group_size),
1961 )
1962 .await
1963 {
1964 send_replication_response(
1965 source,
1966 p2p_node,
1967 request_id,
1968 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1969 key: offer.key,
1970 reason: "Not in storage-admission range for this key".to_string(),
1971 }),
1972 rr_message_id,
1973 )
1974 .await;
1975 return Ok(());
1976 }
1977
1978 // Disk-space pre-check — mirror the PUT handler (V2-411). A full node can
1979 // never store this record, so reject it before the expensive payment
1980 // verification (EVM on-chain query / merkle pool work) rather than verifying
1981 // and only then failing at `storage.put` below. Reuses the cached capacity
1982 // check (passing results only, so freed space is detected promptly), and the
1983 // store path keeps its own check as defence-in-depth.
1984 if let Err(e) = storage.check_capacity() {
1985 info!(
1986 target: "ant_node::storage::disk_precheck",
1987 key = %hex::encode(offer.key),
1988 "Rejecting fresh replication offer before payment verification: {e}"
1989 );
1990 send_replication_response(
1991 source,
1992 p2p_node,
1993 request_id,
1994 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1995 key: offer.key,
1996 reason: e.to_string(),
1997 }),
1998 rr_message_id,
1999 )
2000 .await;
2001 return Ok(());
2002 }
2003
2004 // Gap 1: Validate PoP via PaymentVerifier. Fresh replication is still
2005 // part of the immediate write fan-out: this receiver is about to store the
2006 // record as if the client had PUT it here directly. Storage admission
2007 // was checked above before proof work. ClientPut verification applies
2008 // store-strength cache semantics, paid-quote issuer K-closeness and local
2009 // price floor checks for single-node proofs, and merkle candidate
2010 // closeness for merkle proofs.
2011 match payment_verifier
2012 .verify_payment(
2013 &offer.key,
2014 Some(&offer.proof_of_payment),
2015 fresh_offer_payment_context(),
2016 )
2017 .await
2018 {
2019 Ok(status) if status.can_store() => {
2020 debug!(
2021 "PoP validated for fresh offer key {}",
2022 hex::encode(offer.key)
2023 );
2024 }
2025 Ok(_) => {
2026 send_replication_response(
2027 source,
2028 p2p_node,
2029 request_id,
2030 ReplicationMessageBody::FreshReplicationResponse(
2031 FreshReplicationResponse::Rejected {
2032 key: offer.key,
2033 reason: "Payment verification failed: payment required".to_string(),
2034 },
2035 ),
2036 rr_message_id,
2037 )
2038 .await;
2039 return Ok(());
2040 }
2041 Err(e) => {
2042 warn!(
2043 "PoP verification error for key {}: {e}",
2044 hex::encode(offer.key)
2045 );
2046 send_replication_response(
2047 source,
2048 p2p_node,
2049 request_id,
2050 ReplicationMessageBody::FreshReplicationResponse(
2051 FreshReplicationResponse::Rejected {
2052 key: offer.key,
2053 reason: format!("Payment verification error: {e}"),
2054 },
2055 ),
2056 rr_message_id,
2057 )
2058 .await;
2059 return Ok(());
2060 }
2061 }
2062
2063 // Rule 6: add to PaidForList.
2064 if let Err(e) = paid_list.insert(&offer.key).await {
2065 warn!("Failed to add key to PaidForList: {e}");
2066 }
2067
2068 // Store the record.
2069 match storage.put(&offer.key, &offer.data).await {
2070 Ok(_) => {
2071 send_replication_response(
2072 source,
2073 p2p_node,
2074 request_id,
2075 ReplicationMessageBody::FreshReplicationResponse(
2076 FreshReplicationResponse::Accepted { key: offer.key },
2077 ),
2078 rr_message_id,
2079 )
2080 .await;
2081 }
2082 Err(e) => {
2083 send_replication_response(
2084 source,
2085 p2p_node,
2086 request_id,
2087 ReplicationMessageBody::FreshReplicationResponse(
2088 FreshReplicationResponse::Rejected {
2089 key: offer.key,
2090 reason: e.to_string(),
2091 },
2092 ),
2093 rr_message_id,
2094 )
2095 .await;
2096 }
2097 }
2098
2099 Ok(())
2100}
2101
2102async fn handle_paid_notify(
2103 _source: &PeerId,
2104 notify: &protocol::PaidNotify,
2105 paid_list: &Arc<PaidList>,
2106 payment_verifier: &Arc<PaymentVerifier>,
2107 p2p_node: &Arc<P2PNode>,
2108 config: &ReplicationConfig,
2109) -> Result<()> {
2110 let self_id = *p2p_node.peer_id();
2111
2112 // Rule 3: validate PoP presence before adding.
2113 if notify.proof_of_payment.is_empty() {
2114 return Ok(());
2115 }
2116
2117 // Check if we're in PaidCloseGroup for this key.
2118 if !admission::is_in_paid_close_group(
2119 &self_id,
2120 ¬ify.key,
2121 p2p_node,
2122 config.paid_list_close_group_size,
2123 )
2124 .await
2125 {
2126 return Ok(());
2127 }
2128
2129 // Gap 1: Validate PoP via PaymentVerifier. PaidNotify admits fresh
2130 // paid-list metadata, so local paid-list close-group membership was checked
2131 // above before proof work. The verifier then runs the same payment proof
2132 // checks as ClientPut while writing a paid-list-strength cache entry.
2133 match payment_verifier
2134 .verify_payment(
2135 ¬ify.key,
2136 Some(¬ify.proof_of_payment),
2137 paid_notify_payment_context(),
2138 )
2139 .await
2140 {
2141 Ok(status) if status.can_store() => {
2142 debug!(
2143 "PoP validated for paid notify key {}",
2144 hex::encode(notify.key)
2145 );
2146 }
2147 Ok(_) => {
2148 warn!(
2149 "Paid notify rejected: payment required for key {}",
2150 hex::encode(notify.key)
2151 );
2152 return Ok(());
2153 }
2154 Err(e) => {
2155 warn!(
2156 "PoP verification error for paid notify key {}: {e}",
2157 hex::encode(notify.key)
2158 );
2159 return Ok(());
2160 }
2161 }
2162
2163 if let Err(e) = paid_list.insert(¬ify.key).await {
2164 warn!("Failed to add paid notify key to PaidForList: {e}");
2165 }
2166
2167 Ok(())
2168}
2169
2170#[allow(clippy::too_many_arguments)]
2171async fn handle_neighbor_sync_request(
2172 source: &PeerId,
2173 request: &protocol::NeighborSyncRequest,
2174 p2p_node: &Arc<P2PNode>,
2175 storage: &Arc<LmdbStorage>,
2176 paid_list: &Arc<PaidList>,
2177 queues: &Arc<RwLock<ReplicationQueues>>,
2178 config: &ReplicationConfig,
2179 is_bootstrapping: bool,
2180 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2181 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2182 sync_cycle_epoch: &Arc<RwLock<u64>>,
2183 repair_proofs: &Arc<RwLock<RepairProofs>>,
2184 my_commitment: Option<StorageCommitment>,
2185 request_id: u64,
2186 rr_message_id: Option<&str>,
2187) -> Result<()> {
2188 let self_id = *p2p_node.peer_id();
2189
2190 // No per-request hint count limit: the wire message size limit
2191 // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
2192 // challenges, sync hints don't drive expensive computation — they just
2193 // enter the verification queue. A per-request limit here would break
2194 // bootstrap replication for newly-joined nodes with 0 stored chunks.
2195
2196 // Build response (outbound hints).
2197 let (response, sent_replica_hints, sender_in_rt) =
2198 neighbor_sync::handle_sync_request_with_proofs(
2199 source,
2200 request,
2201 p2p_node,
2202 storage,
2203 paid_list,
2204 config,
2205 is_bootstrapping,
2206 my_commitment.clone(),
2207 )
2208 .await;
2209
2210 // Send response.
2211 let response_sent = send_replication_response_checked(
2212 source,
2213 p2p_node,
2214 request_id,
2215 ReplicationMessageBody::NeighborSyncResponse(response),
2216 rr_message_id,
2217 )
2218 .await;
2219
2220 // Process inbound hints only if sender is in LocalRT (Rule 4-6).
2221 if !sender_in_rt {
2222 return Ok(());
2223 }
2224
2225 // Update sync history for this peer before recording repair proofs so a
2226 // same-tick audit cannot combine a fresh key proof with stale peer maturity.
2227 {
2228 let mut history = sync_history.write().await;
2229 let record = history.entry(*source).or_insert(PeerSyncRecord {
2230 last_sync: None,
2231 cycles_since_sync: 0,
2232 });
2233 record.last_sync = Some(Instant::now());
2234 record.cycles_since_sync = 0;
2235 }
2236
2237 if response_sent && !request.bootstrapping {
2238 record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
2239 .await;
2240 }
2241
2242 // Admit inbound hints and queue for verification.
2243 let outcome = admit_and_queue_hints(
2244 &self_id,
2245 source,
2246 &request.replica_hints,
2247 &request.paid_hints,
2248 p2p_node,
2249 config,
2250 storage,
2251 paid_list,
2252 queues,
2253 )
2254 .await;
2255
2256 // Track discovered keys for bootstrap drain detection so that hints
2257 // admitted via inbound sync requests are not missed. Capacity-rejected
2258 // hints keep this source on the "not yet drained" list until its next
2259 // sync re-admits them; a clean cycle clears the source.
2260 if is_bootstrapping {
2261 if !outcome.discovered.is_empty() {
2262 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2263 }
2264 if outcome.capacity_rejected_count > 0 {
2265 bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
2266 } else {
2267 bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
2268 }
2269 }
2270
2271 Ok(())
2272}
2273
2274async fn handle_verification_request(
2275 source: &PeerId,
2276 request: &protocol::VerificationRequest,
2277 storage: &Arc<LmdbStorage>,
2278 paid_list: &Arc<PaidList>,
2279 p2p_node: &Arc<P2PNode>,
2280 request_id: u64,
2281 rr_message_id: Option<&str>,
2282) -> Result<()> {
2283 // No per-request key count limit: the wire message size limit
2284 // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
2285 // does cheap storage lookups per key, not expensive computation like
2286 // audit digest generation.
2287
2288 #[allow(clippy::cast_possible_truncation)]
2289 let keys_len = request.keys.len() as u32;
2290 let paid_check_set: HashSet<u32> = request
2291 .paid_list_check_indices
2292 .iter()
2293 .copied()
2294 .filter(|&idx| {
2295 if idx >= keys_len {
2296 warn!(
2297 "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
2298 request.keys.len(),
2299 );
2300 false
2301 } else {
2302 true
2303 }
2304 })
2305 .collect();
2306
2307 let mut results = Vec::with_capacity(request.keys.len());
2308 for (i, key) in request.keys.iter().enumerate() {
2309 let present = storage.exists(key).unwrap_or(false);
2310 let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
2311 Some(paid_list.contains(key).unwrap_or(false))
2312 } else {
2313 None
2314 };
2315 results.push(protocol::KeyVerificationResult {
2316 key: *key,
2317 present,
2318 paid,
2319 });
2320 }
2321
2322 send_replication_response(
2323 source,
2324 p2p_node,
2325 request_id,
2326 ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
2327 rr_message_id,
2328 )
2329 .await;
2330
2331 Ok(())
2332}
2333
2334async fn handle_fetch_request(
2335 source: &PeerId,
2336 request: &protocol::FetchRequest,
2337 storage: &Arc<LmdbStorage>,
2338 p2p_node: &Arc<P2PNode>,
2339 request_id: u64,
2340 rr_message_id: Option<&str>,
2341) -> Result<()> {
2342 let response = match storage.get(&request.key).await {
2343 Ok(Some(data)) => protocol::FetchResponse::Success {
2344 key: request.key,
2345 data,
2346 },
2347 Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
2348 Err(e) => protocol::FetchResponse::Error {
2349 key: request.key,
2350 reason: format!("{e}"),
2351 },
2352 };
2353
2354 send_replication_response(
2355 source,
2356 p2p_node,
2357 request_id,
2358 ReplicationMessageBody::FetchResponse(response),
2359 rr_message_id,
2360 )
2361 .await;
2362
2363 Ok(())
2364}
2365
2366/// Responder for an incoming `AuditChallenge` (responsible-chunk audit #2, and
2367/// the prune-confirmation audit, which reuses the same wire message): reply with
2368/// per-key possession digests.
2369async fn handle_audit_challenge_msg(
2370 source: &PeerId,
2371 challenge: &protocol::AuditChallenge,
2372 storage: &Arc<LmdbStorage>,
2373 p2p_node: &Arc<P2PNode>,
2374 is_bootstrapping: bool,
2375 request_id: u64,
2376 rr_message_id: Option<&str>,
2377) -> Result<()> {
2378 #[allow(clippy::cast_possible_truncation)]
2379 let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
2380 info!(
2381 "Audit challenge received: kind=responsible keys={} bootstrapping={} request_response={}",
2382 challenge.keys.len(),
2383 is_bootstrapping,
2384 rr_message_id.is_some(),
2385 );
2386
2387 let response = audit::handle_audit_challenge(
2388 challenge,
2389 storage,
2390 p2p_node.peer_id(),
2391 is_bootstrapping,
2392 stored_chunks,
2393 )
2394 .await;
2395 let response_kind = audit_response_kind(&response);
2396
2397 let sent = send_replication_response_checked(
2398 source,
2399 p2p_node,
2400 request_id,
2401 ReplicationMessageBody::AuditResponse(response),
2402 rr_message_id,
2403 )
2404 .await;
2405 if sent {
2406 info!(
2407 "Audit challenge reply sent: kind=responsible response={} keys={} request_response={}",
2408 response_kind,
2409 challenge.keys.len(),
2410 rr_message_id.is_some(),
2411 );
2412 } else {
2413 warn!(
2414 "Audit challenge reply not sent: kind=responsible response={} keys={} request_response={}",
2415 response_kind,
2416 challenge.keys.len(),
2417 rr_message_id.is_some(),
2418 );
2419 }
2420
2421 Ok(())
2422}
2423
2424fn audit_response_kind(response: &protocol::AuditResponse) -> &'static str {
2425 match response {
2426 protocol::AuditResponse::Digests { .. } => "digests",
2427 protocol::AuditResponse::Bootstrapping { .. } => "bootstrapping",
2428 protocol::AuditResponse::Rejected { .. } => "rejected",
2429 }
2430}
2431
2432fn subtree_audit_response_kind(response: &protocol::SubtreeAuditResponse) -> &'static str {
2433 match response {
2434 protocol::SubtreeAuditResponse::Proof { .. } => "proof",
2435 protocol::SubtreeAuditResponse::Bootstrapping { .. } => "bootstrapping",
2436 protocol::SubtreeAuditResponse::Rejected { .. } => "rejected",
2437 }
2438}
2439
2440fn subtree_byte_response_kind(response: &protocol::SubtreeByteResponse) -> &'static str {
2441 match response {
2442 protocol::SubtreeByteResponse::Items { .. } => "items",
2443 protocol::SubtreeByteResponse::Bootstrapping { .. } => "bootstrapping",
2444 protocol::SubtreeByteResponse::Rejected { .. } => "rejected",
2445 }
2446}
2447
2448// ---------------------------------------------------------------------------
2449// Message sending helper
2450// ---------------------------------------------------------------------------
2451
2452/// Send a replication response message as a best-effort reply.
2453///
2454/// Encode and send failures are logged by the checked helper. Most response
2455/// paths do not need to branch on send success, so this wrapper keeps those
2456/// call sites explicit about their best-effort behavior.
2457async fn send_replication_response(
2458 peer: &PeerId,
2459 p2p_node: &Arc<P2PNode>,
2460 request_id: u64,
2461 body: ReplicationMessageBody,
2462 rr_message_id: Option<&str>,
2463) {
2464 let _ =
2465 send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
2466}
2467
2468/// Send a replication response message and report whether it was accepted.
2469///
2470/// Returns `true` after the message is encoded and accepted by the P2P send
2471/// path. Returns `false` after logging an encode or send failure. Repair-proof
2472/// recording uses this to avoid trusting hints that were not actually sent.
2473///
2474/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
2475/// request-response path so saorsa-core can route it back to the caller's
2476/// `send_request` future. Otherwise it is sent as a plain message.
2477async fn send_replication_response_checked(
2478 peer: &PeerId,
2479 p2p_node: &Arc<P2PNode>,
2480 request_id: u64,
2481 body: ReplicationMessageBody,
2482 rr_message_id: Option<&str>,
2483) -> bool {
2484 let msg = ReplicationMessage { request_id, body };
2485 let encoded = match msg.encode() {
2486 Ok(data) => data,
2487 Err(e) => {
2488 warn!("Failed to encode replication response: {e}");
2489 return false;
2490 }
2491 };
2492 let result = if let Some(msg_id) = rr_message_id {
2493 p2p_node
2494 .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
2495 .await
2496 } else {
2497 p2p_node
2498 .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
2499 .await
2500 };
2501 if let Err(e) = result {
2502 debug!("Failed to send replication response to {peer}: {e}");
2503 return false;
2504 }
2505 true
2506}
2507
2508async fn record_sent_replica_hints(
2509 peer: &PeerId,
2510 hints: &[neighbor_sync::SentReplicaHint],
2511 repair_proofs: &Arc<RwLock<RepairProofs>>,
2512 sync_cycle_epoch: &Arc<RwLock<u64>>,
2513) {
2514 if hints.is_empty() {
2515 return;
2516 }
2517
2518 let hinted_at_epoch = *sync_cycle_epoch.read().await;
2519 let mut proofs = repair_proofs.write().await;
2520 for hint in hints {
2521 if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
2522 debug!(
2523 "Recorded repair hint proof for peer {peer} and key {}",
2524 hex::encode(hint.key)
2525 );
2526 }
2527 }
2528}
2529
2530// ---------------------------------------------------------------------------
2531// Neighbor sync round
2532// ---------------------------------------------------------------------------
2533
2534/// Run one neighbor sync round.
2535#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2536async fn run_neighbor_sync_round(
2537 p2p_node: &Arc<P2PNode>,
2538 storage: &Arc<LmdbStorage>,
2539 paid_list: &Arc<PaidList>,
2540 queues: &Arc<RwLock<ReplicationQueues>>,
2541 config: &ReplicationConfig,
2542 sync_state: &Arc<RwLock<NeighborSyncState>>,
2543 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2544 sync_cycle_epoch: &Arc<RwLock<u64>>,
2545 repair_proofs: &Arc<RwLock<RepairProofs>>,
2546 is_bootstrapping: &Arc<RwLock<bool>>,
2547 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2548 commitment_state: &Arc<ResponderCommitmentState>,
2549 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
2550 ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
2551 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
2552 gossip_audit: &GossipAuditTrigger,
2553) {
2554 let self_id = *p2p_node.peer_id();
2555 let bootstrapping = *is_bootstrapping.read().await;
2556
2557 // Check if cycle is complete; start new one if needed.
2558 // We check under a read lock, then release it before the expensive
2559 // prune pass and DHT snapshot so other tasks are not starved.
2560 let cycle_complete = sync_state.read().await.is_cycle_complete();
2561 if cycle_complete {
2562 // A completed local neighbor-sync cycle advances the epoch component
2563 // of repair-proof maturity. The per-key wall-clock minimum age is
2564 // checked when audits are selected.
2565 {
2566 let mut history = sync_history.write().await;
2567 for record in history.values_mut() {
2568 record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
2569 }
2570 }
2571 let current_sync_epoch = {
2572 let mut epoch = sync_cycle_epoch.write().await;
2573 *epoch = epoch.saturating_add(1);
2574 *epoch
2575 };
2576
2577 // Post-cycle pruning (Section 11) — runs without holding sync_state.
2578 // Remote prune-confirmation audits are storage-proof audits and only
2579 // run after bootstrap has drained.
2580 let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
2581 pruning::run_prune_pass_with_context(pruning::PrunePassContext {
2582 self_id: &self_id,
2583 storage,
2584 paid_list,
2585 p2p_node,
2586 config,
2587 sync_state,
2588 repair_proofs,
2589 current_sync_epoch,
2590 #[cfg(any(test, feature = "test-utils"))]
2591 repair_proof_now: None,
2592 allow_remote_prune_audits,
2593 commitment_state: Some(commitment_state),
2594 })
2595 .await;
2596
2597 // Take fresh close-neighbor snapshot (DHT query, no lock held).
2598 let neighbors =
2599 neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
2600 .await;
2601
2602 // Now re-acquire write lock and re-check before swapping cycle.
2603 let mut state = sync_state.write().await;
2604 if state.is_cycle_complete() {
2605 // Preserve cooldown and bootstrap-claim tracking across cycles.
2606 // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
2607 // would reset the abuse detection timer every cycle.
2608 let old_sync_times = std::mem::take(&mut state.last_sync_times);
2609 let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
2610 let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
2611 let old_prune_cursor = state.prune_cursor;
2612 *state = NeighborSyncState::new_cycle(neighbors);
2613 state.last_sync_times = old_sync_times;
2614 state.bootstrap_claims = old_bootstrap_claims;
2615 state.bootstrap_claim_history = old_bootstrap_claim_history;
2616 state.prune_cursor = old_prune_cursor;
2617 }
2618 }
2619
2620 // Select batch of peers.
2621 let batch = {
2622 let mut state = sync_state.write().await;
2623 neighbor_sync::select_sync_batch(
2624 &mut state,
2625 config.neighbor_sync_peer_count,
2626 config.neighbor_sync_cooldown,
2627 )
2628 };
2629
2630 if batch.is_empty() {
2631 return;
2632 }
2633
2634 debug!("Neighbor sync: syncing with {} peers", batch.len());
2635
2636 // Snapshot our current commitment once per round so all peers in
2637 // this batch see the same thing (gossip is the responder's attestation;
2638 // same value across the batch is fine and reduces RwLock churn). Atomically
2639 // snapshot + mark-gossiped so we stay answerable for exactly what we emit
2640 // (ADR-0002 retention), with no TOCTOU vs a concurrent retire/rotate.
2641 let my_commitment = commitment_state
2642 .current_for_gossip()
2643 .map(|b| b.commitment().clone());
2644
2645 let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
2646 &batch,
2647 storage,
2648 paid_list,
2649 p2p_node,
2650 config.close_group_size,
2651 config.paid_list_close_group_size,
2652 )
2653 .await;
2654
2655 // Sync with each peer in the batch.
2656 for peer in &batch {
2657 let hints = hints_by_peer.remove(peer).unwrap_or_default();
2658 let outcome = neighbor_sync::sync_with_peer_with_hints(
2659 peer,
2660 p2p_node,
2661 config,
2662 bootstrapping,
2663 hints,
2664 my_commitment.clone(),
2665 )
2666 .await;
2667
2668 if let Some(outcome) = outcome {
2669 handle_sync_response(
2670 &self_id,
2671 peer,
2672 &outcome.response,
2673 &outcome.sent_replica_hints,
2674 p2p_node,
2675 config,
2676 bootstrapping,
2677 bootstrap_state,
2678 storage,
2679 paid_list,
2680 queues,
2681 sync_state,
2682 sync_history,
2683 sync_cycle_epoch,
2684 repair_proofs,
2685 last_commitment_by_peer,
2686 ever_capable_peers,
2687 sig_verify_attempts,
2688 gossip_audit,
2689 )
2690 .await;
2691 } else {
2692 // Sync failed -- remove peer and try to fill slot.
2693 let replacement = {
2694 let mut state = sync_state.write().await;
2695 neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
2696 };
2697
2698 // Attempt sync with the replacement peer (if one was found).
2699 if let Some(replacement_peer) = replacement {
2700 let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers(
2701 std::slice::from_ref(&replacement_peer),
2702 storage,
2703 paid_list,
2704 p2p_node,
2705 config.close_group_size,
2706 config.paid_list_close_group_size,
2707 )
2708 .await;
2709 let hints = replacement_hints
2710 .remove(&replacement_peer)
2711 .unwrap_or_default();
2712 let replacement_outcome = neighbor_sync::sync_with_peer_with_hints(
2713 &replacement_peer,
2714 p2p_node,
2715 config,
2716 bootstrapping,
2717 hints,
2718 my_commitment.clone(),
2719 )
2720 .await;
2721
2722 if let Some(outcome) = replacement_outcome {
2723 handle_sync_response(
2724 &self_id,
2725 &replacement_peer,
2726 &outcome.response,
2727 &outcome.sent_replica_hints,
2728 p2p_node,
2729 config,
2730 bootstrapping,
2731 bootstrap_state,
2732 storage,
2733 paid_list,
2734 queues,
2735 sync_state,
2736 sync_history,
2737 sync_cycle_epoch,
2738 repair_proofs,
2739 last_commitment_by_peer,
2740 ever_capable_peers,
2741 sig_verify_attempts,
2742 gossip_audit,
2743 )
2744 .await;
2745 }
2746 }
2747 }
2748 }
2749}
2750
2751/// Process a successful neighbor sync response: record the sync, check for
2752/// bootstrap claim abuse, and admit inbound hints.
2753#[allow(clippy::too_many_arguments)]
2754async fn handle_sync_response(
2755 self_id: &PeerId,
2756 peer: &PeerId,
2757 resp: &NeighborSyncResponse,
2758 sent_replica_hints: &[neighbor_sync::SentReplicaHint],
2759 p2p_node: &Arc<P2PNode>,
2760 config: &ReplicationConfig,
2761 bootstrapping: bool,
2762 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2763 storage: &Arc<LmdbStorage>,
2764 paid_list: &Arc<PaidList>,
2765 queues: &Arc<RwLock<ReplicationQueues>>,
2766 sync_state: &Arc<RwLock<NeighborSyncState>>,
2767 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2768 sync_cycle_epoch: &Arc<RwLock<u64>>,
2769 repair_proofs: &Arc<RwLock<RepairProofs>>,
2770 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
2771 ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
2772 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
2773 gossip_audit: &GossipAuditTrigger,
2774) {
2775 // Ingest the peer's commitment if they piggybacked one on the response.
2776 // Same verification as the request path (peer-id binding + signature);
2777 // forged commitments are dropped at the edge. A *changed* commitment here
2778 // is a gossip-audit trigger just like on the request path — so a peer that
2779 // only ever answers sync (never initiates) is still audited (ADR-0002).
2780 if let Some(target) = ingest_peer_commitment(
2781 peer,
2782 resp.commitment.as_ref(),
2783 p2p_node,
2784 last_commitment_by_peer,
2785 ever_capable_peers,
2786 sig_verify_attempts,
2787 )
2788 .await
2789 {
2790 maybe_trigger_gossip_audit(gossip_audit, peer, target).await;
2791 }
2792
2793 // Record successful sync.
2794 {
2795 let mut state = sync_state.write().await;
2796 neighbor_sync::record_successful_sync(&mut state, peer);
2797 }
2798 {
2799 let mut history = sync_history.write().await;
2800 let record = history.entry(*peer).or_insert(PeerSyncRecord {
2801 last_sync: None,
2802 cycles_since_sync: 0,
2803 });
2804 record.last_sync = Some(Instant::now());
2805 record.cycles_since_sync = 0;
2806 }
2807
2808 // Process inbound hints from response (skip if peer is bootstrapping).
2809 if resp.bootstrapping {
2810 // Gap 6: BootstrapClaimAbuse grace period enforcement.
2811 // Separate state mutation from network I/O to avoid holding the
2812 // write lock across report_trust_event.
2813 let should_report = {
2814 let now = Instant::now();
2815 let mut state = sync_state.write().await;
2816 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
2817 BootstrapClaimObservation::WithinGrace { .. } => false,
2818 BootstrapClaimObservation::PastGrace { first_seen } => {
2819 warn!(
2820 "Peer {peer} has been claiming bootstrap for {:?}, \
2821 exceeding grace period of {:?} — reporting abuse",
2822 now.duration_since(first_seen),
2823 config.bootstrap_claim_grace_period,
2824 );
2825 true
2826 }
2827 BootstrapClaimObservation::Repeated { first_seen } => {
2828 warn!(
2829 "Peer {peer} repeated bootstrap claim after previously stopping; \
2830 first claim was {:?} ago — reporting abuse",
2831 now.duration_since(first_seen),
2832 );
2833 true
2834 }
2835 }
2836 };
2837 if should_report {
2838 p2p_node
2839 .report_trust_event(
2840 peer,
2841 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2842 )
2843 .await;
2844 }
2845 } else {
2846 // Peer is not claiming bootstrap; clear active claim while retaining
2847 // history so the peer cannot start a second grace window later.
2848 {
2849 let mut state = sync_state.write().await;
2850 state.clear_active_bootstrap_claim(peer);
2851 }
2852 record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
2853 let outcome = admit_and_queue_hints(
2854 self_id,
2855 peer,
2856 &resp.replica_hints,
2857 &resp.paid_hints,
2858 p2p_node,
2859 config,
2860 storage,
2861 paid_list,
2862 queues,
2863 )
2864 .await;
2865
2866 // Track discovered keys for bootstrap drain detection so that hints
2867 // admitted via regular neighbor sync are not missed. Capacity-
2868 // rejected hints keep this source on the "not yet drained" list
2869 // until its next sync replays them; a clean cycle clears it.
2870 if bootstrapping {
2871 if !outcome.discovered.is_empty() {
2872 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2873 }
2874 if outcome.capacity_rejected_count > 0 {
2875 bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
2876 } else {
2877 bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
2878 }
2879 }
2880 }
2881}
2882
2883/// Admit hints and queue them for verification, returning newly-discovered keys.
2884///
2885/// Shared by neighbor-sync request handling, response handling, and bootstrap
2886/// sync so that admission + queueing logic lives in one place.
2887#[allow(clippy::too_many_arguments)]
2888/// Outcome of [`admit_and_queue_hints`].
2889///
2890/// `capacity_rejected_count` is non-zero when one or more legitimately
2891/// admissible hints were dropped because `pending_verify`'s global or
2892/// per-source bound was hit. Callers that care about completeness
2893/// (bootstrap drain accounting) MUST NOT treat their work as complete while
2894/// this is > 0 — the source will need to re-hint after capacity frees up.
2895struct AdmissionOutcome {
2896 discovered: HashSet<XorName>,
2897 capacity_rejected_count: usize,
2898}
2899
2900#[allow(clippy::too_many_arguments)]
2901async fn admit_and_queue_hints(
2902 self_id: &PeerId,
2903 source_peer: &PeerId,
2904 replica_hints: &[XorName],
2905 paid_hints: &[XorName],
2906 p2p_node: &Arc<P2PNode>,
2907 config: &ReplicationConfig,
2908 storage: &Arc<LmdbStorage>,
2909 paid_list: &Arc<PaidList>,
2910 queues: &Arc<RwLock<ReplicationQueues>>,
2911) -> AdmissionOutcome {
2912 let pending_keys: HashSet<XorName> = {
2913 let q = queues.read().await;
2914 q.pending_keys().into_iter().collect()
2915 };
2916
2917 let admitted = admission::admit_hints(
2918 self_id,
2919 replica_hints,
2920 paid_hints,
2921 p2p_node,
2922 config,
2923 storage,
2924 paid_list,
2925 &pending_keys,
2926 )
2927 .await;
2928
2929 let mut discovered = HashSet::new();
2930 let mut capacity_rejected_count: usize = 0;
2931 let mut q = queues.write().await;
2932 let now = Instant::now();
2933
2934 for key in admitted.replica_keys {
2935 if !storage.exists(&key).unwrap_or(false) {
2936 let result = q.add_pending_verify(
2937 key,
2938 VerificationEntry {
2939 state: VerificationState::PendingVerify,
2940 pipeline: HintPipeline::Replica,
2941 verified_sources: Vec::new(),
2942 tried_sources: HashSet::new(),
2943 created_at: now,
2944 hint_sender: *source_peer,
2945 },
2946 );
2947 match result {
2948 crate::replication::scheduling::AdmissionResult::Admitted => {
2949 discovered.insert(key);
2950 }
2951 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2952 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2953 capacity_rejected_count += 1;
2954 }
2955 }
2956 }
2957 }
2958
2959 for key in admitted.paid_only_keys {
2960 let result = q.add_pending_verify(
2961 key,
2962 VerificationEntry {
2963 state: VerificationState::PendingVerify,
2964 pipeline: HintPipeline::PaidOnly,
2965 verified_sources: Vec::new(),
2966 tried_sources: HashSet::new(),
2967 created_at: now,
2968 hint_sender: *source_peer,
2969 },
2970 );
2971 match result {
2972 crate::replication::scheduling::AdmissionResult::Admitted => {
2973 discovered.insert(key);
2974 }
2975 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2976 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2977 capacity_rejected_count += 1;
2978 }
2979 }
2980 }
2981
2982 if capacity_rejected_count > 0 {
2983 debug!(
2984 "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
2985 rejected at queue capacity; source will need to re-hint after pending_verify drains"
2986 );
2987 }
2988
2989 AdmissionOutcome {
2990 discovered,
2991 capacity_rejected_count,
2992 }
2993}
2994
2995// ---------------------------------------------------------------------------
2996// Verification cycle
2997// ---------------------------------------------------------------------------
2998
2999/// Run one verification cycle: process pending keys through quorum checks.
3000#[allow(clippy::too_many_lines)]
3001async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
3002 let cycle_started = Instant::now();
3003 let VerificationCycleContext {
3004 p2p_node,
3005 paid_list,
3006 storage,
3007 queues,
3008 config,
3009 bootstrap_state,
3010 is_bootstrapping,
3011 bootstrap_complete_notify,
3012 last_commitment_by_peer,
3013 ever_capable_peers,
3014 recent_provers,
3015 } = ctx;
3016
3017 // Evict stale entries that have been pending too long (e.g. unreachable
3018 // verification targets during a network partition).
3019 {
3020 let mut q = queues.write().await;
3021 q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
3022 }
3023
3024 let pending_keys = {
3025 let q = queues.read().await;
3026 q.pending_keys()
3027 };
3028
3029 if pending_keys.is_empty() {
3030 return;
3031 }
3032 let initial_pending_count = pending_keys.len();
3033
3034 let self_id = *p2p_node.peer_id();
3035
3036 // Step 1: Check local PaidForList for fast-path authorization (Section 9,
3037 // step 4).
3038 let mut local_paid_presence_probe_keys = Vec::new();
3039 let mut local_paid_paid_only_keys = Vec::new();
3040 let mut keys_needing_network = Vec::new();
3041 let mut terminal_keys: Vec<XorName> = Vec::new();
3042 {
3043 let mut q = queues.write().await;
3044 for key in &pending_keys {
3045 if paid_list.contains(key).unwrap_or(false) {
3046 if let Some(pipeline) =
3047 q.set_pending_state(key, VerificationState::PaidListVerified)
3048 {
3049 match pipeline {
3050 HintPipeline::PaidOnly => {
3051 // Paid-only + local paid state needs one more
3052 // storage-admission check outside this lock: if we
3053 // are also in the close group plus storage margin,
3054 // the hint can repair a missing replica.
3055 local_paid_paid_only_keys.push(*key);
3056 }
3057 HintPipeline::Replica => {
3058 // Local paid-list membership authorizes the key.
3059 // We still need a presence probe to discover fetch
3060 // sources, but we must not require remote paid
3061 // majority or presence quorum.
3062 local_paid_presence_probe_keys.push(*key);
3063 }
3064 }
3065 }
3066 } else {
3067 keys_needing_network.push(*key);
3068 }
3069 }
3070 }
3071
3072 if !local_paid_paid_only_keys.is_empty() {
3073 let mut terminal_paid_only = Vec::new();
3074 for key in local_paid_paid_only_keys {
3075 if storage.exists(&key).unwrap_or(false) {
3076 terminal_paid_only.push(key);
3077 } else if admission::is_responsible(
3078 &self_id,
3079 &key,
3080 p2p_node,
3081 storage_admission_width(config.close_group_size),
3082 )
3083 .await
3084 {
3085 local_paid_presence_probe_keys.push(key);
3086 } else {
3087 terminal_paid_only.push(key);
3088 }
3089 }
3090
3091 if !terminal_paid_only.is_empty() {
3092 let mut q = queues.write().await;
3093 for key in terminal_paid_only {
3094 q.remove_pending(&key);
3095 terminal_keys.push(key);
3096 }
3097 }
3098 }
3099
3100 let local_paid_probe_count = local_paid_presence_probe_keys.len();
3101 let keys_needing_network_count = keys_needing_network.len();
3102
3103 // Step 1b: Local paid-list hit for fetch-eligible keys. Per Section 9
3104 // step 4, authorization succeeds immediately; run a presence-only probe
3105 // to find any holder we can fetch from.
3106 if !local_paid_presence_probe_keys.is_empty() {
3107 let targets = quorum::compute_presence_targets(
3108 &local_paid_presence_probe_keys,
3109 p2p_node,
3110 config,
3111 &self_id,
3112 )
3113 .await;
3114 let evidence = quorum::run_verification_round(
3115 &local_paid_presence_probe_keys,
3116 &targets,
3117 p2p_node,
3118 config,
3119 )
3120 .await;
3121
3122 let mut q = queues.write().await;
3123 for key in local_paid_presence_probe_keys {
3124 if storage.exists(&key).unwrap_or(false) {
3125 q.remove_pending(&key);
3126 terminal_keys.push(key);
3127 continue;
3128 }
3129 let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
3130 quorum::present_sources_for_key(&key, ev, &targets)
3131 });
3132 if sources.is_empty() {
3133 // Terminal failure: remove pending and report. No fetch path.
3134 q.remove_pending(&key);
3135 warn!(
3136 "Locally paid key {} has no responding holders (possible data loss)",
3137 hex::encode(key)
3138 );
3139 terminal_keys.push(key);
3140 } else {
3141 let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
3142 // Atomic remove+enqueue: if fetch_queue is at capacity, the
3143 // pending entry is preserved and retried next cycle (no
3144 // silent drop of verified replica-repair work).
3145 let _ = q.promote_pending_to_fetch(key, distance, sources);
3146 }
3147 }
3148 }
3149
3150 // Steps 2-5: Network verification (skipped if all keys resolved locally).
3151 if !keys_needing_network.is_empty() {
3152 // Step 2: Compute targets and run network verification round.
3153 let targets =
3154 quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
3155 .await;
3156
3157 let evidence =
3158 quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
3159
3160 // Step 3: Evaluate results — collect outcomes without holding the write
3161 // lock across paid-list I/O.
3162 //
3163 // v12 §6 holder-eligibility: snapshot the per-peer last-commitment
3164 // table and recent_provers cache up front so the synchronous
3165 // evaluate_key_evidence_with_holder_check predicate can consult
3166 // them without awaiting. The predicate downgrades a Present
3167 // claim to Unresolved unless the peer is credited for that key.
3168 // Snapshot per-peer commitment data. We need two views:
3169 // - `commitment_by_peer_snapshot`: peers that currently have
3170 // a verified commitment record on file (used to look up
3171 // their current hash).
3172 // - `capable_peer_snapshot`: the sticky "ever v12-capable"
3173 // set. Sourced from a separate set rather than the
3174 // commitment map so eviction (PeerRemoved cleanup, sybil
3175 // cap at `MAX_LAST_COMMITMENT_BY_PEER`) does NOT downgrade
3176 // a previously-v12 peer to "legacy" credit-unconditionally.
3177 // Legacy / pre-v12 peers that have never sent a commitment
3178 // remain absent from the set and are credited via the
3179 // legacy path so mixed-version networks stay live.
3180 let commitment_by_peer_snapshot: HashMap<PeerId, [u8; 32]> = {
3181 let map = last_commitment_by_peer.read().await;
3182 map.iter()
3183 // Read the CACHED hash (§13) — no per-cycle re-serialize/re-hash
3184 // of every peer's ~5 KiB commitment.
3185 .filter_map(|(p, rec)| rec.commitment_hash().map(|h| (*p, h)))
3186 .collect()
3187 };
3188 let capable_peer_snapshot: HashSet<PeerId> = ever_capable_peers.read().await.clone();
3189 // Take a full snapshot of recent_provers under the read lock,
3190 // then release. The cache is bounded (16/key × keys), so the
3191 // clone is cheap.
3192 let provers_snapshot = recent_provers.read().await.clone();
3193 // For the replica-fetch path, we need to know whether THIS
3194 // node already holds the key being verified. The v12 §6
3195 // holder-credit gate is meant to prevent uncredited Present
3196 // claims from contributing to paid-list / reward quorum for
3197 // keys we DO hold (and could audit ourselves). For keys we
3198 // are trying to FETCH (i.e. not in local storage), there is
3199 // no possible local audit credit, and gating the presence
3200 // quorum on credit would deadlock replica-repair in a
3201 // fully v12-capable close group.
3202 let mut locally_held: HashSet<XorName> = HashSet::new();
3203 for key in &keys_needing_network {
3204 if storage.exists(key).unwrap_or(false) {
3205 locally_held.insert(*key);
3206 }
3207 }
3208 let holder_credit = |peer: &PeerId, key: &XorName| -> bool {
3209 if !locally_held.contains(key) {
3210 // Replica-fetch path: we don't hold this key, so we
3211 // cannot have collected audit credit for it. Trust
3212 // Present claims to drive fetch-source promotion;
3213 // chunk-PUT payment_verifier is the security backstop
3214 // when the bytes actually arrive.
3215 return true;
3216 }
3217 if !capable_peer_snapshot.contains(peer) {
3218 // Pre-v12 / legacy peer that has never gossiped a
3219 // commitment. The v12 §6 holder-eligibility check
3220 // doesn't apply: their Present evidence comes through
3221 // the legacy path and we credit it unconditionally
3222 // so a mixed-version network stays live during
3223 // transition.
3224 return true;
3225 }
3226 let Some(hash) = commitment_by_peer_snapshot.get(peer) else {
3227 // Peer is commitment_capable (sticky) but currently
3228 // has no live commitment record on file (e.g. their
3229 // last gossip was evicted from the LRU cache, or it
3230 // failed verification). Withhold credit until they
3231 // re-prove storage under a fresh commitment.
3232 return false;
3233 };
3234 provers_snapshot.is_credited_holder(key, peer, hash)
3235 };
3236
3237 let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
3238 {
3239 let q = queues.read().await;
3240 for key in &keys_needing_network {
3241 let Some(ev) = evidence.get(key) else {
3242 continue;
3243 };
3244 let Some(entry) = q.get_pending(key) else {
3245 continue;
3246 };
3247 let outcome = quorum::evaluate_key_evidence_with_holder_check(
3248 key,
3249 ev,
3250 &targets,
3251 config,
3252 holder_credit,
3253 );
3254 evaluated.push((*key, outcome, entry.pipeline));
3255 }
3256 } // read lock released
3257
3258 // Step 4: Insert verified keys into PaidForList (no lock held).
3259 let mut paid_insert_keys: Vec<XorName> = Vec::new();
3260 for (key, outcome, _) in &evaluated {
3261 if matches!(
3262 outcome,
3263 KeyVerificationOutcome::QuorumVerified { .. }
3264 | KeyVerificationOutcome::PaidListVerified { .. }
3265 ) {
3266 paid_insert_keys.push(*key);
3267 }
3268 }
3269 for key in &paid_insert_keys {
3270 if let Err(e) = paid_list.insert(key).await {
3271 warn!("Failed to add verified key to PaidForList: {e}");
3272 }
3273 }
3274
3275 // Paid-only hints normally update PaidForList only. If this node is
3276 // also within the storage-admission group for the key, a verified
3277 // paid-only hint can safely repair a missing replica using sources
3278 // from the same verification round.
3279 let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
3280 for (key, outcome, pipeline) in &evaluated {
3281 if *pipeline == HintPipeline::PaidOnly
3282 && matches!(
3283 outcome,
3284 KeyVerificationOutcome::QuorumVerified { .. }
3285 | KeyVerificationOutcome::PaidListVerified { .. }
3286 )
3287 && !storage.exists(key).unwrap_or(false)
3288 && admission::is_responsible(
3289 &self_id,
3290 key,
3291 p2p_node,
3292 storage_admission_width(config.close_group_size),
3293 )
3294 .await
3295 {
3296 paid_only_fetch_keys.insert(*key);
3297 }
3298 }
3299
3300 // Step 5: Update queues with the evaluated outcomes.
3301 let mut q = queues.write().await;
3302 for (key, outcome, pipeline) in evaluated {
3303 match outcome {
3304 KeyVerificationOutcome::QuorumVerified { sources }
3305 | KeyVerificationOutcome::PaidListVerified { sources } => {
3306 let fetch_eligible =
3307 pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
3308 if fetch_eligible && !sources.is_empty() {
3309 let distance =
3310 crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
3311 // Atomic remove+enqueue: on fetch_queue capacity miss
3312 // the pending entry is preserved so this verified key
3313 // is retried on the next cycle (no silent drop).
3314 let _ = q.promote_pending_to_fetch(key, distance, sources);
3315 // Not terminal — either moved to fetch queue, or
3316 // retained as pending until queue drains.
3317 } else if fetch_eligible && sources.is_empty() {
3318 warn!(
3319 "Verified storage-admitted key {} has no holders (possible data loss)",
3320 hex::encode(key)
3321 );
3322 q.remove_pending(&key);
3323 terminal_keys.push(key);
3324 } else {
3325 q.remove_pending(&key);
3326 terminal_keys.push(key);
3327 }
3328 }
3329 KeyVerificationOutcome::QuorumFailed
3330 | KeyVerificationOutcome::QuorumInconclusive => {
3331 q.remove_pending(&key);
3332 terminal_keys.push(key);
3333 }
3334 }
3335 }
3336 }
3337
3338 // Step 6: Remove terminal keys from bootstrap pending set and re-check
3339 // the drain condition.
3340 update_bootstrap_after_verification(
3341 &terminal_keys,
3342 bootstrap_state,
3343 queues,
3344 is_bootstrapping,
3345 bootstrap_complete_notify,
3346 )
3347 .await;
3348
3349 let (pending_after, fetch_after, in_flight_after) = {
3350 let q = queues.read().await;
3351 (
3352 q.pending_count(),
3353 q.fetch_queue_count(),
3354 q.in_flight_count(),
3355 )
3356 };
3357 let terminal_key_count = terminal_keys.len();
3358 let elapsed_ms = cycle_started.elapsed().as_millis();
3359
3360 if elapsed_ms >= VERIFICATION_CYCLE_SLOW_LOG_MS {
3361 info!(
3362 target: "ant_node::replication::verification",
3363 "Slow replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
3364 );
3365 } else {
3366 debug!(
3367 target: "ant_node::replication::verification",
3368 "Replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
3369 );
3370 }
3371}
3372
3373/// Post-verification bootstrap bookkeeping: remove terminal keys from the
3374/// bootstrap pending set and transition out of bootstrapping when drained.
3375async fn update_bootstrap_after_verification(
3376 terminal_keys: &[XorName],
3377 bootstrap_state: &Arc<RwLock<BootstrapState>>,
3378 queues: &Arc<RwLock<ReplicationQueues>>,
3379 is_bootstrapping: &Arc<RwLock<bool>>,
3380 bootstrap_complete_notify: &Arc<Notify>,
3381) {
3382 if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
3383 return;
3384 }
3385 {
3386 let mut bs = bootstrap_state.write().await;
3387 for key in terminal_keys {
3388 bs.remove_key(key);
3389 }
3390 }
3391 let q = queues.read().await;
3392 if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
3393 complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
3394 }
3395}
3396
3397/// Set `is_bootstrapping` to `false` and wake all waiters.
3398async fn complete_bootstrap(
3399 is_bootstrapping: &Arc<RwLock<bool>>,
3400 bootstrap_complete_notify: &Arc<Notify>,
3401) {
3402 *is_bootstrapping.write().await = false;
3403 bootstrap_complete_notify.notify_waiters();
3404 info!("Replication bootstrap complete");
3405}
3406
3407// ---------------------------------------------------------------------------
3408// Fetch types and single-fetch executor
3409// ---------------------------------------------------------------------------
3410
3411/// Result classification for a single fetch attempt.
3412enum FetchResult {
3413 /// Data fetched, integrity-checked, and stored successfully.
3414 Stored,
3415 /// Content-address integrity check failed — do not retry.
3416 IntegrityFailed,
3417 /// Source failed (network error or non-success response) — retryable.
3418 SourceFailed,
3419}
3420
3421/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
3422/// worker loop to update queue state.
3423struct FetchOutcome {
3424 key: XorName,
3425 result: FetchResult,
3426}
3427
3428#[allow(clippy::too_many_lines)]
3429/// Execute a single fetch request against `source` for `key`.
3430///
3431/// Handles encoding, network I/O, integrity checking, storage, and trust
3432/// event reporting. Returns a [`FetchOutcome`] so the caller can update
3433/// queue state without holding any locks during the network round-trip.
3434async fn execute_single_fetch(
3435 p2p_node: Arc<P2PNode>,
3436 storage: Arc<LmdbStorage>,
3437 config: Arc<ReplicationConfig>,
3438 key: XorName,
3439 source: PeerId,
3440) -> FetchOutcome {
3441 let request = protocol::FetchRequest { key };
3442 let msg = ReplicationMessage {
3443 request_id: rand::thread_rng().gen::<u64>(),
3444 body: ReplicationMessageBody::FetchRequest(request),
3445 };
3446
3447 let encoded = match msg.encode() {
3448 Ok(data) => data,
3449 Err(e) => {
3450 warn!("Failed to encode fetch request: {e}");
3451 return FetchOutcome {
3452 key,
3453 result: FetchResult::SourceFailed,
3454 };
3455 }
3456 };
3457
3458 let result = p2p_node
3459 .send_request(
3460 &source,
3461 REPLICATION_PROTOCOL_ID,
3462 encoded,
3463 config.fetch_request_timeout,
3464 )
3465 .await;
3466
3467 match result {
3468 Ok(response) => {
3469 let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
3470 p2p_node
3471 .report_trust_event(
3472 &source,
3473 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3474 )
3475 .await;
3476 return FetchOutcome {
3477 key,
3478 result: FetchResult::SourceFailed,
3479 };
3480 };
3481
3482 match resp_msg.body {
3483 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
3484 key: resp_key,
3485 data,
3486 }) => {
3487 // Validate the response key matches the requested key.
3488 // A malicious peer could serve valid data for a different
3489 // key, passing integrity checks while the requested key
3490 // is falsely marked as fetched.
3491 if resp_key != key {
3492 warn!(
3493 "Fetch response key mismatch: requested {}, got {}",
3494 hex::encode(key),
3495 hex::encode(resp_key)
3496 );
3497 p2p_node
3498 .report_trust_event(
3499 &source,
3500 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3501 )
3502 .await;
3503 return FetchOutcome {
3504 key,
3505 result: FetchResult::IntegrityFailed,
3506 };
3507 }
3508
3509 // Enforce chunk size invariant on fetched data.
3510 // Checked before the content-address hash to avoid
3511 // hashing up to 10 MiB of oversized junk data.
3512 if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
3513 warn!(
3514 "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
3515 hex::encode(resp_key),
3516 data.len(),
3517 crate::ant_protocol::MAX_CHUNK_SIZE,
3518 );
3519 p2p_node
3520 .report_trust_event(
3521 &source,
3522 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3523 )
3524 .await;
3525 return FetchOutcome {
3526 key,
3527 result: FetchResult::IntegrityFailed,
3528 };
3529 }
3530
3531 // Content-address integrity check.
3532 let computed = crate::client::compute_address(&data);
3533 if computed != resp_key {
3534 warn!(
3535 "Fetched record integrity check failed: expected {}, got {}",
3536 hex::encode(resp_key),
3537 hex::encode(computed)
3538 );
3539 p2p_node
3540 .report_trust_event(
3541 &source,
3542 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3543 )
3544 .await;
3545 return FetchOutcome {
3546 key,
3547 result: FetchResult::IntegrityFailed,
3548 };
3549 }
3550
3551 if let Err(e) = storage.put(&resp_key, &data).await {
3552 warn!(
3553 "Failed to store fetched record {}: {e}",
3554 hex::encode(resp_key)
3555 );
3556 return FetchOutcome {
3557 key,
3558 result: FetchResult::SourceFailed,
3559 };
3560 }
3561
3562 FetchOutcome {
3563 key,
3564 result: FetchResult::Stored,
3565 }
3566 }
3567 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
3568 ..
3569 }) => {
3570 // This peer was selected as a fetch source because it
3571 // recently answered `Present` during verification. A
3572 // subsequent NotFound is evidence of a stale/false claim
3573 // or chunk wiping, so penalize lightly and try another
3574 // verified source.
3575 warn!(
3576 "Fetch: verified source {source} returned NotFound for {}",
3577 hex::encode(key)
3578 );
3579 p2p_node
3580 .report_trust_event(
3581 &source,
3582 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3583 )
3584 .await;
3585 FetchOutcome {
3586 key,
3587 result: FetchResult::SourceFailed,
3588 }
3589 }
3590 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
3591 reason,
3592 ..
3593 }) => {
3594 warn!(
3595 "Fetch: peer {source} returned error for {}: {reason}",
3596 hex::encode(key)
3597 );
3598 p2p_node
3599 .report_trust_event(
3600 &source,
3601 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3602 )
3603 .await;
3604 FetchOutcome {
3605 key,
3606 result: FetchResult::SourceFailed,
3607 }
3608 }
3609 _ => {
3610 // Unexpected message type — treat as malformed.
3611 p2p_node
3612 .report_trust_event(
3613 &source,
3614 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3615 )
3616 .await;
3617 FetchOutcome {
3618 key,
3619 result: FetchResult::SourceFailed,
3620 }
3621 }
3622 }
3623 }
3624 Err(e) => {
3625 debug!("Fetch request to {source} failed: {e}");
3626 // No ApplicationFailure here — P2PNode::send_request() already
3627 // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
3628 FetchOutcome {
3629 key,
3630 result: FetchResult::SourceFailed,
3631 }
3632 }
3633 }
3634}
3635
3636// ---------------------------------------------------------------------------
3637// Audit result handler
3638// ---------------------------------------------------------------------------
3639
3640/// Format the first confirmed-failed key as a 16-hex-char label.
3641///
3642/// Pairs with `challenged_peer` to form a stable cross-host correlation
3643/// handle in the audit-failure log line, e.g.
3644///
3645/// ```text
3646/// Audit failure for <peer>: …, `first_failed_key=0x18878f1d2d9e0612`
3647/// ```
3648///
3649/// Falls back to `"0x"` when the list is empty so the log line never
3650/// contains a misleading default.
3651fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String {
3652 confirmed_failed_keys.first().map_or_else(
3653 || "0x".to_string(),
3654 |k| format!("0x{}", hex::encode(&k[..8])),
3655 )
3656}
3657
3658/// Execute the side effects for a confirmed storage-commitment audit failure.
3659///
3660/// [`plan_failed_audit`] is the pure decision INCLUDING the strike selection
3661/// (record-a-strike-for-`Timeout` vs leave-untouched for confirmed failures),
3662/// extracted so the whole glue — not just the verdict — is testable without a
3663/// live `P2PNode`. This function is only the resulting I/O. Timeouts are graced
3664/// and rollout-gated (TIMEOUT-EVICTION-DISABLED); confirmed failures penalize on
3665/// the first occurrence and revoke holder credit.
3666async fn handle_failed_audit(
3667 challenged_peer: &PeerId,
3668 confirmed_failed_key_count: usize,
3669 reason: &AuditFailureReason,
3670 p2p_node: &Arc<P2PNode>,
3671 sync_state: &Arc<RwLock<NeighborSyncState>>,
3672 recent_provers: &Arc<RwLock<RecentProvers>>,
3673 audit_timeout_strikes: &Arc<RwLock<HashMap<PeerId, u32>>>,
3674) {
3675 let action = {
3676 let mut strikes = audit_timeout_strikes.write().await;
3677 plan_failed_audit(reason, &mut strikes, challenged_peer)
3678 };
3679 match action {
3680 AuditFailureAction::TimeoutGrace => {
3681 // Honest transient slowness: no penalty, no credit loss, retain the
3682 // bootstrap claim. Only *sustained* timeouts (a peer that always
3683 // has to refetch) survive to the threshold — the per-challenge
3684 // window is never widened.
3685 debug!(
3686 "Audit timeout for {challenged_peer} (under the {}-strike threshold); \
3687 within grace, retaining bootstrap claim, no penalty",
3688 config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
3689 );
3690 }
3691 AuditFailureAction::TimeoutPenalize => {
3692 // Strikes are tracked/logged so the mechanism stays observable; the
3693 // trust report that drives eviction is gated behind
3694 // `TIMEOUT_EVICTION_ENABLED` (off this release — see its doc for the
3695 // rollout-death-spiral rationale). Confirmed storage-integrity
3696 // failures (ConfirmedPenalize below) are unaffected.
3697 warn!(
3698 "Audit timeout for {challenged_peer}: reached the {}-strike threshold of \
3699 consecutive timeouts ({})",
3700 config::AUDIT_TIMEOUT_STRIKE_THRESHOLD,
3701 if config::TIMEOUT_EVICTION_ENABLED {
3702 "penalizing"
3703 } else {
3704 "eviction disabled this release — not penalizing"
3705 }
3706 );
3707 if config::TIMEOUT_EVICTION_ENABLED {
3708 p2p_node
3709 .report_trust_event(
3710 challenged_peer,
3711 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
3712 )
3713 .await;
3714 }
3715 }
3716 AuditFailureAction::ConfirmedPenalize => {
3717 // The caller (handle_subtree_audit_result) already logged the rich
3718 // failure line with reason + per-category summary; avoid a redundant
3719 // second error log here. `confirmed_failed_key_count` is retained in
3720 // the signature for callers/tests that assert on it.
3721 let _ = confirmed_failed_key_count;
3722 // Peer returned a non-bootstrap response — clear the active claim
3723 // while retaining claim history.
3724 {
3725 let mut state = sync_state.write().await;
3726 state.clear_active_bootstrap_claim(challenged_peer);
3727 }
3728 // Revoke holder credit on a CONFIRMED failure (DigestMismatch /
3729 // KeyAbsent / Rejected / MalformedResponse): the peer no longer
3730 // provably holds what it committed to, so it must not keep §6
3731 // holder credit for the proof TTL. The §5 `forget_commitment` path
3732 // only fires on an "unknown commitment hash" reply; genuine byte
3733 // loss surfaces here.
3734 {
3735 let mut provers_guard = recent_provers.write().await;
3736 apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason);
3737 }
3738 p2p_node
3739 .report_trust_event(
3740 challenged_peer,
3741 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
3742 )
3743 .await;
3744 }
3745 }
3746}
3747
3748/// Handle audit result: log findings and emit trust events.
3749async fn handle_subtree_audit_result(
3750 result: &AuditTickResult,
3751 p2p_node: &Arc<P2PNode>,
3752 sync_state: &Arc<RwLock<NeighborSyncState>>,
3753 recent_provers: &Arc<RwLock<RecentProvers>>,
3754 audit_timeout_strikes: &Arc<RwLock<HashMap<PeerId, u32>>>,
3755 config: &ReplicationConfig,
3756) {
3757 match result {
3758 AuditTickResult::Passed {
3759 challenged_peer,
3760 keys_checked,
3761 } => {
3762 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
3763 // Peer responded normally — clear the active bootstrap claim while
3764 // retaining history so a later claim is treated as repeated abuse.
3765 {
3766 let mut state = sync_state.write().await;
3767 state.clear_active_bootstrap_claim(challenged_peer);
3768 }
3769 // A normal response proves the slowness (if any) was transient, so
3770 // reset the timeout-strike counter. Only *sustained* timeouts (a
3771 // peer that must refetch on every audit) survive this reset to
3772 // accumulate toward the penalty threshold.
3773 {
3774 let mut strikes = audit_timeout_strikes.write().await;
3775 strikes.remove(challenged_peer);
3776 }
3777 p2p_node
3778 .report_trust_event(
3779 challenged_peer,
3780 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
3781 )
3782 .await;
3783 }
3784 AuditTickResult::Failed { evidence } => {
3785 if let FailureEvidence::AuditFailure {
3786 challenged_peer,
3787 confirmed_failed_keys,
3788 summary,
3789 reason,
3790 ..
3791 } = evidence
3792 {
3793 // Rich diagnostics (from main's audit-failure logging) + the
3794 // first-failed-key correlation handle.
3795 let first_failed_key = first_failed_key_label(confirmed_failed_keys);
3796 error!(
3797 "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
3798 confirmed_failed_keys.len(),
3799 summary.challenged_keys,
3800 summary.absent_keys,
3801 summary.digest_mismatch_keys,
3802 );
3803 // Route the side effects through the strike-grace path: timeouts
3804 // are graced (and rollout-gated by TIMEOUT-EVICTION-DISABLED),
3805 // deterministic failures penalize on the first occurrence and
3806 // revoke holder credit. Do NOT report ApplicationFailure inline
3807 // here — that would evict honest not-yet-upgraded peers on a
3808 // single timeout during the breaking rollout.
3809 handle_failed_audit(
3810 challenged_peer,
3811 confirmed_failed_keys.len(),
3812 reason,
3813 p2p_node,
3814 sync_state,
3815 recent_provers,
3816 audit_timeout_strikes,
3817 )
3818 .await;
3819 }
3820 }
3821 AuditTickResult::BootstrapClaim { peer } => {
3822 // Gap 6: BootstrapClaimAbuse grace period in audit path.
3823 // Separate state mutation from network I/O to avoid holding the
3824 // write lock across report_trust_event.
3825 let should_report = {
3826 let now = Instant::now();
3827 let mut state = sync_state.write().await;
3828 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
3829 {
3830 BootstrapClaimObservation::WithinGrace { .. } => {
3831 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
3832 false
3833 }
3834 BootstrapClaimObservation::PastGrace { first_seen } => {
3835 warn!(
3836 "Audit: peer {peer} claiming bootstrap past grace period \
3837 ({:?} > {:?}), reporting abuse",
3838 now.duration_since(first_seen),
3839 config.bootstrap_claim_grace_period,
3840 );
3841 true
3842 }
3843 BootstrapClaimObservation::Repeated { first_seen } => {
3844 warn!(
3845 "Audit: peer {peer} repeated bootstrap claim after previously \
3846 stopping; first claim was {:?} ago, reporting abuse",
3847 now.duration_since(first_seen),
3848 );
3849 true
3850 }
3851 }
3852 };
3853 if should_report {
3854 p2p_node
3855 .report_trust_event(
3856 peer,
3857 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3858 )
3859 .await;
3860 }
3861 }
3862 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
3863 }
3864}
3865
3866/// Whether a confirmed audit failure with this reason clears the peer's active
3867/// bootstrap claim. A `Timeout` does not (the peer may still be legitimately
3868/// bootstrapping); every confirmed storage-integrity reason does.
3869///
3870/// Both audits now funnel through [`handle_failed_audit`], which derives the
3871/// clear-vs-retain decision from [`decide_audit_failure_action`]; this predicate
3872/// is retained as the readable single-source-of-truth that those tests assert
3873/// against (it is the exact `reason != Timeout` rule the action planner uses).
3874#[cfg(test)]
3875fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
3876 !matches!(reason, AuditFailureReason::Timeout)
3877}
3878
3879/// Handle the result of a responsible-chunk audit tick (audit #2): emit trust
3880/// events and manage bootstrap-claim state.
3881///
3882/// Delegates to [`handle_subtree_audit_result`] so BOTH audits share one
3883/// failure path: timeouts go through the strike/grace logic (graced under the
3884/// threshold, eviction gated off this release via `TIMEOUT-EVICTION-DISABLED`)
3885/// and only confirmed storage-integrity failures penalise on the first
3886/// occurrence and revoke holder credit. Previously this handler reported
3887/// `ApplicationFailure` inline for EVERY failure including `Timeout`, which —
3888/// with the breaking v2 wire change — would false-penalise honest
3889/// not-yet-upgraded peers on a single audit. (Audit #2 cannot credit holders,
3890/// so the shared handler's strike-reset/credit-revocation is a superset of what
3891/// it needs; the responsible-chunk audit never produces `Passed { .. }` with
3892/// holder credit, so nothing is over-credited.)
3893async fn handle_audit_result(
3894 result: &AuditTickResult,
3895 p2p_node: &Arc<P2PNode>,
3896 sync_state: &Arc<RwLock<NeighborSyncState>>,
3897 recent_provers: &Arc<RwLock<RecentProvers>>,
3898 audit_timeout_strikes: &Arc<RwLock<HashMap<PeerId, u32>>>,
3899 config: &ReplicationConfig,
3900) {
3901 handle_subtree_audit_result(
3902 result,
3903 p2p_node,
3904 sync_state,
3905 recent_provers,
3906 audit_timeout_strikes,
3907 config,
3908 )
3909 .await;
3910}
3911
3912/// What the audit-failure handler should do for a given failure, given the
3913/// peer's post-increment timeout-strike count. Pure (no I/O) so the whole
3914/// decision can be exercised end-to-end without a live `P2PNode`.
3915#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3916enum AuditFailureAction {
3917 /// Timeout under the strike threshold: no trust penalty, no credit
3918 /// revocation, retain the bootstrap claim (honest transient slowness).
3919 TimeoutGrace,
3920 /// Timeout at/over the threshold: report `ApplicationFailure`. Bootstrap
3921 /// claim retained; holder credit NOT revoked (the peer never admitted byte
3922 /// loss). The non-storing-peer case.
3923 TimeoutPenalize,
3924 /// Confirmed storage-integrity failure: penalize immediately, clear the
3925 /// active bootstrap claim, and revoke holder credit.
3926 ConfirmedPenalize,
3927}
3928
3929/// Upper bound on a peer's consecutive-timeout strike count. Must exceed the
3930/// largest reachable adaptive threshold (base + `MAX_ADAPTIVE_TIMEOUT_GRACE`) so
3931/// a genuinely non-responsive peer's count can always catch up to and cross an
3932/// inflated threshold — otherwise capping at the base would make timeout
3933/// penalties unreachable once the adaptive threshold rose.
3934const AUDIT_TIMEOUT_STRIKE_MAX: u32 = 64;
3935
3936/// Maximum extra grace the adaptive mechanism may add on top of the base
3937/// threshold. Bounds how far a (possibly stale) set of timing-out peers can
3938/// widen the window, so a small persistent failing cohort cannot push the
3939/// threshold arbitrarily high and shield a bad node indefinitely.
3940const MAX_ADAPTIVE_TIMEOUT_GRACE: u32 = 2 * config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
3941
3942/// Record an audit timeout for `peer` and return its new consecutive-timeout
3943/// strike count, saturating at [`AUDIT_TIMEOUT_STRIKE_MAX`] (well above any
3944/// reachable adaptive threshold). A successful audit removes the peer's entry
3945/// (the `Passed` arm of [`handle_subtree_audit_result`]), so only *consecutive*
3946/// timeouts accumulate here.
3947fn record_audit_timeout_strike(strikes: &mut HashMap<PeerId, u32>, peer: &PeerId) -> u32 {
3948 let count = strikes.entry(*peer).or_insert(0);
3949 *count = count.saturating_add(1).min(AUDIT_TIMEOUT_STRIKE_MAX);
3950 *count
3951}
3952
3953/// The adaptive timeout-strike threshold for judging `peer` (ADR-0002 "Network
3954/// Resilience"): `min(median of the OTHER timing-out peers' counts,
3955/// MAX_ADAPTIVE_TIMEOUT_GRACE) + base threshold`.
3956///
3957/// In a healthy network almost no peer carries timeout strikes, so the median
3958/// is 0 and the threshold is the base [`config::AUDIT_TIMEOUT_STRIKE_THRESHOLD`].
3959/// During genuine disruption many *honest* peers time out together, lifting the
3960/// median and widening the grace so the audit system does not pile onto a
3961/// struggling network — but the widening is capped at `MAX_ADAPTIVE_TIMEOUT_GRACE`
3962/// so a stale failing cohort cannot inflate it without bound.
3963///
3964/// `peer` is EXCLUDED from the median so a lone timing-out peer cannot raise its
3965/// own grace bar. Combined with the map being fed ONLY by timeouts (deterministic
3966/// failures never touch it), this closes self-inflation and bounds
3967/// attacker-inflation of the grace window.
3968fn adaptive_timeout_threshold(strikes: &HashMap<PeerId, u32>, peer: &PeerId) -> u32 {
3969 let grace = median_timeout_strikes_excluding(strikes, peer).min(MAX_ADAPTIVE_TIMEOUT_GRACE);
3970 grace.saturating_add(config::AUDIT_TIMEOUT_STRIKE_THRESHOLD)
3971}
3972
3973/// Lower median of the current per-peer consecutive-timeout counts, excluding
3974/// `peer`. No other peers → 0.
3975fn median_timeout_strikes_excluding(strikes: &HashMap<PeerId, u32>, peer: &PeerId) -> u32 {
3976 let mut counts: Vec<u32> = strikes
3977 .iter()
3978 .filter(|(p, _)| *p != peer)
3979 .map(|(_, c)| *c)
3980 .collect();
3981 if counts.is_empty() {
3982 return 0;
3983 }
3984 counts.sort_unstable();
3985 // Lower median: for even-sized inputs take the lower of the two middle
3986 // values ((len-1)/2), so the grace is conservative rather than inflated.
3987 counts.get((counts.len() - 1) / 2).copied().unwrap_or(0)
3988}
3989
3990/// Whether a peer's consecutive-timeout strike count reaches the (adaptive)
3991/// threshold for emitting an `ApplicationFailure` trust event.
3992fn timeout_strike_reaches_threshold(strikes: u32, threshold: u32) -> bool {
3993 strikes >= threshold
3994}
3995
3996/// Decide what to do about a confirmed audit failure. `timeout_strikes_after`
3997/// is the peer's strike count after recording this event and `timeout_threshold`
3998/// the adaptive threshold to compare against (both only meaningful when
3999/// `reason == Timeout`). Pure, so the integration-level decision can be asserted
4000/// in tests with no networking.
4001fn decide_audit_failure_action(
4002 reason: &AuditFailureReason,
4003 timeout_strikes_after: u32,
4004 timeout_threshold: u32,
4005) -> AuditFailureAction {
4006 if matches!(reason, AuditFailureReason::Timeout) {
4007 if timeout_strike_reaches_threshold(timeout_strikes_after, timeout_threshold) {
4008 AuditFailureAction::TimeoutPenalize
4009 } else {
4010 AuditFailureAction::TimeoutGrace
4011 }
4012 } else {
4013 AuditFailureAction::ConfirmedPenalize
4014 }
4015}
4016
4017/// Plan the response to a confirmed audit failure, performing the
4018/// strike-selection glue in-process: a `Timeout` records a strike against
4019/// `peer` (so consecutive timeouts accumulate) and is judged against the
4020/// adaptive threshold; every other reason is a confirmed failure that does NOT
4021/// touch the strike map. The caller owns the lock and performs the resulting I/O.
4022fn plan_failed_audit(
4023 reason: &AuditFailureReason,
4024 strikes: &mut HashMap<PeerId, u32>,
4025 peer: &PeerId,
4026) -> AuditFailureAction {
4027 // Snapshot the adaptive threshold from the *other* peers' counts (excluding
4028 // this peer), so a single peer's own timeouts cannot raise its own grace bar.
4029 let threshold = adaptive_timeout_threshold(strikes, peer);
4030 let strikes_after = if matches!(reason, AuditFailureReason::Timeout) {
4031 record_audit_timeout_strike(strikes, peer)
4032 } else {
4033 0
4034 };
4035 decide_audit_failure_action(reason, strikes_after, threshold)
4036}
4037
4038/// Whether a confirmed audit failure with this reason should revoke the
4039/// peer's `recent_provers` holder credit immediately (v12 §6).
4040///
4041/// `true` for any reason where the peer actually answered (or admitted
4042/// it cannot): `DigestMismatch`, `KeyAbsent`, `Rejected` ("missing
4043/// bytes for committed key"), `MalformedResponse` — these prove the
4044/// peer no longer holds what it committed to, so it must not keep
4045/// holder credit for the proof TTL. `false` for `Timeout`: a single
4046/// dropped packet must not strip an honest peer; the 40-min TTL is the
4047/// deliberate liveness cushion there.
4048fn audit_failure_revokes_holder_credit(reason: &AuditFailureReason) -> bool {
4049 !matches!(reason, AuditFailureReason::Timeout)
4050}
4051
4052/// Apply the holder-credit revocation decision for a confirmed audit
4053/// failure. Pure over `RecentProvers` so the handler wiring is unit-
4054/// testable without a live `P2PNode`: the production `Failed` arm of
4055/// `handle_subtree_audit_result` calls exactly this.
4056fn apply_audit_failure_credit_revocation(
4057 provers: &mut RecentProvers,
4058 challenged_peer: &PeerId,
4059 reason: &AuditFailureReason,
4060) {
4061 if audit_failure_revokes_holder_credit(reason) {
4062 provers.forget_peer(challenged_peer);
4063 }
4064}
4065
4066// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.
4067
4068// ---------------------------------------------------------------------------
4069// Storage-bound audit (ADR-0002) — gossip trigger + auditor-side ingestion
4070// ---------------------------------------------------------------------------
4071
4072/// State the gossip-audit trigger needs to spawn an audit. Bundled so the
4073/// message handler passes one value instead of a long argument list; all
4074/// fields are cheap `Arc` clones.
4075#[derive(Clone)]
4076struct GossipAuditTrigger {
4077 p2p_node: Arc<P2PNode>,
4078 config: Arc<ReplicationConfig>,
4079 recent_provers: Arc<RwLock<RecentProvers>>,
4080 sync_state: Arc<RwLock<NeighborSyncState>>,
4081 audit_timeout_strikes: Arc<RwLock<HashMap<PeerId, u32>>>,
4082 cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
4083}
4084
4085/// What a gossip ingest yields for the audit trigger: the commitment hash to
4086/// pin and the `key_count` needed to size the response deadline from the actual
4087/// `ceil(sqrt(N))` subtree (ADR-0002). Returned on every VALID gossip (changed
4088/// or not) so a stable-keyset node stays auditable — not just on its first
4089/// commitment.
4090#[derive(Debug, Clone, Copy)]
4091struct AuditTarget {
4092 pin_hash: [u8; 32],
4093 key_count: u32,
4094}
4095
4096/// Per-peer audit cooldown check-and-stamp (ADR-0002 "occasional surprise
4097/// exams, keeps load low"). Returns `true` if `peer` may be audited now (and
4098/// stamps `now`), `false` if it was audited within
4099/// `AUDIT_ON_GOSSIP_COOLDOWN_SECS`. Bounds the map under a flood of distinct
4100/// peers. Pure over the passed map so the flood/cooldown behaviour is testable
4101/// without a live node: a burst of gossips from one peer yields at most one
4102/// `true` per cooldown window.
4103fn cooldown_allows_audit(map: &mut HashMap<PeerId, Instant>, peer: &PeerId, now: Instant) -> bool {
4104 let cooldown = Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS);
4105 let known = match map.get(peer) {
4106 Some(&last) => {
4107 if now.saturating_duration_since(last) < cooldown {
4108 return false;
4109 }
4110 true
4111 }
4112 None => false,
4113 };
4114 // Bound the map under churn like its siblings (drop the oldest stamp) before
4115 // admitting a brand-new peer.
4116 if !known && map.len() >= MAX_LAST_COMMITMENT_BY_PEER {
4117 if let Some(victim) = map.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
4118 map.remove(&victim);
4119 }
4120 }
4121 map.insert(*peer, now);
4122 true
4123}
4124
4125/// The gossip-audit launch decision in ONE place so the ordering is shared
4126/// between production and its test (ADR-0002 "occasional surprise exams").
4127///
4128/// Order matters and is the security-relevant property: the per-peer cooldown is
4129/// checked-and-stamped FIRST, THEN the probability lottery (`lottery_wins`) is
4130/// applied. If the lottery were sampled first, a gossip flood would re-roll it on
4131/// every message until one won, multiplying audits. Because the cooldown is
4132/// stamped before the lottery is consulted, a LOSING ticket still consumes the
4133/// window — so each peer gets at most one audit lottery per cooldown window
4134/// regardless of how often it gossips. Production calls this with
4135/// `lottery_wins = gen_bool(AUDIT_ON_GOSSIP_PROBABILITY)`; the test calls it with
4136/// a deterministic `lottery_wins`, so a reorder regression here fails the test.
4137fn audit_launch_decision(
4138 map: &mut HashMap<PeerId, Instant>,
4139 peer: &PeerId,
4140 now: Instant,
4141 lottery_wins: bool,
4142) -> bool {
4143 // Gate 1: cooldown check-and-stamp (consumes the window even on a loss).
4144 if !cooldown_allows_audit(map, peer, now) {
4145 return false;
4146 }
4147 // Gate 2: the probability lottery.
4148 lottery_wins
4149}
4150
4151/// On a peer's *changed* gossiped commitment, maybe launch a subtree audit
4152/// (ADR-0002): fire with probability `AUDIT_ON_GOSSIP_PROBABILITY`, subject to a
4153/// per-peer cooldown, pinned to the just-ingested root. Detached so gossip
4154/// handling is never blocked on a network round-trip.
4155async fn maybe_trigger_gossip_audit(
4156 trigger: &GossipAuditTrigger,
4157 peer: &PeerId,
4158 target: AuditTarget,
4159) {
4160 // The launch decision (cooldown-then-lottery ordering) lives in the pure
4161 // `audit_launch_decision` so the ordering is shared with its test. Sample
4162 // the lottery here, then let the helper apply it AFTER the cooldown stamp.
4163 let now = Instant::now();
4164 let lottery_wins = rand::thread_rng().gen_bool(config::AUDIT_ON_GOSSIP_PROBABILITY);
4165 {
4166 let mut map = trigger.cooldown.write().await;
4167 if !audit_launch_decision(&mut map, peer, now, lottery_wins) {
4168 return;
4169 }
4170 }
4171
4172 let trigger = trigger.clone();
4173 let peer = *peer;
4174 tokio::spawn(async move {
4175 let credit = storage_commitment_audit::AuditCredit {
4176 recent_provers: &trigger.recent_provers,
4177 };
4178 let result = storage_commitment_audit::run_subtree_audit(
4179 &trigger.p2p_node,
4180 &trigger.config,
4181 &peer,
4182 target.pin_hash,
4183 target.key_count,
4184 Some(&credit),
4185 )
4186 .await;
4187 handle_subtree_audit_result(
4188 &result,
4189 &trigger.p2p_node,
4190 &trigger.sync_state,
4191 &trigger.recent_provers,
4192 &trigger.audit_timeout_strikes,
4193 &trigger.config,
4194 )
4195 .await;
4196 });
4197}
4198
4199/// Atomic check-and-stamp of the per-peer commitment sig-verify rate limit.
4200///
4201/// Returns `true` if a signature verify is allowed now (and stamps the attempt
4202/// time), `false` if the peer is within [`COMMITMENT_SIG_VERIFY_MIN_INTERVAL`]
4203/// of its last attempt. Holds one write lock across the decision so two
4204/// concurrent ingests from the same peer cannot both pass. Stamps BEFORE the
4205/// caller's expensive verify so a slow/failed verify still rate-limits the next
4206/// message. Bounds the map under a flood of distinct peer ids.
4207async fn sig_verify_rate_limit_ok(
4208 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
4209 source: &PeerId,
4210 now: Instant,
4211) -> bool {
4212 let mut attempts = sig_verify_attempts.write().await;
4213 if let Some(&last) = attempts.get(source) {
4214 if now.saturating_duration_since(last) < COMMITMENT_SIG_VERIFY_MIN_INTERVAL {
4215 return false;
4216 }
4217 }
4218 if attempts.len() >= MAX_LAST_COMMITMENT_BY_PEER && !attempts.contains_key(source) {
4219 if let Some(victim) = attempts.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
4220 attempts.remove(&victim);
4221 }
4222 }
4223 attempts.insert(*source, now);
4224 true
4225}
4226
4227/// Verify + store an inbound commitment from a gossip peer.
4228///
4229/// Called from the inbound `NeighborSyncRequest`/`Response` handlers and
4230/// the bootstrap-sync loop. Drops the commitment unless all five gates
4231/// pass:
4232/// 1. `source` is in our DHT routing table (sybil/churn cap).
4233/// 2. `commitment.sender_peer_id == source.as_bytes()` (peer-id
4234/// binding to the authenticated transport peer).
4235/// 3. `BLAKE3(commitment.sender_public_key) == commitment.sender_peer_id`
4236/// (the embedded pubkey actually belongs to the claimed identity —
4237/// saorsa-core derives `PeerId = BLAKE3(pubkey)`).
4238/// 4. `verify_commitment_signature(commitment)` succeeds against the
4239/// embedded public key. The signed payload binds the pubkey, so an
4240/// adversary cannot swap the key while keeping the body.
4241/// 5. The cache has room or this is an update for an existing entry
4242/// (sybil cap, `MAX_LAST_COMMITMENT_BY_PEER`).
4243///
4244/// On all-pass, the commitment is stored as the auditor's per-peer
4245/// "last known commitment" for use as `expected_commitment_hash` in
4246/// future audits.
4247///
4248/// Failures (no commitment / mismatched peer id / bad signature) are
4249/// silent drops — gossip is best-effort and a malformed commitment from
4250/// one peer should not affect anything else.
4251///
4252/// Returns `Some(AuditTarget)` whenever a VALID commitment was stored (whether
4253/// or not its root changed), so the caller can run a probabilistic,
4254/// cooldown-gated subtree audit. Returning on *every* valid gossip — not only
4255/// changed ones — is deliberate (ADR-0002): a node with a stable key set keeps
4256/// being auditable, so it cannot pass one audit and then delete data while
4257/// re-gossiping the same root forever. The cooldown + probability bound the
4258/// audit frequency. Returns `None` only if the commitment was dropped (failed a
4259/// gate) or there is nothing to pin.
4260///
4261/// Handle a capable peer gossiping `None` (a commitment downgrade).
4262///
4263/// A capable peer that previously gossiped a commitment but now gossips `None`
4264/// is trying to drop off the audit path. Within the answerability window we keep
4265/// the cached commitment pinned AND return it as an audit target so this gossip
4266/// still schedules a subtree audit against the peer's last known commitment — if
4267/// it genuinely dropped the data, the audit fails (there is no periodic tick, so
4268/// the trigger MUST fire here or the downgrade is never re-challenged).
4269///
4270/// But this only holds within the SAME `GOSSIP_ANSWERABILITY_TTL` the responder
4271/// honours for its own retired commitment: once that elapses since we last
4272/// received the peer's commitment, an honest peer has legitimately retired that
4273/// root (its responder side `retire_current`s and lets it age out) and can no
4274/// longer answer a pin on it. Auditing it past the TTL would manufacture a false
4275/// failure, so we then forget the cached commitment (keeping the sticky
4276/// `commitment_capable` bit) and stop pinning it.
4277async fn handle_commitment_downgrade(
4278 source: &PeerId,
4279 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
4280) -> Option<AuditTarget> {
4281 let now = Instant::now();
4282 let cached = {
4283 let map = last_commitment_by_peer.read().await;
4284 map.get(source).and_then(|rec| {
4285 if !rec.commitment_capable {
4286 return None;
4287 }
4288 let last = rec.last_commitment()?;
4289 let pin = rec.commitment_hash()?;
4290 let fresh = now.saturating_duration_since(rec.received_at)
4291 < crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
4292 Some((pin, last.key_count, fresh))
4293 })
4294 };
4295 match cached {
4296 Some((pin, key_count, true)) => {
4297 warn!(
4298 "ingest_peer_commitment: commitment-capable peer {source} sent None \
4299 (downgrade attempt); auditing against its last cached commitment"
4300 );
4301 Some(AuditTarget {
4302 pin_hash: pin,
4303 key_count,
4304 })
4305 }
4306 Some((_, _, false)) => {
4307 // Cached commitment has aged past the answerability window — forget
4308 // it so we stop pinning a root the peer is no longer obliged to
4309 // answer. Keep `commitment_capable` (sticky). Re-check freshness
4310 // UNDER the write lock (compare-and-clear): a concurrent valid gossip
4311 // from this peer may have refreshed `received_at` in the gap between
4312 // our read and write locks; if so, leave its fresh commitment intact.
4313 if let Some(rec) = last_commitment_by_peer.write().await.get_mut(source) {
4314 let still_stale = now.saturating_duration_since(rec.received_at)
4315 >= crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
4316 if still_stale {
4317 rec.clear_commitment();
4318 debug!(
4319 "ingest_peer_commitment: capable peer {source} sent None and its cached \
4320 commitment aged past the answerability TTL; forgetting it"
4321 );
4322 }
4323 }
4324 None
4325 }
4326 None => None,
4327 }
4328}
4329
4330async fn ingest_peer_commitment(
4331 source: &PeerId,
4332 commitment: Option<&StorageCommitment>,
4333 p2p_node: &Arc<P2PNode>,
4334 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
4335 ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
4336 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
4337) -> Option<AuditTarget> {
4338 let Some(c) = commitment else {
4339 return handle_commitment_downgrade(source, last_commitment_by_peer).await;
4340 };
4341 // RT-membership gate: only accept commitments from peers in our
4342 // routing table. Off-RT senders (sybils, drive-by relays) cannot
4343 // populate the cache, which closes the hole where a flood of
4344 // off-RT identities could fill the cap and evict honest
4345 // peers. The neighbor-sync request handler applies the same gate
4346 // before admitting inbound replication hints (see neighbor_sync.rs
4347 // `sender_in_rt`); we mirror that policy here for the commitment
4348 // piggyback.
4349 if !p2p_node.dht_manager().is_in_routing_table(source).await {
4350 debug!("ingest_peer_commitment: source {source} not in routing table (dropped)");
4351 return None;
4352 }
4353 // Peer-id binding: the commitment's claimed sender must match the
4354 // authenticated transport peer (`source`). Defeats relay/replay
4355 // and also pins which embedded public key we are about to verify
4356 // against — the verify itself trusts the embedded key, so the
4357 // peer-id binding is the link to a real identity.
4358 if &c.sender_peer_id != source.as_bytes() {
4359 warn!(
4360 "ingest_peer_commitment: sender_peer_id mismatch from {source} \
4361 (dropped, possible relay attempt)"
4362 );
4363 return None;
4364 }
4365 // Peer-id to embedded-pubkey binding: saorsa-core derives PeerId as
4366 // BLAKE3(pubkey_bytes). Without this check, a responder could sign
4367 // with a throwaway key they own and lie about which identity it
4368 // belongs to (the embedded-key signature would verify trivially).
4369 let derived_peer_id = *blake3::hash(&c.sender_public_key).as_bytes();
4370 if derived_peer_id != c.sender_peer_id {
4371 warn!(
4372 "ingest_peer_commitment: embedded pubkey does not hash to claimed peer_id for \
4373 {source} (dropped, throwaway-key attack)"
4374 );
4375 return None;
4376 }
4377 // §2 step 3 + §11 DoS: rate-limit per-peer to at most one ML-DSA
4378 // signature verify per `COMMITMENT_SIG_VERIFY_MIN_INTERVAL`. A
4379 // sybil/RT-membership-bypassing peer that flooded valid-looking
4380 // gossip would otherwise burn CPU on every message. The rate
4381 // limit is checked AFTER cheap structural gates (RT, peer-id
4382 // binding, pubkey-binding) and BEFORE the expensive sig verify.
4383 //
4384 // Tracked in `sig_verify_attempts` (separate from
4385 // last_commitment_by_peer) so EVERY attempt — successful or not —
4386 // bumps the rate-limit clock. Reading only from PeerCommitmentRecord
4387 // would skip the cap for peers we've never successfully verified,
4388 // letting a flood of invalid-but-structurally-plausible gossips
4389 // burn CPU.
4390 let now = Instant::now();
4391 if !sig_verify_rate_limit_ok(sig_verify_attempts, source, now).await {
4392 debug!(
4393 "ingest_peer_commitment: rate-limited sig verify from {source} \
4394 (< {COMMITMENT_SIG_VERIFY_MIN_INTERVAL:?} since last attempt); dropped"
4395 );
4396 return None;
4397 }
4398 // Signature verify, using the public key embedded in the commitment
4399 // itself. The pubkey is bound by the signature payload (see
4400 // commitment_signed_payload) so an adversary cannot keep the body
4401 // and swap the key to one they hold the secret for.
4402 if !crate::replication::commitment::verify_commitment_signature(c) {
4403 warn!(
4404 "ingest_peer_commitment: signature did not verify under embedded key for {source} \
4405 (dropped, forged commitment)"
4406 );
4407 return None;
4408 }
4409 // The new commitment's hash, used to store and to pin for the audit target.
4410 let new_hash = commitment_hash(c);
4411 let mut map = last_commitment_by_peer.write().await;
4412 // Sybil/churn cap: if we're at the hard cap AND this is a new peer,
4413 // evict an arbitrary existing entry to make room. Updates for peers
4414 // already in the map are always accepted (they replace, not grow).
4415 if map.len() >= MAX_LAST_COMMITMENT_BY_PEER && !map.contains_key(source) {
4416 // Drop one arbitrary entry. HashMap iter order is random which
4417 // is fine — over time PeerRemoved cleanup keeps the working set
4418 // anchored on the real RT membership; this cap only fires under
4419 // active flooding attempts.
4420 if let Some(victim) = map.keys().next().copied() {
4421 map.remove(&victim);
4422 warn!(
4423 "ingest_peer_commitment: cache full ({MAX_LAST_COMMITMENT_BY_PEER}); \
4424 evicted {victim} to admit {source}"
4425 );
4426 }
4427 }
4428 // Preserve sticky commitment_capable across updates — once true,
4429 // always true. New entries start with capable = true (we just
4430 // verified a valid commitment from this peer).
4431 map.entry(*source)
4432 .and_modify(|r| {
4433 // set_commitment refreshes the cached hash (§13) alongside the
4434 // commitment + received_at so they never drift.
4435 r.set_commitment(c.clone(), now);
4436 r.last_sig_verify_at = now;
4437 r.commitment_capable = true; // sticky-redundant but explicit
4438 })
4439 .or_insert_with(|| PeerCommitmentRecord::from_verified(c.clone(), now));
4440 drop(map);
4441 // Record the sticky "ever v12-capable" bit in a set independent of
4442 // `last_commitment_by_peer` (whose entries can be evicted by
4443 // `PeerRemoved` and the sybil cap). This is what the §3 audit
4444 // shield and the §6 holder-eligibility closure consult to decide
4445 // whether the peer is expected to speak v12.
4446 //
4447 // Capped at `MAX_EVER_CAPABLE_PEERS` to bound memory under
4448 // identity-rotation attacks: once full, new entries are refused.
4449 // Refusal degrades over-cap peers to the behaviour before this set
4450 // existed (treated as legacy on rejoin), which is not a security
4451 // regression and preserves the historic set stable.
4452 {
4453 let mut set = ever_capable_peers.write().await;
4454 if set.contains(source) || set.len() < MAX_EVER_CAPABLE_PEERS {
4455 set.insert(*source);
4456 } else {
4457 warn!(
4458 "ingest_peer_commitment: ever_capable_peers at cap \
4459 ({MAX_EVER_CAPABLE_PEERS}); refusing to record {source} as sticky-capable"
4460 );
4461 }
4462 }
4463 // Return an audit target for EVERY valid stored commitment (changed or
4464 // not), so the caller's cooldown+probability-gated trigger keeps a
4465 // stable-keyset peer auditable over time (ADR-0002). Only a serialization
4466 // failure (new_hash == None, unreachable for a real commitment) yields None.
4467 new_hash.map(|pin_hash| AuditTarget {
4468 pin_hash,
4469 key_count: c.key_count,
4470 })
4471}
4472
4473// ---------------------------------------------------------------------------
4474// Storage-bound audit (v12) — responder commitment rotation
4475// ---------------------------------------------------------------------------
4476
4477/// Read the current LMDB key set, build + sign a fresh
4478/// `StorageCommitment`, and rotate it into `state` as the new `current`.
4479/// The prior `current` is demoted to `previous`; the prior `previous` is
4480/// dropped (per `ResponderCommitmentState::rotate`).
4481///
4482/// For content-addressed chunks (Autonomi's chunk store), `address ==
4483/// BLAKE3(content)`, so `bytes_hash := key` and we don't have to
4484/// re-read each chunk's bytes to compute the leaf hash.
4485///
4486/// Skips (returns `Ok(())`) if the key set is empty — no commitment to
4487/// rotate. The auditor side handles "no commitment for this peer" by
4488/// falling back to the legacy plain-digest audit path.
4489async fn rebuild_and_rotate_commitment(
4490 storage: &Arc<LmdbStorage>,
4491 identity: &Arc<NodeIdentity>,
4492 state: &Arc<ResponderCommitmentState>,
4493 p2p: &Arc<P2PNode>,
4494 config: &Arc<ReplicationConfig>,
4495) -> Result<()> {
4496 use saorsa_pqc::api::sig::{MlDsaSecretKey, MlDsaVariant};
4497
4498 let stored_keys = storage
4499 .all_keys()
4500 .await
4501 .map_err(|e| Error::Storage(format!("commitment build: read keys: {e}")))?;
4502
4503 // Commit only to keys we are still RESPONSIBLE for ("want-to-hold"), not
4504 // everything currently on disk ("hold"). This is the half of the retention
4505 // contract that lets out-of-range chunks age out: a key that has left our
4506 // close group is excluded from the NEXT commitment, so within at most
4507 // RETAINED_GOSSIPED_COMMITMENTS gossip rotations it falls out of the
4508 // last-2-gossiped window, `ResponderCommitmentState::is_held` goes false,
4509 // and the pruner (which until then vetoes its deletion) reclaims it. Without
4510 // this filter the pruner's reprieve would keep re-committing stale keys
4511 // forever (the rebuild reads all_keys, so a retained-on-disk key would be
4512 // re-committed and re-gossiped every rotation — a permanent pin).
4513 let storage_empty = stored_keys.is_empty();
4514 let self_id = *p2p.peer_id();
4515 let mut keys = Vec::with_capacity(stored_keys.len());
4516 for k in stored_keys {
4517 if admission::is_responsible(&self_id, &k, p2p, config.close_group_size).await {
4518 keys.push(k);
4519 }
4520 }
4521
4522 if keys.is_empty() {
4523 if storage_empty {
4524 // Storage is genuinely empty — there is nothing to answer for, so
4525 // drop the previously advertised commitment immediately. Keeping it
4526 // would leave remote auditors pinning a hash we can never satisfy
4527 // again (the bytes are gone).
4528 if state.retained_slot_count() > 0 {
4529 debug!("Commitment rotation: storage empty, clearing retained slots");
4530 state.clear_all();
4531 }
4532 return Ok(());
4533 }
4534 // Bytes are still on disk but no key is currently in range. We must NOT
4535 // clear retention here: a peer may still be pinning a root we gossiped
4536 // moments ago and could demand its bytes in a round-2 challenge, which
4537 // we can still answer (the bytes are present). But we must STOP
4538 // advertising the stale commitment: retire it so `current()` returns
4539 // `None` and the gossip-emit sites stop re-emitting and re-stamping it.
4540 // The retired slot then ages out by its gossip-answerability TTL while
4541 // remaining answerable for in-flight pins until then. Once it ages out,
4542 // `is_held` flips false and the pruner reclaims the now-uncommitted,
4543 // out-of-range chunks. (Calling `age_out` alone would leave `current()`
4544 // pointing at the stale root, which the gossip loop would keep
4545 // re-stamping — pinning its keys forever.)
4546 debug!(
4547 "Commitment rotation: no responsible keys to commit to; retiring current commitment \
4548 (stays answerable until its gossip TTL lapses, bytes still on disk)"
4549 );
4550 state.retire_current();
4551 return Ok(());
4552 }
4553
4554 // Cap to MAX_COMMITMENT_KEY_COUNT for v12 (responder must not commit
4555 // to more than the protocol limit; auditor would reject the
4556 // commitment otherwise).
4557 let cap = commitment::MAX_COMMITMENT_KEY_COUNT as usize;
4558 if keys.len() > cap {
4559 warn!(
4560 "Commitment rotation: key set ({}) exceeds MAX_COMMITMENT_KEY_COUNT ({}); \
4561 truncating — investigate as this likely means a misconfiguration",
4562 keys.len(),
4563 cap
4564 );
4565 }
4566
4567 // INVARIANT: this module is only used with CONTENT-ADDRESSED chunks,
4568 // where `key == BLAKE3(content)`, so `bytes_hash := key` and we skip a
4569 // full chunk re-read per rotation.
4570 //
4571 // Consequence to be precise about: because the leaf is `(key, key)`,
4572 // the Merkle root commits to the SET OF KEYS, not to the bytes. The
4573 // commitment therefore binds "which keys I claim to hold"; it does NOT
4574 // by itself prove byte possession. Byte possession is enforced by the
4575 // audit-verify path, which recomputes `bytes_hash == BLAKE3(local_bytes)`
4576 // and the per-key digest against the AUDITOR'S OWN local copy of the
4577 // bytes — so a responder that holds the key list but dropped the bytes
4578 // still fails (`missing bytes for committed key` / digest mismatch).
4579 // This is sound ONLY while keys are content addresses. If this module
4580 // is ever reused for non-content-addressed records (`bytes_hash != key`),
4581 // the `(k, k)` shortcut would let a byte-less node forge a valid root and
4582 // MUST be replaced with `(key, BLAKE3(bytes))` computed from real bytes.
4583 let entries: Vec<_> = keys.into_iter().take(cap).map(|k| (k, k)).collect();
4584
4585 // No-op-rotation guard: compute just the Merkle root from `entries`
4586 // and compare against the currently-advertised commitment's root.
4587 // If they match, the key set is unchanged and a new rotation would
4588 // only swap a randomized ML-DSA signature for a fresh one — same
4589 // content, different commitment_hash. That invalidates every
4590 // outstanding `recent_provers` credit on this node across the
4591 // close group with no security benefit, breaking steady-state
4592 // quorum liveness on large nodes that can't re-audit every key
4593 // every rotation interval. Skip the rotation entirely when the
4594 // tree is unchanged.
4595 // Build the tree ONCE here (moving `entries`): it serves both the no-op
4596 // root check below and, if we proceed, the signed commitment via
4597 // `build_from_tree` (§11 — previously the tree was built here and AGAIN
4598 // inside `BuiltCommitment::build`).
4599 let candidate_tree = commitment::MerkleTree::build(entries)
4600 .map_err(|e| Error::Crypto(format!("commitment tree build: {e}")))?;
4601 let candidate_root = candidate_tree.root();
4602 if let Some(current) = state.current() {
4603 if current.commitment().root == candidate_root {
4604 debug!(
4605 "Commitment rotation: key set unchanged (root={}); skipping no-op re-sign",
4606 hex::encode(candidate_root)
4607 );
4608 // Even though we skip re-signing (to avoid invalidating holder
4609 // credit), retention must still advance on the wall clock: a
4610 // previously-gossiped commitment that holds a now-out-of-range key
4611 // must be able to age out of the answerability window even when the
4612 // committed key set is frozen here for many rotations. Without this,
4613 // the no-op guard would pin a stale slot — and its key — forever.
4614 state.age_out();
4615 return Ok(());
4616 }
4617 }
4618
4619 let sk_bytes = identity.secret_key_bytes().to_vec();
4620 let sk = MlDsaSecretKey::from_bytes(MlDsaVariant::MlDsa65, &sk_bytes)
4621 .map_err(|e| Error::Crypto(format!("commitment build: load sk: {e}")))?;
4622 let pk_bytes = identity.public_key().as_bytes().to_vec();
4623 let peer_id_bytes = *p2p.peer_id().as_bytes();
4624
4625 let built = commitment_state::BuiltCommitment::build_from_tree(
4626 candidate_tree,
4627 &peer_id_bytes,
4628 &sk,
4629 &pk_bytes,
4630 )
4631 .map_err(|e| Error::Crypto(format!("commitment build: {e}")))?;
4632
4633 let hash = hex::encode(built.hash());
4634 let key_count = built.commitment().key_count;
4635 state.rotate(built);
4636 info!("Storage commitment rotated: hash={hash} key_count={key_count}");
4637 Ok(())
4638}
4639
4640#[cfg(test)]
4641#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
4642mod tests {
4643 use super::{
4644 adaptive_timeout_threshold, apply_audit_failure_credit_revocation,
4645 audit_failure_clears_bootstrap_claim, audit_failure_revokes_holder_credit,
4646 audit_launch_decision, config, cooldown_allows_audit, decide_audit_failure_action,
4647 first_failed_key_label, fresh_offer_payment_context, median_timeout_strikes_excluding,
4648 paid_notify_payment_context, plan_failed_audit, record_audit_timeout_strike,
4649 timeout_strike_reaches_threshold, AuditFailureAction, AUDIT_TIMEOUT_STRIKE_MAX,
4650 };
4651 use crate::payment::VerificationContext;
4652 use crate::replication::recent_provers::RecentProvers;
4653 use crate::replication::types::AuditFailureReason;
4654 use saorsa_core::identity::PeerId;
4655 use std::collections::HashMap;
4656 use std::time::Duration;
4657 use std::time::Instant;
4658
4659 fn test_peer(b: u8) -> PeerId {
4660 let mut bytes = [0u8; 32];
4661 bytes[0] = b;
4662 PeerId::from_bytes(bytes)
4663 }
4664
4665 fn test_key(b: u8) -> crate::ant_protocol::XorName {
4666 let mut k = [0u8; 32];
4667 k[0] = b;
4668 k
4669 }
4670
4671 #[test]
4672 fn fresh_offer_runs_client_put_payment_checks() {
4673 assert_eq!(
4674 fresh_offer_payment_context(),
4675 VerificationContext::ClientPut
4676 );
4677 }
4678
4679 #[test]
4680 fn paid_notify_uses_paid_list_admission_payment_checks() {
4681 assert_eq!(
4682 paid_notify_payment_context(),
4683 VerificationContext::PaidListAdmission
4684 );
4685 }
4686
4687 #[test]
4688 fn audit_timeout_preserves_active_bootstrap_claim() {
4689 assert!(!audit_failure_clears_bootstrap_claim(
4690 &AuditFailureReason::Timeout
4691 ));
4692 }
4693
4694 fn strike_peer(b: u8) -> PeerId {
4695 let mut bytes = [0u8; 32];
4696 bytes[0] = b;
4697 PeerId::from_bytes(bytes)
4698 }
4699
4700 // HELPER-LEVEL: counter arithmetic + threshold predicate. The reset is
4701 // simulated by an in-test `strikes.remove`; the real reset path (the
4702 // `Passed` arm) is covered at the glue level below.
4703 #[test]
4704 fn single_timeout_then_success_emits_no_failure_and_resets() {
4705 let peer = strike_peer(1);
4706 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4707 let base = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
4708 let after_one = record_audit_timeout_strike(&mut strikes, &peer);
4709 assert_eq!(after_one, 1);
4710 assert!(!timeout_strike_reaches_threshold(after_one, base));
4711 strikes.remove(&peer);
4712 assert!(!strikes.contains_key(&peer));
4713 }
4714
4715 #[test]
4716 fn consecutive_timeouts_cross_threshold_at_n() {
4717 let peer = strike_peer(2);
4718 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4719 let n = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
4720 let mut last = 0;
4721 for i in 1..=n {
4722 last = record_audit_timeout_strike(&mut strikes, &peer);
4723 if i < n {
4724 assert!(!timeout_strike_reaches_threshold(last, n));
4725 }
4726 }
4727 assert!(timeout_strike_reaches_threshold(last, n));
4728 // The count keeps climbing past the base threshold (so it can also
4729 // cross a higher *adaptive* threshold), but is bounded by the strike
4730 // cap — no unbounded growth.
4731 let mut c = last;
4732 for _ in 0..200 {
4733 c = record_audit_timeout_strike(&mut strikes, &peer);
4734 }
4735 assert_eq!(
4736 c,
4737 super::AUDIT_TIMEOUT_STRIKE_MAX,
4738 "count saturates at the max cap"
4739 );
4740 assert!(c > n, "count must be able to exceed the base threshold");
4741 }
4742
4743 // ADR-0002 Network Resilience: adaptive timeout threshold.
4744
4745 #[test]
4746 fn median_timeout_strikes_basics() {
4747 let target = strike_peer(99);
4748 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4749 // No other peers → 0 (healthy network, threshold == base).
4750 assert_eq!(median_timeout_strikes_excluding(&strikes, &target), 0);
4751 strikes.insert(strike_peer(1), 1);
4752 strikes.insert(strike_peer(2), 3);
4753 strikes.insert(strike_peer(3), 5);
4754 // Sorted [1,3,5], lower-median index 1 → 3.
4755 assert_eq!(median_timeout_strikes_excluding(&strikes, &target), 3);
4756 }
4757
4758 // ADVERSARIAL (ADR point e + sybil-inflation bound). Two invariants the
4759 // existing suite leaves unpinned:
4760 // 1. EVEN-count inputs must take the LOWER of the two middle values. The
4761 // existing basics test only feeds an odd-length cohort, so an
4762 // implementation that used `len/2` (upper median) would still pass it.
4763 // Here [1,4] -> lower median 1 (not 4) and [2,4,6,8] -> 4 (not 6).
4764 // 2. A sybil cohort pinned at the *strike cap* (the most an attacker could
4765 // ever drive fabricated peers to) STILL cannot push the grace past
4766 // MAX_ADAPTIVE_TIMEOUT_GRACE: the threshold saturates at base + max
4767 // grace regardless of how high or how numerous the cohort is.
4768 // FLIPS IF: median switches to the upper element on even input, or the
4769 // grace clamp (`.min(MAX_ADAPTIVE_TIMEOUT_GRACE)`) is removed.
4770 #[test]
4771 fn even_count_takes_lower_median_and_sybil_cohort_cannot_exceed_grace_bound() {
4772 let target = strike_peer(150);
4773
4774 // Even count == 2: lower of [1, 4] is 1.
4775 let mut two: HashMap<PeerId, u32> = HashMap::new();
4776 two.insert(strike_peer(1), 1);
4777 two.insert(strike_peer(2), 4);
4778 assert_eq!(
4779 median_timeout_strikes_excluding(&two, &target),
4780 1,
4781 "even-count median must take the LOWER middle value (1), not the upper (4)"
4782 );
4783
4784 // Even count == 4: sorted [2,4,6,8], lower median index (4-1)/2 = 1 → 4.
4785 let mut four: HashMap<PeerId, u32> = HashMap::new();
4786 for (i, v) in (10u8..).zip([2u32, 4, 6, 8]) {
4787 four.insert(strike_peer(i), v);
4788 }
4789 assert_eq!(
4790 median_timeout_strikes_excluding(&four, &target),
4791 4,
4792 "even-count median must be the lower middle (4), not the upper (6)"
4793 );
4794
4795 // Sybil cohort pinned at the strike CAP — the strongest inflation an
4796 // attacker could mount — must not lift the threshold past base + max
4797 // grace. Try several cohort sizes (odd and even) to be sure.
4798 for cohort in [2u8, 5, 8, 20] {
4799 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4800 for i in 0..cohort {
4801 strikes.insert(strike_peer(50 + i), super::AUDIT_TIMEOUT_STRIKE_MAX);
4802 }
4803 let threshold = adaptive_timeout_threshold(&strikes, &target);
4804 assert_eq!(
4805 threshold,
4806 config::AUDIT_TIMEOUT_STRIKE_THRESHOLD + super::MAX_ADAPTIVE_TIMEOUT_GRACE,
4807 "a sybil cohort at the strike cap (size {cohort}) must saturate the grace at \
4808 the bound, never exceed it"
4809 );
4810 }
4811
4812 // And even at the bounded-but-inflated threshold, a genuinely
4813 // non-responsive target can still cross it (cap > max reachable
4814 // threshold), so the bound never shields a bad node forever.
4815 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4816 for i in 0..8u8 {
4817 strikes.insert(strike_peer(80 + i), super::AUDIT_TIMEOUT_STRIKE_MAX);
4818 }
4819 let threshold = adaptive_timeout_threshold(&strikes, &target);
4820 let mut c = 0;
4821 for _ in 0..(threshold + 5) {
4822 c = record_audit_timeout_strike(&mut strikes, &target);
4823 }
4824 assert!(
4825 timeout_strike_reaches_threshold(c, threshold),
4826 "target must still cross the bounded inflated threshold ({c} vs {threshold})"
4827 );
4828 }
4829
4830 #[test]
4831 fn lone_timing_out_peer_does_not_inflate_its_own_grace() {
4832 // The peer under judgement is excluded from the median, so a single bad
4833 // peer (the common case) is judged against the base threshold and caught
4834 // — it cannot raise its own bar as its strike count climbs.
4835 let bad = strike_peer(7);
4836 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4837 strikes.insert(bad, 5); // its own large count must not count
4838 assert_eq!(
4839 adaptive_timeout_threshold(&strikes, &bad),
4840 config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
4841 );
4842 }
4843
4844 #[test]
4845 fn widespread_timeouts_widen_the_grace() {
4846 // Genuine disruption: many OTHER honest peers carry timeout strikes. The
4847 // median rises, so the threshold for any given peer widens beyond the
4848 // base — the audit system does not pile onto a struggling network.
4849 let target = strike_peer(100);
4850 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4851 for i in 0..9u8 {
4852 strikes.insert(strike_peer(i), 4);
4853 }
4854 assert_eq!(
4855 adaptive_timeout_threshold(&strikes, &target),
4856 4 + config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
4857 );
4858 assert!(
4859 adaptive_timeout_threshold(&strikes, &target) > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
4860 );
4861 }
4862
4863 #[test]
4864 fn adaptive_grace_only_responds_to_timeouts_not_deterministic_failures() {
4865 // The strike map is fed ONLY by timeouts (plan_failed_audit records a
4866 // strike for Timeout and never for confirmed failures). So a flood of
4867 // deterministic failures cannot inflate the median to buy grace.
4868 let target = strike_peer(101);
4869 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
4870 // Many confirmed (non-timeout) failures: these must NOT touch the map.
4871 for i in 0..9u8 {
4872 let action = plan_failed_audit(
4873 &AuditFailureReason::DigestMismatch,
4874 &mut strikes,
4875 &strike_peer(i),
4876 );
4877 assert_eq!(action, AuditFailureAction::ConfirmedPenalize);
4878 }
4879 assert!(
4880 strikes.is_empty(),
4881 "deterministic failures must not record strikes"
4882 );
4883 // Threshold stays at the base — an attacker cannot widen grace by
4884 // failing audits on purpose.
4885 assert_eq!(
4886 adaptive_timeout_threshold(&strikes, &target),
4887 config::AUDIT_TIMEOUT_STRIKE_THRESHOLD
4888 );
4889 }
4890
4891 // ADR-0002: "occasional surprise exams, keeps load low" — the per-peer
4892 // cooldown must collapse a gossip flood into at most one audit per window.
4893
4894 #[test]
4895 fn gossip_flood_yields_at_most_one_audit_per_cooldown_window() {
4896 let peer = strike_peer(1);
4897 let mut map: HashMap<PeerId, Instant> = HashMap::new();
4898 let t0 = Instant::now();
4899 // First gossip in the window passes; a burst of further gossips at the
4900 // same instant are all suppressed.
4901 assert!(cooldown_allows_audit(&mut map, &peer, t0));
4902 let mut passed = 1;
4903 for _ in 0..100 {
4904 if cooldown_allows_audit(&mut map, &peer, t0) {
4905 passed += 1;
4906 }
4907 }
4908 assert_eq!(
4909 passed, 1,
4910 "a flood at one instant must trigger exactly one audit"
4911 );
4912 }
4913
4914 // ADR-0002 ordering invariant: `maybe_trigger_gossip_audit` stamps the
4915 // per-peer cooldown BEFORE the probability lottery, so a LOSING ticket still
4916 // consumes the window. This is the property the isolated cooldown tests above
4917 // cannot see: they never sample the lottery, so a regression that reordered
4918 // the gates (sample probability first, only stamp the cooldown on a win)
4919 // would still pass them while breaking flood-resistance: a flood would then
4920 // re-roll the lottery on EVERY message until one won, multiplying audits.
4921 //
4922 // We model the exact production gate order (cooldown-then-lottery) with a
4923 // lottery driven by a fixed outcome instead of `gen_bool(..)`. The first
4924 // message LOSES the lottery; the remaining flood messages all WIN. With the
4925 // production order, the losing first ticket burns the window and every later
4926 // winner in the same window is blocked, so there are 0 audits this window. If
4927 // the gates were flipped, the second message's winning ticket would slip
4928 // through. The window only reopens after the cooldown elapses.
4929 //
4930 // FLIPS IF: the lottery is sampled before `cooldown_allows_audit` (a losing
4931 // ticket no longer consumes the window), re-enabling a flood-amplified audit
4932 // storm.
4933 #[test]
4934 fn losing_lottery_still_consumes_cooldown_window() {
4935 // Faithful re-implementation of the two gates in
4936 // `maybe_trigger_gossip_audit`, with the lottery outcome made
4937 // deterministic instead of `rand::thread_rng().gen_bool(..)`.
4938 // Calls the SHIPPED `audit_launch_decision` (the same function
4939 // `maybe_trigger_gossip_audit` uses), so a reorder of the two gates in
4940 // production fails this test — not a local reimplementation.
4941 let peer = strike_peer(3);
4942 let mut map: HashMap<PeerId, Instant> = HashMap::new();
4943 let t0 = Instant::now();
4944
4945 // First flooded message at t0 LOSES the lottery, but the cooldown is
4946 // stamped BEFORE the lottery is consulted, so the window is now consumed.
4947 assert!(
4948 !audit_launch_decision(&mut map, &peer, t0, false),
4949 "a losing ticket launches no audit"
4950 );
4951
4952 // 99 more flooded messages at the same instant would all WIN the lottery,
4953 // yet every one must be blocked by the cooldown the loser already stamped.
4954 // (If production sampled the lottery FIRST, these would each get a fresh
4955 // roll and audits would multiply — this assertion catches that reorder.)
4956 let mut audits = 0;
4957 for _ in 0..99 {
4958 if audit_launch_decision(&mut map, &peer, t0, true) {
4959 audits += 1;
4960 }
4961 }
4962 assert_eq!(
4963 audits, 0,
4964 "a losing first ticket must consume the window so no later flooded \
4965 message in the same window can audit"
4966 );
4967
4968 // The window only reopens after the cooldown elapses; the next winning
4969 // ticket then launches exactly one audit.
4970 let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
4971 assert!(
4972 audit_launch_decision(&mut map, &peer, after, true),
4973 "after the cooldown a winning ticket audits again"
4974 );
4975 }
4976
4977 #[test]
4978 fn cooldown_lets_audit_through_after_the_window() {
4979 let peer = strike_peer(2);
4980 let mut map: HashMap<PeerId, Instant> = HashMap::new();
4981 let t0 = Instant::now();
4982 assert!(cooldown_allows_audit(&mut map, &peer, t0));
4983 // Within the window: suppressed.
4984 let within = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS - 1);
4985 assert!(!cooldown_allows_audit(&mut map, &peer, within));
4986 // Past the window: allowed again.
4987 let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
4988 assert!(cooldown_allows_audit(&mut map, &peer, after));
4989 }
4990
4991 #[test]
4992 fn cooldown_is_per_peer_independent() {
4993 let mut map: HashMap<PeerId, Instant> = HashMap::new();
4994 let t0 = Instant::now();
4995 // Different peers each get their own first-audit pass at the same instant.
4996 for i in 0..20u8 {
4997 assert!(
4998 cooldown_allows_audit(&mut map, &strike_peer(i), t0),
4999 "peer {i} should be auditable independently"
5000 );
5001 }
5002 }
5003
5004 #[test]
5005 fn inflated_adaptive_threshold_is_still_reachable_and_bounded() {
5006 // When the median lifts the threshold above the base, a genuinely
5007 // non-responsive peer's strike count must still be able to
5008 // reach it (the count is no longer capped at the base). And the grace
5009 // widening itself is bounded so it can't shield a bad node forever.
5010 let target = strike_peer(200);
5011 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5012 // A cohort of other peers each at a high strike count.
5013 for i in 0..9u8 {
5014 strikes.insert(strike_peer(i), 10);
5015 }
5016 let threshold = adaptive_timeout_threshold(&strikes, &target);
5017 // Grace is capped, so the threshold cannot exceed base + max grace.
5018 assert!(
5019 threshold <= config::AUDIT_TIMEOUT_STRIKE_THRESHOLD + super::MAX_ADAPTIVE_TIMEOUT_GRACE
5020 );
5021 assert!(threshold > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD);
5022 // The target peer can accumulate strikes past that inflated threshold.
5023 let mut c = 0;
5024 for _ in 0..threshold + 5 {
5025 c = record_audit_timeout_strike(&mut strikes, &target);
5026 }
5027 assert!(
5028 timeout_strike_reaches_threshold(c, threshold),
5029 "a persistent peer must be able to cross the inflated threshold ({c} vs {threshold})"
5030 );
5031 }
5032
5033 #[test]
5034 fn audit_on_gossip_constants_match_adr() {
5035 // Tripwire on the ADR-locked tunables. The spot-check count sits at the
5036 // top of the auditor's 3..=5 band (the auditor clamps to that band, so
5037 // values above 5 would silently never be requested).
5038 assert_eq!(config::AUDIT_SPOTCHECK_COUNT, 5);
5039 assert!((config::AUDIT_ON_GOSSIP_PROBABILITY - 0.2).abs() < f64::EPSILON);
5040 assert_eq!(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS, 30 * 60);
5041 }
5042
5043 // (d) A confirmed storage-integrity failure penalizes immediately and
5044 // revokes credit; it is not a timeout.
5045 #[test]
5046 fn digest_mismatch_is_not_a_timeout_and_penalizes_immediately() {
5047 assert!(audit_failure_clears_bootstrap_claim(
5048 &AuditFailureReason::DigestMismatch
5049 ));
5050 assert!(audit_failure_revokes_holder_credit(
5051 &AuditFailureReason::DigestMismatch
5052 ));
5053 }
5054
5055 // E2E (pure decision): an honest peer that times out once, recovers,
5056 // repeatedly, never reaches a penalty because each success resets strikes.
5057 // FLIPS IF: the strike threshold is removed or success stops resetting.
5058 #[test]
5059 fn e2e_honest_intermittent_timeouts_never_penalized() {
5060 let peer = strike_peer(10);
5061 let base = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
5062 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5063 for _ in 0..10 {
5064 let after = record_audit_timeout_strike(&mut strikes, &peer);
5065 assert_eq!(
5066 decide_audit_failure_action(&AuditFailureReason::Timeout, after, base),
5067 AuditFailureAction::TimeoutGrace
5068 );
5069 strikes.remove(&peer);
5070 }
5071 assert!(!strikes.contains_key(&peer));
5072 }
5073
5074 // E2E: a peer that times out on EVERY audit (never reset) crosses the
5075 // threshold and is penalized — the deterrent against non-storing peers.
5076 // FLIPS IF: per-challenge window widened so it answers in time, or strikes
5077 // reset without a success.
5078 #[test]
5079 fn e2e_persistent_timeouts_get_penalized() {
5080 let peer = strike_peer(11);
5081 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5082 let threshold = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
5083 let mut penalized_at = None;
5084 for tick in 1..=(threshold + 2) {
5085 let after = record_audit_timeout_strike(&mut strikes, &peer);
5086 if decide_audit_failure_action(&AuditFailureReason::Timeout, after, threshold)
5087 == AuditFailureAction::TimeoutPenalize
5088 && penalized_at.is_none()
5089 {
5090 penalized_at = Some(tick);
5091 }
5092 }
5093 assert_eq!(penalized_at, Some(threshold));
5094 }
5095
5096 // Glue: a Timeout through the real plan_failed_audit MUST record a strike on
5097 // the map AND penalize once enough accumulate.
5098 // FLIPS IF: the handler stops feeding Timeout through the strike counter
5099 // (e.g. strikes_after hard-coded to 0). (Mutation-verified.)
5100 #[test]
5101 fn e2e_glue_timeout_records_strike_and_penalizes_at_threshold() {
5102 let peer = strike_peer(20);
5103 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5104 let threshold = config::AUDIT_TIMEOUT_STRIKE_THRESHOLD;
5105 let mut action = AuditFailureAction::TimeoutGrace;
5106 for tick in 1..=threshold {
5107 action = plan_failed_audit(&AuditFailureReason::Timeout, &mut strikes, &peer);
5108 assert_eq!(strikes.get(&peer).copied(), Some(tick));
5109 }
5110 assert_eq!(action, AuditFailureAction::TimeoutPenalize);
5111 }
5112
5113 // Glue: a confirmed failure through plan_failed_audit must NOT touch the
5114 // strike map and must return ConfirmedPenalize.
5115 #[test]
5116 fn e2e_glue_confirmed_failure_leaves_strike_map_untouched() {
5117 let peer = strike_peer(21);
5118 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5119 for reason in [
5120 AuditFailureReason::DigestMismatch,
5121 AuditFailureReason::KeyAbsent,
5122 AuditFailureReason::Rejected,
5123 AuditFailureReason::MalformedResponse,
5124 ] {
5125 assert_eq!(
5126 plan_failed_audit(&reason, &mut strikes, &peer),
5127 AuditFailureAction::ConfirmedPenalize
5128 );
5129 }
5130 assert!(strikes.is_empty());
5131 }
5132
5133 // ADR-0002 "Accounting and False Positives", adversarial: a DETERMINISTIC
5134 // failure is acted on the FIRST time it occurs, "regardless of network
5135 // conditions". Here the strike map is pre-loaded with many *other* peers
5136 // timing out, which inflates the adaptive timeout grace to its cap — the
5137 // most forgiving the network ever gets. Under that maximally-relaxed
5138 // window:
5139 // - a brand-new peer's FIRST deterministic failure (DigestMismatch /
5140 // Rejected / MalformedResponse) STILL returns ConfirmedPenalize, never
5141 // a grace lane, and never touches the strike map; while
5142 // - that same peer's FIRST timeout is only TimeoutGrace.
5143 // This proves the inflated grace is the timeout-only lane and can NEVER be
5144 // weaponized to buy a deterministic failure even one round of delay.
5145 // FLIPS IF: deterministic failures start consulting the strike threshold,
5146 // or ConfirmedPenalize is collapsed into a timeout action.
5147 #[test]
5148 fn deterministic_failure_penalizes_first_time_under_inflated_grace() {
5149 let mut strikes: HashMap<PeerId, u32> = HashMap::new();
5150 // Saturate the adaptive grace: many other peers each carrying a high
5151 // consecutive-timeout count, so the median (and thus the grace) is
5152 // pushed to its MAX cap for any newly-judged peer.
5153 for b in 100..150u8 {
5154 let other = strike_peer(b);
5155 for _ in 0..AUDIT_TIMEOUT_STRIKE_MAX {
5156 record_audit_timeout_strike(&mut strikes, &other);
5157 }
5158 }
5159 let victim = strike_peer(7);
5160 // Sanity: the grace seen by the victim is genuinely inflated above base.
5161 let inflated = adaptive_timeout_threshold(&strikes, &victim);
5162 assert!(
5163 inflated > config::AUDIT_TIMEOUT_STRIKE_THRESHOLD,
5164 "test precondition: grace must be inflated, got {inflated}"
5165 );
5166
5167 // First deterministic failure of each kind -> ConfirmedPenalize on
5168 // occurrence #1, and the victim is never inserted into the strike map.
5169 for reason in [
5170 AuditFailureReason::DigestMismatch,
5171 AuditFailureReason::Rejected,
5172 AuditFailureReason::MalformedResponse,
5173 ] {
5174 let action = plan_failed_audit(&reason, &mut strikes, &victim);
5175 assert_eq!(
5176 action,
5177 AuditFailureAction::ConfirmedPenalize,
5178 "{reason:?} must penalize on the first occurrence regardless of grace"
5179 );
5180 assert_ne!(
5181 action,
5182 AuditFailureAction::TimeoutPenalize,
5183 "a deterministic failure must NOT be routed through the (eviction-gated) \
5184 timeout-penalize lane"
5185 );
5186 assert!(
5187 !strikes.contains_key(&victim),
5188 "deterministic failure must not touch the timeout strike map"
5189 );
5190 // And it always revokes holder credit / clears the claim.
5191 assert!(audit_failure_revokes_holder_credit(&reason));
5192 assert!(audit_failure_clears_bootstrap_claim(&reason));
5193 }
5194
5195 // The SAME victim's first timeout, under the same inflated grace, is
5196 // only TimeoutGrace (no penalty, no revocation, claim retained).
5197 let timeout_action = plan_failed_audit(&AuditFailureReason::Timeout, &mut strikes, &victim);
5198 assert_eq!(timeout_action, AuditFailureAction::TimeoutGrace);
5199 assert_eq!(strikes.get(&victim).copied(), Some(1));
5200 assert!(!audit_failure_revokes_holder_credit(
5201 &AuditFailureReason::Timeout
5202 ));
5203 assert!(!audit_failure_clears_bootstrap_claim(
5204 &AuditFailureReason::Timeout
5205 ));
5206 }
5207
5208 /// The exact decision the `Failed` arm of `handle_subtree_audit_result`
5209 /// uses: confirmed failures revoke credit, `Timeout` does not.
5210 #[test]
5211 fn confirmed_failures_revoke_credit_timeout_does_not() {
5212 for reason in [
5213 AuditFailureReason::MalformedResponse,
5214 AuditFailureReason::DigestMismatch,
5215 AuditFailureReason::KeyAbsent,
5216 AuditFailureReason::Rejected,
5217 ] {
5218 assert!(
5219 audit_failure_revokes_holder_credit(&reason),
5220 "confirmed failure {reason:?} must revoke holder credit"
5221 );
5222 }
5223 assert!(
5224 !audit_failure_revokes_holder_credit(&AuditFailureReason::Timeout),
5225 "Timeout must NOT revoke credit (single dropped packet != storage loss)"
5226 );
5227 }
5228
5229 /// Wiring test for the security fix: the helper the handler calls
5230 /// actually strips a credited peer on a confirmed failure
5231 /// (`DigestMismatch`), and actually RETAINS credit on `Timeout`.
5232 /// Records genuine credit first so neither assertion is vacuous;
5233 /// this fails if `forget_peer` stops being called, or if the
5234 /// `Timeout` exclusion is dropped (both verified by mutation).
5235 #[test]
5236 fn apply_revocation_strips_on_digest_mismatch_retains_on_timeout() {
5237 let peer = test_peer(0xAB);
5238 let key = test_key(1);
5239 let hash = [0xCD; 32];
5240
5241 // Confirmed failure -> credit revoked.
5242 let mut provers = RecentProvers::new();
5243 provers.record_proof(key, peer, hash, Instant::now());
5244 assert!(
5245 provers.is_credited_holder(&key, &peer, &hash),
5246 "precondition: peer credited before failure"
5247 );
5248 apply_audit_failure_credit_revocation(
5249 &mut provers,
5250 &peer,
5251 &AuditFailureReason::DigestMismatch,
5252 );
5253 assert!(
5254 !provers.is_credited_holder(&key, &peer, &hash),
5255 "DigestMismatch must strip the peer's holder credit"
5256 );
5257
5258 // Timeout -> credit retained.
5259 let mut provers_timeout = RecentProvers::new();
5260 provers_timeout.record_proof(key, peer, hash, Instant::now());
5261 apply_audit_failure_credit_revocation(
5262 &mut provers_timeout,
5263 &peer,
5264 &AuditFailureReason::Timeout,
5265 );
5266 assert!(
5267 provers_timeout.is_credited_holder(&key, &peer, &hash),
5268 "Timeout must retain holder credit (deliberate liveness cushion)"
5269 );
5270 }
5271
5272 #[test]
5273 fn decoded_audit_failures_clear_active_bootstrap_claim() {
5274 for reason in [
5275 AuditFailureReason::MalformedResponse,
5276 AuditFailureReason::DigestMismatch,
5277 AuditFailureReason::KeyAbsent,
5278 AuditFailureReason::Rejected,
5279 ] {
5280 assert!(
5281 audit_failure_clears_bootstrap_claim(&reason),
5282 "decoded non-bootstrap failure {reason:?} should clear active claim"
5283 );
5284 }
5285 }
5286
5287 #[test]
5288 fn first_failed_key_label_truncates_to_16_hex_chars() {
5289 // The high-order 8 bytes of the XorName determine the label so an
5290 // operator can group audit-failures on the same chunk prefix.
5291 let mut key = [0u8; 32];
5292 key[0] = 0x18;
5293 key[7] = 0xff;
5294 // Low-order bytes (positions 8..32) are deliberately set to 0xAA
5295 // to verify they are NOT included in the label.
5296 for byte in &mut key[8..] {
5297 *byte = 0xAA;
5298 }
5299 let label = first_failed_key_label(&[key]);
5300 // Only the first 8 bytes are encoded, low-order bytes are dropped.
5301 assert_eq!(label, "0x18000000000000ff");
5302 assert_eq!(label.len(), "0x".len() + 16);
5303 }
5304
5305 #[test]
5306 fn first_failed_key_label_falls_back_when_empty() {
5307 // Should never happen in production (handle_audit_failure rejects
5308 // empty sets), but the formatter must still produce a valid label
5309 // so the log line doesn't contain a misleading default.
5310 assert_eq!(first_failed_key_label(&[]), "0x");
5311 }
5312
5313 #[test]
5314 fn first_failed_key_label_uses_first_key_only() {
5315 let first = [0x11u8; 32];
5316 let second = [0x22u8; 32];
5317 assert_eq!(
5318 first_failed_key_label(&[first, second]),
5319 format!("0x{}", hex::encode(&first[..8]))
5320 );
5321 }
5322}