ant_node/replication/mod.rs
1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod commitment;
21pub mod commitment_state;
22pub mod config;
23pub mod fresh;
24pub mod neighbor_sync;
25pub mod paid_list;
26pub mod possession;
27pub mod protocol;
28pub mod pruning;
29pub mod quorum;
30pub mod recent_provers;
31pub mod scheduling;
32pub mod storage_commitment_audit;
33pub mod subtree;
34pub mod types;
35
36use std::collections::{HashMap, HashSet};
37use std::path::Path;
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use std::pin::Pin;
42
43use crate::logging::{debug, error, info, warn};
44use futures::stream::FuturesUnordered;
45use futures::{Future, StreamExt};
46use rand::Rng;
47use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
48use tokio::task::JoinHandle;
49use tokio_util::sync::CancellationToken;
50
51use crate::ant_protocol::XorName;
52use crate::error::{Error, Result};
53use crate::payment::{PaymentVerifier, VerificationContext};
54use crate::replication::audit::AuditTickResult;
55use crate::replication::commitment::{commitment_hash, StorageCommitment};
56use crate::replication::commitment_state::{PeerCommitmentRecord, ResponderCommitmentState};
57use crate::replication::config::{
58 max_parallel_fetch, storage_admission_width, ReplicationConfig, MAX_AUDIT_RESPONSES_PER_PEER,
59 MAX_CONCURRENT_AUDIT_RESPONSES, MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID,
60};
61use crate::replication::paid_list::PaidList;
62use crate::replication::protocol::{
63 FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
64 VerificationResponse,
65};
66use crate::replication::quorum::KeyVerificationOutcome;
67use crate::replication::recent_provers::RecentProvers;
68use crate::replication::scheduling::ReplicationQueues;
69use crate::replication::types::{
70 AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
71 NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
72};
73use crate::storage::LmdbStorage;
74use saorsa_core::identity::{NodeIdentity, PeerId};
75use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
76
77// ---------------------------------------------------------------------------
78// Constants
79// ---------------------------------------------------------------------------
80
81/// Prefix used by saorsa-core's request-response mechanism.
82const RR_PREFIX: &str = "/rr/";
83
84fn fresh_offer_payment_context() -> VerificationContext {
85 VerificationContext::ClientPut
86}
87
88fn paid_notify_payment_context() -> VerificationContext {
89 VerificationContext::PaidListAdmission
90}
91
92/// Boxed future type for in-flight fetch tasks.
93type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
94
95/// Shared dependencies for one verification worker cycle.
96struct VerificationCycleContext<'a> {
97 p2p_node: &'a Arc<P2PNode>,
98 paid_list: &'a Arc<PaidList>,
99 storage: &'a Arc<LmdbStorage>,
100 queues: &'a Arc<RwLock<ReplicationQueues>>,
101 config: &'a ReplicationConfig,
102 bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
103 is_bootstrapping: &'a Arc<RwLock<bool>>,
104 bootstrap_complete_notify: &'a Arc<Notify>,
105 /// v12 §6 holder-eligibility inputs. The verifier downgrades a
106 /// peer's Present claim to Unresolved unless they're a credited
107 /// holder of the key (i.e. they recently passed a commitment-bound
108 /// audit on it under their currently-credited commitment hash).
109 last_commitment_by_peer: &'a Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
110 ever_capable_peers: &'a Arc<RwLock<HashSet<PeerId>>>,
111 recent_provers: &'a Arc<RwLock<RecentProvers>>,
112}
113
114/// Fetch worker polling interval in milliseconds.
115const FETCH_WORKER_POLL_MS: u64 = 100;
116
117/// Verification worker polling interval in milliseconds.
118const VERIFICATION_WORKER_POLL_MS: u64 = 250;
119
120/// Verification cycle duration that is worth surfacing at info level.
121const VERIFICATION_CYCLE_SLOW_LOG_MS: u128 = 500;
122
123/// Standard trust event weight for per-operation success/failure signals.
124///
125/// Used for individual replication fetch outcomes, integrity check failures,
126/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
127/// is reserved for confirmed audit failures.
128const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
129
130/// Bootstrap drain check interval in seconds.
131const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
132
133/// How often the responder rebuilds + rotates its storage commitment.
134///
135/// Each rebuild scans LMDB to compute leaf hashes; for ~10k keys this is
136/// sub-100ms (BLAKE3 + tree build). Retention is gossip-anchored, NOT
137/// rotation-anchored: the responder stays answerable for the current
138/// commitment plus the last `RETAINED_GOSSIPED_COMMITMENTS` (= 2) it
139/// actually gossiped, each kept for `GOSSIP_ANSWERABILITY_TTL` (3 h) after
140/// its last emission (see `commitment_state`). So the rotation cadence does
141/// not by itself bound answerability — a gossiped commitment stays
142/// answerable across rotations until its gossip TTL lapses.
143///
144/// Default: 1 hour, aligned with the worst-case neighbor-sync cooldown
145/// (`NEIGHBOR_SYNC_COOLDOWN_SECS = 3600`). Because the gossip TTL (3 h)
146/// comfortably exceeds the gap between our rotation and the next gossip
147/// arrival at a remote peer, this prevents the "unknown commitment hash" ->
148/// Idle audit-skip pattern from being the common case.
149///
150/// Why not faster: the v12 pin is bound to a specific point-in-time
151/// commitment, so rotation isn't security-critical for pin freshness —
152/// only for keeping the committed key set current as the responder
153/// writes new keys. 1 hour is plenty for that, and slow enough that
154/// honest auditors mostly hit `current` or `previous` rather than the
155/// "rotated past" case.
156const COMMITMENT_ROTATION_INTERVAL_SECS: u64 = 3600;
157
158/// Minimum interval between commitment signature verifications for a
159/// single peer (v10/v12 §2 step 3 + §11 `DoS`).
160///
161/// A sybil that bypasses the routing-table gate (e.g. by transient
162/// bucket pollution) could otherwise force one ML-DSA-65 verify (~1 ms)
163/// per gossip message. This rate limit caps the verify-per-peer rate
164/// at 1/min, which is comfortably above the legitimate gossip cadence
165/// (the 10-20 min neighbor-sync round on each peer).
166const COMMITMENT_SIG_VERIFY_MIN_INTERVAL: Duration = Duration::from_secs(60);
167
168/// Hard cap on the size of `last_commitment_by_peer`.
169///
170/// Bounds the per-process memory cost of the auditor's per-peer
171/// commitment cache. Each entry holds a `StorageCommitment`
172/// (~5 KiB: 1952-byte pubkey + 3293-byte signature + small fields).
173/// At 4096 entries the cache is ~20 MiB, which comfortably covers a
174/// realistic close-group neighborhood. When the cap is hit, one
175/// arbitrary existing entry is evicted on insert (`HashMap` iteration
176/// order is unspecified; we do not track insertion order). The
177/// `PeerRemoved` handler proactively drops entries as the DHT
178/// detects departures, and `ingest_peer_commitment` only admits
179/// commitments from peers currently in the routing table — together
180/// the cap is the third line of defence against sybil/churn flooding.
181const MAX_LAST_COMMITMENT_BY_PEER: usize = 4096;
182
183/// Cap on the sticky `ever_capable_peers` set. Bounds memory so a
184/// long-running bootstrap node cannot have the set grow without limit
185/// from peer-id churn. Sized at 4x `MAX_LAST_COMMITMENT_BY_PEER` so
186/// the set comfortably outlives normal LRU churn but still caps the
187/// blast radius of identity-rotation attacks. Once full we refuse new
188/// inserts (no eviction) — keeps the historic set stable; new v12
189/// peers above the cap are treated as legacy on rejoin, which matches
190/// the behaviour before this set existed, not a security regression.
191const MAX_EVER_CAPABLE_PEERS: usize = 4 * MAX_LAST_COMMITMENT_BY_PEER;
192
193// ---------------------------------------------------------------------------
194// ReplicationEngine
195// ---------------------------------------------------------------------------
196
197/// The replication engine manages all replication background tasks and state.
198pub struct ReplicationEngine {
199 /// Replication configuration (shared across spawned tasks).
200 config: Arc<ReplicationConfig>,
201 /// P2P networking node.
202 p2p_node: Arc<P2PNode>,
203 /// Local chunk storage.
204 storage: Arc<LmdbStorage>,
205 /// Persistent paid-for-list.
206 paid_list: Arc<PaidList>,
207 /// Payment verifier for `PoP` validation.
208 payment_verifier: Arc<PaymentVerifier>,
209 /// Replication pipeline queues.
210 queues: Arc<RwLock<ReplicationQueues>>,
211 /// Neighbor sync cycle state.
212 sync_state: Arc<RwLock<NeighborSyncState>>,
213 /// Per-peer sync history (for `RepairOpportunity`).
214 ///
215 /// This map grows with peer churn and is intentionally unbounded: entries
216 /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
217 /// naturally bounded by the routing table's k-bucket capacity.
218 sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
219 /// Per-peer cooldown for gossip-triggered subtree audits (ADR-0002).
220 ///
221 /// Records when each peer was last audited so a burst of gossiped
222 /// commitment changes cannot spawn back-to-back audits of the same peer.
223 /// Bounded by routing-table membership and cleaned on `PeerRemoved`.
224 audit_on_gossip_cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
225 /// Completed local neighbor-sync cycle epoch for proof maturity.
226 sync_cycle_epoch: Arc<RwLock<u64>>,
227 /// Per-key repair proof tracking for audit eligibility.
228 repair_proofs: Arc<RwLock<RepairProofs>>,
229 /// Bootstrap state tracking.
230 bootstrap_state: Arc<RwLock<BootstrapState>>,
231 /// Whether this node is currently bootstrapping.
232 is_bootstrapping: Arc<RwLock<bool>>,
233 /// Trigger for early neighbor sync (signalled on topology changes).
234 sync_trigger: Arc<Notify>,
235 /// Notified when `is_bootstrapping` transitions from `true` to `false`.
236 bootstrap_complete_notify: Arc<Notify>,
237 /// Node identity (for signing storage commitments).
238 ///
239 /// Phase 3 of the v12 storage-bound audit design. The responder
240 /// uses this to sign its periodically-built `StorageCommitment`.
241 identity: Arc<NodeIdentity>,
242 /// Responder-side commitment state (two-slot atomic rotation).
243 ///
244 /// Periodically rebuilt from the live LMDB key set; gossiped on
245 /// outbound `NeighborSyncRequest`/`Response`; consulted by the
246 /// commitment-bound audit handler.
247 commitment_state: Arc<ResponderCommitmentState>,
248 /// Auditor-side per-peer commitment record (last known commitment +
249 /// sticky `commitment_capable` flag).
250 ///
251 /// Populated whenever an inbound gossip carries a verified
252 /// commitment from the sender. Used by `audit_tick` to snapshot
253 /// `expected_commitment_hash` into outbound challenges, and by
254 /// holder-eligibility (§6) to decide whether a peer's `recent_provers`
255 /// proof should be honoured. The sticky `commitment_capable` flag
256 /// flips true on first successful ingest and never reverts (§2
257 /// step 5).
258 last_commitment_by_peer: Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
259 /// Sticky set of peer IDs we have EVER seen carrying a v12
260 /// commitment, independent of whether their commitment bytes are
261 /// still in `last_commitment_by_peer`. The §6 holder-eligibility
262 /// closure consults this set to keep treating churned-out
263 /// previously-v12 peers as v12-capable (rather than degrading them
264 /// to "legacy" credit-unconditionally) when they re-appear on the
265 /// network before their next gossip arrives. Bounded growth: even
266 /// at one million peers seen over the node's lifetime, the set is
267 /// 32 MB.
268 ever_capable_peers: Arc<RwLock<HashSet<PeerId>>>,
269 /// Auditor-side holder-eligibility cache (v12 §6).
270 ///
271 /// Recorded on successful commitment-bound audit; read by future
272 /// quorum / paid-list eligibility checks (phase-3 stretch).
273 recent_provers: Arc<RwLock<RecentProvers>>,
274 /// Per-peer last sig-verify attempt timestamp for the §2 step 3 /
275 /// §11 `DoS` rate limit. Bumped on EVERY verify attempt (success or
276 /// failure) so a peer we've never successfully verified can't burn
277 /// CPU on a flood of structurally-plausible-but-invalid gossips.
278 /// Lives separately from `last_commitment_by_peer` because that
279 /// map's records only exist after a successful verify.
280 sig_verify_attempts: Arc<RwLock<HashMap<PeerId, Instant>>>,
281 /// Limits concurrent outbound replication sends to prevent bandwidth
282 /// saturation on home broadband connections.
283 send_semaphore: Arc<Semaphore>,
284 /// Bounds concurrent IN-FLIGHT audit-responder tasks (subtree round 1 +
285 /// byte round 2). Those are spawned off the serial message loop so disk
286 /// reads don't block replication; the semaphore restores a global
287 /// backpressure ceiling so the node can't fan out unbounded `get_raw` reads
288 /// / multi-MiB byte serves.
289 audit_responder_semaphore: Arc<Semaphore>,
290 /// Per-source in-flight audit-responder counts, capped at
291 /// [`MAX_AUDIT_RESPONSES_PER_PEER`]. The GLOBAL semaphore alone is not
292 /// flood-fair: one peer spamming challenges could occupy every slot and
293 /// starve honest auditors, whose dropped challenges then convert to
294 /// audit timeouts against HONEST peers (codex-r2 A). This
295 /// per-peer cap guarantees no single source can hold more than its share,
296 /// so a flood self-throttles without denying service to everyone else.
297 audit_responder_inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
298 /// Receiver for fresh-write events from the chunk PUT handler.
299 ///
300 /// When present, `start()` spawns a drainer task that calls
301 /// `replicate_fresh` for each event.
302 fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
303 /// Sender for delayed possession-check events (ADR-0003). The fresh-write
304 /// drainer pushes the responsible close-group peers here after each fresh
305 /// replication; the possession-check scheduler drains the paired receiver.
306 possession_check_tx: mpsc::UnboundedSender<possession::PossessionCheckEvent>,
307 /// Receiver paired with `possession_check_tx`; taken by the scheduler task.
308 possession_check_rx: Option<mpsc::UnboundedReceiver<possession::PossessionCheckEvent>>,
309 /// Shutdown token.
310 shutdown: CancellationToken,
311 /// Background task handles.
312 task_handles: Vec<JoinHandle<()>>,
313}
314
315impl ReplicationEngine {
316 /// Create a new replication engine.
317 ///
318 /// # Errors
319 ///
320 /// Returns an error if the `PaidList` LMDB environment cannot be opened
321 /// or if the configuration fails validation.
322 #[allow(clippy::too_many_arguments)]
323 pub async fn new(
324 config: ReplicationConfig,
325 p2p_node: Arc<P2PNode>,
326 storage: Arc<LmdbStorage>,
327 payment_verifier: Arc<PaymentVerifier>,
328 identity: Arc<NodeIdentity>,
329 root_dir: &Path,
330 fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
331 shutdown: CancellationToken,
332 ) -> Result<Self> {
333 config.validate().map_err(Error::Config)?;
334
335 let paid_list = Arc::new(
336 PaidList::new(root_dir)
337 .await
338 .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
339 );
340
341 let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
342 let config = Arc::new(config);
343 let (possession_check_tx, possession_check_rx) = mpsc::unbounded_channel();
344
345 Ok(Self {
346 config: Arc::clone(&config),
347 p2p_node,
348 storage,
349 paid_list,
350 payment_verifier,
351 queues: Arc::new(RwLock::new(ReplicationQueues::new())),
352 sync_state: Arc::new(RwLock::new(initial_neighbors)),
353 sync_history: Arc::new(RwLock::new(HashMap::new())),
354 audit_on_gossip_cooldown: Arc::new(RwLock::new(HashMap::new())),
355 sync_cycle_epoch: Arc::new(RwLock::new(0)),
356 repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
357 bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
358 is_bootstrapping: Arc::new(RwLock::new(true)),
359 sync_trigger: Arc::new(Notify::new()),
360 bootstrap_complete_notify: Arc::new(Notify::new()),
361 identity,
362 commitment_state: Arc::new(ResponderCommitmentState::new()),
363 last_commitment_by_peer: Arc::new(RwLock::new(HashMap::new())),
364 ever_capable_peers: Arc::new(RwLock::new(HashSet::new())),
365 recent_provers: Arc::new(RwLock::new(RecentProvers::new())),
366 sig_verify_attempts: Arc::new(RwLock::new(HashMap::new())),
367 send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
368 audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)),
369 audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())),
370 fresh_write_rx: Some(fresh_write_rx),
371 possession_check_tx,
372 possession_check_rx: Some(possession_check_rx),
373 shutdown,
374 task_handles: Vec::new(),
375 })
376 }
377
378 /// Get a reference to the `PaidList`.
379 #[must_use]
380 pub fn paid_list(&self) -> &Arc<PaidList> {
381 &self.paid_list
382 }
383
384 /// Get a reference to the responder's commitment state. Used by audit
385 /// handlers to look up commitments by hash; used by the rotation tick
386 /// to install fresh ones.
387 #[must_use]
388 pub fn commitment_state(&self) -> &Arc<ResponderCommitmentState> {
389 &self.commitment_state
390 }
391
392 /// Get a reference to the auditor's last-commitment-by-peer table.
393 #[must_use]
394 pub fn last_commitment_by_peer(&self) -> &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>> {
395 &self.last_commitment_by_peer
396 }
397
398 /// Get a reference to the holder-eligibility cache. Phase-3 stretch:
399 /// will be read by quorum / paid-list eligibility checks.
400 #[must_use]
401 pub fn recent_provers(&self) -> &Arc<RwLock<RecentProvers>> {
402 &self.recent_provers
403 }
404
405 /// Test-only: rebuild + rotate this node's storage commitment now over its
406 /// current key set (normally on a 1h timer). Lets a test commit to chunks it
407 /// just stored without waiting for the rotation cadence.
408 ///
409 /// # Errors
410 ///
411 /// Propagates any error from reading the local key set or building/signing
412 /// the commitment.
413 #[cfg(any(test, feature = "test-utils"))]
414 pub async fn rebuild_commitment_now(&self) -> Result<()> {
415 rebuild_and_rotate_commitment(
416 &self.storage,
417 &self.identity,
418 &self.commitment_state,
419 &self.p2p_node,
420 &self.config,
421 )
422 .await
423 }
424
425 /// Test-only: directly seed this node's cached commitment for `peer`,
426 /// simulating "we received `peer`'s gossiped commitment" without depending
427 /// on neighbor-sync propagation timing. Lets a two-node audit test pin the
428 /// peer's commitment deterministically.
429 #[cfg(any(feature = "test-utils", test))]
430 pub async fn inject_peer_commitment_for_test(
431 &self,
432 peer: &PeerId,
433 commitment: StorageCommitment,
434 ) {
435 let now = Instant::now();
436 self.last_commitment_by_peer
437 .write()
438 .await
439 .insert(*peer, PeerCommitmentRecord::from_verified(commitment, now));
440 self.ever_capable_peers.write().await.insert(*peer);
441 }
442
443 /// Test-only: run ONE subtree audit against `peer` right now, pinned to the
444 /// commitment this node has cached for it (from gossip), over the live wire.
445 /// Returns the audit outcome so tests can assert honest-pass / adversary-fail
446 /// in a real two-node setting without waiting for the gossip cadence.
447 ///
448 /// Returns `AuditTickResult::Idle` if we have no cached commitment for the
449 /// peer yet (gossip hasn't reached us). Gated to test builds.
450 #[cfg(any(test, feature = "test-utils"))]
451 pub async fn audit_peer_now(&self, peer: &PeerId) -> audit::AuditTickResult {
452 let target = {
453 let map = self.last_commitment_by_peer.read().await;
454 map.get(peer)
455 .and_then(PeerCommitmentRecord::last_commitment)
456 .and_then(|c| commitment_hash(c).map(|h| (h, c.key_count)))
457 };
458 let Some((pin, key_count)) = target else {
459 return audit::AuditTickResult::Idle;
460 };
461 let credit = storage_commitment_audit::AuditCredit {
462 recent_provers: &self.recent_provers,
463 };
464 storage_commitment_audit::run_subtree_audit(
465 &self.p2p_node,
466 &self.config,
467 peer,
468 pin,
469 key_count,
470 Some(&credit),
471 )
472 .await
473 }
474
475 /// Test-only: run the possession check immediately for `key` against
476 /// `peers`, bypassing the scheduler's randomised 5-15 minute settle delay.
477 ///
478 /// Penalises any peer that does not hold `key` at `AuditChallenge`
479 /// severity (ADR-0003). Lets e2e tests assert the detection+penalty path
480 /// deterministically without waiting for the scheduled check.
481 #[cfg(any(test, feature = "test-utils"))]
482 pub async fn run_possession_check_now(&self, key: XorName, peers: Vec<PeerId>) {
483 possession::run_possession_check(
484 key,
485 peers,
486 &self.p2p_node,
487 &self.storage,
488 &self.config,
489 &self.sync_state,
490 &self.shutdown,
491 )
492 .await;
493 }
494
495 /// Start all background tasks.
496 ///
497 /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
498 /// the `BootstrapComplete` event emitted during DHT bootstrap is not
499 /// missed by the bootstrap-sync gate.
500 pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
501 if !self.task_handles.is_empty() {
502 error!("ReplicationEngine::start() called while already running — ignoring");
503 return;
504 }
505 info!("Starting replication engine");
506
507 self.start_message_handler();
508 self.start_neighbor_sync_loop();
509 self.start_self_lookup_loop();
510 // Audit #2 (responsible-chunk): periodic tick auditing peers for the
511 // chunks they SHOULD store (responsibility + prior hint).
512 self.start_audit_loop();
513 // Audit #1 (storage-commitment) is gossip-triggered in the message
514 // handler when a peer's commitment is ingested, not on a periodic tick.
515 self.start_commitment_rotation_loop();
516 self.start_fetch_worker();
517 self.start_verification_worker();
518 self.start_bootstrap_sync(dht_events);
519 self.start_fresh_write_drainer();
520 self.start_possession_check_scheduler();
521
522 info!(
523 "Replication engine started with {} background tasks",
524 self.task_handles.len()
525 );
526 }
527
528 /// Returns `true` if the node is still in the replication bootstrap phase.
529 ///
530 /// During bootstrap, audit challenges return `Bootstrapping` instead of
531 /// digests, and neighbor sync responses carry `bootstrapping: true`.
532 pub async fn is_bootstrapping(&self) -> bool {
533 *self.is_bootstrapping.read().await
534 }
535
536 /// Wait until the replication bootstrap phase completes.
537 ///
538 /// Returns immediately if bootstrap has already completed. Useful for
539 /// readiness probes, health checks, and test harnesses that need the
540 /// node to be fully operational before proceeding.
541 ///
542 /// Returns `true` if bootstrap completed within the timeout, `false`
543 /// if the timeout elapsed first.
544 pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
545 // Register the notification future *before* checking the flag so that
546 // a transition between the read and the await is not missed.
547 let notified = self.bootstrap_complete_notify.notified();
548 tokio::pin!(notified);
549 notified.as_mut().enable();
550
551 if !*self.is_bootstrapping.read().await {
552 return true;
553 }
554
555 tokio::time::timeout(timeout, notified).await.is_ok()
556 }
557
558 /// Cancel all background tasks and wait for them to terminate.
559 ///
560 /// This must be awaited before dropping the engine when the caller needs
561 /// the `Arc<LmdbStorage>` references held by background tasks to be
562 /// released (e.g. before reopening the same LMDB environment).
563 pub async fn shutdown(&mut self) {
564 self.shutdown.cancel();
565 for (i, mut handle) in self.task_handles.drain(..).enumerate() {
566 match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
567 Ok(Ok(())) => {}
568 Ok(Err(e)) if e.is_cancelled() => {}
569 Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
570 Err(_) => {
571 warn!("Replication task {i} did not stop within 10s, aborting");
572 handle.abort();
573 }
574 }
575 }
576 }
577
578 /// Trigger an early neighbor sync round.
579 ///
580 /// Useful after topology changes (new nodes joining, network heal after
581 /// partition) when the caller wants replication to converge faster than
582 /// the regular 10-20 minute cadence.
583 pub fn trigger_neighbor_sync(&self) {
584 self.sync_trigger.notify_one();
585 }
586
587 /// Execute fresh replication for a newly stored record, then schedule the
588 /// delayed possession check for the responsible close-group peers
589 /// (ADR-0003). The production PUT path schedules via the fresh-write
590 /// drainer; this direct entry point schedules here so callers (and tests)
591 /// that drive replication directly still get the possession check.
592 pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
593 let peers = fresh::replicate_fresh(
594 key,
595 data,
596 proof_of_payment,
597 &self.p2p_node,
598 &self.paid_list,
599 &self.config,
600 &self.send_semaphore,
601 )
602 .await;
603 if !peers.is_empty() {
604 let _ = self
605 .possession_check_tx
606 .send(possession::PossessionCheckEvent { key: *key, peers });
607 }
608 }
609
610 // =======================================================================
611 // Background task launchers
612 // =======================================================================
613
614 /// Spawn a task that drains the fresh-write channel and triggers
615 /// replication for each newly-stored chunk.
616 fn start_fresh_write_drainer(&mut self) {
617 let Some(mut rx) = self.fresh_write_rx.take() else {
618 return;
619 };
620 let p2p = Arc::clone(&self.p2p_node);
621 let paid_list = Arc::clone(&self.paid_list);
622 let config = Arc::clone(&self.config);
623 let send_semaphore = Arc::clone(&self.send_semaphore);
624 let possession_tx = self.possession_check_tx.clone();
625 let shutdown = self.shutdown.clone();
626
627 let handle = tokio::spawn(async move {
628 loop {
629 tokio::select! {
630 () = shutdown.cancelled() => break,
631 event = rx.recv() => {
632 let Some(event) = event else { break };
633 let peers = fresh::replicate_fresh(
634 &event.key,
635 &event.data,
636 &event.payment_proof,
637 &p2p,
638 &paid_list,
639 &config,
640 &send_semaphore,
641 )
642 .await;
643 // Schedule the delayed possession check (ADR-0003) for
644 // the responsible close-group peers. A closed receiver
645 // (engine shutting down) is ignored.
646 if !peers.is_empty() {
647 let _ = possession_tx.send(possession::PossessionCheckEvent {
648 key: event.key,
649 peers,
650 });
651 }
652 }
653 }
654 }
655 debug!("Fresh-write drainer shut down");
656 });
657 self.task_handles.push(handle);
658 }
659
660 /// Spawn the possession-check scheduler (ADR-0003).
661 ///
662 /// Drains scheduled possession-check events and, for each, waits a
663 /// randomised 5-15 minute settle delay before probing every responsible
664 /// peer for actual possession. A peer that cryptographically fails to prove
665 /// possession, including by timeout, is penalised at `AuditChallenge`
666 /// severity.
667 fn start_possession_check_scheduler(&mut self) {
668 let Some(mut rx) = self.possession_check_rx.take() else {
669 return;
670 };
671 let p2p = Arc::clone(&self.p2p_node);
672 let storage = Arc::clone(&self.storage);
673 let config = Arc::clone(&self.config);
674 let sync_state = Arc::clone(&self.sync_state);
675 let shutdown = self.shutdown.clone();
676
677 let handle = tokio::spawn(async move {
678 loop {
679 tokio::select! {
680 () = shutdown.cancelled() => break,
681 event = rx.recv() => {
682 let Some(event) = event else { break };
683 // Spawn a per-chunk delayed check so the drain loop
684 // keeps pace with the write rate. Each check sleeps the
685 // randomised settle delay, then probes every peer.
686 let p2p = Arc::clone(&p2p);
687 let storage = Arc::clone(&storage);
688 let config = Arc::clone(&config);
689 let sync_state = Arc::clone(&sync_state);
690 let shutdown = shutdown.clone();
691 let delay_min = config.possession_check_delay_min;
692 let delay_max = config.possession_check_delay_max;
693 tokio::spawn(async move {
694 let delay = possession::random_delay(delay_min, delay_max);
695 tokio::select! {
696 () = shutdown.cancelled() => {}
697 () = tokio::time::sleep(delay) => {
698 possession::run_possession_check(
699 event.key,
700 event.peers,
701 &p2p,
702 &storage,
703 &config,
704 &sync_state,
705 &shutdown,
706 )
707 .await;
708 }
709 }
710 });
711 }
712 }
713 }
714 debug!("Possession-check scheduler shut down");
715 });
716 self.task_handles.push(handle);
717 }
718
719 #[allow(clippy::too_many_lines)]
720 fn start_message_handler(&mut self) {
721 let mut p2p_events = self.p2p_node.subscribe_events();
722 let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
723 let p2p = Arc::clone(&self.p2p_node);
724 let storage = Arc::clone(&self.storage);
725 let paid_list = Arc::clone(&self.paid_list);
726 let payment_verifier = Arc::clone(&self.payment_verifier);
727 let queues = Arc::clone(&self.queues);
728 let config = Arc::clone(&self.config);
729 let shutdown = self.shutdown.clone();
730 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
731 let bootstrap_state = Arc::clone(&self.bootstrap_state);
732 let sync_history = Arc::clone(&self.sync_history);
733 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
734 let repair_proofs = Arc::clone(&self.repair_proofs);
735 let sync_trigger = Arc::clone(&self.sync_trigger);
736 let my_commitment_state = Arc::clone(&self.commitment_state);
737 let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
738 let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
739 let recent_provers = Arc::clone(&self.recent_provers);
740 let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
741 let audit_on_gossip_cooldown = Arc::clone(&self.audit_on_gossip_cooldown);
742 let sync_state = Arc::clone(&self.sync_state);
743 let audit_responder_semaphore = Arc::clone(&self.audit_responder_semaphore);
744 let audit_responder_inflight = Arc::clone(&self.audit_responder_inflight);
745
746 // ADR-0002 gossip-audit trigger: bundled state so an ingested *changed*
747 // commitment can spawn a probabilistic, cooldown-gated subtree audit.
748 let gossip_audit = GossipAuditTrigger {
749 p2p_node: Arc::clone(&p2p),
750 config: Arc::clone(&config),
751 recent_provers: Arc::clone(&recent_provers),
752 sync_state: Arc::clone(&sync_state),
753 cooldown: Arc::clone(&audit_on_gossip_cooldown),
754 };
755
756 let handle = tokio::spawn(async move {
757 loop {
758 tokio::select! {
759 () = shutdown.cancelled() => break,
760 event = p2p_events.recv() => {
761 let Ok(event) = event else { continue };
762 if let P2PEvent::Message {
763 topic,
764 source: Some(source),
765 data,
766 ..
767 } = event {
768 // Determine if this is a replication message
769 // and whether it arrived via the /rr/ request-response
770 // path (which wraps payloads in RequestResponseEnvelope).
771 let rr_info = if topic == REPLICATION_PROTOCOL_ID {
772 Some((data.clone(), None))
773 } else if topic.starts_with(RR_PREFIX)
774 && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
775 {
776 P2PNode::parse_request_envelope(&data)
777 .filter(|(_, is_resp, _)| !is_resp)
778 .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
779 } else {
780 None
781 };
782 if let Some((payload, rr_message_id)) = rr_info {
783 match handle_replication_message(
784 &source,
785 &payload,
786 &p2p,
787 &storage,
788 &paid_list,
789 &payment_verifier,
790 &queues,
791 &config,
792 &is_bootstrapping,
793 &bootstrap_state,
794 &sync_history,
795 &sync_cycle_epoch,
796 &repair_proofs,
797 &last_commitment_by_peer,
798 &ever_capable_peers,
799 &sig_verify_attempts,
800 &my_commitment_state,
801 &gossip_audit,
802 &audit_responder_semaphore,
803 &audit_responder_inflight,
804 rr_message_id.as_deref(),
805 ).await {
806 Ok(()) => {}
807 Err(e) => {
808 debug!(
809 "Replication message from {source} error: {e}"
810 );
811 }
812 }
813 }
814 }
815 }
816 // Gap 4: Topology churn handling (Section 13).
817 //
818 // The DHT routing table emits KClosestPeersChanged when the
819 // K-closest peer set actually changes, which is the precise
820 // signal for triggering neighbor sync. This replaces the
821 // previous approach of checking every PeerConnected /
822 // PeerDisconnected event against the close group.
823 dht_event = dht_events.recv() => {
824 let Ok(dht_event) = dht_event else { continue };
825 match dht_event {
826 DhtNetworkEvent::KClosestPeersChanged { old, new } => {
827 let old_peers = old
828 .iter()
829 .take(config.neighbor_sync_scope)
830 .copied()
831 .collect::<HashSet<_>>();
832 let new_scoped = new
833 .iter()
834 .take(config.neighbor_sync_scope)
835 .copied()
836 .collect::<Vec<_>>();
837 let new_peers =
838 new_scoped.iter().copied().collect::<HashSet<_>>();
839 let entrants = new_scoped
840 .iter()
841 .copied()
842 .filter(|peer| !old_peers.contains(peer))
843 .collect::<Vec<_>>();
844 let entrant_count = entrants.len();
845 let (priority_insertions, sync_removals) = {
846 let mut state = sync_state.write().await;
847 let sync_removals = state.retain_sync_peers(&new_peers);
848 let priority_insertions = state.queue_priority_peers(entrants);
849 (priority_insertions, sync_removals)
850 };
851 if priority_insertions > 0 {
852 debug!(
853 "K-closest peers changed, queued {priority_insertions}/{entrant_count} new close peers for priority neighbor sync and pruned {sync_removals} departed pending sync entries"
854 );
855 } else {
856 debug!(
857 "K-closest peers changed, no additional close peers queued, pruned {sync_removals} departed pending sync entries, triggering early neighbor sync"
858 );
859 }
860 sync_trigger.notify_one();
861 }
862 DhtNetworkEvent::PeerRemoved { peer_id } => {
863 sync_state.write().await.remove_peer(&peer_id);
864 repair_proofs.write().await.remove_peer(&peer_id);
865 // v12: drop the commitment bytes and the
866 // recent-prover credit so a churn / sybil
867 // attacker cannot leave behind one
868 // StorageCommitment per identity in
869 // `last_commitment_by_peer`. Also drop the
870 // sig-verify rate-limit timestamp.
871 last_commitment_by_peer.write().await.remove(&peer_id);
872 recent_provers.write().await.forget_peer(&peer_id);
873 sig_verify_attempts.write().await.remove(&peer_id);
874 // Same for the gossip-audit cooldown (ADR-0002).
875 audit_on_gossip_cooldown.write().await.remove(&peer_id);
876 // The sticky `commitment_capable` flag is
877 // preserved orthogonally via
878 // `ever_capable_peers` — even after this
879 // removal, a re-joining peer continues to
880 // be treated as v12-capable rather than
881 // legacy (§3 shield).
882 }
883 _ => {}
884 }
885 }
886 }
887 }
888 debug!("Replication message handler shut down");
889 });
890 self.task_handles.push(handle);
891 }
892
893 fn start_neighbor_sync_loop(&mut self) {
894 let p2p = Arc::clone(&self.p2p_node);
895 let storage = Arc::clone(&self.storage);
896 let paid_list = Arc::clone(&self.paid_list);
897 let queues = Arc::clone(&self.queues);
898 let config = Arc::clone(&self.config);
899 let shutdown = self.shutdown.clone();
900 let sync_state = Arc::clone(&self.sync_state);
901 let sync_history = Arc::clone(&self.sync_history);
902 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
903 let repair_proofs = Arc::clone(&self.repair_proofs);
904 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
905 let bootstrap_state = Arc::clone(&self.bootstrap_state);
906 let sync_trigger = Arc::clone(&self.sync_trigger);
907 let commitment_state = Arc::clone(&self.commitment_state);
908 let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
909 let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
910 let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
911 // ADR-0002: a peer's commitment also arrives on the sync RESPONSE path
912 // (we initiated, they piggybacked theirs). Carry a gossip-audit trigger
913 // here too so a peer that only ever answers — never initiates sync —
914 // is still audited; otherwise it could fully evade auditing.
915 let gossip_audit = GossipAuditTrigger {
916 p2p_node: Arc::clone(&p2p),
917 config: Arc::clone(&config),
918 recent_provers: Arc::clone(&self.recent_provers),
919 sync_state: Arc::clone(&sync_state),
920 cooldown: Arc::clone(&self.audit_on_gossip_cooldown),
921 };
922
923 let handle = tokio::spawn(async move {
924 loop {
925 let interval = config.random_neighbor_sync_interval();
926 tokio::select! {
927 () = shutdown.cancelled() => break,
928 () = tokio::time::sleep(interval) => {}
929 () = sync_trigger.notified() => {
930 debug!("Neighbor sync triggered by topology change");
931 }
932 }
933 // Wrap the sync round in a select so shutdown cancels
934 // in-progress network operations rather than waiting for
935 // the full round to complete.
936 tokio::select! {
937 () = shutdown.cancelled() => break,
938 () = run_neighbor_sync_round(
939 &p2p,
940 &storage,
941 &paid_list,
942 &queues,
943 &config,
944 &sync_state,
945 &sync_history,
946 &sync_cycle_epoch,
947 &repair_proofs,
948 &is_bootstrapping,
949 &bootstrap_state,
950 &commitment_state,
951 &last_commitment_by_peer,
952 &ever_capable_peers,
953 &sig_verify_attempts,
954 &gossip_audit,
955 ) => {}
956 }
957 }
958 debug!("Neighbor sync loop shut down");
959 });
960 self.task_handles.push(handle);
961 }
962
963 fn start_self_lookup_loop(&mut self) {
964 let p2p = Arc::clone(&self.p2p_node);
965 let config = Arc::clone(&self.config);
966 let shutdown = self.shutdown.clone();
967
968 let handle = tokio::spawn(async move {
969 loop {
970 let interval = config.random_self_lookup_interval();
971 tokio::select! {
972 () = shutdown.cancelled() => break,
973 () = tokio::time::sleep(interval) => {
974 if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
975 debug!("Self-lookup failed: {e}");
976 }
977 }
978 }
979 }
980 debug!("Self-lookup loop shut down");
981 });
982 self.task_handles.push(handle);
983 }
984
985 /// Periodic responsible-chunk audit loop (audit #2): every
986 /// [`ReplicationConfig::random_audit_tick_interval`] (~10-20 min), audit one
987 /// eligible close peer for the chunks it *should* be storing (by
988 /// responsibility and prior repair hint), independent of the gossip-triggered
989 /// storage-commitment audit. Waits for bootstrap to drain, then runs one tick
990 /// immediately and periodically thereafter.
991 fn start_audit_loop(&mut self) {
992 let p2p = Arc::clone(&self.p2p_node);
993 let storage = Arc::clone(&self.storage);
994 let config = Arc::clone(&self.config);
995 let shutdown = self.shutdown.clone();
996 let sync_history = Arc::clone(&self.sync_history);
997 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
998 let repair_proofs = Arc::clone(&self.repair_proofs);
999 let bootstrap_state = Arc::clone(&self.bootstrap_state);
1000 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1001 let sync_state = Arc::clone(&self.sync_state);
1002
1003 let handle = tokio::spawn(async move {
1004 // Invariant 19: wait for bootstrap to drain before starting audits.
1005 loop {
1006 tokio::select! {
1007 () = shutdown.cancelled() => return,
1008 () = tokio::time::sleep(
1009 std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
1010 ) => {
1011 if bootstrap_state.read().await.is_drained() {
1012 break;
1013 }
1014 }
1015 }
1016 }
1017
1018 // Run one audit tick immediately after bootstrap drain.
1019 {
1020 let bootstrapping = *is_bootstrapping.read().await;
1021 let result = {
1022 let history = sync_history.read().await;
1023 let current_sync_epoch = *sync_cycle_epoch.read().await;
1024 audit::audit_tick_with_repair_proofs(
1025 &p2p,
1026 &storage,
1027 &config,
1028 &history,
1029 &repair_proofs,
1030 current_sync_epoch,
1031 bootstrapping,
1032 )
1033 .await
1034 };
1035 handle_audit_result(&result, &p2p, &sync_state, &config).await;
1036 }
1037
1038 // Then run periodically.
1039 loop {
1040 let interval = config.random_audit_tick_interval();
1041 tokio::select! {
1042 () = shutdown.cancelled() => break,
1043 () = tokio::time::sleep(interval) => {
1044 let bootstrapping = *is_bootstrapping.read().await;
1045 let result = {
1046 let history = sync_history.read().await;
1047 let current_sync_epoch = *sync_cycle_epoch.read().await;
1048 audit::audit_tick_with_repair_proofs(
1049 &p2p,
1050 &storage,
1051 &config,
1052 &history,
1053 &repair_proofs,
1054 current_sync_epoch,
1055 bootstrapping,
1056 )
1057 .await
1058 };
1059 handle_audit_result(&result, &p2p, &sync_state, &config).await;
1060 }
1061 }
1062 }
1063 debug!("Audit loop shut down");
1064 });
1065 self.task_handles.push(handle);
1066 }
1067
1068 /// Periodically rebuild + sign + rotate the responder's storage
1069 /// commitment.
1070 ///
1071 /// Phase 3 of the v12 storage-bound audit. Once per
1072 /// [`COMMITMENT_ROTATION_INTERVAL_SECS`], the responder reads the
1073 /// current LMDB key set, builds a Merkle tree (for content-addressed
1074 /// chunks `bytes_hash == key`, so no chunk re-read is needed), signs
1075 /// the root with the node's `MlDsaSecretKey`, and rotates the result
1076 /// into `commitment_state`. Old `previous` slot is dropped by the
1077 /// rotate (per `ResponderCommitmentState::rotate`).
1078 ///
1079 /// Skips if the key set is empty (no commitment to make) — the
1080 /// auditor side falls back to the legacy plain-digest path for
1081 /// peers that have never gossiped a commitment.
1082 fn start_commitment_rotation_loop(&mut self) {
1083 let storage = Arc::clone(&self.storage);
1084 let identity = Arc::clone(&self.identity);
1085 let commitment_state = Arc::clone(&self.commitment_state);
1086 let shutdown = self.shutdown.clone();
1087 let p2p = Arc::clone(&self.p2p_node);
1088 let config = Arc::clone(&self.config);
1089 let sync_trigger = Arc::clone(&self.sync_trigger);
1090 let recent_provers = Arc::clone(&self.recent_provers);
1091
1092 let handle = tokio::spawn(async move {
1093 // Build the first commitment immediately on startup so a
1094 // restarted node can answer commitment-bound audits right
1095 // away — otherwise current() stays None for a full rotation
1096 // interval and audits silently fall back to legacy.
1097 //
1098 // After the first build, trigger an immediate neighbor-sync
1099 // round so the new commitment gossips out within seconds.
1100 // Without this, after a restart remote auditors keep pinning
1101 // the pre-restart (rotated-away) hash until their normal
1102 // sync cadence elapses — up to 1 h in the worst case,
1103 // during which time commitment-bound audits hit "unknown
1104 // commitment hash" -> Idle no-ops.
1105 // ML-DSA signatures are randomized so we cannot reproduce
1106 // the pre-restart hash; the only honest path to recovery
1107 // is fast re-gossip.
1108 if let Err(e) =
1109 rebuild_and_rotate_commitment(&storage, &identity, &commitment_state, &p2p, &config)
1110 .await
1111 {
1112 warn!("Initial commitment build failed: {e}");
1113 } else {
1114 sync_trigger.notify_one();
1115 }
1116 loop {
1117 tokio::select! {
1118 () = shutdown.cancelled() => break,
1119 () = tokio::time::sleep(
1120 std::time::Duration::from_secs(COMMITMENT_ROTATION_INTERVAL_SECS)
1121 ) => {
1122 if let Err(e) = rebuild_and_rotate_commitment(
1123 &storage,
1124 &identity,
1125 &commitment_state,
1126 &p2p,
1127 &config,
1128 ).await {
1129 warn!("Commitment rotation failed: {e}");
1130 }
1131 // Piggyback a sweep of expired recent_provers
1132 // entries on the rotation tick (same cadence,
1133 // 1 h). is_credited_holder already honours the
1134 // TTL on read, but the sweep reclaims memory
1135 // for entries we'll never re-read.
1136 let dropped = recent_provers.write().await.sweep_expired(
1137 std::time::Instant::now()
1138 );
1139 if dropped > 0 {
1140 debug!("recent_provers: swept {dropped} expired entries");
1141 }
1142 }
1143 }
1144 }
1145 debug!("Commitment rotation loop shut down");
1146 });
1147 self.task_handles.push(handle);
1148 }
1149
1150 #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
1151 fn start_fetch_worker(&mut self) {
1152 let p2p = Arc::clone(&self.p2p_node);
1153 let storage = Arc::clone(&self.storage);
1154 let queues = Arc::clone(&self.queues);
1155 let config = Arc::clone(&self.config);
1156 let shutdown = self.shutdown.clone();
1157 let bootstrap_state = Arc::clone(&self.bootstrap_state);
1158 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1159 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1160 let concurrency = max_parallel_fetch();
1161
1162 info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
1163
1164 let handle = tokio::spawn(async move {
1165 // Each in-flight future yields (key, Option<FetchOutcome>) so we
1166 // always recover the key — even if the inner task panics.
1167 let mut in_flight = FuturesUnordered::<FetchFuture>::new();
1168
1169 loop {
1170 // Fill up to `concurrency` slots from the queue.
1171 {
1172 let mut q = queues.write().await;
1173 while in_flight.len() < concurrency {
1174 let Some(candidate) = q.dequeue_fetch() else {
1175 break;
1176 };
1177 let Some(&source) = candidate.sources.first() else {
1178 warn!(
1179 "Fetch candidate {} has no sources — dropping",
1180 hex::encode(candidate.key)
1181 );
1182 continue;
1183 };
1184 q.start_fetch(candidate.key, source, candidate.sources.clone());
1185
1186 let p2p = Arc::clone(&p2p);
1187 let storage = Arc::clone(&storage);
1188 let config = Arc::clone(&config);
1189 let token = shutdown.clone();
1190 let fetch_key = candidate.key;
1191 in_flight.push(Box::pin(async move {
1192 let handle = tokio::spawn(async move {
1193 // Cancel-aware: abort when the engine shuts down.
1194 tokio::select! {
1195 () = token.cancelled() => FetchOutcome {
1196 key: fetch_key,
1197 result: FetchResult::SourceFailed,
1198 },
1199 outcome = execute_single_fetch(
1200 p2p, storage, config, fetch_key, source,
1201 ) => outcome,
1202 }
1203 });
1204 match handle.await {
1205 Ok(outcome) => (outcome.key, Some(outcome)),
1206 Err(e) => {
1207 error!(
1208 "Fetch task for {} panicked: {e}",
1209 hex::encode(fetch_key)
1210 );
1211 (fetch_key, None)
1212 }
1213 }
1214 }));
1215 }
1216 } // release queues write lock
1217
1218 if in_flight.is_empty() {
1219 // No work — wait for new items or shutdown.
1220 tokio::select! {
1221 () = shutdown.cancelled() => break,
1222 () = tokio::time::sleep(
1223 std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
1224 ) => continue,
1225 }
1226 }
1227
1228 // Wait for the next fetch to complete and process the result.
1229 tokio::select! {
1230 () = shutdown.cancelled() => break,
1231 Some((key, maybe_outcome)) = in_flight.next() => {
1232 let mut q = queues.write().await;
1233 let terminal = if let Some(outcome) = maybe_outcome {
1234 match outcome.result {
1235 FetchResult::Stored => {
1236 q.complete_fetch(&key);
1237 true
1238 }
1239 FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
1240 if let Some(next_peer) = q.retry_fetch(&key) {
1241 // Spawn a new fetch task for the next source.
1242 let p2p = Arc::clone(&p2p);
1243 let storage = Arc::clone(&storage);
1244 let config = Arc::clone(&config);
1245 let token = shutdown.clone();
1246 let fetch_key = key;
1247 in_flight.push(Box::pin(async move {
1248 let handle = tokio::spawn(async move {
1249 tokio::select! {
1250 () = token.cancelled() => FetchOutcome {
1251 key: fetch_key,
1252 result: FetchResult::SourceFailed,
1253 },
1254 outcome = execute_single_fetch(
1255 p2p, storage, config, fetch_key, next_peer,
1256 ) => outcome,
1257 }
1258 });
1259 match handle.await {
1260 Ok(outcome) => (outcome.key, Some(outcome)),
1261 Err(e) => {
1262 error!(
1263 "Fetch task for {} panicked: {e}",
1264 hex::encode(fetch_key)
1265 );
1266 (fetch_key, None)
1267 }
1268 }
1269 }));
1270 false
1271 } else {
1272 q.complete_fetch(&key);
1273 true
1274 }
1275 }
1276 }
1277 } else {
1278 // Task panicked — reclaim the in-flight slot.
1279 q.complete_fetch(&key);
1280 true
1281 };
1282
1283 // Shrink bootstrap pending set on terminal exit.
1284 if terminal {
1285 drop(q); // release queues lock before acquiring bootstrap_state
1286 if !bootstrap_state.read().await.is_drained() {
1287 bootstrap_state.write().await.remove_key(&key);
1288 let q = queues.read().await;
1289 if bootstrap::check_bootstrap_drained(
1290 &bootstrap_state,
1291 &q,
1292 )
1293 .await
1294 {
1295 complete_bootstrap(
1296 &is_bootstrapping,
1297 &bootstrap_complete_notify,
1298 ).await;
1299 }
1300 }
1301 }
1302 }
1303 }
1304 }
1305
1306 // Cancel and drain remaining in-flight fetches on shutdown.
1307 // The CancellationToken is already cancelled by this point, so
1308 // spawned tasks will see cancellation via their select! branches.
1309 while in_flight.next().await.is_some() {}
1310 debug!("Fetch worker shut down");
1311 });
1312 self.task_handles.push(handle);
1313 }
1314
1315 fn start_verification_worker(&mut self) {
1316 let p2p = Arc::clone(&self.p2p_node);
1317 let storage = Arc::clone(&self.storage);
1318 let queues = Arc::clone(&self.queues);
1319 let paid_list = Arc::clone(&self.paid_list);
1320 let config = Arc::clone(&self.config);
1321 let shutdown = self.shutdown.clone();
1322 let bootstrap_state = Arc::clone(&self.bootstrap_state);
1323 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1324 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1325 let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
1326 let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
1327 let recent_provers = Arc::clone(&self.recent_provers);
1328
1329 let handle = tokio::spawn(async move {
1330 loop {
1331 tokio::select! {
1332 () = shutdown.cancelled() => break,
1333 () = tokio::time::sleep(
1334 std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
1335 ) => {
1336 let ctx = VerificationCycleContext {
1337 p2p_node: &p2p,
1338 paid_list: &paid_list,
1339 storage: &storage,
1340 queues: &queues,
1341 config: &config,
1342 bootstrap_state: &bootstrap_state,
1343 is_bootstrapping: &is_bootstrapping,
1344 bootstrap_complete_notify: &bootstrap_complete_notify,
1345 last_commitment_by_peer: &last_commitment_by_peer,
1346 ever_capable_peers: &ever_capable_peers,
1347 recent_provers: &recent_provers,
1348 };
1349 run_verification_cycle(ctx).await;
1350 }
1351 }
1352 }
1353 debug!("Verification worker shut down");
1354 });
1355 self.task_handles.push(handle);
1356 }
1357
1358 /// Gap 3: Run a one-shot bootstrap sync on startup.
1359 ///
1360 /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
1361 /// (indicating the routing table is populated) before snapshotting
1362 /// close neighbors. Falls back after a timeout so bootstrap nodes
1363 /// (which have no peers and therefore never receive the event) still
1364 /// proceed.
1365 ///
1366 /// After the gate, finds close neighbors, syncs with each in
1367 /// round-robin batches, admits returned hints into the verification
1368 /// pipeline, and tracks discovered keys for bootstrap drain detection.
1369 #[allow(clippy::too_many_lines)]
1370 fn start_bootstrap_sync(
1371 &mut self,
1372 dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
1373 ) {
1374 let p2p = Arc::clone(&self.p2p_node);
1375 let storage = Arc::clone(&self.storage);
1376 let paid_list = Arc::clone(&self.paid_list);
1377 let queues = Arc::clone(&self.queues);
1378 let config = Arc::clone(&self.config);
1379 let shutdown = self.shutdown.clone();
1380 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
1381 let bootstrap_state = Arc::clone(&self.bootstrap_state);
1382 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
1383 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
1384 let repair_proofs = Arc::clone(&self.repair_proofs);
1385 let my_commitment_state = Arc::clone(&self.commitment_state);
1386 let last_commitment_by_peer = Arc::clone(&self.last_commitment_by_peer);
1387 let ever_capable_peers = Arc::clone(&self.ever_capable_peers);
1388 let sig_verify_attempts = Arc::clone(&self.sig_verify_attempts);
1389
1390 let handle = tokio::spawn(async move {
1391 // Wait for DHT bootstrap to complete before snapshotting
1392 // neighbors. The routing table is empty until saorsa-core
1393 // finishes its FIND_NODE rounds and bucket refreshes.
1394 let gate = bootstrap::wait_for_bootstrap_complete(
1395 dht_events,
1396 config.bootstrap_complete_timeout_secs,
1397 &shutdown,
1398 )
1399 .await;
1400
1401 if gate == bootstrap::BootstrapGateResult::Shutdown {
1402 return;
1403 }
1404
1405 let self_id = *p2p.peer_id();
1406 let neighbors =
1407 neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
1408 .await;
1409
1410 if neighbors.is_empty() {
1411 info!("Bootstrap sync: no close neighbors found, marking drained");
1412 bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
1413 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
1414 return;
1415 }
1416
1417 let neighbor_count = neighbors.len();
1418 info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
1419
1420 // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
1421 for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
1422 if shutdown.is_cancelled() {
1423 break;
1424 }
1425
1426 let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
1427 batch,
1428 &storage,
1429 &paid_list,
1430 &p2p,
1431 config.close_group_size,
1432 config.paid_list_close_group_size,
1433 )
1434 .await;
1435
1436 for peer in batch {
1437 if shutdown.is_cancelled() {
1438 break;
1439 }
1440
1441 // Re-read on each iteration so peers see current state.
1442 let bootstrapping = *is_bootstrapping.read().await;
1443
1444 bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
1445
1446 let hints = hints_by_peer.remove(peer).unwrap_or_default();
1447 let outcome = neighbor_sync::sync_with_peer_with_hints(
1448 peer,
1449 &p2p,
1450 &config,
1451 bootstrapping,
1452 hints,
1453 // Atomically snapshot + mark-gossiped: emitted in the
1454 // bootstrap-sync request, so we stay answerable for it
1455 // (ADR-0002). One critical section avoids a TOCTOU where a
1456 // concurrent retire/rotate drops the slot between read and
1457 // mark.
1458 my_commitment_state
1459 .current_for_gossip()
1460 .map(|b| b.commitment().clone()),
1461 )
1462 .await;
1463
1464 bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
1465
1466 if let Some(outcome) = outcome {
1467 // Ingest the peer's piggybacked commitment from the
1468 // response (same verification as the request path).
1469 // Bootstrap is the FIRST gossip we receive from most
1470 // peers, so this populates last_commitment_by_peer.
1471 //
1472 // We intentionally do NOT trigger a gossip-audit here:
1473 // during bootstrap this node may itself still be
1474 // bootstrapping (audits are gated on that), and the
1475 // close-group/RT view is not yet stable. The peer is
1476 // audited on the first STEADY-STATE neighbor-sync round
1477 // after bootstrap drains (request + response paths both
1478 // trigger), which is within one sync cycle — so caching
1479 // the commitment here is sufficient and there is no
1480 // coverage gap (ADR-0002).
1481 ingest_peer_commitment(
1482 peer,
1483 outcome.response.commitment.as_ref(),
1484 &p2p,
1485 &last_commitment_by_peer,
1486 &ever_capable_peers,
1487 &sig_verify_attempts,
1488 )
1489 .await; // sig_verify_attempts in scope from line ~1080
1490
1491 if !outcome.response.bootstrapping {
1492 record_sent_replica_hints(
1493 peer,
1494 &outcome.sent_replica_hints,
1495 &repair_proofs,
1496 &sync_cycle_epoch,
1497 )
1498 .await;
1499 // Admit hints into verification pipeline.
1500 let outcome = admit_and_queue_hints(
1501 &self_id,
1502 peer,
1503 &outcome.response.replica_hints,
1504 &outcome.response.paid_hints,
1505 &p2p,
1506 &config,
1507 &storage,
1508 &paid_list,
1509 &queues,
1510 )
1511 .await;
1512
1513 // Track discovered keys for drain detection.
1514 if !outcome.discovered.is_empty() {
1515 bootstrap::track_discovered_keys(
1516 &bootstrap_state,
1517 &outcome.discovered,
1518 )
1519 .await;
1520 }
1521
1522 // Record / retire capacity rejections so the
1523 // drain check correctly reflects whether each
1524 // source still owes us re-hinted work after
1525 // queue overflow.
1526 if outcome.capacity_rejected_count > 0 {
1527 bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
1528 } else {
1529 bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
1530 }
1531 }
1532 }
1533 }
1534 }
1535
1536 // Check drain condition.
1537 {
1538 let q = queues.read().await;
1539 if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
1540 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
1541 }
1542 }
1543
1544 info!("Bootstrap sync completed");
1545 });
1546 self.task_handles.push(handle);
1547 }
1548}
1549
1550// ===========================================================================
1551// Free functions for background tasks
1552// ===========================================================================
1553
1554/// RAII admission for one audit-responder task: holds the GLOBAL permit and,
1555/// on drop, decrements the PER-PEER in-flight count. Moving this into the
1556/// spawned task ties both bounds to the task's exact lifetime — no manual
1557/// decrement to forget on an early return or panic.
1558struct AuditResponderGuard {
1559 _permit: tokio::sync::OwnedSemaphorePermit,
1560 inflight: Arc<RwLock<HashMap<PeerId, u32>>>,
1561 peer: PeerId,
1562}
1563
1564impl Drop for AuditResponderGuard {
1565 fn drop(&mut self) {
1566 // Decrement (and prune to keep the map bounded) without blocking the
1567 // async runtime: a short lock on a tiny map.
1568 //
1569 // Fast path: if the (uncontended, tiny) lock is free, decrement inline
1570 // with no spawn. Otherwise defer to a task — but only if a runtime is
1571 // actually current, so `Drop` during shutdown (no runtime) can never
1572 // panic. A missed decrement at shutdown is harmless: the whole map is
1573 // being dropped with the engine.
1574 let peer = self.peer;
1575 if let Ok(mut map) = self.inflight.try_write() {
1576 if let Some(n) = map.get_mut(&peer) {
1577 *n = n.saturating_sub(1);
1578 if *n == 0 {
1579 map.remove(&peer);
1580 }
1581 }
1582 return;
1583 }
1584 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1585 let inflight = Arc::clone(&self.inflight);
1586 handle.spawn(async move {
1587 let mut map = inflight.write().await;
1588 if let Some(n) = map.get_mut(&peer) {
1589 *n = n.saturating_sub(1);
1590 if *n == 0 {
1591 map.remove(&peer);
1592 }
1593 }
1594 });
1595 }
1596 }
1597}
1598
1599/// Try to admit one audit-responder task for `source`: take a global permit AND
1600/// a per-peer slot (both bounded). Returns `None` (caller drops the challenge,
1601/// leaving the remote auditor to apply that audit path's timeout policy) if
1602/// either ceiling is hit, so one flooder can neither exhaust the global pool's
1603/// effect on others nor exceed its own per-peer share (codex-r2 A).
1604async fn admit_audit_responder(
1605 semaphore: &Arc<Semaphore>,
1606 inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
1607 source: &PeerId,
1608) -> Option<AuditResponderGuard> {
1609 // Per-peer cap first (cheap, and the fairness-critical bound), committed
1610 // under the write lock so concurrent challenges from the same peer can't
1611 // both slip past the cap.
1612 {
1613 let mut map = inflight.write().await;
1614 let entry = map.entry(*source).or_insert(0);
1615 if *entry >= MAX_AUDIT_RESPONSES_PER_PEER {
1616 return None;
1617 }
1618 *entry += 1;
1619 }
1620 // Then the global ceiling. If it's exhausted, give back the per-peer slot we
1621 // just claimed so it isn't leaked.
1622 let Ok(permit) = Arc::clone(semaphore).try_acquire_owned() else {
1623 let mut map = inflight.write().await;
1624 if let Some(n) = map.get_mut(source) {
1625 *n = n.saturating_sub(1);
1626 if *n == 0 {
1627 map.remove(source);
1628 }
1629 }
1630 return None;
1631 };
1632 Some(AuditResponderGuard {
1633 _permit: permit,
1634 inflight: Arc::clone(inflight),
1635 peer: *source,
1636 })
1637}
1638
1639/// Handle an incoming replication protocol message.
1640///
1641/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
1642/// request-response path and the response must be sent via `send_response`
1643/// so saorsa-core can route it back to the waiting `send_request` caller.
1644#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1645async fn handle_replication_message(
1646 source: &PeerId,
1647 data: &[u8],
1648 p2p_node: &Arc<P2PNode>,
1649 storage: &Arc<LmdbStorage>,
1650 paid_list: &Arc<PaidList>,
1651 payment_verifier: &Arc<PaymentVerifier>,
1652 queues: &Arc<RwLock<ReplicationQueues>>,
1653 config: &ReplicationConfig,
1654 is_bootstrapping: &Arc<RwLock<bool>>,
1655 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1656 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1657 sync_cycle_epoch: &Arc<RwLock<u64>>,
1658 repair_proofs: &Arc<RwLock<RepairProofs>>,
1659 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
1660 ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
1661 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
1662 my_commitment_state: &Arc<ResponderCommitmentState>,
1663 gossip_audit: &GossipAuditTrigger,
1664 audit_responder_semaphore: &Arc<Semaphore>,
1665 audit_responder_inflight: &Arc<RwLock<HashMap<PeerId, u32>>>,
1666 rr_message_id: Option<&str>,
1667) -> Result<()> {
1668 let msg = ReplicationMessage::decode(data)
1669 .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
1670
1671 match msg.body {
1672 ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
1673 handle_fresh_offer(
1674 source,
1675 offer,
1676 storage,
1677 paid_list,
1678 payment_verifier,
1679 p2p_node,
1680 config,
1681 msg.request_id,
1682 rr_message_id,
1683 )
1684 .await
1685 }
1686 ReplicationMessageBody::PaidNotify(ref notify) => {
1687 handle_paid_notify(
1688 source,
1689 notify,
1690 paid_list,
1691 payment_verifier,
1692 p2p_node,
1693 config,
1694 )
1695 .await
1696 }
1697 ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1698 let bootstrapping = *is_bootstrapping.read().await;
1699 // Phase-3 storage-bound audit: store the sender's
1700 // commitment for use as `expected_commitment_hash` in
1701 // future audits. Verify signature before storing so a peer
1702 // cannot inject a forged commitment for someone else.
1703 if let Some(target) = ingest_peer_commitment(
1704 source,
1705 request.commitment.as_ref(),
1706 p2p_node,
1707 last_commitment_by_peer,
1708 ever_capable_peers,
1709 sig_verify_attempts,
1710 )
1711 .await
1712 {
1713 maybe_trigger_gossip_audit(gossip_audit, source, target).await;
1714 }
1715 handle_neighbor_sync_request(
1716 source,
1717 request,
1718 p2p_node,
1719 storage,
1720 paid_list,
1721 queues,
1722 config,
1723 bootstrapping,
1724 bootstrap_state,
1725 sync_history,
1726 sync_cycle_epoch,
1727 repair_proofs,
1728 // Atomically snapshot + mark-gossiped: emitted in the sync
1729 // response, so we must stay answerable for it (ADR-0002).
1730 my_commitment_state
1731 .current_for_gossip()
1732 .map(|b| b.commitment().clone()),
1733 msg.request_id,
1734 rr_message_id,
1735 )
1736 .await
1737 }
1738 ReplicationMessageBody::VerificationRequest(ref request) => {
1739 handle_verification_request(
1740 source,
1741 request,
1742 storage,
1743 paid_list,
1744 p2p_node,
1745 msg.request_id,
1746 rr_message_id,
1747 )
1748 .await
1749 }
1750 ReplicationMessageBody::FetchRequest(ref request) => {
1751 handle_fetch_request(
1752 source,
1753 request,
1754 storage,
1755 p2p_node,
1756 msg.request_id,
1757 rr_message_id,
1758 )
1759 .await
1760 }
1761 ReplicationMessageBody::AuditChallenge(challenge) => {
1762 // Responsible-chunk audit (audit #2) responder: answer with per-key
1763 // possession digests. This same handler also answers the
1764 // prune-confirmation audit, which sends the same `AuditChallenge`
1765 // wire message.
1766 //
1767 // Answering digests the stored bytes of every challenged key, so —
1768 // like the subtree/byte audits below — run it on a detached task off
1769 // this serial message loop. Handling it inline lets one challenge
1770 // block all other replication traffic until its digests complete
1771 // (head-of-line blocking). The same flood-fair admission applies: a
1772 // global ceiling AND a per-peer cap, dropping the challenge if either
1773 // is hit. Responsible/prune audit timeouts are penalised by the
1774 // caller, so the caps must remain high enough for honest audit load;
1775 // the per-peer share still prevents one flooder from starving others.
1776 let Some(guard) =
1777 admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1778 .await
1779 else {
1780 warn!(
1781 "Audit challenge reply not sent: kind=responsible response=dropped \
1782 source={source} (audit-responder capacity reached)"
1783 );
1784 return Ok(());
1785 };
1786 let bootstrapping = *is_bootstrapping.read().await;
1787 let storage = Arc::clone(storage);
1788 let p2p_node = Arc::clone(p2p_node);
1789 let source = *source;
1790 let request_id = msg.request_id;
1791 let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1792 tokio::spawn(async move {
1793 let _guard = guard; // global permit + per-peer slot, held until done
1794 if let Err(e) = handle_audit_challenge_msg(
1795 &source,
1796 &challenge,
1797 &storage,
1798 &p2p_node,
1799 bootstrapping,
1800 request_id,
1801 rr_message_id.as_deref(),
1802 )
1803 .await
1804 {
1805 debug!("Audit challenge from {source} error: {e}");
1806 }
1807 });
1808 Ok(())
1809 }
1810 ReplicationMessageBody::SubtreeAuditChallenge(challenge) => {
1811 // Gossip-triggered storage-bound subtree audit (ADR-0002). The
1812 // responder rebuilds the WHOLE nonce-selected subtree, reading every
1813 // leaf's bytes from disk (`get_raw` × ~sqrt(N) leaves). Run it on a
1814 // detached task so this serial message loop is never blocked on disk
1815 // I/O — otherwise one audit stalls all replication traffic (§5).
1816 //
1817 // A bounded, flood-fair admission restores backpressure (codex#1 +
1818 // codex-r2 A): a global ceiling AND a per-peer cap. If either is hit
1819 // we drop this challenge. Subtree auditors grace timeout
1820 // non-responses, so capacity drops throttle flooders without turning
1821 // into trust penalties (and one source cannot starve other peers,
1822 // since its share is capped per-peer).
1823 info!(
1824 "Audit challenge received: kind=subtree source={source} request_response={}",
1825 rr_message_id.is_some(),
1826 );
1827 let Some(guard) =
1828 admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1829 .await
1830 else {
1831 warn!(
1832 "Audit challenge reply not sent: kind=subtree response=dropped \
1833 source={source} (audit-responder capacity reached)"
1834 );
1835 return Ok(());
1836 };
1837 let bootstrapping = *is_bootstrapping.read().await;
1838 let storage = Arc::clone(storage);
1839 let p2p_node = Arc::clone(p2p_node);
1840 let my_commitment_state = Arc::clone(my_commitment_state);
1841 let source = *source;
1842 let request_id = msg.request_id;
1843 let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1844 tokio::spawn(async move {
1845 let _guard = guard; // global permit + per-peer slot, held until done
1846 let response = storage_commitment_audit::handle_subtree_challenge(
1847 &challenge,
1848 &storage,
1849 p2p_node.peer_id(),
1850 bootstrapping,
1851 Some(&my_commitment_state),
1852 )
1853 .await;
1854 let response_kind = subtree_audit_response_kind(&response);
1855 let sent = send_replication_response_checked(
1856 &source,
1857 &p2p_node,
1858 request_id,
1859 ReplicationMessageBody::SubtreeAuditResponse(response),
1860 rr_message_id.as_deref(),
1861 )
1862 .await;
1863 if sent {
1864 info!(
1865 "Audit challenge reply sent: kind=subtree response={response_kind} \
1866 source={source} request_response={}",
1867 rr_message_id.is_some(),
1868 );
1869 } else {
1870 warn!(
1871 "Audit challenge reply not sent: kind=subtree response={response_kind} \
1872 source={source} request_response={}",
1873 rr_message_id.is_some(),
1874 );
1875 }
1876 });
1877 Ok(())
1878 }
1879 ReplicationMessageBody::SubtreeByteChallenge(challenge) => {
1880 // Round 2 of the storage audit (ADR-0002): serve the original bytes
1881 // for the auditor's spot-check keys, or signal `Absent` for a
1882 // committed key we can no longer produce. Reads chunk bytes from
1883 // disk, so likewise spawned off the serial loop (§5) under the same
1884 // flood-fair admission (codex#1 + codex-r2 A).
1885 info!(
1886 "Audit challenge received: kind=byte source={source} request_response={}",
1887 rr_message_id.is_some(),
1888 );
1889 let Some(guard) =
1890 admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source)
1891 .await
1892 else {
1893 warn!(
1894 "Audit challenge reply not sent: kind=byte response=dropped \
1895 source={source} (audit-responder capacity reached)"
1896 );
1897 return Ok(());
1898 };
1899 let bootstrapping = *is_bootstrapping.read().await;
1900 let storage = Arc::clone(storage);
1901 let p2p_node = Arc::clone(p2p_node);
1902 let my_commitment_state = Arc::clone(my_commitment_state);
1903 let source = *source;
1904 let request_id = msg.request_id;
1905 let rr_message_id = rr_message_id.map(ToOwned::to_owned);
1906 tokio::spawn(async move {
1907 let _guard = guard; // global permit + per-peer slot, held until done
1908 let response = storage_commitment_audit::handle_subtree_byte_challenge(
1909 &challenge,
1910 &storage,
1911 p2p_node.peer_id(),
1912 bootstrapping,
1913 Some(&my_commitment_state),
1914 )
1915 .await;
1916 let response_kind = subtree_byte_response_kind(&response);
1917 let sent = send_replication_response_checked(
1918 &source,
1919 &p2p_node,
1920 request_id,
1921 ReplicationMessageBody::SubtreeByteResponse(response),
1922 rr_message_id.as_deref(),
1923 )
1924 .await;
1925 if sent {
1926 info!(
1927 "Audit challenge reply sent: kind=byte response={response_kind} \
1928 source={source} request_response={}",
1929 rr_message_id.is_some(),
1930 );
1931 } else {
1932 warn!(
1933 "Audit challenge reply not sent: kind=byte response={response_kind} \
1934 source={source} request_response={}",
1935 rr_message_id.is_some(),
1936 );
1937 }
1938 });
1939 Ok(())
1940 }
1941 // Response messages are handled by their respective request initiators.
1942 ReplicationMessageBody::FreshReplicationResponse(_)
1943 | ReplicationMessageBody::NeighborSyncResponse(_)
1944 | ReplicationMessageBody::VerificationResponse(_)
1945 | ReplicationMessageBody::FetchResponse(_)
1946 | ReplicationMessageBody::AuditResponse(_)
1947 | ReplicationMessageBody::SubtreeAuditResponse(_)
1948 | ReplicationMessageBody::SubtreeByteResponse(_) => Ok(()),
1949 }
1950}
1951
1952// ---------------------------------------------------------------------------
1953// Per-message-type handlers
1954// ---------------------------------------------------------------------------
1955
1956#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1957async fn handle_fresh_offer(
1958 source: &PeerId,
1959 offer: &protocol::FreshReplicationOffer,
1960 storage: &Arc<LmdbStorage>,
1961 paid_list: &Arc<PaidList>,
1962 payment_verifier: &Arc<PaymentVerifier>,
1963 p2p_node: &Arc<P2PNode>,
1964 config: &ReplicationConfig,
1965 request_id: u64,
1966 rr_message_id: Option<&str>,
1967) -> Result<()> {
1968 let self_id = *p2p_node.peer_id();
1969
1970 // Rule 5: reject if PoP is missing.
1971 if offer.proof_of_payment.is_empty() {
1972 send_replication_response(
1973 source,
1974 p2p_node,
1975 request_id,
1976 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1977 key: offer.key,
1978 reason: "Missing proof of payment".to_string(),
1979 }),
1980 rr_message_id,
1981 )
1982 .await;
1983 return Ok(());
1984 }
1985
1986 // Enforce chunk size invariant: the normal PUT path rejects data larger
1987 // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1988 // prevent peers from pushing oversized records through replication.
1989 if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1990 warn!(
1991 "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1992 hex::encode(offer.key),
1993 offer.data.len(),
1994 crate::ant_protocol::MAX_CHUNK_SIZE,
1995 );
1996 p2p_node
1997 .report_trust_event(
1998 source,
1999 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2000 )
2001 .await;
2002 send_replication_response(
2003 source,
2004 p2p_node,
2005 request_id,
2006 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
2007 key: offer.key,
2008 reason: format!(
2009 "Data size {} exceeds maximum chunk size {}",
2010 offer.data.len(),
2011 crate::ant_protocol::MAX_CHUNK_SIZE,
2012 ),
2013 }),
2014 rr_message_id,
2015 )
2016 .await;
2017 return Ok(());
2018 }
2019
2020 // Mirror the normal PUT path: the advertised key must be the content
2021 // address of the supplied bytes before any expensive payment verification.
2022 let computed_key = crate::client::compute_address(&offer.data);
2023 if computed_key != offer.key {
2024 warn!(
2025 "Rejecting fresh offer for key {}: content address mismatch, computed {}",
2026 hex::encode(offer.key),
2027 hex::encode(computed_key),
2028 );
2029 p2p_node
2030 .report_trust_event(
2031 source,
2032 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2033 )
2034 .await;
2035 send_replication_response(
2036 source,
2037 p2p_node,
2038 request_id,
2039 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
2040 key: offer.key,
2041 reason: format!(
2042 "Content address mismatch: expected {}, computed {}",
2043 hex::encode(offer.key),
2044 hex::encode(computed_key),
2045 ),
2046 }),
2047 rr_message_id,
2048 )
2049 .await;
2050 return Ok(());
2051 }
2052
2053 // Rule 7: check storage admission. Fresh chunk receivers accept across the
2054 // paid-close-group neighbourhood (`paid_list_close_group_size`, = K_BUCKET_SIZE,
2055 // the same width client PUTs use), not just the close group plus a small
2056 // margin (ADR-0003). During full-node shunning a healthy replica's routing
2057 // table may still list closer full nodes it hasn't evicted yet, ranking it
2058 // outside the narrow window in its own view; the wider accept window absorbs
2059 // that transient skew so the chunk still lands. Retention (pruning) stays at
2060 // `storage_admission_width`, so steady-state replication is unchanged.
2061 if !admission::is_responsible(
2062 &self_id,
2063 &offer.key,
2064 p2p_node,
2065 config.paid_list_close_group_size,
2066 )
2067 .await
2068 {
2069 send_replication_response(
2070 source,
2071 p2p_node,
2072 request_id,
2073 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
2074 key: offer.key,
2075 reason: "Not in storage-admission range for this key".to_string(),
2076 }),
2077 rr_message_id,
2078 )
2079 .await;
2080 return Ok(());
2081 }
2082
2083 // Disk-space pre-check — mirror the PUT handler (V2-411). A full node can
2084 // never store this record, so reject it before the expensive payment
2085 // verification (EVM on-chain query / merkle pool work) rather than verifying
2086 // and only then failing at `storage.put` below. Reuses the cached capacity
2087 // check (passing results only, so freed space is detected promptly), and the
2088 // store path keeps its own check as defence-in-depth.
2089 if let Err(e) = storage.check_capacity() {
2090 info!(
2091 target: "ant_node::storage::disk_precheck",
2092 key = %hex::encode(offer.key),
2093 "Rejecting fresh replication offer before payment verification: {e}"
2094 );
2095 send_replication_response(
2096 source,
2097 p2p_node,
2098 request_id,
2099 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
2100 key: offer.key,
2101 reason: e.to_string(),
2102 }),
2103 rr_message_id,
2104 )
2105 .await;
2106 return Ok(());
2107 }
2108
2109 // Gap 1: Validate PoP via PaymentVerifier. Fresh replication is still
2110 // part of the immediate write fan-out: this receiver is about to store the
2111 // record as if the client had PUT it here directly. Storage admission
2112 // was checked above before proof work. ClientPut verification applies
2113 // store-strength cache semantics, paid-quote issuer K-closeness and local
2114 // price floor checks for single-node proofs, and merkle candidate
2115 // closeness for merkle proofs.
2116 match payment_verifier
2117 .verify_payment(
2118 &offer.key,
2119 Some(&offer.proof_of_payment),
2120 fresh_offer_payment_context(),
2121 )
2122 .await
2123 {
2124 Ok(status) if status.can_store() => {
2125 debug!(
2126 "PoP validated for fresh offer key {}",
2127 hex::encode(offer.key)
2128 );
2129 }
2130 Ok(_) => {
2131 send_replication_response(
2132 source,
2133 p2p_node,
2134 request_id,
2135 ReplicationMessageBody::FreshReplicationResponse(
2136 FreshReplicationResponse::Rejected {
2137 key: offer.key,
2138 reason: "Payment verification failed: payment required".to_string(),
2139 },
2140 ),
2141 rr_message_id,
2142 )
2143 .await;
2144 return Ok(());
2145 }
2146 Err(e) => {
2147 warn!(
2148 "PoP verification error for key {}: {e}",
2149 hex::encode(offer.key)
2150 );
2151 send_replication_response(
2152 source,
2153 p2p_node,
2154 request_id,
2155 ReplicationMessageBody::FreshReplicationResponse(
2156 FreshReplicationResponse::Rejected {
2157 key: offer.key,
2158 reason: format!("Payment verification error: {e}"),
2159 },
2160 ),
2161 rr_message_id,
2162 )
2163 .await;
2164 return Ok(());
2165 }
2166 }
2167
2168 // Rule 6: add to PaidForList.
2169 if let Err(e) = paid_list.insert(&offer.key).await {
2170 warn!("Failed to add key to PaidForList: {e}");
2171 }
2172
2173 // Store the record.
2174 match storage.put(&offer.key, &offer.data).await {
2175 Ok(_) => {
2176 send_replication_response(
2177 source,
2178 p2p_node,
2179 request_id,
2180 ReplicationMessageBody::FreshReplicationResponse(
2181 FreshReplicationResponse::Accepted { key: offer.key },
2182 ),
2183 rr_message_id,
2184 )
2185 .await;
2186 }
2187 Err(e) => {
2188 send_replication_response(
2189 source,
2190 p2p_node,
2191 request_id,
2192 ReplicationMessageBody::FreshReplicationResponse(
2193 FreshReplicationResponse::Rejected {
2194 key: offer.key,
2195 reason: e.to_string(),
2196 },
2197 ),
2198 rr_message_id,
2199 )
2200 .await;
2201 }
2202 }
2203
2204 Ok(())
2205}
2206
2207async fn handle_paid_notify(
2208 _source: &PeerId,
2209 notify: &protocol::PaidNotify,
2210 paid_list: &Arc<PaidList>,
2211 payment_verifier: &Arc<PaymentVerifier>,
2212 p2p_node: &Arc<P2PNode>,
2213 config: &ReplicationConfig,
2214) -> Result<()> {
2215 let self_id = *p2p_node.peer_id();
2216
2217 // Rule 3: validate PoP presence before adding.
2218 if notify.proof_of_payment.is_empty() {
2219 return Ok(());
2220 }
2221
2222 // Check if we're in PaidCloseGroup for this key.
2223 if !admission::is_in_paid_close_group(
2224 &self_id,
2225 ¬ify.key,
2226 p2p_node,
2227 config.paid_list_close_group_size,
2228 )
2229 .await
2230 {
2231 return Ok(());
2232 }
2233
2234 // Gap 1: Validate PoP via PaymentVerifier. PaidNotify admits fresh
2235 // paid-list metadata, so local paid-list close-group membership was checked
2236 // above before proof work. The verifier then runs the same payment proof
2237 // checks as ClientPut while writing a paid-list-strength cache entry.
2238 match payment_verifier
2239 .verify_payment(
2240 ¬ify.key,
2241 Some(¬ify.proof_of_payment),
2242 paid_notify_payment_context(),
2243 )
2244 .await
2245 {
2246 Ok(status) if status.can_store() => {
2247 debug!(
2248 "PoP validated for paid notify key {}",
2249 hex::encode(notify.key)
2250 );
2251 }
2252 Ok(_) => {
2253 warn!(
2254 "Paid notify rejected: payment required for key {}",
2255 hex::encode(notify.key)
2256 );
2257 return Ok(());
2258 }
2259 Err(e) => {
2260 warn!(
2261 "PoP verification error for paid notify key {}: {e}",
2262 hex::encode(notify.key)
2263 );
2264 return Ok(());
2265 }
2266 }
2267
2268 if let Err(e) = paid_list.insert(¬ify.key).await {
2269 warn!("Failed to add paid notify key to PaidForList: {e}");
2270 }
2271
2272 Ok(())
2273}
2274
2275#[allow(clippy::too_many_arguments)]
2276async fn handle_neighbor_sync_request(
2277 source: &PeerId,
2278 request: &protocol::NeighborSyncRequest,
2279 p2p_node: &Arc<P2PNode>,
2280 storage: &Arc<LmdbStorage>,
2281 paid_list: &Arc<PaidList>,
2282 queues: &Arc<RwLock<ReplicationQueues>>,
2283 config: &ReplicationConfig,
2284 is_bootstrapping: bool,
2285 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2286 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2287 sync_cycle_epoch: &Arc<RwLock<u64>>,
2288 repair_proofs: &Arc<RwLock<RepairProofs>>,
2289 my_commitment: Option<StorageCommitment>,
2290 request_id: u64,
2291 rr_message_id: Option<&str>,
2292) -> Result<()> {
2293 let self_id = *p2p_node.peer_id();
2294
2295 // No per-request hint count limit: the wire message size limit
2296 // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
2297 // challenges, sync hints don't drive expensive computation — they just
2298 // enter the verification queue. A per-request limit here would break
2299 // bootstrap replication for newly-joined nodes with 0 stored chunks.
2300
2301 // Build response (outbound hints).
2302 let (response, sent_replica_hints, sender_in_rt) =
2303 neighbor_sync::handle_sync_request_with_proofs(
2304 source,
2305 request,
2306 p2p_node,
2307 storage,
2308 paid_list,
2309 config,
2310 is_bootstrapping,
2311 my_commitment.clone(),
2312 )
2313 .await;
2314
2315 // Send response.
2316 let response_sent = send_replication_response_checked(
2317 source,
2318 p2p_node,
2319 request_id,
2320 ReplicationMessageBody::NeighborSyncResponse(response),
2321 rr_message_id,
2322 )
2323 .await;
2324
2325 // Process inbound hints only if sender is in LocalRT (Rule 4-6).
2326 if !sender_in_rt {
2327 return Ok(());
2328 }
2329
2330 // Update sync history for this peer before recording repair proofs so a
2331 // same-tick audit cannot combine a fresh key proof with stale peer maturity.
2332 {
2333 let mut history = sync_history.write().await;
2334 let record = history.entry(*source).or_insert(PeerSyncRecord {
2335 last_sync: None,
2336 cycles_since_sync: 0,
2337 });
2338 record.last_sync = Some(Instant::now());
2339 record.cycles_since_sync = 0;
2340 }
2341
2342 if response_sent && !request.bootstrapping {
2343 record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
2344 .await;
2345 }
2346
2347 // Admit inbound hints and queue for verification.
2348 let outcome = admit_and_queue_hints(
2349 &self_id,
2350 source,
2351 &request.replica_hints,
2352 &request.paid_hints,
2353 p2p_node,
2354 config,
2355 storage,
2356 paid_list,
2357 queues,
2358 )
2359 .await;
2360
2361 // Track discovered keys for bootstrap drain detection so that hints
2362 // admitted via inbound sync requests are not missed. Capacity-rejected
2363 // hints keep this source on the "not yet drained" list until its next
2364 // sync re-admits them; a clean cycle clears the source.
2365 if is_bootstrapping {
2366 if !outcome.discovered.is_empty() {
2367 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2368 }
2369 if outcome.capacity_rejected_count > 0 {
2370 bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
2371 } else {
2372 bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
2373 }
2374 }
2375
2376 Ok(())
2377}
2378
2379async fn handle_verification_request(
2380 source: &PeerId,
2381 request: &protocol::VerificationRequest,
2382 storage: &Arc<LmdbStorage>,
2383 paid_list: &Arc<PaidList>,
2384 p2p_node: &Arc<P2PNode>,
2385 request_id: u64,
2386 rr_message_id: Option<&str>,
2387) -> Result<()> {
2388 // No per-request key count limit: the wire message size limit
2389 // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
2390 // does cheap storage lookups per key, not expensive computation like
2391 // audit digest generation.
2392
2393 #[allow(clippy::cast_possible_truncation)]
2394 let keys_len = request.keys.len() as u32;
2395 let paid_check_set: HashSet<u32> = request
2396 .paid_list_check_indices
2397 .iter()
2398 .copied()
2399 .filter(|&idx| {
2400 if idx >= keys_len {
2401 warn!(
2402 "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
2403 request.keys.len(),
2404 );
2405 false
2406 } else {
2407 true
2408 }
2409 })
2410 .collect();
2411
2412 let mut results = Vec::with_capacity(request.keys.len());
2413 for (i, key) in request.keys.iter().enumerate() {
2414 let present = storage.exists(key).unwrap_or(false);
2415 let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
2416 Some(paid_list.contains(key).unwrap_or(false))
2417 } else {
2418 None
2419 };
2420 results.push(protocol::KeyVerificationResult {
2421 key: *key,
2422 present,
2423 paid,
2424 });
2425 }
2426
2427 send_replication_response(
2428 source,
2429 p2p_node,
2430 request_id,
2431 ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
2432 rr_message_id,
2433 )
2434 .await;
2435
2436 Ok(())
2437}
2438
2439async fn handle_fetch_request(
2440 source: &PeerId,
2441 request: &protocol::FetchRequest,
2442 storage: &Arc<LmdbStorage>,
2443 p2p_node: &Arc<P2PNode>,
2444 request_id: u64,
2445 rr_message_id: Option<&str>,
2446) -> Result<()> {
2447 let response = match storage.get(&request.key).await {
2448 Ok(Some(data)) => protocol::FetchResponse::Success {
2449 key: request.key,
2450 data,
2451 },
2452 Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
2453 Err(e) => protocol::FetchResponse::Error {
2454 key: request.key,
2455 reason: format!("{e}"),
2456 },
2457 };
2458
2459 send_replication_response(
2460 source,
2461 p2p_node,
2462 request_id,
2463 ReplicationMessageBody::FetchResponse(response),
2464 rr_message_id,
2465 )
2466 .await;
2467
2468 Ok(())
2469}
2470
2471/// Responder for an incoming `AuditChallenge` (responsible-chunk audit #2, and
2472/// the prune-confirmation audit, which reuses the same wire message): reply with
2473/// per-key possession digests.
2474async fn handle_audit_challenge_msg(
2475 source: &PeerId,
2476 challenge: &protocol::AuditChallenge,
2477 storage: &Arc<LmdbStorage>,
2478 p2p_node: &Arc<P2PNode>,
2479 is_bootstrapping: bool,
2480 request_id: u64,
2481 rr_message_id: Option<&str>,
2482) -> Result<()> {
2483 #[allow(clippy::cast_possible_truncation)]
2484 let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
2485 info!(
2486 "Audit challenge received: kind=responsible keys={} bootstrapping={} request_response={}",
2487 challenge.keys.len(),
2488 is_bootstrapping,
2489 rr_message_id.is_some(),
2490 );
2491
2492 let response = audit::handle_audit_challenge(
2493 challenge,
2494 storage,
2495 p2p_node.peer_id(),
2496 is_bootstrapping,
2497 stored_chunks,
2498 )
2499 .await;
2500 let response_kind = audit_response_kind(&response);
2501
2502 let sent = send_replication_response_checked(
2503 source,
2504 p2p_node,
2505 request_id,
2506 ReplicationMessageBody::AuditResponse(response),
2507 rr_message_id,
2508 )
2509 .await;
2510 if sent {
2511 info!(
2512 "Audit challenge reply sent: kind=responsible response={} keys={} request_response={}",
2513 response_kind,
2514 challenge.keys.len(),
2515 rr_message_id.is_some(),
2516 );
2517 } else {
2518 warn!(
2519 "Audit challenge reply not sent: kind=responsible response={} keys={} request_response={}",
2520 response_kind,
2521 challenge.keys.len(),
2522 rr_message_id.is_some(),
2523 );
2524 }
2525
2526 Ok(())
2527}
2528
2529fn audit_response_kind(response: &protocol::AuditResponse) -> &'static str {
2530 match response {
2531 protocol::AuditResponse::Digests { .. } => "digests",
2532 protocol::AuditResponse::Bootstrapping { .. } => "bootstrapping",
2533 protocol::AuditResponse::Rejected { .. } => "rejected",
2534 }
2535}
2536
2537fn subtree_audit_response_kind(response: &protocol::SubtreeAuditResponse) -> &'static str {
2538 match response {
2539 protocol::SubtreeAuditResponse::Proof { .. } => "proof",
2540 protocol::SubtreeAuditResponse::Bootstrapping { .. } => "bootstrapping",
2541 protocol::SubtreeAuditResponse::Rejected { .. } => "rejected",
2542 }
2543}
2544
2545fn subtree_byte_response_kind(response: &protocol::SubtreeByteResponse) -> &'static str {
2546 match response {
2547 protocol::SubtreeByteResponse::Items { .. } => "items",
2548 protocol::SubtreeByteResponse::Bootstrapping { .. } => "bootstrapping",
2549 protocol::SubtreeByteResponse::Rejected { .. } => "rejected",
2550 }
2551}
2552
2553// ---------------------------------------------------------------------------
2554// Message sending helper
2555// ---------------------------------------------------------------------------
2556
2557/// Send a replication response message as a best-effort reply.
2558///
2559/// Encode and send failures are logged by the checked helper. Most response
2560/// paths do not need to branch on send success, so this wrapper keeps those
2561/// call sites explicit about their best-effort behavior.
2562async fn send_replication_response(
2563 peer: &PeerId,
2564 p2p_node: &Arc<P2PNode>,
2565 request_id: u64,
2566 body: ReplicationMessageBody,
2567 rr_message_id: Option<&str>,
2568) {
2569 let _ =
2570 send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
2571}
2572
2573/// Send a replication response message and report whether it was accepted.
2574///
2575/// Returns `true` after the message is encoded and accepted by the P2P send
2576/// path. Returns `false` after logging an encode or send failure. Repair-proof
2577/// recording uses this to avoid trusting hints that were not actually sent.
2578///
2579/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
2580/// request-response path so saorsa-core can route it back to the caller's
2581/// `send_request` future. Otherwise it is sent as a plain message.
2582async fn send_replication_response_checked(
2583 peer: &PeerId,
2584 p2p_node: &Arc<P2PNode>,
2585 request_id: u64,
2586 body: ReplicationMessageBody,
2587 rr_message_id: Option<&str>,
2588) -> bool {
2589 let msg = ReplicationMessage { request_id, body };
2590 let encoded = match msg.encode() {
2591 Ok(data) => data,
2592 Err(e) => {
2593 warn!("Failed to encode replication response: {e}");
2594 return false;
2595 }
2596 };
2597 let result = if let Some(msg_id) = rr_message_id {
2598 p2p_node
2599 .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
2600 .await
2601 } else {
2602 p2p_node
2603 .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
2604 .await
2605 };
2606 if let Err(e) = result {
2607 debug!("Failed to send replication response to {peer}: {e}");
2608 return false;
2609 }
2610 true
2611}
2612
2613async fn record_sent_replica_hints(
2614 peer: &PeerId,
2615 hints: &[neighbor_sync::SentReplicaHint],
2616 repair_proofs: &Arc<RwLock<RepairProofs>>,
2617 sync_cycle_epoch: &Arc<RwLock<u64>>,
2618) {
2619 if hints.is_empty() {
2620 return;
2621 }
2622
2623 let hinted_at_epoch = *sync_cycle_epoch.read().await;
2624 let mut proofs = repair_proofs.write().await;
2625 for hint in hints {
2626 if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
2627 debug!(
2628 "Recorded repair hint proof for peer {peer} and key {}",
2629 hex::encode(hint.key)
2630 );
2631 }
2632 }
2633}
2634
2635// ---------------------------------------------------------------------------
2636// Neighbor sync round
2637// ---------------------------------------------------------------------------
2638
2639/// Run one neighbor sync round.
2640#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2641async fn run_neighbor_sync_round(
2642 p2p_node: &Arc<P2PNode>,
2643 storage: &Arc<LmdbStorage>,
2644 paid_list: &Arc<PaidList>,
2645 queues: &Arc<RwLock<ReplicationQueues>>,
2646 config: &ReplicationConfig,
2647 sync_state: &Arc<RwLock<NeighborSyncState>>,
2648 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2649 sync_cycle_epoch: &Arc<RwLock<u64>>,
2650 repair_proofs: &Arc<RwLock<RepairProofs>>,
2651 is_bootstrapping: &Arc<RwLock<bool>>,
2652 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2653 commitment_state: &Arc<ResponderCommitmentState>,
2654 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
2655 ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
2656 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
2657 gossip_audit: &GossipAuditTrigger,
2658) {
2659 let self_id = *p2p_node.peer_id();
2660 let bootstrapping = *is_bootstrapping.read().await;
2661
2662 // Check if cycle is complete; start new one if needed.
2663 // We check under a read lock, then release it before the expensive
2664 // prune pass and DHT snapshot so other tasks are not starved.
2665 let cycle_complete = sync_state.read().await.is_cycle_complete();
2666 if cycle_complete {
2667 // A completed local neighbor-sync cycle advances the epoch component
2668 // of repair-proof maturity. The per-key wall-clock minimum age is
2669 // checked when audits are selected.
2670 {
2671 let mut history = sync_history.write().await;
2672 for record in history.values_mut() {
2673 record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
2674 }
2675 }
2676 let current_sync_epoch = {
2677 let mut epoch = sync_cycle_epoch.write().await;
2678 *epoch = epoch.saturating_add(1);
2679 *epoch
2680 };
2681
2682 // Post-cycle pruning (Section 11) — runs without holding sync_state.
2683 // Remote prune-confirmation audits are storage-proof audits and only
2684 // run after bootstrap has drained.
2685 let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
2686 pruning::run_prune_pass_with_context(pruning::PrunePassContext {
2687 self_id: &self_id,
2688 storage,
2689 paid_list,
2690 p2p_node,
2691 config,
2692 sync_state,
2693 repair_proofs,
2694 current_sync_epoch,
2695 #[cfg(any(test, feature = "test-utils"))]
2696 repair_proof_now: None,
2697 allow_remote_prune_audits,
2698 commitment_state: Some(commitment_state),
2699 })
2700 .await;
2701
2702 // Take fresh close-neighbor snapshot (DHT query, no lock held).
2703 let neighbors =
2704 neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
2705 .await;
2706
2707 // Now re-acquire write lock and re-check before swapping cycle.
2708 let mut state = sync_state.write().await;
2709 if state.is_cycle_complete() {
2710 // Preserve cooldown and bootstrap-claim tracking across cycles.
2711 // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
2712 // would reset the abuse detection timer every cycle.
2713 let old_sync_times = std::mem::take(&mut state.last_sync_times);
2714 let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
2715 let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
2716 let old_prune_cursor = state.prune_cursor;
2717 *state = NeighborSyncState::new_cycle(neighbors);
2718 state.last_sync_times = old_sync_times;
2719 state.bootstrap_claims = old_bootstrap_claims;
2720 state.bootstrap_claim_history = old_bootstrap_claim_history;
2721 state.prune_cursor = old_prune_cursor;
2722 }
2723 }
2724
2725 // Select batch of peers.
2726 let batch = {
2727 let mut state = sync_state.write().await;
2728 neighbor_sync::select_sync_batch(
2729 &mut state,
2730 config.neighbor_sync_peer_count,
2731 config.neighbor_sync_cooldown,
2732 )
2733 };
2734
2735 if batch.is_empty() {
2736 return;
2737 }
2738
2739 debug!("Neighbor sync: syncing with {} peers", batch.len());
2740
2741 // Snapshot our current commitment once per round so all peers in
2742 // this batch see the same thing (gossip is the responder's attestation;
2743 // same value across the batch is fine and reduces RwLock churn). Atomically
2744 // snapshot + mark-gossiped so we stay answerable for exactly what we emit
2745 // (ADR-0002 retention), with no TOCTOU vs a concurrent retire/rotate.
2746 let my_commitment = commitment_state
2747 .current_for_gossip()
2748 .map(|b| b.commitment().clone());
2749
2750 let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
2751 &batch,
2752 storage,
2753 paid_list,
2754 p2p_node,
2755 config.close_group_size,
2756 config.paid_list_close_group_size,
2757 )
2758 .await;
2759
2760 // Sync with each peer in the batch.
2761 for peer in &batch {
2762 let hints = hints_by_peer.remove(peer).unwrap_or_default();
2763 let outcome = neighbor_sync::sync_with_peer_with_hints(
2764 peer,
2765 p2p_node,
2766 config,
2767 bootstrapping,
2768 hints,
2769 my_commitment.clone(),
2770 )
2771 .await;
2772
2773 if let Some(outcome) = outcome {
2774 handle_sync_response(
2775 &self_id,
2776 peer,
2777 &outcome.response,
2778 &outcome.sent_replica_hints,
2779 p2p_node,
2780 config,
2781 bootstrapping,
2782 bootstrap_state,
2783 storage,
2784 paid_list,
2785 queues,
2786 sync_state,
2787 sync_history,
2788 sync_cycle_epoch,
2789 repair_proofs,
2790 last_commitment_by_peer,
2791 ever_capable_peers,
2792 sig_verify_attempts,
2793 gossip_audit,
2794 )
2795 .await;
2796 } else {
2797 // Sync failed -- remove peer and try to fill slot.
2798 let replacement = {
2799 let mut state = sync_state.write().await;
2800 neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
2801 };
2802
2803 // Attempt sync with the replacement peer (if one was found).
2804 if let Some(replacement_peer) = replacement {
2805 let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers(
2806 std::slice::from_ref(&replacement_peer),
2807 storage,
2808 paid_list,
2809 p2p_node,
2810 config.close_group_size,
2811 config.paid_list_close_group_size,
2812 )
2813 .await;
2814 let hints = replacement_hints
2815 .remove(&replacement_peer)
2816 .unwrap_or_default();
2817 let replacement_outcome = neighbor_sync::sync_with_peer_with_hints(
2818 &replacement_peer,
2819 p2p_node,
2820 config,
2821 bootstrapping,
2822 hints,
2823 my_commitment.clone(),
2824 )
2825 .await;
2826
2827 if let Some(outcome) = replacement_outcome {
2828 handle_sync_response(
2829 &self_id,
2830 &replacement_peer,
2831 &outcome.response,
2832 &outcome.sent_replica_hints,
2833 p2p_node,
2834 config,
2835 bootstrapping,
2836 bootstrap_state,
2837 storage,
2838 paid_list,
2839 queues,
2840 sync_state,
2841 sync_history,
2842 sync_cycle_epoch,
2843 repair_proofs,
2844 last_commitment_by_peer,
2845 ever_capable_peers,
2846 sig_verify_attempts,
2847 gossip_audit,
2848 )
2849 .await;
2850 }
2851 }
2852 }
2853 }
2854}
2855
2856/// Process a successful neighbor sync response: record the sync, check for
2857/// bootstrap claim abuse, and admit inbound hints.
2858#[allow(clippy::too_many_arguments)]
2859async fn handle_sync_response(
2860 self_id: &PeerId,
2861 peer: &PeerId,
2862 resp: &NeighborSyncResponse,
2863 sent_replica_hints: &[neighbor_sync::SentReplicaHint],
2864 p2p_node: &Arc<P2PNode>,
2865 config: &ReplicationConfig,
2866 bootstrapping: bool,
2867 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2868 storage: &Arc<LmdbStorage>,
2869 paid_list: &Arc<PaidList>,
2870 queues: &Arc<RwLock<ReplicationQueues>>,
2871 sync_state: &Arc<RwLock<NeighborSyncState>>,
2872 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
2873 sync_cycle_epoch: &Arc<RwLock<u64>>,
2874 repair_proofs: &Arc<RwLock<RepairProofs>>,
2875 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
2876 ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
2877 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
2878 gossip_audit: &GossipAuditTrigger,
2879) {
2880 // Ingest the peer's commitment if they piggybacked one on the response.
2881 // Same verification as the request path (peer-id binding + signature);
2882 // forged commitments are dropped at the edge. A *changed* commitment here
2883 // is a gossip-audit trigger just like on the request path — so a peer that
2884 // only ever answers sync (never initiates) is still audited (ADR-0002).
2885 if let Some(target) = ingest_peer_commitment(
2886 peer,
2887 resp.commitment.as_ref(),
2888 p2p_node,
2889 last_commitment_by_peer,
2890 ever_capable_peers,
2891 sig_verify_attempts,
2892 )
2893 .await
2894 {
2895 maybe_trigger_gossip_audit(gossip_audit, peer, target).await;
2896 }
2897
2898 // Record successful sync.
2899 {
2900 let mut state = sync_state.write().await;
2901 neighbor_sync::record_successful_sync(&mut state, peer);
2902 }
2903 {
2904 let mut history = sync_history.write().await;
2905 let record = history.entry(*peer).or_insert(PeerSyncRecord {
2906 last_sync: None,
2907 cycles_since_sync: 0,
2908 });
2909 record.last_sync = Some(Instant::now());
2910 record.cycles_since_sync = 0;
2911 }
2912
2913 // Process inbound hints from response (skip if peer is bootstrapping).
2914 if resp.bootstrapping {
2915 // Gap 6: BootstrapClaimAbuse grace period enforcement.
2916 // Separate state mutation from network I/O to avoid holding the
2917 // write lock across report_trust_event.
2918 let should_report = {
2919 let now = Instant::now();
2920 let mut state = sync_state.write().await;
2921 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
2922 BootstrapClaimObservation::WithinGrace { .. } => false,
2923 BootstrapClaimObservation::PastGrace { first_seen } => {
2924 warn!(
2925 "Peer {peer} has been claiming bootstrap for {:?}, \
2926 exceeding grace period of {:?} — reporting abuse",
2927 now.duration_since(first_seen),
2928 config.bootstrap_claim_grace_period,
2929 );
2930 true
2931 }
2932 BootstrapClaimObservation::Repeated { first_seen } => {
2933 warn!(
2934 "Peer {peer} repeated bootstrap claim after previously stopping; \
2935 first claim was {:?} ago — reporting abuse",
2936 now.duration_since(first_seen),
2937 );
2938 true
2939 }
2940 }
2941 };
2942 if should_report {
2943 p2p_node
2944 .report_trust_event(
2945 peer,
2946 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2947 )
2948 .await;
2949 }
2950 } else {
2951 // Peer is not claiming bootstrap; clear active claim while retaining
2952 // history so the peer cannot start a second grace window later.
2953 {
2954 let mut state = sync_state.write().await;
2955 state.clear_active_bootstrap_claim(peer);
2956 }
2957 record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
2958 let outcome = admit_and_queue_hints(
2959 self_id,
2960 peer,
2961 &resp.replica_hints,
2962 &resp.paid_hints,
2963 p2p_node,
2964 config,
2965 storage,
2966 paid_list,
2967 queues,
2968 )
2969 .await;
2970
2971 // Track discovered keys for bootstrap drain detection so that hints
2972 // admitted via regular neighbor sync are not missed. Capacity-
2973 // rejected hints keep this source on the "not yet drained" list
2974 // until its next sync replays them; a clean cycle clears it.
2975 if bootstrapping {
2976 if !outcome.discovered.is_empty() {
2977 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2978 }
2979 if outcome.capacity_rejected_count > 0 {
2980 bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
2981 } else {
2982 bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
2983 }
2984 }
2985 }
2986}
2987
2988/// Admit hints and queue them for verification, returning newly-discovered keys.
2989///
2990/// Shared by neighbor-sync request handling, response handling, and bootstrap
2991/// sync so that admission + queueing logic lives in one place.
2992#[allow(clippy::too_many_arguments)]
2993/// Outcome of [`admit_and_queue_hints`].
2994///
2995/// `capacity_rejected_count` is non-zero when one or more legitimately
2996/// admissible hints were dropped because `pending_verify`'s global or
2997/// per-source bound was hit. Callers that care about completeness
2998/// (bootstrap drain accounting) MUST NOT treat their work as complete while
2999/// this is > 0 — the source will need to re-hint after capacity frees up.
3000struct AdmissionOutcome {
3001 discovered: HashSet<XorName>,
3002 capacity_rejected_count: usize,
3003}
3004
3005#[allow(clippy::too_many_arguments)]
3006async fn admit_and_queue_hints(
3007 self_id: &PeerId,
3008 source_peer: &PeerId,
3009 replica_hints: &[XorName],
3010 paid_hints: &[XorName],
3011 p2p_node: &Arc<P2PNode>,
3012 config: &ReplicationConfig,
3013 storage: &Arc<LmdbStorage>,
3014 paid_list: &Arc<PaidList>,
3015 queues: &Arc<RwLock<ReplicationQueues>>,
3016) -> AdmissionOutcome {
3017 let pending_keys: HashSet<XorName> = {
3018 let q = queues.read().await;
3019 q.pending_keys().into_iter().collect()
3020 };
3021
3022 let admitted = admission::admit_hints(
3023 self_id,
3024 replica_hints,
3025 paid_hints,
3026 p2p_node,
3027 config,
3028 storage,
3029 paid_list,
3030 &pending_keys,
3031 )
3032 .await;
3033
3034 let mut discovered = HashSet::new();
3035 let mut capacity_rejected_count: usize = 0;
3036 let mut q = queues.write().await;
3037 let now = Instant::now();
3038
3039 for key in admitted.replica_keys {
3040 if !storage.exists(&key).unwrap_or(false) {
3041 let result = q.add_pending_verify(
3042 key,
3043 VerificationEntry {
3044 state: VerificationState::PendingVerify,
3045 pipeline: HintPipeline::Replica,
3046 verified_sources: Vec::new(),
3047 tried_sources: HashSet::new(),
3048 created_at: now,
3049 hint_sender: *source_peer,
3050 },
3051 );
3052 match result {
3053 crate::replication::scheduling::AdmissionResult::Admitted => {
3054 discovered.insert(key);
3055 }
3056 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
3057 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
3058 capacity_rejected_count += 1;
3059 }
3060 }
3061 }
3062 }
3063
3064 for key in admitted.paid_only_keys {
3065 let result = q.add_pending_verify(
3066 key,
3067 VerificationEntry {
3068 state: VerificationState::PendingVerify,
3069 pipeline: HintPipeline::PaidOnly,
3070 verified_sources: Vec::new(),
3071 tried_sources: HashSet::new(),
3072 created_at: now,
3073 hint_sender: *source_peer,
3074 },
3075 );
3076 match result {
3077 crate::replication::scheduling::AdmissionResult::Admitted => {
3078 discovered.insert(key);
3079 }
3080 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
3081 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
3082 capacity_rejected_count += 1;
3083 }
3084 }
3085 }
3086
3087 if capacity_rejected_count > 0 {
3088 debug!(
3089 "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
3090 rejected at queue capacity; source will need to re-hint after pending_verify drains"
3091 );
3092 }
3093
3094 AdmissionOutcome {
3095 discovered,
3096 capacity_rejected_count,
3097 }
3098}
3099
3100// ---------------------------------------------------------------------------
3101// Verification cycle
3102// ---------------------------------------------------------------------------
3103
3104/// Run one verification cycle: process pending keys through quorum checks.
3105#[allow(clippy::too_many_lines)]
3106async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
3107 let cycle_started = Instant::now();
3108 let VerificationCycleContext {
3109 p2p_node,
3110 paid_list,
3111 storage,
3112 queues,
3113 config,
3114 bootstrap_state,
3115 is_bootstrapping,
3116 bootstrap_complete_notify,
3117 last_commitment_by_peer,
3118 ever_capable_peers,
3119 recent_provers,
3120 } = ctx;
3121
3122 // Evict stale entries that have been pending too long (e.g. unreachable
3123 // verification targets during a network partition).
3124 {
3125 let mut q = queues.write().await;
3126 q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
3127 }
3128
3129 let pending_keys = {
3130 let q = queues.read().await;
3131 q.pending_keys()
3132 };
3133
3134 if pending_keys.is_empty() {
3135 return;
3136 }
3137 let initial_pending_count = pending_keys.len();
3138
3139 let self_id = *p2p_node.peer_id();
3140
3141 // Step 1: Check local PaidForList for fast-path authorization (Section 9,
3142 // step 4).
3143 let mut local_paid_presence_probe_keys = Vec::new();
3144 let mut local_paid_paid_only_keys = Vec::new();
3145 let mut keys_needing_network = Vec::new();
3146 let mut terminal_keys: Vec<XorName> = Vec::new();
3147 {
3148 let mut q = queues.write().await;
3149 for key in &pending_keys {
3150 if paid_list.contains(key).unwrap_or(false) {
3151 if let Some(pipeline) =
3152 q.set_pending_state(key, VerificationState::PaidListVerified)
3153 {
3154 match pipeline {
3155 HintPipeline::PaidOnly => {
3156 // Paid-only + local paid state needs one more
3157 // storage-admission check outside this lock: if we
3158 // are also in the close group plus storage margin,
3159 // the hint can repair a missing replica.
3160 local_paid_paid_only_keys.push(*key);
3161 }
3162 HintPipeline::Replica => {
3163 // Local paid-list membership authorizes the key.
3164 // We still need a presence probe to discover fetch
3165 // sources, but we must not require remote paid
3166 // majority or presence quorum.
3167 local_paid_presence_probe_keys.push(*key);
3168 }
3169 }
3170 }
3171 } else {
3172 keys_needing_network.push(*key);
3173 }
3174 }
3175 }
3176
3177 if !local_paid_paid_only_keys.is_empty() {
3178 let mut terminal_paid_only = Vec::new();
3179 for key in local_paid_paid_only_keys {
3180 if storage.exists(&key).unwrap_or(false) {
3181 terminal_paid_only.push(key);
3182 } else if admission::is_responsible(
3183 &self_id,
3184 &key,
3185 p2p_node,
3186 storage_admission_width(config.close_group_size),
3187 )
3188 .await
3189 {
3190 local_paid_presence_probe_keys.push(key);
3191 } else {
3192 terminal_paid_only.push(key);
3193 }
3194 }
3195
3196 if !terminal_paid_only.is_empty() {
3197 let mut q = queues.write().await;
3198 for key in terminal_paid_only {
3199 q.remove_pending(&key);
3200 terminal_keys.push(key);
3201 }
3202 }
3203 }
3204
3205 let local_paid_probe_count = local_paid_presence_probe_keys.len();
3206 let keys_needing_network_count = keys_needing_network.len();
3207
3208 // Step 1b: Local paid-list hit for fetch-eligible keys. Per Section 9
3209 // step 4, authorization succeeds immediately; run a presence-only probe
3210 // to find any holder we can fetch from.
3211 if !local_paid_presence_probe_keys.is_empty() {
3212 let targets = quorum::compute_presence_targets(
3213 &local_paid_presence_probe_keys,
3214 p2p_node,
3215 config,
3216 &self_id,
3217 )
3218 .await;
3219 let evidence = quorum::run_verification_round(
3220 &local_paid_presence_probe_keys,
3221 &targets,
3222 p2p_node,
3223 config,
3224 )
3225 .await;
3226
3227 let mut q = queues.write().await;
3228 for key in local_paid_presence_probe_keys {
3229 if storage.exists(&key).unwrap_or(false) {
3230 q.remove_pending(&key);
3231 terminal_keys.push(key);
3232 continue;
3233 }
3234 let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
3235 quorum::present_sources_for_key(&key, ev, &targets)
3236 });
3237 if sources.is_empty() {
3238 // Terminal failure: remove pending and report. No fetch path.
3239 q.remove_pending(&key);
3240 warn!(
3241 "Locally paid key {} has no responding holders (possible data loss)",
3242 hex::encode(key)
3243 );
3244 terminal_keys.push(key);
3245 } else {
3246 let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
3247 // Atomic remove+enqueue: if fetch_queue is at capacity, the
3248 // pending entry is preserved and retried next cycle (no
3249 // silent drop of verified replica-repair work).
3250 let _ = q.promote_pending_to_fetch(key, distance, sources);
3251 }
3252 }
3253 }
3254
3255 // Steps 2-5: Network verification (skipped if all keys resolved locally).
3256 if !keys_needing_network.is_empty() {
3257 // Step 2: Compute targets and run network verification round.
3258 let targets =
3259 quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
3260 .await;
3261
3262 let evidence =
3263 quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
3264
3265 // Step 3: Evaluate results — collect outcomes without holding the write
3266 // lock across paid-list I/O.
3267 //
3268 // v12 §6 holder-eligibility: snapshot the per-peer last-commitment
3269 // table and recent_provers cache up front so the synchronous
3270 // evaluate_key_evidence_with_holder_check predicate can consult
3271 // them without awaiting. The predicate downgrades a Present
3272 // claim to Unresolved unless the peer is credited for that key.
3273 // Snapshot per-peer commitment data. We need two views:
3274 // - `commitment_by_peer_snapshot`: peers that currently have
3275 // a verified commitment record on file (used to look up
3276 // their current hash).
3277 // - `capable_peer_snapshot`: the sticky "ever v12-capable"
3278 // set. Sourced from a separate set rather than the
3279 // commitment map so eviction (PeerRemoved cleanup, sybil
3280 // cap at `MAX_LAST_COMMITMENT_BY_PEER`) does NOT downgrade
3281 // a previously-v12 peer to "legacy" credit-unconditionally.
3282 // Legacy / pre-v12 peers that have never sent a commitment
3283 // remain absent from the set and are credited via the
3284 // legacy path so mixed-version networks stay live.
3285 let commitment_by_peer_snapshot: HashMap<PeerId, [u8; 32]> = {
3286 let map = last_commitment_by_peer.read().await;
3287 map.iter()
3288 // Read the CACHED hash (§13) — no per-cycle re-serialize/re-hash
3289 // of every peer's ~5 KiB commitment.
3290 .filter_map(|(p, rec)| rec.commitment_hash().map(|h| (*p, h)))
3291 .collect()
3292 };
3293 let capable_peer_snapshot: HashSet<PeerId> = ever_capable_peers.read().await.clone();
3294 // Take a full snapshot of recent_provers under the read lock,
3295 // then release. The cache is bounded (16/key × keys), so the
3296 // clone is cheap.
3297 let provers_snapshot = recent_provers.read().await.clone();
3298 // For the replica-fetch path, we need to know whether THIS
3299 // node already holds the key being verified. The v12 §6
3300 // holder-credit gate is meant to prevent uncredited Present
3301 // claims from contributing to paid-list / reward quorum for
3302 // keys we DO hold (and could audit ourselves). For keys we
3303 // are trying to FETCH (i.e. not in local storage), there is
3304 // no possible local audit credit, and gating the presence
3305 // quorum on credit would deadlock replica-repair in a
3306 // fully v12-capable close group.
3307 let mut locally_held: HashSet<XorName> = HashSet::new();
3308 for key in &keys_needing_network {
3309 if storage.exists(key).unwrap_or(false) {
3310 locally_held.insert(*key);
3311 }
3312 }
3313 let holder_credit = |peer: &PeerId, key: &XorName| -> bool {
3314 if !locally_held.contains(key) {
3315 // Replica-fetch path: we don't hold this key, so we
3316 // cannot have collected audit credit for it. Trust
3317 // Present claims to drive fetch-source promotion;
3318 // chunk-PUT payment_verifier is the security backstop
3319 // when the bytes actually arrive.
3320 return true;
3321 }
3322 if !capable_peer_snapshot.contains(peer) {
3323 // Pre-v12 / legacy peer that has never gossiped a
3324 // commitment. The v12 §6 holder-eligibility check
3325 // doesn't apply: their Present evidence comes through
3326 // the legacy path and we credit it unconditionally
3327 // so a mixed-version network stays live during
3328 // transition.
3329 return true;
3330 }
3331 let Some(hash) = commitment_by_peer_snapshot.get(peer) else {
3332 // Peer is commitment_capable (sticky) but currently
3333 // has no live commitment record on file (e.g. their
3334 // last gossip was evicted from the LRU cache, or it
3335 // failed verification). Withhold credit until they
3336 // re-prove storage under a fresh commitment.
3337 return false;
3338 };
3339 provers_snapshot.is_credited_holder(key, peer, hash)
3340 };
3341
3342 let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
3343 {
3344 let q = queues.read().await;
3345 for key in &keys_needing_network {
3346 let Some(ev) = evidence.get(key) else {
3347 continue;
3348 };
3349 let Some(entry) = q.get_pending(key) else {
3350 continue;
3351 };
3352 let outcome = quorum::evaluate_key_evidence_with_holder_check(
3353 key,
3354 ev,
3355 &targets,
3356 config,
3357 holder_credit,
3358 );
3359 evaluated.push((*key, outcome, entry.pipeline));
3360 }
3361 } // read lock released
3362
3363 // Step 4: Insert verified keys into PaidForList (no lock held).
3364 let mut paid_insert_keys: Vec<XorName> = Vec::new();
3365 for (key, outcome, _) in &evaluated {
3366 if matches!(
3367 outcome,
3368 KeyVerificationOutcome::QuorumVerified { .. }
3369 | KeyVerificationOutcome::PaidListVerified { .. }
3370 ) {
3371 paid_insert_keys.push(*key);
3372 }
3373 }
3374 for key in &paid_insert_keys {
3375 if let Err(e) = paid_list.insert(key).await {
3376 warn!("Failed to add verified key to PaidForList: {e}");
3377 }
3378 }
3379
3380 // Paid-only hints normally update PaidForList only. If this node is
3381 // also within the storage-admission group for the key, a verified
3382 // paid-only hint can safely repair a missing replica using sources
3383 // from the same verification round.
3384 let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
3385 for (key, outcome, pipeline) in &evaluated {
3386 if *pipeline == HintPipeline::PaidOnly
3387 && matches!(
3388 outcome,
3389 KeyVerificationOutcome::QuorumVerified { .. }
3390 | KeyVerificationOutcome::PaidListVerified { .. }
3391 )
3392 && !storage.exists(key).unwrap_or(false)
3393 && admission::is_responsible(
3394 &self_id,
3395 key,
3396 p2p_node,
3397 storage_admission_width(config.close_group_size),
3398 )
3399 .await
3400 {
3401 paid_only_fetch_keys.insert(*key);
3402 }
3403 }
3404
3405 // Step 5: Update queues with the evaluated outcomes.
3406 let mut q = queues.write().await;
3407 for (key, outcome, pipeline) in evaluated {
3408 match outcome {
3409 KeyVerificationOutcome::QuorumVerified { sources }
3410 | KeyVerificationOutcome::PaidListVerified { sources } => {
3411 let fetch_eligible =
3412 pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
3413 if fetch_eligible && !sources.is_empty() {
3414 let distance =
3415 crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
3416 // Atomic remove+enqueue: on fetch_queue capacity miss
3417 // the pending entry is preserved so this verified key
3418 // is retried on the next cycle (no silent drop).
3419 let _ = q.promote_pending_to_fetch(key, distance, sources);
3420 // Not terminal — either moved to fetch queue, or
3421 // retained as pending until queue drains.
3422 } else if fetch_eligible && sources.is_empty() {
3423 warn!(
3424 "Verified storage-admitted key {} has no holders (possible data loss)",
3425 hex::encode(key)
3426 );
3427 q.remove_pending(&key);
3428 terminal_keys.push(key);
3429 } else {
3430 q.remove_pending(&key);
3431 terminal_keys.push(key);
3432 }
3433 }
3434 KeyVerificationOutcome::QuorumFailed
3435 | KeyVerificationOutcome::QuorumInconclusive => {
3436 q.remove_pending(&key);
3437 terminal_keys.push(key);
3438 }
3439 }
3440 }
3441 }
3442
3443 // Step 6: Remove terminal keys from bootstrap pending set and re-check
3444 // the drain condition.
3445 update_bootstrap_after_verification(
3446 &terminal_keys,
3447 bootstrap_state,
3448 queues,
3449 is_bootstrapping,
3450 bootstrap_complete_notify,
3451 )
3452 .await;
3453
3454 let (pending_after, fetch_after, in_flight_after) = {
3455 let q = queues.read().await;
3456 (
3457 q.pending_count(),
3458 q.fetch_queue_count(),
3459 q.in_flight_count(),
3460 )
3461 };
3462 let terminal_key_count = terminal_keys.len();
3463 let elapsed_ms = cycle_started.elapsed().as_millis();
3464
3465 if elapsed_ms >= VERIFICATION_CYCLE_SLOW_LOG_MS {
3466 info!(
3467 target: "ant_node::replication::verification",
3468 "Slow replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
3469 );
3470 } else {
3471 debug!(
3472 target: "ant_node::replication::verification",
3473 "Replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
3474 );
3475 }
3476}
3477
3478/// Post-verification bootstrap bookkeeping: remove terminal keys from the
3479/// bootstrap pending set and transition out of bootstrapping when drained.
3480async fn update_bootstrap_after_verification(
3481 terminal_keys: &[XorName],
3482 bootstrap_state: &Arc<RwLock<BootstrapState>>,
3483 queues: &Arc<RwLock<ReplicationQueues>>,
3484 is_bootstrapping: &Arc<RwLock<bool>>,
3485 bootstrap_complete_notify: &Arc<Notify>,
3486) {
3487 if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
3488 return;
3489 }
3490 {
3491 let mut bs = bootstrap_state.write().await;
3492 for key in terminal_keys {
3493 bs.remove_key(key);
3494 }
3495 }
3496 let q = queues.read().await;
3497 if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
3498 complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
3499 }
3500}
3501
3502/// Set `is_bootstrapping` to `false` and wake all waiters.
3503async fn complete_bootstrap(
3504 is_bootstrapping: &Arc<RwLock<bool>>,
3505 bootstrap_complete_notify: &Arc<Notify>,
3506) {
3507 *is_bootstrapping.write().await = false;
3508 bootstrap_complete_notify.notify_waiters();
3509 info!("Replication bootstrap complete");
3510}
3511
3512// ---------------------------------------------------------------------------
3513// Fetch types and single-fetch executor
3514// ---------------------------------------------------------------------------
3515
3516/// Result classification for a single fetch attempt.
3517enum FetchResult {
3518 /// Data fetched, integrity-checked, and stored successfully.
3519 Stored,
3520 /// Content-address integrity check failed — do not retry.
3521 IntegrityFailed,
3522 /// Source failed (network error or non-success response) — retryable.
3523 SourceFailed,
3524}
3525
3526/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
3527/// worker loop to update queue state.
3528struct FetchOutcome {
3529 key: XorName,
3530 result: FetchResult,
3531}
3532
3533#[allow(clippy::too_many_lines)]
3534/// Execute a single fetch request against `source` for `key`.
3535///
3536/// Handles encoding, network I/O, integrity checking, storage, and trust
3537/// event reporting. Returns a [`FetchOutcome`] so the caller can update
3538/// queue state without holding any locks during the network round-trip.
3539async fn execute_single_fetch(
3540 p2p_node: Arc<P2PNode>,
3541 storage: Arc<LmdbStorage>,
3542 config: Arc<ReplicationConfig>,
3543 key: XorName,
3544 source: PeerId,
3545) -> FetchOutcome {
3546 let request = protocol::FetchRequest { key };
3547 let msg = ReplicationMessage {
3548 request_id: rand::thread_rng().gen::<u64>(),
3549 body: ReplicationMessageBody::FetchRequest(request),
3550 };
3551
3552 let encoded = match msg.encode() {
3553 Ok(data) => data,
3554 Err(e) => {
3555 warn!("Failed to encode fetch request: {e}");
3556 return FetchOutcome {
3557 key,
3558 result: FetchResult::SourceFailed,
3559 };
3560 }
3561 };
3562
3563 let result = p2p_node
3564 .send_request(
3565 &source,
3566 REPLICATION_PROTOCOL_ID,
3567 encoded,
3568 config.fetch_request_timeout,
3569 )
3570 .await;
3571
3572 match result {
3573 Ok(response) => {
3574 let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
3575 p2p_node
3576 .report_trust_event(
3577 &source,
3578 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3579 )
3580 .await;
3581 return FetchOutcome {
3582 key,
3583 result: FetchResult::SourceFailed,
3584 };
3585 };
3586
3587 match resp_msg.body {
3588 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
3589 key: resp_key,
3590 data,
3591 }) => {
3592 // Validate the response key matches the requested key.
3593 // A malicious peer could serve valid data for a different
3594 // key, passing integrity checks while the requested key
3595 // is falsely marked as fetched.
3596 if resp_key != key {
3597 warn!(
3598 "Fetch response key mismatch: requested {}, got {}",
3599 hex::encode(key),
3600 hex::encode(resp_key)
3601 );
3602 p2p_node
3603 .report_trust_event(
3604 &source,
3605 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3606 )
3607 .await;
3608 return FetchOutcome {
3609 key,
3610 result: FetchResult::IntegrityFailed,
3611 };
3612 }
3613
3614 // Enforce chunk size invariant on fetched data.
3615 // Checked before the content-address hash to avoid
3616 // hashing up to 10 MiB of oversized junk data.
3617 if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
3618 warn!(
3619 "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
3620 hex::encode(resp_key),
3621 data.len(),
3622 crate::ant_protocol::MAX_CHUNK_SIZE,
3623 );
3624 p2p_node
3625 .report_trust_event(
3626 &source,
3627 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3628 )
3629 .await;
3630 return FetchOutcome {
3631 key,
3632 result: FetchResult::IntegrityFailed,
3633 };
3634 }
3635
3636 // Content-address integrity check.
3637 let computed = crate::client::compute_address(&data);
3638 if computed != resp_key {
3639 warn!(
3640 "Fetched record integrity check failed: expected {}, got {}",
3641 hex::encode(resp_key),
3642 hex::encode(computed)
3643 );
3644 p2p_node
3645 .report_trust_event(
3646 &source,
3647 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3648 )
3649 .await;
3650 return FetchOutcome {
3651 key,
3652 result: FetchResult::IntegrityFailed,
3653 };
3654 }
3655
3656 if let Err(e) = storage.put(&resp_key, &data).await {
3657 warn!(
3658 "Failed to store fetched record {}: {e}",
3659 hex::encode(resp_key)
3660 );
3661 return FetchOutcome {
3662 key,
3663 result: FetchResult::SourceFailed,
3664 };
3665 }
3666
3667 FetchOutcome {
3668 key,
3669 result: FetchResult::Stored,
3670 }
3671 }
3672 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
3673 ..
3674 }) => {
3675 // This peer was selected as a fetch source because it
3676 // recently answered `Present` during verification. A
3677 // subsequent NotFound is evidence of a stale/false claim
3678 // or chunk wiping, so penalize lightly and try another
3679 // verified source.
3680 warn!(
3681 "Fetch: verified source {source} returned NotFound for {}",
3682 hex::encode(key)
3683 );
3684 p2p_node
3685 .report_trust_event(
3686 &source,
3687 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3688 )
3689 .await;
3690 FetchOutcome {
3691 key,
3692 result: FetchResult::SourceFailed,
3693 }
3694 }
3695 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
3696 reason,
3697 ..
3698 }) => {
3699 warn!(
3700 "Fetch: peer {source} returned error for {}: {reason}",
3701 hex::encode(key)
3702 );
3703 p2p_node
3704 .report_trust_event(
3705 &source,
3706 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3707 )
3708 .await;
3709 FetchOutcome {
3710 key,
3711 result: FetchResult::SourceFailed,
3712 }
3713 }
3714 _ => {
3715 // Unexpected message type — treat as malformed.
3716 p2p_node
3717 .report_trust_event(
3718 &source,
3719 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3720 )
3721 .await;
3722 FetchOutcome {
3723 key,
3724 result: FetchResult::SourceFailed,
3725 }
3726 }
3727 }
3728 }
3729 Err(e) => {
3730 debug!("Fetch request to {source} failed: {e}");
3731 // No ApplicationFailure here — P2PNode::send_request() already
3732 // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
3733 FetchOutcome {
3734 key,
3735 result: FetchResult::SourceFailed,
3736 }
3737 }
3738 }
3739}
3740
3741// ---------------------------------------------------------------------------
3742// Audit result handler
3743// ---------------------------------------------------------------------------
3744
3745/// Format the first confirmed-failed key as a 16-hex-char label.
3746///
3747/// Pairs with `challenged_peer` to form a stable cross-host correlation
3748/// handle in the audit-failure log line, e.g.
3749///
3750/// ```text
3751/// Audit failure for <peer>: …, `first_failed_key=0x18878f1d2d9e0612`
3752/// ```
3753///
3754/// Falls back to `"0x"` when the list is empty so the log line never
3755/// contains a misleading default.
3756fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String {
3757 confirmed_failed_keys.first().map_or_else(
3758 || "0x".to_string(),
3759 |k| format!("0x{}", hex::encode(&k[..8])),
3760 )
3761}
3762
3763/// Execute the side effects for a subtree storage-commitment audit failure.
3764///
3765/// Subtree timeouts are fully graced: the multi-round, multi-chunk challenge can
3766/// legitimately time out on slow or loaded honest peers, so it never touches the
3767/// responsible-chunk audit path or its timeout accounting. Confirmed subtree
3768/// failures still penalise immediately and revoke holder credit.
3769async fn handle_subtree_failed_audit(
3770 challenged_peer: &PeerId,
3771 confirmed_failed_key_count: usize,
3772 reason: &AuditFailureReason,
3773 p2p_node: &Arc<P2PNode>,
3774 sync_state: &Arc<RwLock<NeighborSyncState>>,
3775 recent_provers: &Arc<RwLock<RecentProvers>>,
3776) {
3777 if matches!(reason, AuditFailureReason::Timeout) {
3778 debug!(
3779 "Audit timeout for {challenged_peer} fully graced \
3780 (subtree audit does not evict on timeout)"
3781 );
3782 return;
3783 }
3784
3785 // The caller already logged the rich failure line with reason + per-category
3786 // summary; avoid a redundant second error log here.
3787 let _ = confirmed_failed_key_count;
3788 {
3789 let mut state = sync_state.write().await;
3790 state.clear_active_bootstrap_claim(challenged_peer);
3791 }
3792 {
3793 let mut provers_guard = recent_provers.write().await;
3794 apply_audit_failure_credit_revocation(&mut provers_guard, challenged_peer, reason);
3795 }
3796 p2p_node
3797 .report_trust_event(
3798 challenged_peer,
3799 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
3800 )
3801 .await;
3802}
3803
3804/// Handle audit result: log findings and emit trust events.
3805async fn handle_subtree_audit_result(
3806 result: &AuditTickResult,
3807 p2p_node: &Arc<P2PNode>,
3808 sync_state: &Arc<RwLock<NeighborSyncState>>,
3809 recent_provers: &Arc<RwLock<RecentProvers>>,
3810 config: &ReplicationConfig,
3811) {
3812 match result {
3813 AuditTickResult::Passed {
3814 challenged_peer,
3815 keys_checked,
3816 } => {
3817 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
3818 // Peer responded normally — clear the active bootstrap claim while
3819 // retaining history so a later claim is treated as repeated abuse.
3820 {
3821 let mut state = sync_state.write().await;
3822 state.clear_active_bootstrap_claim(challenged_peer);
3823 }
3824 p2p_node
3825 .report_trust_event(
3826 challenged_peer,
3827 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
3828 )
3829 .await;
3830 }
3831 AuditTickResult::Failed { evidence } => {
3832 if let FailureEvidence::AuditFailure {
3833 challenged_peer,
3834 confirmed_failed_keys,
3835 summary,
3836 reason,
3837 ..
3838 } = evidence
3839 {
3840 // Rich diagnostics (from main's audit-failure logging) + the
3841 // first-failed-key correlation handle.
3842 let first_failed_key = first_failed_key_label(confirmed_failed_keys);
3843 error!(
3844 "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
3845 confirmed_failed_keys.len(),
3846 summary.challenged_keys,
3847 summary.absent_keys,
3848 summary.digest_mismatch_keys,
3849 );
3850 // Route the side effects through the subtree-only failure path.
3851 // Responsible-chunk `AuditChallenge` handling intentionally uses
3852 // its own old immediate-penalty handler below.
3853 handle_subtree_failed_audit(
3854 challenged_peer,
3855 confirmed_failed_keys.len(),
3856 reason,
3857 p2p_node,
3858 sync_state,
3859 recent_provers,
3860 )
3861 .await;
3862 }
3863 }
3864 AuditTickResult::BootstrapClaim { peer } => {
3865 // Gap 6: BootstrapClaimAbuse grace period in audit path.
3866 // Separate state mutation from network I/O to avoid holding the
3867 // write lock across report_trust_event.
3868 let should_report = {
3869 let now = Instant::now();
3870 let mut state = sync_state.write().await;
3871 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
3872 {
3873 BootstrapClaimObservation::WithinGrace { .. } => {
3874 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
3875 false
3876 }
3877 BootstrapClaimObservation::PastGrace { first_seen } => {
3878 warn!(
3879 "Audit: peer {peer} claiming bootstrap past grace period \
3880 ({:?} > {:?}), reporting abuse",
3881 now.duration_since(first_seen),
3882 config.bootstrap_claim_grace_period,
3883 );
3884 true
3885 }
3886 BootstrapClaimObservation::Repeated { first_seen } => {
3887 warn!(
3888 "Audit: peer {peer} repeated bootstrap claim after previously \
3889 stopping; first claim was {:?} ago, reporting abuse",
3890 now.duration_since(first_seen),
3891 );
3892 true
3893 }
3894 }
3895 };
3896 if should_report {
3897 p2p_node
3898 .report_trust_event(
3899 peer,
3900 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
3901 )
3902 .await;
3903 }
3904 }
3905 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
3906 }
3907}
3908
3909/// Whether a confirmed audit failure with this reason clears the peer's active
3910/// bootstrap claim. A `Timeout` does not (the peer may still be legitimately
3911/// bootstrapping); every confirmed storage-integrity reason does.
3912///
3913/// Responsible-chunk `AuditChallenge` failures use this directly: timeouts keep
3914/// the bootstrap claim but are still reported as audit failures, matching the
3915/// pre-ADR-0002 behaviour.
3916fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
3917 !matches!(reason, AuditFailureReason::Timeout)
3918}
3919
3920/// Handle the result of a responsible-chunk audit tick (audit #2): emit trust
3921/// events and manage bootstrap-claim state.
3922///
3923/// This is intentionally separate from the subtree audit result handler. A
3924/// responsible-chunk `AuditChallenge` `Failed` result reports
3925/// `ApplicationFailure` immediately for every reason, including `Timeout`,
3926/// restoring the pre-ADR-0002 behaviour.
3927async fn handle_audit_result(
3928 result: &AuditTickResult,
3929 p2p_node: &Arc<P2PNode>,
3930 sync_state: &Arc<RwLock<NeighborSyncState>>,
3931 config: &ReplicationConfig,
3932) {
3933 match result {
3934 AuditTickResult::Passed {
3935 challenged_peer,
3936 keys_checked,
3937 } => {
3938 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
3939 {
3940 let mut state = sync_state.write().await;
3941 state.clear_active_bootstrap_claim(challenged_peer);
3942 }
3943 p2p_node
3944 .report_trust_event(
3945 challenged_peer,
3946 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
3947 )
3948 .await;
3949 }
3950 AuditTickResult::Failed { evidence } => {
3951 if let FailureEvidence::AuditFailure {
3952 challenged_peer,
3953 confirmed_failed_keys,
3954 summary,
3955 reason,
3956 ..
3957 } = evidence
3958 {
3959 let first_failed_key = first_failed_key_label(confirmed_failed_keys);
3960 error!(
3961 "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
3962 confirmed_failed_keys.len(),
3963 summary.challenged_keys,
3964 summary.absent_keys,
3965 summary.digest_mismatch_keys,
3966 );
3967 if audit_failure_clears_bootstrap_claim(reason) {
3968 let mut state = sync_state.write().await;
3969 state.clear_active_bootstrap_claim(challenged_peer);
3970 } else {
3971 debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
3972 }
3973 p2p_node
3974 .report_trust_event(
3975 challenged_peer,
3976 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
3977 )
3978 .await;
3979 }
3980 }
3981 AuditTickResult::BootstrapClaim { peer } => {
3982 let should_report = {
3983 let now = Instant::now();
3984 let mut state = sync_state.write().await;
3985 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
3986 {
3987 BootstrapClaimObservation::WithinGrace { .. } => {
3988 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
3989 false
3990 }
3991 BootstrapClaimObservation::PastGrace { first_seen } => {
3992 warn!(
3993 "Audit: peer {peer} claiming bootstrap past grace period \
3994 ({:?} > {:?}), reporting abuse",
3995 now.duration_since(first_seen),
3996 config.bootstrap_claim_grace_period,
3997 );
3998 true
3999 }
4000 BootstrapClaimObservation::Repeated { first_seen } => {
4001 warn!(
4002 "Audit: peer {peer} repeated bootstrap claim after previously \
4003 stopping; first claim was {:?} ago, reporting abuse",
4004 now.duration_since(first_seen),
4005 );
4006 true
4007 }
4008 }
4009 };
4010 if should_report {
4011 p2p_node
4012 .report_trust_event(
4013 peer,
4014 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
4015 )
4016 .await;
4017 }
4018 }
4019 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
4020 }
4021}
4022
4023/// Whether a confirmed audit failure with this reason should revoke the
4024/// peer's `recent_provers` holder credit immediately (v12 §6).
4025///
4026/// `true` for any reason where the peer actually answered (or admitted
4027/// it cannot): `DigestMismatch`, `KeyAbsent`, `Rejected` ("missing
4028/// bytes for committed key"), `MalformedResponse` — these prove the
4029/// peer no longer holds what it committed to, so it must not keep
4030/// holder credit for the proof TTL. `false` for `Timeout`: a single
4031/// dropped packet must not strip an honest peer; the 40-min TTL is the
4032/// deliberate liveness cushion there.
4033fn audit_failure_revokes_holder_credit(reason: &AuditFailureReason) -> bool {
4034 !matches!(reason, AuditFailureReason::Timeout)
4035}
4036
4037/// Apply the holder-credit revocation decision for a confirmed audit
4038/// failure. Pure over `RecentProvers` so the handler wiring is unit-
4039/// testable without a live `P2PNode`: the production `Failed` arm of
4040/// `handle_subtree_audit_result` calls exactly this.
4041fn apply_audit_failure_credit_revocation(
4042 provers: &mut RecentProvers,
4043 challenged_peer: &PeerId,
4044 reason: &AuditFailureReason,
4045) {
4046 if audit_failure_revokes_holder_credit(reason) {
4047 provers.forget_peer(challenged_peer);
4048 }
4049}
4050
4051// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.
4052
4053// ---------------------------------------------------------------------------
4054// Storage-bound audit (ADR-0002) — gossip trigger + auditor-side ingestion
4055// ---------------------------------------------------------------------------
4056
4057/// State the gossip-audit trigger needs to spawn an audit. Bundled so the
4058/// message handler passes one value instead of a long argument list; all
4059/// fields are cheap `Arc` clones.
4060#[derive(Clone)]
4061struct GossipAuditTrigger {
4062 p2p_node: Arc<P2PNode>,
4063 config: Arc<ReplicationConfig>,
4064 recent_provers: Arc<RwLock<RecentProvers>>,
4065 sync_state: Arc<RwLock<NeighborSyncState>>,
4066 cooldown: Arc<RwLock<HashMap<PeerId, Instant>>>,
4067}
4068
4069/// What a gossip ingest yields for the audit trigger: the commitment hash to
4070/// pin and the `key_count` needed to size the response deadline from the actual
4071/// `ceil(sqrt(N))` subtree (ADR-0002). Returned on every VALID gossip (changed
4072/// or not) so a stable-keyset node stays auditable — not just on its first
4073/// commitment.
4074#[derive(Debug, Clone, Copy)]
4075struct AuditTarget {
4076 pin_hash: [u8; 32],
4077 key_count: u32,
4078}
4079
4080/// Per-peer audit cooldown check-and-stamp (ADR-0002 "occasional surprise
4081/// exams, keeps load low"). Returns `true` if `peer` may be audited now (and
4082/// stamps `now`), `false` if it was audited within
4083/// `AUDIT_ON_GOSSIP_COOLDOWN_SECS`. Bounds the map under a flood of distinct
4084/// peers. Pure over the passed map so the flood/cooldown behaviour is testable
4085/// without a live node: a burst of gossips from one peer yields at most one
4086/// `true` per cooldown window.
4087fn cooldown_allows_audit(map: &mut HashMap<PeerId, Instant>, peer: &PeerId, now: Instant) -> bool {
4088 let cooldown = Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS);
4089 let known = match map.get(peer) {
4090 Some(&last) => {
4091 if now.saturating_duration_since(last) < cooldown {
4092 return false;
4093 }
4094 true
4095 }
4096 None => false,
4097 };
4098 // Bound the map under churn like its siblings (drop the oldest stamp) before
4099 // admitting a brand-new peer.
4100 if !known && map.len() >= MAX_LAST_COMMITMENT_BY_PEER {
4101 if let Some(victim) = map.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
4102 map.remove(&victim);
4103 }
4104 }
4105 map.insert(*peer, now);
4106 true
4107}
4108
4109/// The gossip-audit launch decision in ONE place so the ordering is shared
4110/// between production and its test (ADR-0002 "occasional surprise exams").
4111///
4112/// Order matters and is the security-relevant property: the per-peer cooldown is
4113/// checked-and-stamped FIRST, THEN the probability lottery (`lottery_wins`) is
4114/// applied. If the lottery were sampled first, a gossip flood would re-roll it on
4115/// every message until one won, multiplying audits. Because the cooldown is
4116/// stamped before the lottery is consulted, a LOSING ticket still consumes the
4117/// window — so each peer gets at most one audit lottery per cooldown window
4118/// regardless of how often it gossips. Production calls this with
4119/// `lottery_wins = gen_bool(AUDIT_ON_GOSSIP_PROBABILITY)`; the test calls it with
4120/// a deterministic `lottery_wins`, so a reorder regression here fails the test.
4121fn audit_launch_decision(
4122 map: &mut HashMap<PeerId, Instant>,
4123 peer: &PeerId,
4124 now: Instant,
4125 lottery_wins: bool,
4126) -> bool {
4127 // Gate 1: cooldown check-and-stamp (consumes the window even on a loss).
4128 if !cooldown_allows_audit(map, peer, now) {
4129 return false;
4130 }
4131 // Gate 2: the probability lottery.
4132 lottery_wins
4133}
4134
4135/// On a peer's *changed* gossiped commitment, maybe launch a subtree audit
4136/// (ADR-0002): fire with probability `AUDIT_ON_GOSSIP_PROBABILITY`, subject to a
4137/// per-peer cooldown, pinned to the just-ingested root. Detached so gossip
4138/// handling is never blocked on a network round-trip.
4139async fn maybe_trigger_gossip_audit(
4140 trigger: &GossipAuditTrigger,
4141 peer: &PeerId,
4142 target: AuditTarget,
4143) {
4144 // The launch decision (cooldown-then-lottery ordering) lives in the pure
4145 // `audit_launch_decision` so the ordering is shared with its test. Sample
4146 // the lottery here, then let the helper apply it AFTER the cooldown stamp.
4147 let now = Instant::now();
4148 let lottery_wins = rand::thread_rng().gen_bool(config::AUDIT_ON_GOSSIP_PROBABILITY);
4149 {
4150 let mut map = trigger.cooldown.write().await;
4151 if !audit_launch_decision(&mut map, peer, now, lottery_wins) {
4152 return;
4153 }
4154 }
4155
4156 let trigger = trigger.clone();
4157 let peer = *peer;
4158 tokio::spawn(async move {
4159 let credit = storage_commitment_audit::AuditCredit {
4160 recent_provers: &trigger.recent_provers,
4161 };
4162 let result = storage_commitment_audit::run_subtree_audit(
4163 &trigger.p2p_node,
4164 &trigger.config,
4165 &peer,
4166 target.pin_hash,
4167 target.key_count,
4168 Some(&credit),
4169 )
4170 .await;
4171 handle_subtree_audit_result(
4172 &result,
4173 &trigger.p2p_node,
4174 &trigger.sync_state,
4175 &trigger.recent_provers,
4176 &trigger.config,
4177 )
4178 .await;
4179 });
4180}
4181
4182/// Atomic check-and-stamp of the per-peer commitment sig-verify rate limit.
4183///
4184/// Returns `true` if a signature verify is allowed now (and stamps the attempt
4185/// time), `false` if the peer is within [`COMMITMENT_SIG_VERIFY_MIN_INTERVAL`]
4186/// of its last attempt. Holds one write lock across the decision so two
4187/// concurrent ingests from the same peer cannot both pass. Stamps BEFORE the
4188/// caller's expensive verify so a slow/failed verify still rate-limits the next
4189/// message. Bounds the map under a flood of distinct peer ids.
4190async fn sig_verify_rate_limit_ok(
4191 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
4192 source: &PeerId,
4193 now: Instant,
4194) -> bool {
4195 let mut attempts = sig_verify_attempts.write().await;
4196 if let Some(&last) = attempts.get(source) {
4197 if now.saturating_duration_since(last) < COMMITMENT_SIG_VERIFY_MIN_INTERVAL {
4198 return false;
4199 }
4200 }
4201 if attempts.len() >= MAX_LAST_COMMITMENT_BY_PEER && !attempts.contains_key(source) {
4202 if let Some(victim) = attempts.iter().min_by_key(|(_, &ts)| ts).map(|(p, _)| *p) {
4203 attempts.remove(&victim);
4204 }
4205 }
4206 attempts.insert(*source, now);
4207 true
4208}
4209
4210/// Verify + store an inbound commitment from a gossip peer.
4211///
4212/// Called from the inbound `NeighborSyncRequest`/`Response` handlers and
4213/// the bootstrap-sync loop. Drops the commitment unless all five gates
4214/// pass:
4215/// 1. `source` is in our DHT routing table (sybil/churn cap).
4216/// 2. `commitment.sender_peer_id == source.as_bytes()` (peer-id
4217/// binding to the authenticated transport peer).
4218/// 3. `BLAKE3(commitment.sender_public_key) == commitment.sender_peer_id`
4219/// (the embedded pubkey actually belongs to the claimed identity —
4220/// saorsa-core derives `PeerId = BLAKE3(pubkey)`).
4221/// 4. `verify_commitment_signature(commitment)` succeeds against the
4222/// embedded public key. The signed payload binds the pubkey, so an
4223/// adversary cannot swap the key while keeping the body.
4224/// 5. The cache has room or this is an update for an existing entry
4225/// (sybil cap, `MAX_LAST_COMMITMENT_BY_PEER`).
4226///
4227/// On all-pass, the commitment is stored as the auditor's per-peer
4228/// "last known commitment" for use as `expected_commitment_hash` in
4229/// future audits.
4230///
4231/// Failures (no commitment / mismatched peer id / bad signature) are
4232/// silent drops — gossip is best-effort and a malformed commitment from
4233/// one peer should not affect anything else.
4234///
4235/// Returns `Some(AuditTarget)` whenever a VALID commitment was stored (whether
4236/// or not its root changed), so the caller can run a probabilistic,
4237/// cooldown-gated subtree audit. Returning on *every* valid gossip — not only
4238/// changed ones — is deliberate (ADR-0002): a node with a stable key set keeps
4239/// being auditable, so it cannot pass one audit and then delete data while
4240/// re-gossiping the same root forever. The cooldown + probability bound the
4241/// audit frequency. Returns `None` only if the commitment was dropped (failed a
4242/// gate) or there is nothing to pin.
4243///
4244/// Handle a capable peer gossiping `None` (a commitment downgrade).
4245///
4246/// A capable peer that previously gossiped a commitment but now gossips `None`
4247/// is trying to drop off the audit path. Within the answerability window we keep
4248/// the cached commitment pinned AND return it as an audit target so this gossip
4249/// still schedules a subtree audit against the peer's last known commitment — if
4250/// it genuinely dropped the data, the audit fails (there is no periodic tick, so
4251/// the trigger MUST fire here or the downgrade is never re-challenged).
4252///
4253/// But this only holds within the SAME `GOSSIP_ANSWERABILITY_TTL` the responder
4254/// honours for its own retired commitment: once that elapses since we last
4255/// received the peer's commitment, an honest peer has legitimately retired that
4256/// root (its responder side `retire_current`s and lets it age out) and can no
4257/// longer answer a pin on it. Auditing it past the TTL would manufacture a false
4258/// failure, so we then forget the cached commitment (keeping the sticky
4259/// `commitment_capable` bit) and stop pinning it.
4260async fn handle_commitment_downgrade(
4261 source: &PeerId,
4262 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
4263) -> Option<AuditTarget> {
4264 let now = Instant::now();
4265 let cached = {
4266 let map = last_commitment_by_peer.read().await;
4267 map.get(source).and_then(|rec| {
4268 if !rec.commitment_capable {
4269 return None;
4270 }
4271 let last = rec.last_commitment()?;
4272 let pin = rec.commitment_hash()?;
4273 let fresh = now.saturating_duration_since(rec.received_at)
4274 < crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
4275 Some((pin, last.key_count, fresh))
4276 })
4277 };
4278 match cached {
4279 Some((pin, key_count, true)) => {
4280 warn!(
4281 "ingest_peer_commitment: commitment-capable peer {source} sent None \
4282 (downgrade attempt); auditing against its last cached commitment"
4283 );
4284 Some(AuditTarget {
4285 pin_hash: pin,
4286 key_count,
4287 })
4288 }
4289 Some((_, _, false)) => {
4290 // Cached commitment has aged past the answerability window — forget
4291 // it so we stop pinning a root the peer is no longer obliged to
4292 // answer. Keep `commitment_capable` (sticky). Re-check freshness
4293 // UNDER the write lock (compare-and-clear): a concurrent valid gossip
4294 // from this peer may have refreshed `received_at` in the gap between
4295 // our read and write locks; if so, leave its fresh commitment intact.
4296 if let Some(rec) = last_commitment_by_peer.write().await.get_mut(source) {
4297 let still_stale = now.saturating_duration_since(rec.received_at)
4298 >= crate::replication::commitment_state::GOSSIP_ANSWERABILITY_TTL;
4299 if still_stale {
4300 rec.clear_commitment();
4301 debug!(
4302 "ingest_peer_commitment: capable peer {source} sent None and its cached \
4303 commitment aged past the answerability TTL; forgetting it"
4304 );
4305 }
4306 }
4307 None
4308 }
4309 None => None,
4310 }
4311}
4312
4313async fn ingest_peer_commitment(
4314 source: &PeerId,
4315 commitment: Option<&StorageCommitment>,
4316 p2p_node: &Arc<P2PNode>,
4317 last_commitment_by_peer: &Arc<RwLock<HashMap<PeerId, PeerCommitmentRecord>>>,
4318 ever_capable_peers: &Arc<RwLock<HashSet<PeerId>>>,
4319 sig_verify_attempts: &Arc<RwLock<HashMap<PeerId, Instant>>>,
4320) -> Option<AuditTarget> {
4321 let Some(c) = commitment else {
4322 return handle_commitment_downgrade(source, last_commitment_by_peer).await;
4323 };
4324 // RT-membership gate: only accept commitments from peers in our
4325 // routing table. Off-RT senders (sybils, drive-by relays) cannot
4326 // populate the cache, which closes the hole where a flood of
4327 // off-RT identities could fill the cap and evict honest
4328 // peers. The neighbor-sync request handler applies the same gate
4329 // before admitting inbound replication hints (see neighbor_sync.rs
4330 // `sender_in_rt`); we mirror that policy here for the commitment
4331 // piggyback.
4332 if !p2p_node.dht_manager().is_in_routing_table(source).await {
4333 debug!("ingest_peer_commitment: source {source} not in routing table (dropped)");
4334 return None;
4335 }
4336 // Peer-id binding: the commitment's claimed sender must match the
4337 // authenticated transport peer (`source`). Defeats relay/replay
4338 // and also pins which embedded public key we are about to verify
4339 // against — the verify itself trusts the embedded key, so the
4340 // peer-id binding is the link to a real identity.
4341 if &c.sender_peer_id != source.as_bytes() {
4342 warn!(
4343 "ingest_peer_commitment: sender_peer_id mismatch from {source} \
4344 (dropped, possible relay attempt)"
4345 );
4346 return None;
4347 }
4348 // Peer-id to embedded-pubkey binding: saorsa-core derives PeerId as
4349 // BLAKE3(pubkey_bytes). Without this check, a responder could sign
4350 // with a throwaway key they own and lie about which identity it
4351 // belongs to (the embedded-key signature would verify trivially).
4352 let derived_peer_id = *blake3::hash(&c.sender_public_key).as_bytes();
4353 if derived_peer_id != c.sender_peer_id {
4354 warn!(
4355 "ingest_peer_commitment: embedded pubkey does not hash to claimed peer_id for \
4356 {source} (dropped, throwaway-key attack)"
4357 );
4358 return None;
4359 }
4360 // §2 step 3 + §11 DoS: rate-limit per-peer to at most one ML-DSA
4361 // signature verify per `COMMITMENT_SIG_VERIFY_MIN_INTERVAL`. A
4362 // sybil/RT-membership-bypassing peer that flooded valid-looking
4363 // gossip would otherwise burn CPU on every message. The rate
4364 // limit is checked AFTER cheap structural gates (RT, peer-id
4365 // binding, pubkey-binding) and BEFORE the expensive sig verify.
4366 //
4367 // Tracked in `sig_verify_attempts` (separate from
4368 // last_commitment_by_peer) so EVERY attempt — successful or not —
4369 // bumps the rate-limit clock. Reading only from PeerCommitmentRecord
4370 // would skip the cap for peers we've never successfully verified,
4371 // letting a flood of invalid-but-structurally-plausible gossips
4372 // burn CPU.
4373 let now = Instant::now();
4374 if !sig_verify_rate_limit_ok(sig_verify_attempts, source, now).await {
4375 debug!(
4376 "ingest_peer_commitment: rate-limited sig verify from {source} \
4377 (< {COMMITMENT_SIG_VERIFY_MIN_INTERVAL:?} since last attempt); dropped"
4378 );
4379 return None;
4380 }
4381 // Signature verify, using the public key embedded in the commitment
4382 // itself. The pubkey is bound by the signature payload (see
4383 // commitment_signed_payload) so an adversary cannot keep the body
4384 // and swap the key to one they hold the secret for.
4385 if !crate::replication::commitment::verify_commitment_signature(c) {
4386 warn!(
4387 "ingest_peer_commitment: signature did not verify under embedded key for {source} \
4388 (dropped, forged commitment)"
4389 );
4390 return None;
4391 }
4392 // The new commitment's hash, used to store and to pin for the audit target.
4393 let new_hash = commitment_hash(c);
4394 let mut map = last_commitment_by_peer.write().await;
4395 // Sybil/churn cap: if we're at the hard cap AND this is a new peer,
4396 // evict an arbitrary existing entry to make room. Updates for peers
4397 // already in the map are always accepted (they replace, not grow).
4398 if map.len() >= MAX_LAST_COMMITMENT_BY_PEER && !map.contains_key(source) {
4399 // Drop one arbitrary entry. HashMap iter order is random which
4400 // is fine — over time PeerRemoved cleanup keeps the working set
4401 // anchored on the real RT membership; this cap only fires under
4402 // active flooding attempts.
4403 if let Some(victim) = map.keys().next().copied() {
4404 map.remove(&victim);
4405 warn!(
4406 "ingest_peer_commitment: cache full ({MAX_LAST_COMMITMENT_BY_PEER}); \
4407 evicted {victim} to admit {source}"
4408 );
4409 }
4410 }
4411 // Preserve sticky commitment_capable across updates — once true,
4412 // always true. New entries start with capable = true (we just
4413 // verified a valid commitment from this peer).
4414 map.entry(*source)
4415 .and_modify(|r| {
4416 // set_commitment refreshes the cached hash (§13) alongside the
4417 // commitment + received_at so they never drift.
4418 r.set_commitment(c.clone(), now);
4419 r.last_sig_verify_at = now;
4420 r.commitment_capable = true; // sticky-redundant but explicit
4421 })
4422 .or_insert_with(|| PeerCommitmentRecord::from_verified(c.clone(), now));
4423 drop(map);
4424 // Record the sticky "ever v12-capable" bit in a set independent of
4425 // `last_commitment_by_peer` (whose entries can be evicted by
4426 // `PeerRemoved` and the sybil cap). This is what the §3 audit
4427 // shield and the §6 holder-eligibility closure consult to decide
4428 // whether the peer is expected to speak v12.
4429 //
4430 // Capped at `MAX_EVER_CAPABLE_PEERS` to bound memory under
4431 // identity-rotation attacks: once full, new entries are refused.
4432 // Refusal degrades over-cap peers to the behaviour before this set
4433 // existed (treated as legacy on rejoin), which is not a security
4434 // regression and preserves the historic set stable.
4435 {
4436 let mut set = ever_capable_peers.write().await;
4437 if set.contains(source) || set.len() < MAX_EVER_CAPABLE_PEERS {
4438 set.insert(*source);
4439 } else {
4440 warn!(
4441 "ingest_peer_commitment: ever_capable_peers at cap \
4442 ({MAX_EVER_CAPABLE_PEERS}); refusing to record {source} as sticky-capable"
4443 );
4444 }
4445 }
4446 // Return an audit target for EVERY valid stored commitment (changed or
4447 // not), so the caller's cooldown+probability-gated trigger keeps a
4448 // stable-keyset peer auditable over time (ADR-0002). Only a serialization
4449 // failure (new_hash == None, unreachable for a real commitment) yields None.
4450 new_hash.map(|pin_hash| AuditTarget {
4451 pin_hash,
4452 key_count: c.key_count,
4453 })
4454}
4455
4456// ---------------------------------------------------------------------------
4457// Storage-bound audit (v12) — responder commitment rotation
4458// ---------------------------------------------------------------------------
4459
4460/// Read the current LMDB key set, build + sign a fresh
4461/// `StorageCommitment`, and rotate it into `state` as the new `current`.
4462/// The prior `current` is demoted to `previous`; the prior `previous` is
4463/// dropped (per `ResponderCommitmentState::rotate`).
4464///
4465/// For content-addressed chunks (Autonomi's chunk store), `address ==
4466/// BLAKE3(content)`, so `bytes_hash := key` and we don't have to
4467/// re-read each chunk's bytes to compute the leaf hash.
4468///
4469/// Skips (returns `Ok(())`) if the key set is empty — no commitment to
4470/// rotate. The auditor side handles "no commitment for this peer" by
4471/// falling back to the legacy plain-digest audit path.
4472async fn rebuild_and_rotate_commitment(
4473 storage: &Arc<LmdbStorage>,
4474 identity: &Arc<NodeIdentity>,
4475 state: &Arc<ResponderCommitmentState>,
4476 p2p: &Arc<P2PNode>,
4477 config: &Arc<ReplicationConfig>,
4478) -> Result<()> {
4479 use saorsa_pqc::api::sig::{MlDsaSecretKey, MlDsaVariant};
4480
4481 let stored_keys = storage
4482 .all_keys()
4483 .await
4484 .map_err(|e| Error::Storage(format!("commitment build: read keys: {e}")))?;
4485
4486 // Commit only to keys we are still RESPONSIBLE for ("want-to-hold"), not
4487 // everything currently on disk ("hold"). This is the half of the retention
4488 // contract that lets out-of-range chunks age out: a key that has left our
4489 // close group is excluded from the NEXT commitment, so within at most
4490 // RETAINED_GOSSIPED_COMMITMENTS gossip rotations it falls out of the
4491 // last-2-gossiped window, `ResponderCommitmentState::is_held` goes false,
4492 // and the pruner (which until then vetoes its deletion) reclaims it. Without
4493 // this filter the pruner's reprieve would keep re-committing stale keys
4494 // forever (the rebuild reads all_keys, so a retained-on-disk key would be
4495 // re-committed and re-gossiped every rotation — a permanent pin).
4496 let storage_empty = stored_keys.is_empty();
4497 let self_id = *p2p.peer_id();
4498 let mut keys = Vec::with_capacity(stored_keys.len());
4499 for k in stored_keys {
4500 if admission::is_responsible(&self_id, &k, p2p, config.close_group_size).await {
4501 keys.push(k);
4502 }
4503 }
4504
4505 if keys.is_empty() {
4506 if storage_empty {
4507 // Storage is genuinely empty — there is nothing to answer for, so
4508 // drop the previously advertised commitment immediately. Keeping it
4509 // would leave remote auditors pinning a hash we can never satisfy
4510 // again (the bytes are gone).
4511 if state.retained_slot_count() > 0 {
4512 debug!("Commitment rotation: storage empty, clearing retained slots");
4513 state.clear_all();
4514 }
4515 return Ok(());
4516 }
4517 // Bytes are still on disk but no key is currently in range. We must NOT
4518 // clear retention here: a peer may still be pinning a root we gossiped
4519 // moments ago and could demand its bytes in a round-2 challenge, which
4520 // we can still answer (the bytes are present). But we must STOP
4521 // advertising the stale commitment: retire it so `current()` returns
4522 // `None` and the gossip-emit sites stop re-emitting and re-stamping it.
4523 // The retired slot then ages out by its gossip-answerability TTL while
4524 // remaining answerable for in-flight pins until then. Once it ages out,
4525 // `is_held` flips false and the pruner reclaims the now-uncommitted,
4526 // out-of-range chunks. (Calling `age_out` alone would leave `current()`
4527 // pointing at the stale root, which the gossip loop would keep
4528 // re-stamping — pinning its keys forever.)
4529 debug!(
4530 "Commitment rotation: no responsible keys to commit to; retiring current commitment \
4531 (stays answerable until its gossip TTL lapses, bytes still on disk)"
4532 );
4533 state.retire_current();
4534 return Ok(());
4535 }
4536
4537 // Cap to MAX_COMMITMENT_KEY_COUNT for v12 (responder must not commit
4538 // to more than the protocol limit; auditor would reject the
4539 // commitment otherwise).
4540 let cap = commitment::MAX_COMMITMENT_KEY_COUNT as usize;
4541 if keys.len() > cap {
4542 warn!(
4543 "Commitment rotation: key set ({}) exceeds MAX_COMMITMENT_KEY_COUNT ({}); \
4544 truncating — investigate as this likely means a misconfiguration",
4545 keys.len(),
4546 cap
4547 );
4548 }
4549
4550 // INVARIANT: this module is only used with CONTENT-ADDRESSED chunks,
4551 // where `key == BLAKE3(content)`, so `bytes_hash := key` and we skip a
4552 // full chunk re-read per rotation.
4553 //
4554 // Consequence to be precise about: because the leaf is `(key, key)`,
4555 // the Merkle root commits to the SET OF KEYS, not to the bytes. The
4556 // commitment therefore binds "which keys I claim to hold"; it does NOT
4557 // by itself prove byte possession. Byte possession is enforced by the
4558 // audit-verify path, which recomputes `bytes_hash == BLAKE3(local_bytes)`
4559 // and the per-key digest against the AUDITOR'S OWN local copy of the
4560 // bytes — so a responder that holds the key list but dropped the bytes
4561 // still fails (`missing bytes for committed key` / digest mismatch).
4562 // This is sound ONLY while keys are content addresses. If this module
4563 // is ever reused for non-content-addressed records (`bytes_hash != key`),
4564 // the `(k, k)` shortcut would let a byte-less node forge a valid root and
4565 // MUST be replaced with `(key, BLAKE3(bytes))` computed from real bytes.
4566 let entries: Vec<_> = keys.into_iter().take(cap).map(|k| (k, k)).collect();
4567
4568 // No-op-rotation guard: compute just the Merkle root from `entries`
4569 // and compare against the currently-advertised commitment's root.
4570 // If they match, the key set is unchanged and a new rotation would
4571 // only swap a randomized ML-DSA signature for a fresh one — same
4572 // content, different commitment_hash. That invalidates every
4573 // outstanding `recent_provers` credit on this node across the
4574 // close group with no security benefit, breaking steady-state
4575 // quorum liveness on large nodes that can't re-audit every key
4576 // every rotation interval. Skip the rotation entirely when the
4577 // tree is unchanged.
4578 // Build the tree ONCE here (moving `entries`): it serves both the no-op
4579 // root check below and, if we proceed, the signed commitment via
4580 // `build_from_tree` (§11 — previously the tree was built here and AGAIN
4581 // inside `BuiltCommitment::build`).
4582 let candidate_tree = commitment::MerkleTree::build(entries)
4583 .map_err(|e| Error::Crypto(format!("commitment tree build: {e}")))?;
4584 let candidate_root = candidate_tree.root();
4585 if let Some(current) = state.current() {
4586 if current.commitment().root == candidate_root {
4587 debug!(
4588 "Commitment rotation: key set unchanged (root={}); skipping no-op re-sign",
4589 hex::encode(candidate_root)
4590 );
4591 // Even though we skip re-signing (to avoid invalidating holder
4592 // credit), retention must still advance on the wall clock: a
4593 // previously-gossiped commitment that holds a now-out-of-range key
4594 // must be able to age out of the answerability window even when the
4595 // committed key set is frozen here for many rotations. Without this,
4596 // the no-op guard would pin a stale slot — and its key — forever.
4597 state.age_out();
4598 return Ok(());
4599 }
4600 }
4601
4602 let sk_bytes = identity.secret_key_bytes().to_vec();
4603 let sk = MlDsaSecretKey::from_bytes(MlDsaVariant::MlDsa65, &sk_bytes)
4604 .map_err(|e| Error::Crypto(format!("commitment build: load sk: {e}")))?;
4605 let pk_bytes = identity.public_key().as_bytes().to_vec();
4606 let peer_id_bytes = *p2p.peer_id().as_bytes();
4607
4608 let built = commitment_state::BuiltCommitment::build_from_tree(
4609 candidate_tree,
4610 &peer_id_bytes,
4611 &sk,
4612 &pk_bytes,
4613 )
4614 .map_err(|e| Error::Crypto(format!("commitment build: {e}")))?;
4615
4616 let hash = hex::encode(built.hash());
4617 let key_count = built.commitment().key_count;
4618 state.rotate(built);
4619 info!("Storage commitment rotated: hash={hash} key_count={key_count}");
4620 Ok(())
4621}
4622
4623#[cfg(test)]
4624#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
4625mod tests {
4626 use super::{
4627 apply_audit_failure_credit_revocation, audit_failure_clears_bootstrap_claim,
4628 audit_failure_revokes_holder_credit, audit_launch_decision, config, cooldown_allows_audit,
4629 first_failed_key_label, fresh_offer_payment_context, paid_notify_payment_context,
4630 };
4631 use crate::payment::VerificationContext;
4632 use crate::replication::recent_provers::RecentProvers;
4633 use crate::replication::types::AuditFailureReason;
4634 use saorsa_core::identity::PeerId;
4635 use std::collections::HashMap;
4636 use std::time::Duration;
4637 use std::time::Instant;
4638
4639 fn test_peer(b: u8) -> PeerId {
4640 let mut bytes = [0u8; 32];
4641 bytes[0] = b;
4642 PeerId::from_bytes(bytes)
4643 }
4644
4645 fn test_key(b: u8) -> crate::ant_protocol::XorName {
4646 let mut k = [0u8; 32];
4647 k[0] = b;
4648 k
4649 }
4650
4651 #[test]
4652 fn fresh_offer_runs_client_put_payment_checks() {
4653 assert_eq!(
4654 fresh_offer_payment_context(),
4655 VerificationContext::ClientPut
4656 );
4657 }
4658
4659 #[test]
4660 fn paid_notify_uses_paid_list_admission_payment_checks() {
4661 assert_eq!(
4662 paid_notify_payment_context(),
4663 VerificationContext::PaidListAdmission
4664 );
4665 }
4666
4667 #[test]
4668 fn audit_timeout_preserves_active_bootstrap_claim() {
4669 assert!(!audit_failure_clears_bootstrap_claim(
4670 &AuditFailureReason::Timeout
4671 ));
4672 }
4673
4674 fn strike_peer(b: u8) -> PeerId {
4675 let mut bytes = [0u8; 32];
4676 bytes[0] = b;
4677 PeerId::from_bytes(bytes)
4678 }
4679
4680 // ADR-0002: "occasional surprise exams, keeps load low" — the per-peer
4681 // cooldown must collapse a gossip flood into at most one audit per window.
4682
4683 #[test]
4684 fn gossip_flood_yields_at_most_one_audit_per_cooldown_window() {
4685 let peer = strike_peer(1);
4686 let mut map: HashMap<PeerId, Instant> = HashMap::new();
4687 let t0 = Instant::now();
4688 // First gossip in the window passes; a burst of further gossips at the
4689 // same instant are all suppressed.
4690 assert!(cooldown_allows_audit(&mut map, &peer, t0));
4691 let mut passed = 1;
4692 for _ in 0..100 {
4693 if cooldown_allows_audit(&mut map, &peer, t0) {
4694 passed += 1;
4695 }
4696 }
4697 assert_eq!(
4698 passed, 1,
4699 "a flood at one instant must trigger exactly one audit"
4700 );
4701 }
4702
4703 // ADR-0002 ordering invariant: `maybe_trigger_gossip_audit` stamps the
4704 // per-peer cooldown BEFORE the probability lottery, so a LOSING ticket still
4705 // consumes the window. This is the property the isolated cooldown tests above
4706 // cannot see: they never sample the lottery, so a regression that reordered
4707 // the gates (sample probability first, only stamp the cooldown on a win)
4708 // would still pass them while breaking flood-resistance: a flood would then
4709 // re-roll the lottery on EVERY message until one won, multiplying audits.
4710 //
4711 // We model the exact production gate order (cooldown-then-lottery) with a
4712 // lottery driven by a fixed outcome instead of `gen_bool(..)`. The first
4713 // message LOSES the lottery; the remaining flood messages all WIN. With the
4714 // production order, the losing first ticket burns the window and every later
4715 // winner in the same window is blocked, so there are 0 audits this window. If
4716 // the gates were flipped, the second message's winning ticket would slip
4717 // through. The window only reopens after the cooldown elapses.
4718 //
4719 // FLIPS IF: the lottery is sampled before `cooldown_allows_audit` (a losing
4720 // ticket no longer consumes the window), re-enabling a flood-amplified audit
4721 // storm.
4722 #[test]
4723 fn losing_lottery_still_consumes_cooldown_window() {
4724 // Faithful re-implementation of the two gates in
4725 // `maybe_trigger_gossip_audit`, with the lottery outcome made
4726 // deterministic instead of `rand::thread_rng().gen_bool(..)`.
4727 // Calls the SHIPPED `audit_launch_decision` (the same function
4728 // `maybe_trigger_gossip_audit` uses), so a reorder of the two gates in
4729 // production fails this test — not a local reimplementation.
4730 let peer = strike_peer(3);
4731 let mut map: HashMap<PeerId, Instant> = HashMap::new();
4732 let t0 = Instant::now();
4733
4734 // First flooded message at t0 LOSES the lottery, but the cooldown is
4735 // stamped BEFORE the lottery is consulted, so the window is now consumed.
4736 assert!(
4737 !audit_launch_decision(&mut map, &peer, t0, false),
4738 "a losing ticket launches no audit"
4739 );
4740
4741 // 99 more flooded messages at the same instant would all WIN the lottery,
4742 // yet every one must be blocked by the cooldown the loser already stamped.
4743 // (If production sampled the lottery FIRST, these would each get a fresh
4744 // roll and audits would multiply — this assertion catches that reorder.)
4745 let mut audits = 0;
4746 for _ in 0..99 {
4747 if audit_launch_decision(&mut map, &peer, t0, true) {
4748 audits += 1;
4749 }
4750 }
4751 assert_eq!(
4752 audits, 0,
4753 "a losing first ticket must consume the window so no later flooded \
4754 message in the same window can audit"
4755 );
4756
4757 // The window only reopens after the cooldown elapses; the next winning
4758 // ticket then launches exactly one audit.
4759 let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
4760 assert!(
4761 audit_launch_decision(&mut map, &peer, after, true),
4762 "after the cooldown a winning ticket audits again"
4763 );
4764 }
4765
4766 #[test]
4767 fn cooldown_lets_audit_through_after_the_window() {
4768 let peer = strike_peer(2);
4769 let mut map: HashMap<PeerId, Instant> = HashMap::new();
4770 let t0 = Instant::now();
4771 assert!(cooldown_allows_audit(&mut map, &peer, t0));
4772 // Within the window: suppressed.
4773 let within = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS - 1);
4774 assert!(!cooldown_allows_audit(&mut map, &peer, within));
4775 // Past the window: allowed again.
4776 let after = t0 + Duration::from_secs(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS + 1);
4777 assert!(cooldown_allows_audit(&mut map, &peer, after));
4778 }
4779
4780 #[test]
4781 fn cooldown_is_per_peer_independent() {
4782 let mut map: HashMap<PeerId, Instant> = HashMap::new();
4783 let t0 = Instant::now();
4784 // Different peers each get their own first-audit pass at the same instant.
4785 for i in 0..20u8 {
4786 assert!(
4787 cooldown_allows_audit(&mut map, &strike_peer(i), t0),
4788 "peer {i} should be auditable independently"
4789 );
4790 }
4791 }
4792
4793 #[test]
4794 fn audit_on_gossip_constants_match_adr() {
4795 // Tripwire on the ADR-locked tunables. The spot-check count sits at the
4796 // top of the auditor's 3..=5 band (the auditor clamps to that band, so
4797 // values above 5 would silently never be requested).
4798 assert_eq!(config::AUDIT_SPOTCHECK_COUNT, 5);
4799 assert!((config::AUDIT_ON_GOSSIP_PROBABILITY - 0.2).abs() < f64::EPSILON);
4800 assert_eq!(config::AUDIT_ON_GOSSIP_COOLDOWN_SECS, 30 * 60);
4801 }
4802
4803 // (d) A confirmed storage-integrity failure penalizes immediately and
4804 // revokes credit; it is not a timeout.
4805 #[test]
4806 fn digest_mismatch_is_not_a_timeout_and_penalizes_immediately() {
4807 assert!(audit_failure_clears_bootstrap_claim(
4808 &AuditFailureReason::DigestMismatch
4809 ));
4810 assert!(audit_failure_revokes_holder_credit(
4811 &AuditFailureReason::DigestMismatch
4812 ));
4813 }
4814
4815 /// The exact decision the `Failed` arm of `handle_subtree_audit_result`
4816 /// uses: confirmed failures revoke credit, `Timeout` does not.
4817 #[test]
4818 fn confirmed_failures_revoke_credit_timeout_does_not() {
4819 for reason in [
4820 AuditFailureReason::MalformedResponse,
4821 AuditFailureReason::DigestMismatch,
4822 AuditFailureReason::KeyAbsent,
4823 AuditFailureReason::Rejected,
4824 ] {
4825 assert!(
4826 audit_failure_revokes_holder_credit(&reason),
4827 "confirmed failure {reason:?} must revoke holder credit"
4828 );
4829 }
4830 assert!(
4831 !audit_failure_revokes_holder_credit(&AuditFailureReason::Timeout),
4832 "Timeout must NOT revoke credit (single dropped packet != storage loss)"
4833 );
4834 }
4835
4836 /// Wiring test for the security fix: the helper the handler calls
4837 /// actually strips a credited peer on a confirmed failure
4838 /// (`DigestMismatch`), and actually RETAINS credit on `Timeout`.
4839 /// Records genuine credit first so neither assertion is vacuous;
4840 /// this fails if `forget_peer` stops being called, or if the
4841 /// `Timeout` exclusion is dropped (both verified by mutation).
4842 #[test]
4843 fn apply_revocation_strips_on_digest_mismatch_retains_on_timeout() {
4844 let peer = test_peer(0xAB);
4845 let key = test_key(1);
4846 let hash = [0xCD; 32];
4847
4848 // Confirmed failure -> credit revoked.
4849 let mut provers = RecentProvers::new();
4850 provers.record_proof(key, peer, hash, Instant::now());
4851 assert!(
4852 provers.is_credited_holder(&key, &peer, &hash),
4853 "precondition: peer credited before failure"
4854 );
4855 apply_audit_failure_credit_revocation(
4856 &mut provers,
4857 &peer,
4858 &AuditFailureReason::DigestMismatch,
4859 );
4860 assert!(
4861 !provers.is_credited_holder(&key, &peer, &hash),
4862 "DigestMismatch must strip the peer's holder credit"
4863 );
4864
4865 // Timeout -> credit retained.
4866 let mut provers_timeout = RecentProvers::new();
4867 provers_timeout.record_proof(key, peer, hash, Instant::now());
4868 apply_audit_failure_credit_revocation(
4869 &mut provers_timeout,
4870 &peer,
4871 &AuditFailureReason::Timeout,
4872 );
4873 assert!(
4874 provers_timeout.is_credited_holder(&key, &peer, &hash),
4875 "Timeout must retain holder credit (deliberate liveness cushion)"
4876 );
4877 }
4878
4879 #[test]
4880 fn decoded_audit_failures_clear_active_bootstrap_claim() {
4881 for reason in [
4882 AuditFailureReason::MalformedResponse,
4883 AuditFailureReason::DigestMismatch,
4884 AuditFailureReason::KeyAbsent,
4885 AuditFailureReason::Rejected,
4886 ] {
4887 assert!(
4888 audit_failure_clears_bootstrap_claim(&reason),
4889 "decoded non-bootstrap failure {reason:?} should clear active claim"
4890 );
4891 }
4892 }
4893
4894 #[test]
4895 fn first_failed_key_label_truncates_to_16_hex_chars() {
4896 // The high-order 8 bytes of the XorName determine the label so an
4897 // operator can group audit-failures on the same chunk prefix.
4898 let mut key = [0u8; 32];
4899 key[0] = 0x18;
4900 key[7] = 0xff;
4901 // Low-order bytes (positions 8..32) are deliberately set to 0xAA
4902 // to verify they are NOT included in the label.
4903 for byte in &mut key[8..] {
4904 *byte = 0xAA;
4905 }
4906 let label = first_failed_key_label(&[key]);
4907 // Only the first 8 bytes are encoded, low-order bytes are dropped.
4908 assert_eq!(label, "0x18000000000000ff");
4909 assert_eq!(label.len(), "0x".len() + 16);
4910 }
4911
4912 #[test]
4913 fn first_failed_key_label_falls_back_when_empty() {
4914 // Should never happen in production (audit failure handling rejects
4915 // empty sets), but the formatter must still produce a valid label
4916 // so the log line doesn't contain a misleading default.
4917 assert_eq!(first_failed_key_label(&[]), "0x");
4918 }
4919
4920 #[test]
4921 fn first_failed_key_label_uses_first_key_only() {
4922 let first = [0x11u8; 32];
4923 let second = [0x22u8; 32];
4924 assert_eq!(
4925 first_failed_key_label(&[first, second]),
4926 format!("0x{}", hex::encode(&first[..8]))
4927 );
4928 }
4929}