Skip to main content

ant_node/replication/
mod.rs

1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::{PaymentVerifier, VerificationContext};
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50    max_parallel_fetch, storage_admission_width, ReplicationConfig,
51    MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55    FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56    VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61    AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
62    NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68// ---------------------------------------------------------------------------
69// Constants
70// ---------------------------------------------------------------------------
71
72/// Prefix used by saorsa-core's request-response mechanism.
73const RR_PREFIX: &str = "/rr/";
74
75fn fresh_offer_payment_context() -> VerificationContext {
76    VerificationContext::ClientPut
77}
78
79fn paid_notify_payment_context() -> VerificationContext {
80    VerificationContext::PaidListAdmission
81}
82
83/// Boxed future type for in-flight fetch tasks.
84type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
85
86/// Shared dependencies for one verification worker cycle.
87struct VerificationCycleContext<'a> {
88    p2p_node: &'a Arc<P2PNode>,
89    paid_list: &'a Arc<PaidList>,
90    storage: &'a Arc<LmdbStorage>,
91    queues: &'a Arc<RwLock<ReplicationQueues>>,
92    config: &'a ReplicationConfig,
93    bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
94    is_bootstrapping: &'a Arc<RwLock<bool>>,
95    bootstrap_complete_notify: &'a Arc<Notify>,
96}
97
98/// Fetch worker polling interval in milliseconds.
99const FETCH_WORKER_POLL_MS: u64 = 100;
100
101/// Verification worker polling interval in milliseconds.
102const VERIFICATION_WORKER_POLL_MS: u64 = 250;
103
104/// Verification cycle duration that is worth surfacing at info level.
105const VERIFICATION_CYCLE_SLOW_LOG_MS: u128 = 500;
106
107/// Bootstrap drain check interval in seconds.
108const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
109
110/// Standard trust event weight for per-operation success/failure signals.
111///
112/// Used for individual replication fetch outcomes, integrity check failures,
113/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
114/// is reserved for confirmed audit failures.
115const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
116
117// ---------------------------------------------------------------------------
118// ReplicationEngine
119// ---------------------------------------------------------------------------
120
121/// The replication engine manages all replication background tasks and state.
122pub struct ReplicationEngine {
123    /// Replication configuration (shared across spawned tasks).
124    config: Arc<ReplicationConfig>,
125    /// P2P networking node.
126    p2p_node: Arc<P2PNode>,
127    /// Local chunk storage.
128    storage: Arc<LmdbStorage>,
129    /// Persistent paid-for-list.
130    paid_list: Arc<PaidList>,
131    /// Payment verifier for `PoP` validation.
132    payment_verifier: Arc<PaymentVerifier>,
133    /// Replication pipeline queues.
134    queues: Arc<RwLock<ReplicationQueues>>,
135    /// Neighbor sync cycle state.
136    sync_state: Arc<RwLock<NeighborSyncState>>,
137    /// Per-peer sync history (for `RepairOpportunity`).
138    ///
139    /// This map grows with peer churn and is intentionally unbounded: entries
140    /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
141    /// naturally bounded by the routing table's k-bucket capacity.
142    sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
143    /// Completed local neighbor-sync cycle epoch for proof maturity.
144    sync_cycle_epoch: Arc<RwLock<u64>>,
145    /// Per-key repair proof tracking for audit eligibility.
146    repair_proofs: Arc<RwLock<RepairProofs>>,
147    /// Bootstrap state tracking.
148    bootstrap_state: Arc<RwLock<BootstrapState>>,
149    /// Whether this node is currently bootstrapping.
150    is_bootstrapping: Arc<RwLock<bool>>,
151    /// Trigger for early neighbor sync (signalled on topology changes).
152    sync_trigger: Arc<Notify>,
153    /// Notified when `is_bootstrapping` transitions from `true` to `false`.
154    bootstrap_complete_notify: Arc<Notify>,
155    /// Limits concurrent outbound replication sends to prevent bandwidth
156    /// saturation on home broadband connections.
157    send_semaphore: Arc<Semaphore>,
158    /// Receiver for fresh-write events from the chunk PUT handler.
159    ///
160    /// When present, `start()` spawns a drainer task that calls
161    /// `replicate_fresh` for each event.
162    fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
163    /// Shutdown token.
164    shutdown: CancellationToken,
165    /// Background task handles.
166    task_handles: Vec<JoinHandle<()>>,
167}
168
169impl ReplicationEngine {
170    /// Create a new replication engine.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the `PaidList` LMDB environment cannot be opened
175    /// or if the configuration fails validation.
176    pub async fn new(
177        config: ReplicationConfig,
178        p2p_node: Arc<P2PNode>,
179        storage: Arc<LmdbStorage>,
180        payment_verifier: Arc<PaymentVerifier>,
181        root_dir: &Path,
182        fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
183        shutdown: CancellationToken,
184    ) -> Result<Self> {
185        config.validate().map_err(Error::Config)?;
186
187        let paid_list = Arc::new(
188            PaidList::new(root_dir)
189                .await
190                .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
191        );
192
193        let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
194        let config = Arc::new(config);
195
196        Ok(Self {
197            config: Arc::clone(&config),
198            p2p_node,
199            storage,
200            paid_list,
201            payment_verifier,
202            queues: Arc::new(RwLock::new(ReplicationQueues::new())),
203            sync_state: Arc::new(RwLock::new(initial_neighbors)),
204            sync_history: Arc::new(RwLock::new(HashMap::new())),
205            sync_cycle_epoch: Arc::new(RwLock::new(0)),
206            repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
207            bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
208            is_bootstrapping: Arc::new(RwLock::new(true)),
209            sync_trigger: Arc::new(Notify::new()),
210            bootstrap_complete_notify: Arc::new(Notify::new()),
211            send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
212            fresh_write_rx: Some(fresh_write_rx),
213            shutdown,
214            task_handles: Vec::new(),
215        })
216    }
217
218    /// Get a reference to the `PaidList`.
219    #[must_use]
220    pub fn paid_list(&self) -> &Arc<PaidList> {
221        &self.paid_list
222    }
223
224    /// Start all background tasks.
225    ///
226    /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
227    /// the `BootstrapComplete` event emitted during DHT bootstrap is not
228    /// missed by the bootstrap-sync gate.
229    pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
230        if !self.task_handles.is_empty() {
231            error!("ReplicationEngine::start() called while already running — ignoring");
232            return;
233        }
234        info!("Starting replication engine");
235
236        self.start_message_handler();
237        self.start_neighbor_sync_loop();
238        self.start_self_lookup_loop();
239        self.start_audit_loop();
240        self.start_fetch_worker();
241        self.start_verification_worker();
242        self.start_bootstrap_sync(dht_events);
243        self.start_fresh_write_drainer();
244
245        info!(
246            "Replication engine started with {} background tasks",
247            self.task_handles.len()
248        );
249    }
250
251    /// Returns `true` if the node is still in the replication bootstrap phase.
252    ///
253    /// During bootstrap, audit challenges return `Bootstrapping` instead of
254    /// digests, and neighbor sync responses carry `bootstrapping: true`.
255    pub async fn is_bootstrapping(&self) -> bool {
256        *self.is_bootstrapping.read().await
257    }
258
259    /// Wait until the replication bootstrap phase completes.
260    ///
261    /// Returns immediately if bootstrap has already completed. Useful for
262    /// readiness probes, health checks, and test harnesses that need the
263    /// node to be fully operational before proceeding.
264    ///
265    /// Returns `true` if bootstrap completed within the timeout, `false`
266    /// if the timeout elapsed first.
267    pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
268        // Register the notification future *before* checking the flag so that
269        // a transition between the read and the await is not missed.
270        let notified = self.bootstrap_complete_notify.notified();
271        tokio::pin!(notified);
272        notified.as_mut().enable();
273
274        if !*self.is_bootstrapping.read().await {
275            return true;
276        }
277
278        tokio::time::timeout(timeout, notified).await.is_ok()
279    }
280
281    /// Cancel all background tasks and wait for them to terminate.
282    ///
283    /// This must be awaited before dropping the engine when the caller needs
284    /// the `Arc<LmdbStorage>` references held by background tasks to be
285    /// released (e.g. before reopening the same LMDB environment).
286    pub async fn shutdown(&mut self) {
287        self.shutdown.cancel();
288        for (i, mut handle) in self.task_handles.drain(..).enumerate() {
289            match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
290                Ok(Ok(())) => {}
291                Ok(Err(e)) if e.is_cancelled() => {}
292                Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
293                Err(_) => {
294                    warn!("Replication task {i} did not stop within 10s, aborting");
295                    handle.abort();
296                }
297            }
298        }
299    }
300
301    /// Trigger an early neighbor sync round.
302    ///
303    /// Useful after topology changes (new nodes joining, network heal after
304    /// partition) when the caller wants replication to converge faster than
305    /// the regular 10-20 minute cadence.
306    pub fn trigger_neighbor_sync(&self) {
307        self.sync_trigger.notify_one();
308    }
309
310    /// Execute fresh replication for a newly stored record.
311    pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
312        fresh::replicate_fresh(
313            key,
314            data,
315            proof_of_payment,
316            &self.p2p_node,
317            &self.paid_list,
318            &self.config,
319            &self.send_semaphore,
320        )
321        .await;
322    }
323
324    // =======================================================================
325    // Background task launchers
326    // =======================================================================
327
328    /// Spawn a task that drains the fresh-write channel and triggers
329    /// replication for each newly-stored chunk.
330    fn start_fresh_write_drainer(&mut self) {
331        let Some(mut rx) = self.fresh_write_rx.take() else {
332            return;
333        };
334        let p2p = Arc::clone(&self.p2p_node);
335        let paid_list = Arc::clone(&self.paid_list);
336        let config = Arc::clone(&self.config);
337        let send_semaphore = Arc::clone(&self.send_semaphore);
338        let shutdown = self.shutdown.clone();
339
340        let handle = tokio::spawn(async move {
341            loop {
342                tokio::select! {
343                    () = shutdown.cancelled() => break,
344                    event = rx.recv() => {
345                        let Some(event) = event else { break };
346                        fresh::replicate_fresh(
347                            &event.key,
348                            &event.data,
349                            &event.payment_proof,
350                            &p2p,
351                            &paid_list,
352                            &config,
353                            &send_semaphore,
354                        )
355                        .await;
356                    }
357                }
358            }
359            debug!("Fresh-write drainer shut down");
360        });
361        self.task_handles.push(handle);
362    }
363
364    #[allow(clippy::too_many_lines)]
365    fn start_message_handler(&mut self) {
366        let mut p2p_events = self.p2p_node.subscribe_events();
367        let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
368        let p2p = Arc::clone(&self.p2p_node);
369        let storage = Arc::clone(&self.storage);
370        let paid_list = Arc::clone(&self.paid_list);
371        let payment_verifier = Arc::clone(&self.payment_verifier);
372        let queues = Arc::clone(&self.queues);
373        let config = Arc::clone(&self.config);
374        let shutdown = self.shutdown.clone();
375        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
376        let bootstrap_state = Arc::clone(&self.bootstrap_state);
377        let sync_history = Arc::clone(&self.sync_history);
378        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
379        let repair_proofs = Arc::clone(&self.repair_proofs);
380        let sync_trigger = Arc::clone(&self.sync_trigger);
381        let sync_state = Arc::clone(&self.sync_state);
382
383        let handle = tokio::spawn(async move {
384            loop {
385                tokio::select! {
386                    () = shutdown.cancelled() => break,
387                    event = p2p_events.recv() => {
388                        let Ok(event) = event else { continue };
389                        if let P2PEvent::Message {
390                            topic,
391                            source: Some(source),
392                            data,
393                            ..
394                        } = event {
395                            // Determine if this is a replication message
396                            // and whether it arrived via the /rr/ request-response
397                            // path (which wraps payloads in RequestResponseEnvelope).
398                            let rr_info = if topic == REPLICATION_PROTOCOL_ID {
399                                Some((data.clone(), None))
400                            } else if topic.starts_with(RR_PREFIX)
401                                && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
402                            {
403                                P2PNode::parse_request_envelope(&data)
404                                    .filter(|(_, is_resp, _)| !is_resp)
405                                    .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
406                            } else {
407                                None
408                            };
409                            if let Some((payload, rr_message_id)) = rr_info {
410                                match handle_replication_message(
411                                    &source,
412                                    &payload,
413                                    &p2p,
414                                    &storage,
415                                    &paid_list,
416                                    &payment_verifier,
417                                    &queues,
418                                    &config,
419                                    &is_bootstrapping,
420                                    &bootstrap_state,
421                                    &sync_history,
422                                    &sync_cycle_epoch,
423                                    &repair_proofs,
424                                    rr_message_id.as_deref(),
425                                ).await {
426                                    Ok(()) => {}
427                                    Err(e) => {
428                                        debug!(
429                                            "Replication message from {source} error: {e}"
430                                        );
431                                    }
432                                }
433                            }
434                        }
435                    }
436                    // Gap 4: Topology churn handling (Section 13).
437                    //
438                    // The DHT routing table emits KClosestPeersChanged when the
439                    // K-closest peer set actually changes, which is the precise
440                    // signal for triggering neighbor sync. This replaces the
441                    // previous approach of checking every PeerConnected /
442                    // PeerDisconnected event against the close group.
443                    dht_event = dht_events.recv() => {
444                        let Ok(dht_event) = dht_event else { continue };
445                        match dht_event {
446                            DhtNetworkEvent::KClosestPeersChanged { old, new } => {
447                                let old_peers = old
448                                    .iter()
449                                    .take(config.neighbor_sync_scope)
450                                    .copied()
451                                    .collect::<HashSet<_>>();
452                                let new_scoped = new
453                                    .iter()
454                                    .take(config.neighbor_sync_scope)
455                                    .copied()
456                                    .collect::<Vec<_>>();
457                                let new_peers =
458                                    new_scoped.iter().copied().collect::<HashSet<_>>();
459                                let entrants = new_scoped
460                                    .iter()
461                                    .copied()
462                                    .filter(|peer| !old_peers.contains(peer))
463                                    .collect::<Vec<_>>();
464                                let entrant_count = entrants.len();
465                                let (priority_insertions, sync_removals) = {
466                                    let mut state = sync_state.write().await;
467                                    let sync_removals = state.retain_sync_peers(&new_peers);
468                                    let priority_insertions = state.queue_priority_peers(entrants);
469                                    (priority_insertions, sync_removals)
470                                };
471                                if priority_insertions > 0 {
472                                    debug!(
473                                        "K-closest peers changed, queued {priority_insertions}/{entrant_count} new close peers for priority neighbor sync and pruned {sync_removals} departed pending sync entries"
474                                    );
475                                } else {
476                                    debug!(
477                                        "K-closest peers changed, no additional close peers queued, pruned {sync_removals} departed pending sync entries, triggering early neighbor sync"
478                                    );
479                                }
480                                sync_trigger.notify_one();
481                            }
482                            DhtNetworkEvent::PeerRemoved { peer_id } => {
483                                sync_state.write().await.remove_peer(&peer_id);
484                                repair_proofs.write().await.remove_peer(&peer_id);
485                            }
486                            _ => {}
487                        }
488                    }
489                }
490            }
491            debug!("Replication message handler shut down");
492        });
493        self.task_handles.push(handle);
494    }
495
496    fn start_neighbor_sync_loop(&mut self) {
497        let p2p = Arc::clone(&self.p2p_node);
498        let storage = Arc::clone(&self.storage);
499        let paid_list = Arc::clone(&self.paid_list);
500        let queues = Arc::clone(&self.queues);
501        let config = Arc::clone(&self.config);
502        let shutdown = self.shutdown.clone();
503        let sync_state = Arc::clone(&self.sync_state);
504        let sync_history = Arc::clone(&self.sync_history);
505        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
506        let repair_proofs = Arc::clone(&self.repair_proofs);
507        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
508        let bootstrap_state = Arc::clone(&self.bootstrap_state);
509        let sync_trigger = Arc::clone(&self.sync_trigger);
510
511        let handle = tokio::spawn(async move {
512            loop {
513                let interval = config.random_neighbor_sync_interval();
514                tokio::select! {
515                    () = shutdown.cancelled() => break,
516                    () = tokio::time::sleep(interval) => {}
517                    () = sync_trigger.notified() => {
518                        debug!("Neighbor sync triggered by topology change");
519                    }
520                }
521                // Wrap the sync round in a select so shutdown cancels
522                // in-progress network operations rather than waiting for
523                // the full round to complete.
524                tokio::select! {
525                    () = shutdown.cancelled() => break,
526                    () = run_neighbor_sync_round(
527                        &p2p,
528                        &storage,
529                        &paid_list,
530                        &queues,
531                        &config,
532                        &sync_state,
533                        &sync_history,
534                        &sync_cycle_epoch,
535                        &repair_proofs,
536                        &is_bootstrapping,
537                        &bootstrap_state,
538                    ) => {}
539                }
540            }
541            debug!("Neighbor sync loop shut down");
542        });
543        self.task_handles.push(handle);
544    }
545
546    fn start_self_lookup_loop(&mut self) {
547        let p2p = Arc::clone(&self.p2p_node);
548        let config = Arc::clone(&self.config);
549        let shutdown = self.shutdown.clone();
550
551        let handle = tokio::spawn(async move {
552            loop {
553                let interval = config.random_self_lookup_interval();
554                tokio::select! {
555                    () = shutdown.cancelled() => break,
556                    () = tokio::time::sleep(interval) => {
557                        if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
558                            debug!("Self-lookup failed: {e}");
559                        }
560                    }
561                }
562            }
563            debug!("Self-lookup loop shut down");
564        });
565        self.task_handles.push(handle);
566    }
567
568    fn start_audit_loop(&mut self) {
569        let p2p = Arc::clone(&self.p2p_node);
570        let storage = Arc::clone(&self.storage);
571        let config = Arc::clone(&self.config);
572        let shutdown = self.shutdown.clone();
573        let sync_history = Arc::clone(&self.sync_history);
574        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
575        let repair_proofs = Arc::clone(&self.repair_proofs);
576        let bootstrap_state = Arc::clone(&self.bootstrap_state);
577        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
578        let sync_state = Arc::clone(&self.sync_state);
579
580        let handle = tokio::spawn(async move {
581            // Invariant 19: wait for bootstrap to drain before starting audits.
582            loop {
583                tokio::select! {
584                    () = shutdown.cancelled() => return,
585                    () = tokio::time::sleep(
586                        std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
587                    ) => {
588                        if bootstrap_state.read().await.is_drained() {
589                            break;
590                        }
591                    }
592                }
593            }
594
595            // Run one audit tick immediately after bootstrap drain.
596            {
597                let bootstrapping = *is_bootstrapping.read().await;
598                let result = {
599                    let history = sync_history.read().await;
600                    let current_sync_epoch = *sync_cycle_epoch.read().await;
601                    audit::audit_tick_with_repair_proofs(
602                        &p2p,
603                        &storage,
604                        &config,
605                        &history,
606                        &repair_proofs,
607                        current_sync_epoch,
608                        bootstrapping,
609                    )
610                    .await
611                };
612                handle_audit_result(&result, &p2p, &sync_state, &config).await;
613            }
614
615            // Then run periodically.
616            loop {
617                let interval = config.random_audit_tick_interval();
618                tokio::select! {
619                    () = shutdown.cancelled() => break,
620                    () = tokio::time::sleep(interval) => {
621                        let bootstrapping = *is_bootstrapping.read().await;
622                        let result = {
623                            let history = sync_history.read().await;
624                            let current_sync_epoch = *sync_cycle_epoch.read().await;
625                            audit::audit_tick_with_repair_proofs(
626                                &p2p,
627                                &storage,
628                                &config,
629                                &history,
630                                &repair_proofs,
631                                current_sync_epoch,
632                                bootstrapping,
633                            )
634                            .await
635                        };
636                        handle_audit_result(&result, &p2p, &sync_state, &config).await;
637                    }
638                }
639            }
640            debug!("Audit loop shut down");
641        });
642        self.task_handles.push(handle);
643    }
644
645    #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
646    fn start_fetch_worker(&mut self) {
647        let p2p = Arc::clone(&self.p2p_node);
648        let storage = Arc::clone(&self.storage);
649        let queues = Arc::clone(&self.queues);
650        let config = Arc::clone(&self.config);
651        let shutdown = self.shutdown.clone();
652        let bootstrap_state = Arc::clone(&self.bootstrap_state);
653        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
654        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
655        let concurrency = max_parallel_fetch();
656
657        info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
658
659        let handle = tokio::spawn(async move {
660            // Each in-flight future yields (key, Option<FetchOutcome>) so we
661            // always recover the key — even if the inner task panics.
662            let mut in_flight = FuturesUnordered::<FetchFuture>::new();
663
664            loop {
665                // Fill up to `concurrency` slots from the queue.
666                {
667                    let mut q = queues.write().await;
668                    while in_flight.len() < concurrency {
669                        let Some(candidate) = q.dequeue_fetch() else {
670                            break;
671                        };
672                        let Some(&source) = candidate.sources.first() else {
673                            warn!(
674                                "Fetch candidate {} has no sources — dropping",
675                                hex::encode(candidate.key)
676                            );
677                            continue;
678                        };
679                        q.start_fetch(candidate.key, source, candidate.sources.clone());
680
681                        let p2p = Arc::clone(&p2p);
682                        let storage = Arc::clone(&storage);
683                        let config = Arc::clone(&config);
684                        let token = shutdown.clone();
685                        let fetch_key = candidate.key;
686                        in_flight.push(Box::pin(async move {
687                            let handle = tokio::spawn(async move {
688                                // Cancel-aware: abort when the engine shuts down.
689                                tokio::select! {
690                                    () = token.cancelled() => FetchOutcome {
691                                        key: fetch_key,
692                                        result: FetchResult::SourceFailed,
693                                    },
694                                    outcome = execute_single_fetch(
695                                        p2p, storage, config, fetch_key, source,
696                                    ) => outcome,
697                                }
698                            });
699                            match handle.await {
700                                Ok(outcome) => (outcome.key, Some(outcome)),
701                                Err(e) => {
702                                    error!(
703                                        "Fetch task for {} panicked: {e}",
704                                        hex::encode(fetch_key)
705                                    );
706                                    (fetch_key, None)
707                                }
708                            }
709                        }));
710                    }
711                } // release queues write lock
712
713                if in_flight.is_empty() {
714                    // No work — wait for new items or shutdown.
715                    tokio::select! {
716                        () = shutdown.cancelled() => break,
717                        () = tokio::time::sleep(
718                            std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
719                        ) => continue,
720                    }
721                }
722
723                // Wait for the next fetch to complete and process the result.
724                tokio::select! {
725                    () = shutdown.cancelled() => break,
726                    Some((key, maybe_outcome)) = in_flight.next() => {
727                        let mut q = queues.write().await;
728                        let terminal = if let Some(outcome) = maybe_outcome {
729                            match outcome.result {
730                                FetchResult::Stored => {
731                                    q.complete_fetch(&key);
732                                    true
733                                }
734                                FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
735                                    if let Some(next_peer) = q.retry_fetch(&key) {
736                                        // Spawn a new fetch task for the next source.
737                                        let p2p = Arc::clone(&p2p);
738                                        let storage = Arc::clone(&storage);
739                                        let config = Arc::clone(&config);
740                                        let token = shutdown.clone();
741                                        let fetch_key = key;
742                                        in_flight.push(Box::pin(async move {
743                                            let handle = tokio::spawn(async move {
744                                                tokio::select! {
745                                                    () = token.cancelled() => FetchOutcome {
746                                                        key: fetch_key,
747                                                        result: FetchResult::SourceFailed,
748                                                    },
749                                                    outcome = execute_single_fetch(
750                                                        p2p, storage, config, fetch_key, next_peer,
751                                                    ) => outcome,
752                                                }
753                                            });
754                                            match handle.await {
755                                                Ok(outcome) => (outcome.key, Some(outcome)),
756                                                Err(e) => {
757                                                    error!(
758                                                        "Fetch task for {} panicked: {e}",
759                                                        hex::encode(fetch_key)
760                                                    );
761                                                    (fetch_key, None)
762                                                }
763                                            }
764                                        }));
765                                        false
766                                    } else {
767                                        q.complete_fetch(&key);
768                                        true
769                                    }
770                                }
771                            }
772                        } else {
773                            // Task panicked — reclaim the in-flight slot.
774                            q.complete_fetch(&key);
775                            true
776                        };
777
778                        // Shrink bootstrap pending set on terminal exit.
779                        if terminal {
780                            drop(q); // release queues lock before acquiring bootstrap_state
781                            if !bootstrap_state.read().await.is_drained() {
782                                bootstrap_state.write().await.remove_key(&key);
783                                let q = queues.read().await;
784                                if bootstrap::check_bootstrap_drained(
785                                    &bootstrap_state,
786                                    &q,
787                                )
788                                .await
789                                {
790                                    complete_bootstrap(
791                                        &is_bootstrapping,
792                                        &bootstrap_complete_notify,
793                                    ).await;
794                                }
795                            }
796                        }
797                    }
798                }
799            }
800
801            // Cancel and drain remaining in-flight fetches on shutdown.
802            // The CancellationToken is already cancelled by this point, so
803            // spawned tasks will see cancellation via their select! branches.
804            while in_flight.next().await.is_some() {}
805            debug!("Fetch worker shut down");
806        });
807        self.task_handles.push(handle);
808    }
809
810    fn start_verification_worker(&mut self) {
811        let p2p = Arc::clone(&self.p2p_node);
812        let storage = Arc::clone(&self.storage);
813        let queues = Arc::clone(&self.queues);
814        let paid_list = Arc::clone(&self.paid_list);
815        let config = Arc::clone(&self.config);
816        let shutdown = self.shutdown.clone();
817        let bootstrap_state = Arc::clone(&self.bootstrap_state);
818        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
819        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
820
821        let handle = tokio::spawn(async move {
822            loop {
823                tokio::select! {
824                    () = shutdown.cancelled() => break,
825                    () = tokio::time::sleep(
826                        std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
827                    ) => {
828                        let ctx = VerificationCycleContext {
829                            p2p_node: &p2p,
830                            paid_list: &paid_list,
831                            storage: &storage,
832                            queues: &queues,
833                            config: &config,
834                            bootstrap_state: &bootstrap_state,
835                            is_bootstrapping: &is_bootstrapping,
836                            bootstrap_complete_notify: &bootstrap_complete_notify,
837                        };
838                        run_verification_cycle(ctx).await;
839                    }
840                }
841            }
842            debug!("Verification worker shut down");
843        });
844        self.task_handles.push(handle);
845    }
846
847    /// Gap 3: Run a one-shot bootstrap sync on startup.
848    ///
849    /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
850    /// (indicating the routing table is populated) before snapshotting
851    /// close neighbors. Falls back after a timeout so bootstrap nodes
852    /// (which have no peers and therefore never receive the event) still
853    /// proceed.
854    ///
855    /// After the gate, finds close neighbors, syncs with each in
856    /// round-robin batches, admits returned hints into the verification
857    /// pipeline, and tracks discovered keys for bootstrap drain detection.
858    #[allow(clippy::too_many_lines)]
859    fn start_bootstrap_sync(
860        &mut self,
861        dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
862    ) {
863        let p2p = Arc::clone(&self.p2p_node);
864        let storage = Arc::clone(&self.storage);
865        let paid_list = Arc::clone(&self.paid_list);
866        let queues = Arc::clone(&self.queues);
867        let config = Arc::clone(&self.config);
868        let shutdown = self.shutdown.clone();
869        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
870        let bootstrap_state = Arc::clone(&self.bootstrap_state);
871        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
872        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
873        let repair_proofs = Arc::clone(&self.repair_proofs);
874
875        let handle = tokio::spawn(async move {
876            // Wait for DHT bootstrap to complete before snapshotting
877            // neighbors. The routing table is empty until saorsa-core
878            // finishes its FIND_NODE rounds and bucket refreshes.
879            let gate = bootstrap::wait_for_bootstrap_complete(
880                dht_events,
881                config.bootstrap_complete_timeout_secs,
882                &shutdown,
883            )
884            .await;
885
886            if gate == bootstrap::BootstrapGateResult::Shutdown {
887                return;
888            }
889
890            let self_id = *p2p.peer_id();
891            let neighbors =
892                neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
893                    .await;
894
895            if neighbors.is_empty() {
896                info!("Bootstrap sync: no close neighbors found, marking drained");
897                bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
898                complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
899                return;
900            }
901
902            let neighbor_count = neighbors.len();
903            info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
904
905            // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
906            for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
907                if shutdown.is_cancelled() {
908                    break;
909                }
910
911                let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
912                    batch,
913                    &storage,
914                    &paid_list,
915                    &p2p,
916                    config.close_group_size,
917                    config.paid_list_close_group_size,
918                )
919                .await;
920
921                for peer in batch {
922                    if shutdown.is_cancelled() {
923                        break;
924                    }
925
926                    // Re-read on each iteration so peers see current state.
927                    let bootstrapping = *is_bootstrapping.read().await;
928
929                    bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
930
931                    let hints = hints_by_peer.remove(peer).unwrap_or_default();
932                    let outcome = neighbor_sync::sync_with_peer_with_hints(
933                        peer,
934                        &p2p,
935                        &config,
936                        bootstrapping,
937                        hints,
938                    )
939                    .await;
940
941                    bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
942
943                    if let Some(outcome) = outcome {
944                        if !outcome.response.bootstrapping {
945                            record_sent_replica_hints(
946                                peer,
947                                &outcome.sent_replica_hints,
948                                &repair_proofs,
949                                &sync_cycle_epoch,
950                            )
951                            .await;
952                            // Admit hints into verification pipeline.
953                            let outcome = admit_and_queue_hints(
954                                &self_id,
955                                peer,
956                                &outcome.response.replica_hints,
957                                &outcome.response.paid_hints,
958                                &p2p,
959                                &config,
960                                &storage,
961                                &paid_list,
962                                &queues,
963                            )
964                            .await;
965
966                            // Track discovered keys for drain detection.
967                            if !outcome.discovered.is_empty() {
968                                bootstrap::track_discovered_keys(
969                                    &bootstrap_state,
970                                    &outcome.discovered,
971                                )
972                                .await;
973                            }
974
975                            // Record / retire capacity rejections so the
976                            // drain check correctly reflects whether each
977                            // source still owes us re-hinted work after
978                            // queue overflow.
979                            if outcome.capacity_rejected_count > 0 {
980                                bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
981                            } else {
982                                bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
983                            }
984                        }
985                    }
986                }
987            }
988
989            // Check drain condition.
990            {
991                let q = queues.read().await;
992                if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
993                    complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
994                }
995            }
996
997            info!("Bootstrap sync completed");
998        });
999        self.task_handles.push(handle);
1000    }
1001}
1002
1003// ===========================================================================
1004// Free functions for background tasks
1005// ===========================================================================
1006
1007/// Handle an incoming replication protocol message.
1008///
1009/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
1010/// request-response path and the response must be sent via `send_response`
1011/// so saorsa-core can route it back to the waiting `send_request` caller.
1012#[allow(clippy::too_many_arguments)]
1013async fn handle_replication_message(
1014    source: &PeerId,
1015    data: &[u8],
1016    p2p_node: &Arc<P2PNode>,
1017    storage: &Arc<LmdbStorage>,
1018    paid_list: &Arc<PaidList>,
1019    payment_verifier: &Arc<PaymentVerifier>,
1020    queues: &Arc<RwLock<ReplicationQueues>>,
1021    config: &ReplicationConfig,
1022    is_bootstrapping: &Arc<RwLock<bool>>,
1023    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1024    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1025    sync_cycle_epoch: &Arc<RwLock<u64>>,
1026    repair_proofs: &Arc<RwLock<RepairProofs>>,
1027    rr_message_id: Option<&str>,
1028) -> Result<()> {
1029    let msg = ReplicationMessage::decode(data)
1030        .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
1031
1032    match msg.body {
1033        ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
1034            handle_fresh_offer(
1035                source,
1036                offer,
1037                storage,
1038                paid_list,
1039                payment_verifier,
1040                p2p_node,
1041                config,
1042                msg.request_id,
1043                rr_message_id,
1044            )
1045            .await
1046        }
1047        ReplicationMessageBody::PaidNotify(ref notify) => {
1048            handle_paid_notify(
1049                source,
1050                notify,
1051                paid_list,
1052                payment_verifier,
1053                p2p_node,
1054                config,
1055            )
1056            .await
1057        }
1058        ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1059            let bootstrapping = *is_bootstrapping.read().await;
1060            handle_neighbor_sync_request(
1061                source,
1062                request,
1063                p2p_node,
1064                storage,
1065                paid_list,
1066                queues,
1067                config,
1068                bootstrapping,
1069                bootstrap_state,
1070                sync_history,
1071                sync_cycle_epoch,
1072                repair_proofs,
1073                msg.request_id,
1074                rr_message_id,
1075            )
1076            .await
1077        }
1078        ReplicationMessageBody::VerificationRequest(ref request) => {
1079            handle_verification_request(
1080                source,
1081                request,
1082                storage,
1083                paid_list,
1084                p2p_node,
1085                msg.request_id,
1086                rr_message_id,
1087            )
1088            .await
1089        }
1090        ReplicationMessageBody::FetchRequest(ref request) => {
1091            handle_fetch_request(
1092                source,
1093                request,
1094                storage,
1095                p2p_node,
1096                msg.request_id,
1097                rr_message_id,
1098            )
1099            .await
1100        }
1101        ReplicationMessageBody::AuditChallenge(ref challenge) => {
1102            let bootstrapping = *is_bootstrapping.read().await;
1103            handle_audit_challenge_msg(
1104                source,
1105                challenge,
1106                storage,
1107                p2p_node,
1108                bootstrapping,
1109                msg.request_id,
1110                rr_message_id,
1111            )
1112            .await
1113        }
1114        // Response messages are handled by their respective request initiators.
1115        ReplicationMessageBody::FreshReplicationResponse(_)
1116        | ReplicationMessageBody::NeighborSyncResponse(_)
1117        | ReplicationMessageBody::VerificationResponse(_)
1118        | ReplicationMessageBody::FetchResponse(_)
1119        | ReplicationMessageBody::AuditResponse(_) => Ok(()),
1120    }
1121}
1122
1123// ---------------------------------------------------------------------------
1124// Per-message-type handlers
1125// ---------------------------------------------------------------------------
1126
1127#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1128async fn handle_fresh_offer(
1129    source: &PeerId,
1130    offer: &protocol::FreshReplicationOffer,
1131    storage: &Arc<LmdbStorage>,
1132    paid_list: &Arc<PaidList>,
1133    payment_verifier: &Arc<PaymentVerifier>,
1134    p2p_node: &Arc<P2PNode>,
1135    config: &ReplicationConfig,
1136    request_id: u64,
1137    rr_message_id: Option<&str>,
1138) -> Result<()> {
1139    let self_id = *p2p_node.peer_id();
1140
1141    // Rule 5: reject if PoP is missing.
1142    if offer.proof_of_payment.is_empty() {
1143        send_replication_response(
1144            source,
1145            p2p_node,
1146            request_id,
1147            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1148                key: offer.key,
1149                reason: "Missing proof of payment".to_string(),
1150            }),
1151            rr_message_id,
1152        )
1153        .await;
1154        return Ok(());
1155    }
1156
1157    // Enforce chunk size invariant: the normal PUT path rejects data larger
1158    // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1159    // prevent peers from pushing oversized records through replication.
1160    if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1161        warn!(
1162            "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1163            hex::encode(offer.key),
1164            offer.data.len(),
1165            crate::ant_protocol::MAX_CHUNK_SIZE,
1166        );
1167        p2p_node
1168            .report_trust_event(
1169                source,
1170                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1171            )
1172            .await;
1173        send_replication_response(
1174            source,
1175            p2p_node,
1176            request_id,
1177            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1178                key: offer.key,
1179                reason: format!(
1180                    "Data size {} exceeds maximum chunk size {}",
1181                    offer.data.len(),
1182                    crate::ant_protocol::MAX_CHUNK_SIZE,
1183                ),
1184            }),
1185            rr_message_id,
1186        )
1187        .await;
1188        return Ok(());
1189    }
1190
1191    // Mirror the normal PUT path: the advertised key must be the content
1192    // address of the supplied bytes before any expensive payment verification.
1193    let computed_key = crate::client::compute_address(&offer.data);
1194    if computed_key != offer.key {
1195        warn!(
1196            "Rejecting fresh offer for key {}: content address mismatch, computed {}",
1197            hex::encode(offer.key),
1198            hex::encode(computed_key),
1199        );
1200        p2p_node
1201            .report_trust_event(
1202                source,
1203                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1204            )
1205            .await;
1206        send_replication_response(
1207            source,
1208            p2p_node,
1209            request_id,
1210            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1211                key: offer.key,
1212                reason: format!(
1213                    "Content address mismatch: expected {}, computed {}",
1214                    hex::encode(offer.key),
1215                    hex::encode(computed_key),
1216                ),
1217            }),
1218            rr_message_id,
1219        )
1220        .await;
1221        return Ok(());
1222    }
1223
1224    // Rule 7: check storage admission. Fresh chunk receivers accept the close
1225    // group plus a small margin to absorb local routing-table disagreement.
1226    if !admission::is_responsible(
1227        &self_id,
1228        &offer.key,
1229        p2p_node,
1230        storage_admission_width(config.close_group_size),
1231    )
1232    .await
1233    {
1234        send_replication_response(
1235            source,
1236            p2p_node,
1237            request_id,
1238            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1239                key: offer.key,
1240                reason: "Not in storage-admission range for this key".to_string(),
1241            }),
1242            rr_message_id,
1243        )
1244        .await;
1245        return Ok(());
1246    }
1247
1248    // Disk-space pre-check — mirror the PUT handler (V2-411). A full node can
1249    // never store this record, so reject it before the expensive payment
1250    // verification (EVM on-chain query / merkle pool work) rather than verifying
1251    // and only then failing at `storage.put` below. Reuses the cached capacity
1252    // check (passing results only, so freed space is detected promptly), and the
1253    // store path keeps its own check as defence-in-depth.
1254    if let Err(e) = storage.check_capacity() {
1255        info!(
1256            target: "ant_node::storage::disk_precheck",
1257            key = %hex::encode(offer.key),
1258            "Rejecting fresh replication offer before payment verification: {e}"
1259        );
1260        send_replication_response(
1261            source,
1262            p2p_node,
1263            request_id,
1264            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1265                key: offer.key,
1266                reason: e.to_string(),
1267            }),
1268            rr_message_id,
1269        )
1270        .await;
1271        return Ok(());
1272    }
1273
1274    // Gap 1: Validate PoP via PaymentVerifier. Fresh replication is still
1275    // part of the immediate write fan-out: this receiver is about to store the
1276    // record as if the client had PUT it here directly. Storage admission
1277    // was checked above before proof work. ClientPut verification applies
1278    // store-strength cache semantics, paid-quote issuer K-closeness and local
1279    // price floor checks for single-node proofs, and merkle candidate
1280    // closeness for merkle proofs.
1281    match payment_verifier
1282        .verify_payment(
1283            &offer.key,
1284            Some(&offer.proof_of_payment),
1285            fresh_offer_payment_context(),
1286        )
1287        .await
1288    {
1289        Ok(status) if status.can_store() => {
1290            debug!(
1291                "PoP validated for fresh offer key {}",
1292                hex::encode(offer.key)
1293            );
1294        }
1295        Ok(_) => {
1296            send_replication_response(
1297                source,
1298                p2p_node,
1299                request_id,
1300                ReplicationMessageBody::FreshReplicationResponse(
1301                    FreshReplicationResponse::Rejected {
1302                        key: offer.key,
1303                        reason: "Payment verification failed: payment required".to_string(),
1304                    },
1305                ),
1306                rr_message_id,
1307            )
1308            .await;
1309            return Ok(());
1310        }
1311        Err(e) => {
1312            warn!(
1313                "PoP verification error for key {}: {e}",
1314                hex::encode(offer.key)
1315            );
1316            send_replication_response(
1317                source,
1318                p2p_node,
1319                request_id,
1320                ReplicationMessageBody::FreshReplicationResponse(
1321                    FreshReplicationResponse::Rejected {
1322                        key: offer.key,
1323                        reason: format!("Payment verification error: {e}"),
1324                    },
1325                ),
1326                rr_message_id,
1327            )
1328            .await;
1329            return Ok(());
1330        }
1331    }
1332
1333    // Rule 6: add to PaidForList.
1334    if let Err(e) = paid_list.insert(&offer.key).await {
1335        warn!("Failed to add key to PaidForList: {e}");
1336    }
1337
1338    // Store the record.
1339    match storage.put(&offer.key, &offer.data).await {
1340        Ok(_) => {
1341            send_replication_response(
1342                source,
1343                p2p_node,
1344                request_id,
1345                ReplicationMessageBody::FreshReplicationResponse(
1346                    FreshReplicationResponse::Accepted { key: offer.key },
1347                ),
1348                rr_message_id,
1349            )
1350            .await;
1351        }
1352        Err(e) => {
1353            send_replication_response(
1354                source,
1355                p2p_node,
1356                request_id,
1357                ReplicationMessageBody::FreshReplicationResponse(
1358                    FreshReplicationResponse::Rejected {
1359                        key: offer.key,
1360                        reason: e.to_string(),
1361                    },
1362                ),
1363                rr_message_id,
1364            )
1365            .await;
1366        }
1367    }
1368
1369    Ok(())
1370}
1371
1372async fn handle_paid_notify(
1373    _source: &PeerId,
1374    notify: &protocol::PaidNotify,
1375    paid_list: &Arc<PaidList>,
1376    payment_verifier: &Arc<PaymentVerifier>,
1377    p2p_node: &Arc<P2PNode>,
1378    config: &ReplicationConfig,
1379) -> Result<()> {
1380    let self_id = *p2p_node.peer_id();
1381
1382    // Rule 3: validate PoP presence before adding.
1383    if notify.proof_of_payment.is_empty() {
1384        return Ok(());
1385    }
1386
1387    // Check if we're in PaidCloseGroup for this key.
1388    if !admission::is_in_paid_close_group(
1389        &self_id,
1390        &notify.key,
1391        p2p_node,
1392        config.paid_list_close_group_size,
1393    )
1394    .await
1395    {
1396        return Ok(());
1397    }
1398
1399    // Gap 1: Validate PoP via PaymentVerifier. PaidNotify admits fresh
1400    // paid-list metadata, so local paid-list close-group membership was checked
1401    // above before proof work. The verifier then runs the same payment proof
1402    // checks as ClientPut while writing a paid-list-strength cache entry.
1403    match payment_verifier
1404        .verify_payment(
1405            &notify.key,
1406            Some(&notify.proof_of_payment),
1407            paid_notify_payment_context(),
1408        )
1409        .await
1410    {
1411        Ok(status) if status.can_store() => {
1412            debug!(
1413                "PoP validated for paid notify key {}",
1414                hex::encode(notify.key)
1415            );
1416        }
1417        Ok(_) => {
1418            warn!(
1419                "Paid notify rejected: payment required for key {}",
1420                hex::encode(notify.key)
1421            );
1422            return Ok(());
1423        }
1424        Err(e) => {
1425            warn!(
1426                "PoP verification error for paid notify key {}: {e}",
1427                hex::encode(notify.key)
1428            );
1429            return Ok(());
1430        }
1431    }
1432
1433    if let Err(e) = paid_list.insert(&notify.key).await {
1434        warn!("Failed to add paid notify key to PaidForList: {e}");
1435    }
1436
1437    Ok(())
1438}
1439
1440#[allow(clippy::too_many_arguments)]
1441async fn handle_neighbor_sync_request(
1442    source: &PeerId,
1443    request: &protocol::NeighborSyncRequest,
1444    p2p_node: &Arc<P2PNode>,
1445    storage: &Arc<LmdbStorage>,
1446    paid_list: &Arc<PaidList>,
1447    queues: &Arc<RwLock<ReplicationQueues>>,
1448    config: &ReplicationConfig,
1449    is_bootstrapping: bool,
1450    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1451    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1452    sync_cycle_epoch: &Arc<RwLock<u64>>,
1453    repair_proofs: &Arc<RwLock<RepairProofs>>,
1454    request_id: u64,
1455    rr_message_id: Option<&str>,
1456) -> Result<()> {
1457    let self_id = *p2p_node.peer_id();
1458
1459    // No per-request hint count limit: the wire message size limit
1460    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
1461    // challenges, sync hints don't drive expensive computation — they just
1462    // enter the verification queue. A per-request limit here would break
1463    // bootstrap replication for newly-joined nodes with 0 stored chunks.
1464
1465    // Build response (outbound hints).
1466    let (response, sent_replica_hints, sender_in_rt) =
1467        neighbor_sync::handle_sync_request_with_proofs(
1468            source,
1469            request,
1470            p2p_node,
1471            storage,
1472            paid_list,
1473            config,
1474            is_bootstrapping,
1475        )
1476        .await;
1477
1478    // Send response.
1479    let response_sent = send_replication_response_checked(
1480        source,
1481        p2p_node,
1482        request_id,
1483        ReplicationMessageBody::NeighborSyncResponse(response),
1484        rr_message_id,
1485    )
1486    .await;
1487
1488    // Process inbound hints only if sender is in LocalRT (Rule 4-6).
1489    if !sender_in_rt {
1490        return Ok(());
1491    }
1492
1493    // Update sync history for this peer before recording repair proofs so a
1494    // same-tick audit cannot combine a fresh key proof with stale peer maturity.
1495    {
1496        let mut history = sync_history.write().await;
1497        let record = history.entry(*source).or_insert(PeerSyncRecord {
1498            last_sync: None,
1499            cycles_since_sync: 0,
1500        });
1501        record.last_sync = Some(Instant::now());
1502        record.cycles_since_sync = 0;
1503    }
1504
1505    if response_sent && !request.bootstrapping {
1506        record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
1507            .await;
1508    }
1509
1510    // Admit inbound hints and queue for verification.
1511    let outcome = admit_and_queue_hints(
1512        &self_id,
1513        source,
1514        &request.replica_hints,
1515        &request.paid_hints,
1516        p2p_node,
1517        config,
1518        storage,
1519        paid_list,
1520        queues,
1521    )
1522    .await;
1523
1524    // Track discovered keys for bootstrap drain detection so that hints
1525    // admitted via inbound sync requests are not missed. Capacity-rejected
1526    // hints keep this source on the "not yet drained" list until its next
1527    // sync re-admits them; a clean cycle clears the source.
1528    if is_bootstrapping {
1529        if !outcome.discovered.is_empty() {
1530            bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1531        }
1532        if outcome.capacity_rejected_count > 0 {
1533            bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
1534        } else {
1535            bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
1536        }
1537    }
1538
1539    Ok(())
1540}
1541
1542async fn handle_verification_request(
1543    source: &PeerId,
1544    request: &protocol::VerificationRequest,
1545    storage: &Arc<LmdbStorage>,
1546    paid_list: &Arc<PaidList>,
1547    p2p_node: &Arc<P2PNode>,
1548    request_id: u64,
1549    rr_message_id: Option<&str>,
1550) -> Result<()> {
1551    // No per-request key count limit: the wire message size limit
1552    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
1553    // does cheap storage lookups per key, not expensive computation like
1554    // audit digest generation.
1555
1556    #[allow(clippy::cast_possible_truncation)]
1557    let keys_len = request.keys.len() as u32;
1558    let paid_check_set: HashSet<u32> = request
1559        .paid_list_check_indices
1560        .iter()
1561        .copied()
1562        .filter(|&idx| {
1563            if idx >= keys_len {
1564                warn!(
1565                    "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1566                    request.keys.len(),
1567                );
1568                false
1569            } else {
1570                true
1571            }
1572        })
1573        .collect();
1574
1575    let mut results = Vec::with_capacity(request.keys.len());
1576    for (i, key) in request.keys.iter().enumerate() {
1577        let present = storage.exists(key).unwrap_or(false);
1578        let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1579            Some(paid_list.contains(key).unwrap_or(false))
1580        } else {
1581            None
1582        };
1583        results.push(protocol::KeyVerificationResult {
1584            key: *key,
1585            present,
1586            paid,
1587        });
1588    }
1589
1590    send_replication_response(
1591        source,
1592        p2p_node,
1593        request_id,
1594        ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1595        rr_message_id,
1596    )
1597    .await;
1598
1599    Ok(())
1600}
1601
1602async fn handle_fetch_request(
1603    source: &PeerId,
1604    request: &protocol::FetchRequest,
1605    storage: &Arc<LmdbStorage>,
1606    p2p_node: &Arc<P2PNode>,
1607    request_id: u64,
1608    rr_message_id: Option<&str>,
1609) -> Result<()> {
1610    let response = match storage.get(&request.key).await {
1611        Ok(Some(data)) => protocol::FetchResponse::Success {
1612            key: request.key,
1613            data,
1614        },
1615        Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1616        Err(e) => protocol::FetchResponse::Error {
1617            key: request.key,
1618            reason: format!("{e}"),
1619        },
1620    };
1621
1622    send_replication_response(
1623        source,
1624        p2p_node,
1625        request_id,
1626        ReplicationMessageBody::FetchResponse(response),
1627        rr_message_id,
1628    )
1629    .await;
1630
1631    Ok(())
1632}
1633
1634async fn handle_audit_challenge_msg(
1635    source: &PeerId,
1636    challenge: &protocol::AuditChallenge,
1637    storage: &Arc<LmdbStorage>,
1638    p2p_node: &Arc<P2PNode>,
1639    is_bootstrapping: bool,
1640    request_id: u64,
1641    rr_message_id: Option<&str>,
1642) -> Result<()> {
1643    #[allow(clippy::cast_possible_truncation)]
1644    let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1645    let response = audit::handle_audit_challenge(
1646        challenge,
1647        storage,
1648        p2p_node.peer_id(),
1649        is_bootstrapping,
1650        stored_chunks,
1651    )
1652    .await;
1653
1654    send_replication_response(
1655        source,
1656        p2p_node,
1657        request_id,
1658        ReplicationMessageBody::AuditResponse(response),
1659        rr_message_id,
1660    )
1661    .await;
1662
1663    Ok(())
1664}
1665
1666// ---------------------------------------------------------------------------
1667// Message sending helper
1668// ---------------------------------------------------------------------------
1669
1670/// Send a replication response message as a best-effort reply.
1671///
1672/// Encode and send failures are logged by the checked helper. Most response
1673/// paths do not need to branch on send success, so this wrapper keeps those
1674/// call sites explicit about their best-effort behavior.
1675async fn send_replication_response(
1676    peer: &PeerId,
1677    p2p_node: &Arc<P2PNode>,
1678    request_id: u64,
1679    body: ReplicationMessageBody,
1680    rr_message_id: Option<&str>,
1681) {
1682    let _ =
1683        send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
1684}
1685
1686/// Send a replication response message and report whether it was accepted.
1687///
1688/// Returns `true` after the message is encoded and accepted by the P2P send
1689/// path. Returns `false` after logging an encode or send failure. Repair-proof
1690/// recording uses this to avoid trusting hints that were not actually sent.
1691///
1692/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
1693/// request-response path so saorsa-core can route it back to the caller's
1694/// `send_request` future. Otherwise it is sent as a plain message.
1695async fn send_replication_response_checked(
1696    peer: &PeerId,
1697    p2p_node: &Arc<P2PNode>,
1698    request_id: u64,
1699    body: ReplicationMessageBody,
1700    rr_message_id: Option<&str>,
1701) -> bool {
1702    let msg = ReplicationMessage { request_id, body };
1703    let encoded = match msg.encode() {
1704        Ok(data) => data,
1705        Err(e) => {
1706            warn!("Failed to encode replication response: {e}");
1707            return false;
1708        }
1709    };
1710    let result = if let Some(msg_id) = rr_message_id {
1711        p2p_node
1712            .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1713            .await
1714    } else {
1715        p2p_node
1716            .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1717            .await
1718    };
1719    if let Err(e) = result {
1720        debug!("Failed to send replication response to {peer}: {e}");
1721        return false;
1722    }
1723    true
1724}
1725
1726async fn record_sent_replica_hints(
1727    peer: &PeerId,
1728    hints: &[neighbor_sync::SentReplicaHint],
1729    repair_proofs: &Arc<RwLock<RepairProofs>>,
1730    sync_cycle_epoch: &Arc<RwLock<u64>>,
1731) {
1732    if hints.is_empty() {
1733        return;
1734    }
1735
1736    let hinted_at_epoch = *sync_cycle_epoch.read().await;
1737    let mut proofs = repair_proofs.write().await;
1738    for hint in hints {
1739        if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
1740            debug!(
1741                "Recorded repair hint proof for peer {peer} and key {}",
1742                hex::encode(hint.key)
1743            );
1744        }
1745    }
1746}
1747
1748// ---------------------------------------------------------------------------
1749// Neighbor sync round
1750// ---------------------------------------------------------------------------
1751
1752/// Run one neighbor sync round.
1753#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1754async fn run_neighbor_sync_round(
1755    p2p_node: &Arc<P2PNode>,
1756    storage: &Arc<LmdbStorage>,
1757    paid_list: &Arc<PaidList>,
1758    queues: &Arc<RwLock<ReplicationQueues>>,
1759    config: &ReplicationConfig,
1760    sync_state: &Arc<RwLock<NeighborSyncState>>,
1761    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1762    sync_cycle_epoch: &Arc<RwLock<u64>>,
1763    repair_proofs: &Arc<RwLock<RepairProofs>>,
1764    is_bootstrapping: &Arc<RwLock<bool>>,
1765    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1766) {
1767    let self_id = *p2p_node.peer_id();
1768    let bootstrapping = *is_bootstrapping.read().await;
1769
1770    // Check if cycle is complete; start new one if needed.
1771    // We check under a read lock, then release it before the expensive
1772    // prune pass and DHT snapshot so other tasks are not starved.
1773    let cycle_complete = sync_state.read().await.is_cycle_complete();
1774    if cycle_complete {
1775        // A completed local neighbor-sync cycle advances the epoch component
1776        // of repair-proof maturity. The per-key wall-clock minimum age is
1777        // checked when audits are selected.
1778        {
1779            let mut history = sync_history.write().await;
1780            for record in history.values_mut() {
1781                record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1782            }
1783        }
1784        let current_sync_epoch = {
1785            let mut epoch = sync_cycle_epoch.write().await;
1786            *epoch = epoch.saturating_add(1);
1787            *epoch
1788        };
1789
1790        // Post-cycle pruning (Section 11) — runs without holding sync_state.
1791        // Remote prune-confirmation audits are storage-proof audits and only
1792        // run after bootstrap has drained.
1793        let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
1794        pruning::run_prune_pass_with_context(pruning::PrunePassContext {
1795            self_id: &self_id,
1796            storage,
1797            paid_list,
1798            p2p_node,
1799            config,
1800            sync_state,
1801            repair_proofs,
1802            current_sync_epoch,
1803            #[cfg(any(test, feature = "test-utils"))]
1804            repair_proof_now: None,
1805            allow_remote_prune_audits,
1806        })
1807        .await;
1808
1809        // Take fresh close-neighbor snapshot (DHT query, no lock held).
1810        let neighbors =
1811            neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1812                .await;
1813
1814        // Now re-acquire write lock and re-check before swapping cycle.
1815        let mut state = sync_state.write().await;
1816        if state.is_cycle_complete() {
1817            // Preserve cooldown and bootstrap-claim tracking across cycles.
1818            // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
1819            // would reset the abuse detection timer every cycle.
1820            let old_sync_times = std::mem::take(&mut state.last_sync_times);
1821            let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1822            let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
1823            let old_prune_cursor = state.prune_cursor;
1824            *state = NeighborSyncState::new_cycle(neighbors);
1825            state.last_sync_times = old_sync_times;
1826            state.bootstrap_claims = old_bootstrap_claims;
1827            state.bootstrap_claim_history = old_bootstrap_claim_history;
1828            state.prune_cursor = old_prune_cursor;
1829        }
1830    }
1831
1832    // Select batch of peers.
1833    let batch = {
1834        let mut state = sync_state.write().await;
1835        neighbor_sync::select_sync_batch(
1836            &mut state,
1837            config.neighbor_sync_peer_count,
1838            config.neighbor_sync_cooldown,
1839        )
1840    };
1841
1842    if batch.is_empty() {
1843        return;
1844    }
1845
1846    debug!("Neighbor sync: syncing with {} peers", batch.len());
1847
1848    let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
1849        &batch,
1850        storage,
1851        paid_list,
1852        p2p_node,
1853        config.close_group_size,
1854        config.paid_list_close_group_size,
1855    )
1856    .await;
1857
1858    // Sync with each peer in the batch.
1859    for peer in &batch {
1860        let hints = hints_by_peer.remove(peer).unwrap_or_default();
1861        let outcome =
1862            neighbor_sync::sync_with_peer_with_hints(peer, p2p_node, config, bootstrapping, hints)
1863                .await;
1864
1865        if let Some(outcome) = outcome {
1866            handle_sync_response(
1867                &self_id,
1868                peer,
1869                &outcome.response,
1870                &outcome.sent_replica_hints,
1871                p2p_node,
1872                config,
1873                bootstrapping,
1874                bootstrap_state,
1875                storage,
1876                paid_list,
1877                queues,
1878                sync_state,
1879                sync_history,
1880                sync_cycle_epoch,
1881                repair_proofs,
1882            )
1883            .await;
1884        } else {
1885            // Sync failed -- remove peer and try to fill slot.
1886            let replacement = {
1887                let mut state = sync_state.write().await;
1888                neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1889            };
1890
1891            // Attempt sync with the replacement peer (if one was found).
1892            if let Some(replacement_peer) = replacement {
1893                let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers(
1894                    std::slice::from_ref(&replacement_peer),
1895                    storage,
1896                    paid_list,
1897                    p2p_node,
1898                    config.close_group_size,
1899                    config.paid_list_close_group_size,
1900                )
1901                .await;
1902                let hints = replacement_hints
1903                    .remove(&replacement_peer)
1904                    .unwrap_or_default();
1905                let replacement_outcome = neighbor_sync::sync_with_peer_with_hints(
1906                    &replacement_peer,
1907                    p2p_node,
1908                    config,
1909                    bootstrapping,
1910                    hints,
1911                )
1912                .await;
1913
1914                if let Some(outcome) = replacement_outcome {
1915                    handle_sync_response(
1916                        &self_id,
1917                        &replacement_peer,
1918                        &outcome.response,
1919                        &outcome.sent_replica_hints,
1920                        p2p_node,
1921                        config,
1922                        bootstrapping,
1923                        bootstrap_state,
1924                        storage,
1925                        paid_list,
1926                        queues,
1927                        sync_state,
1928                        sync_history,
1929                        sync_cycle_epoch,
1930                        repair_proofs,
1931                    )
1932                    .await;
1933                }
1934            }
1935        }
1936    }
1937}
1938
1939/// Process a successful neighbor sync response: record the sync, check for
1940/// bootstrap claim abuse, and admit inbound hints.
1941#[allow(clippy::too_many_arguments)]
1942async fn handle_sync_response(
1943    self_id: &PeerId,
1944    peer: &PeerId,
1945    resp: &NeighborSyncResponse,
1946    sent_replica_hints: &[neighbor_sync::SentReplicaHint],
1947    p2p_node: &Arc<P2PNode>,
1948    config: &ReplicationConfig,
1949    bootstrapping: bool,
1950    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1951    storage: &Arc<LmdbStorage>,
1952    paid_list: &Arc<PaidList>,
1953    queues: &Arc<RwLock<ReplicationQueues>>,
1954    sync_state: &Arc<RwLock<NeighborSyncState>>,
1955    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1956    sync_cycle_epoch: &Arc<RwLock<u64>>,
1957    repair_proofs: &Arc<RwLock<RepairProofs>>,
1958) {
1959    // Record successful sync.
1960    {
1961        let mut state = sync_state.write().await;
1962        neighbor_sync::record_successful_sync(&mut state, peer);
1963    }
1964    {
1965        let mut history = sync_history.write().await;
1966        let record = history.entry(*peer).or_insert(PeerSyncRecord {
1967            last_sync: None,
1968            cycles_since_sync: 0,
1969        });
1970        record.last_sync = Some(Instant::now());
1971        record.cycles_since_sync = 0;
1972    }
1973
1974    // Process inbound hints from response (skip if peer is bootstrapping).
1975    if resp.bootstrapping {
1976        // Gap 6: BootstrapClaimAbuse grace period enforcement.
1977        // Separate state mutation from network I/O to avoid holding the
1978        // write lock across report_trust_event.
1979        let should_report = {
1980            let now = Instant::now();
1981            let mut state = sync_state.write().await;
1982            match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
1983                BootstrapClaimObservation::WithinGrace { .. } => false,
1984                BootstrapClaimObservation::PastGrace { first_seen } => {
1985                    warn!(
1986                        "Peer {peer} has been claiming bootstrap for {:?}, \
1987                         exceeding grace period of {:?} — reporting abuse",
1988                        now.duration_since(first_seen),
1989                        config.bootstrap_claim_grace_period,
1990                    );
1991                    true
1992                }
1993                BootstrapClaimObservation::Repeated { first_seen } => {
1994                    warn!(
1995                        "Peer {peer} repeated bootstrap claim after previously stopping; \
1996                         first claim was {:?} ago — reporting abuse",
1997                        now.duration_since(first_seen),
1998                    );
1999                    true
2000                }
2001            }
2002        };
2003        if should_report {
2004            p2p_node
2005                .report_trust_event(
2006                    peer,
2007                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2008                )
2009                .await;
2010        }
2011    } else {
2012        // Peer is not claiming bootstrap; clear active claim while retaining
2013        // history so the peer cannot start a second grace window later.
2014        {
2015            let mut state = sync_state.write().await;
2016            state.clear_active_bootstrap_claim(peer);
2017        }
2018        record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
2019        let outcome = admit_and_queue_hints(
2020            self_id,
2021            peer,
2022            &resp.replica_hints,
2023            &resp.paid_hints,
2024            p2p_node,
2025            config,
2026            storage,
2027            paid_list,
2028            queues,
2029        )
2030        .await;
2031
2032        // Track discovered keys for bootstrap drain detection so that hints
2033        // admitted via regular neighbor sync are not missed. Capacity-
2034        // rejected hints keep this source on the "not yet drained" list
2035        // until its next sync replays them; a clean cycle clears it.
2036        if bootstrapping {
2037            if !outcome.discovered.is_empty() {
2038                bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2039            }
2040            if outcome.capacity_rejected_count > 0 {
2041                bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
2042            } else {
2043                bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
2044            }
2045        }
2046    }
2047}
2048
2049/// Admit hints and queue them for verification, returning newly-discovered keys.
2050///
2051/// Shared by neighbor-sync request handling, response handling, and bootstrap
2052/// sync so that admission + queueing logic lives in one place.
2053#[allow(clippy::too_many_arguments)]
2054/// Outcome of [`admit_and_queue_hints`].
2055///
2056/// `capacity_rejected_count` is non-zero when one or more legitimately
2057/// admissible hints were dropped because `pending_verify`'s global or
2058/// per-source bound was hit. Callers that care about completeness
2059/// (bootstrap drain accounting) MUST NOT treat their work as complete while
2060/// this is > 0 — the source will need to re-hint after capacity frees up.
2061struct AdmissionOutcome {
2062    discovered: HashSet<XorName>,
2063    capacity_rejected_count: usize,
2064}
2065
2066#[allow(clippy::too_many_arguments)]
2067async fn admit_and_queue_hints(
2068    self_id: &PeerId,
2069    source_peer: &PeerId,
2070    replica_hints: &[XorName],
2071    paid_hints: &[XorName],
2072    p2p_node: &Arc<P2PNode>,
2073    config: &ReplicationConfig,
2074    storage: &Arc<LmdbStorage>,
2075    paid_list: &Arc<PaidList>,
2076    queues: &Arc<RwLock<ReplicationQueues>>,
2077) -> AdmissionOutcome {
2078    let pending_keys: HashSet<XorName> = {
2079        let q = queues.read().await;
2080        q.pending_keys().into_iter().collect()
2081    };
2082
2083    let admitted = admission::admit_hints(
2084        self_id,
2085        replica_hints,
2086        paid_hints,
2087        p2p_node,
2088        config,
2089        storage,
2090        paid_list,
2091        &pending_keys,
2092    )
2093    .await;
2094
2095    let mut discovered = HashSet::new();
2096    let mut capacity_rejected_count: usize = 0;
2097    let mut q = queues.write().await;
2098    let now = Instant::now();
2099
2100    for key in admitted.replica_keys {
2101        if !storage.exists(&key).unwrap_or(false) {
2102            let result = q.add_pending_verify(
2103                key,
2104                VerificationEntry {
2105                    state: VerificationState::PendingVerify,
2106                    pipeline: HintPipeline::Replica,
2107                    verified_sources: Vec::new(),
2108                    tried_sources: HashSet::new(),
2109                    created_at: now,
2110                    hint_sender: *source_peer,
2111                },
2112            );
2113            match result {
2114                crate::replication::scheduling::AdmissionResult::Admitted => {
2115                    discovered.insert(key);
2116                }
2117                crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2118                crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2119                    capacity_rejected_count += 1;
2120                }
2121            }
2122        }
2123    }
2124
2125    for key in admitted.paid_only_keys {
2126        let result = q.add_pending_verify(
2127            key,
2128            VerificationEntry {
2129                state: VerificationState::PendingVerify,
2130                pipeline: HintPipeline::PaidOnly,
2131                verified_sources: Vec::new(),
2132                tried_sources: HashSet::new(),
2133                created_at: now,
2134                hint_sender: *source_peer,
2135            },
2136        );
2137        match result {
2138            crate::replication::scheduling::AdmissionResult::Admitted => {
2139                discovered.insert(key);
2140            }
2141            crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2142            crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2143                capacity_rejected_count += 1;
2144            }
2145        }
2146    }
2147
2148    if capacity_rejected_count > 0 {
2149        debug!(
2150            "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
2151             rejected at queue capacity; source will need to re-hint after pending_verify drains"
2152        );
2153    }
2154
2155    AdmissionOutcome {
2156        discovered,
2157        capacity_rejected_count,
2158    }
2159}
2160
2161// ---------------------------------------------------------------------------
2162// Verification cycle
2163// ---------------------------------------------------------------------------
2164
2165/// Run one verification cycle: process pending keys through quorum checks.
2166#[allow(clippy::too_many_lines)]
2167async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
2168    let cycle_started = Instant::now();
2169    let VerificationCycleContext {
2170        p2p_node,
2171        paid_list,
2172        storage,
2173        queues,
2174        config,
2175        bootstrap_state,
2176        is_bootstrapping,
2177        bootstrap_complete_notify,
2178    } = ctx;
2179
2180    // Evict stale entries that have been pending too long (e.g. unreachable
2181    // verification targets during a network partition).
2182    {
2183        let mut q = queues.write().await;
2184        q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
2185    }
2186
2187    let pending_keys = {
2188        let q = queues.read().await;
2189        q.pending_keys()
2190    };
2191
2192    if pending_keys.is_empty() {
2193        return;
2194    }
2195    let initial_pending_count = pending_keys.len();
2196
2197    let self_id = *p2p_node.peer_id();
2198
2199    // Step 1: Check local PaidForList for fast-path authorization (Section 9,
2200    // step 4).
2201    let mut local_paid_presence_probe_keys = Vec::new();
2202    let mut local_paid_paid_only_keys = Vec::new();
2203    let mut keys_needing_network = Vec::new();
2204    let mut terminal_keys: Vec<XorName> = Vec::new();
2205    {
2206        let mut q = queues.write().await;
2207        for key in &pending_keys {
2208            if paid_list.contains(key).unwrap_or(false) {
2209                if let Some(pipeline) =
2210                    q.set_pending_state(key, VerificationState::PaidListVerified)
2211                {
2212                    match pipeline {
2213                        HintPipeline::PaidOnly => {
2214                            // Paid-only + local paid state needs one more
2215                            // storage-admission check outside this lock: if we
2216                            // are also in the close group plus storage margin,
2217                            // the hint can repair a missing replica.
2218                            local_paid_paid_only_keys.push(*key);
2219                        }
2220                        HintPipeline::Replica => {
2221                            // Local paid-list membership authorizes the key.
2222                            // We still need a presence probe to discover fetch
2223                            // sources, but we must not require remote paid
2224                            // majority or presence quorum.
2225                            local_paid_presence_probe_keys.push(*key);
2226                        }
2227                    }
2228                }
2229            } else {
2230                keys_needing_network.push(*key);
2231            }
2232        }
2233    }
2234
2235    if !local_paid_paid_only_keys.is_empty() {
2236        let mut terminal_paid_only = Vec::new();
2237        for key in local_paid_paid_only_keys {
2238            if storage.exists(&key).unwrap_or(false) {
2239                terminal_paid_only.push(key);
2240            } else if admission::is_responsible(
2241                &self_id,
2242                &key,
2243                p2p_node,
2244                storage_admission_width(config.close_group_size),
2245            )
2246            .await
2247            {
2248                local_paid_presence_probe_keys.push(key);
2249            } else {
2250                terminal_paid_only.push(key);
2251            }
2252        }
2253
2254        if !terminal_paid_only.is_empty() {
2255            let mut q = queues.write().await;
2256            for key in terminal_paid_only {
2257                q.remove_pending(&key);
2258                terminal_keys.push(key);
2259            }
2260        }
2261    }
2262
2263    let local_paid_probe_count = local_paid_presence_probe_keys.len();
2264    let keys_needing_network_count = keys_needing_network.len();
2265
2266    // Step 1b: Local paid-list hit for fetch-eligible keys. Per Section 9
2267    // step 4, authorization succeeds immediately; run a presence-only probe
2268    // to find any holder we can fetch from.
2269    if !local_paid_presence_probe_keys.is_empty() {
2270        let targets = quorum::compute_presence_targets(
2271            &local_paid_presence_probe_keys,
2272            p2p_node,
2273            config,
2274            &self_id,
2275        )
2276        .await;
2277        let evidence = quorum::run_verification_round(
2278            &local_paid_presence_probe_keys,
2279            &targets,
2280            p2p_node,
2281            config,
2282        )
2283        .await;
2284
2285        let mut q = queues.write().await;
2286        for key in local_paid_presence_probe_keys {
2287            if storage.exists(&key).unwrap_or(false) {
2288                q.remove_pending(&key);
2289                terminal_keys.push(key);
2290                continue;
2291            }
2292            let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
2293                quorum::present_sources_for_key(&key, ev, &targets)
2294            });
2295            if sources.is_empty() {
2296                // Terminal failure: remove pending and report. No fetch path.
2297                q.remove_pending(&key);
2298                warn!(
2299                    "Locally paid key {} has no responding holders (possible data loss)",
2300                    hex::encode(key)
2301                );
2302                terminal_keys.push(key);
2303            } else {
2304                let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2305                // Atomic remove+enqueue: if fetch_queue is at capacity, the
2306                // pending entry is preserved and retried next cycle (no
2307                // silent drop of verified replica-repair work).
2308                let _ = q.promote_pending_to_fetch(key, distance, sources);
2309            }
2310        }
2311    }
2312
2313    // Steps 2-5: Network verification (skipped if all keys resolved locally).
2314    if !keys_needing_network.is_empty() {
2315        // Step 2: Compute targets and run network verification round.
2316        let targets =
2317            quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
2318                .await;
2319
2320        let evidence =
2321            quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
2322
2323        // Step 3: Evaluate results — collect outcomes without holding the write
2324        // lock across paid-list I/O.
2325        let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
2326        {
2327            let q = queues.read().await;
2328            for key in &keys_needing_network {
2329                let Some(ev) = evidence.get(key) else {
2330                    continue;
2331                };
2332                let Some(entry) = q.get_pending(key) else {
2333                    continue;
2334                };
2335                let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
2336                evaluated.push((*key, outcome, entry.pipeline));
2337            }
2338        } // read lock released
2339
2340        // Step 4: Insert verified keys into PaidForList (no lock held).
2341        let mut paid_insert_keys: Vec<XorName> = Vec::new();
2342        for (key, outcome, _) in &evaluated {
2343            if matches!(
2344                outcome,
2345                KeyVerificationOutcome::QuorumVerified { .. }
2346                    | KeyVerificationOutcome::PaidListVerified { .. }
2347            ) {
2348                paid_insert_keys.push(*key);
2349            }
2350        }
2351        for key in &paid_insert_keys {
2352            if let Err(e) = paid_list.insert(key).await {
2353                warn!("Failed to add verified key to PaidForList: {e}");
2354            }
2355        }
2356
2357        // Paid-only hints normally update PaidForList only. If this node is
2358        // also within the storage-admission group for the key, a verified
2359        // paid-only hint can safely repair a missing replica using sources
2360        // from the same verification round.
2361        let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
2362        for (key, outcome, pipeline) in &evaluated {
2363            if *pipeline == HintPipeline::PaidOnly
2364                && matches!(
2365                    outcome,
2366                    KeyVerificationOutcome::QuorumVerified { .. }
2367                        | KeyVerificationOutcome::PaidListVerified { .. }
2368                )
2369                && !storage.exists(key).unwrap_or(false)
2370                && admission::is_responsible(
2371                    &self_id,
2372                    key,
2373                    p2p_node,
2374                    storage_admission_width(config.close_group_size),
2375                )
2376                .await
2377            {
2378                paid_only_fetch_keys.insert(*key);
2379            }
2380        }
2381
2382        // Step 5: Update queues with the evaluated outcomes.
2383        let mut q = queues.write().await;
2384        for (key, outcome, pipeline) in evaluated {
2385            match outcome {
2386                KeyVerificationOutcome::QuorumVerified { sources }
2387                | KeyVerificationOutcome::PaidListVerified { sources } => {
2388                    let fetch_eligible =
2389                        pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
2390                    if fetch_eligible && !sources.is_empty() {
2391                        let distance =
2392                            crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2393                        // Atomic remove+enqueue: on fetch_queue capacity miss
2394                        // the pending entry is preserved so this verified key
2395                        // is retried on the next cycle (no silent drop).
2396                        let _ = q.promote_pending_to_fetch(key, distance, sources);
2397                        // Not terminal — either moved to fetch queue, or
2398                        // retained as pending until queue drains.
2399                    } else if fetch_eligible && sources.is_empty() {
2400                        warn!(
2401                            "Verified storage-admitted key {} has no holders (possible data loss)",
2402                            hex::encode(key)
2403                        );
2404                        q.remove_pending(&key);
2405                        terminal_keys.push(key);
2406                    } else {
2407                        q.remove_pending(&key);
2408                        terminal_keys.push(key);
2409                    }
2410                }
2411                KeyVerificationOutcome::QuorumFailed
2412                | KeyVerificationOutcome::QuorumInconclusive => {
2413                    q.remove_pending(&key);
2414                    terminal_keys.push(key);
2415                }
2416            }
2417        }
2418    }
2419
2420    // Step 6: Remove terminal keys from bootstrap pending set and re-check
2421    // the drain condition.
2422    update_bootstrap_after_verification(
2423        &terminal_keys,
2424        bootstrap_state,
2425        queues,
2426        is_bootstrapping,
2427        bootstrap_complete_notify,
2428    )
2429    .await;
2430
2431    let (pending_after, fetch_after, in_flight_after) = {
2432        let q = queues.read().await;
2433        (
2434            q.pending_count(),
2435            q.fetch_queue_count(),
2436            q.in_flight_count(),
2437        )
2438    };
2439    let terminal_key_count = terminal_keys.len();
2440    let elapsed_ms = cycle_started.elapsed().as_millis();
2441
2442    if elapsed_ms >= VERIFICATION_CYCLE_SLOW_LOG_MS {
2443        info!(
2444            target: "ant_node::replication::verification",
2445            "Slow replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
2446        );
2447    } else {
2448        debug!(
2449            target: "ant_node::replication::verification",
2450            "Replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
2451        );
2452    }
2453}
2454
2455/// Post-verification bootstrap bookkeeping: remove terminal keys from the
2456/// bootstrap pending set and transition out of bootstrapping when drained.
2457async fn update_bootstrap_after_verification(
2458    terminal_keys: &[XorName],
2459    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2460    queues: &Arc<RwLock<ReplicationQueues>>,
2461    is_bootstrapping: &Arc<RwLock<bool>>,
2462    bootstrap_complete_notify: &Arc<Notify>,
2463) {
2464    if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
2465        return;
2466    }
2467    {
2468        let mut bs = bootstrap_state.write().await;
2469        for key in terminal_keys {
2470            bs.remove_key(key);
2471        }
2472    }
2473    let q = queues.read().await;
2474    if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
2475        complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
2476    }
2477}
2478
2479/// Set `is_bootstrapping` to `false` and wake all waiters.
2480async fn complete_bootstrap(
2481    is_bootstrapping: &Arc<RwLock<bool>>,
2482    bootstrap_complete_notify: &Arc<Notify>,
2483) {
2484    *is_bootstrapping.write().await = false;
2485    bootstrap_complete_notify.notify_waiters();
2486    info!("Replication bootstrap complete");
2487}
2488
2489// ---------------------------------------------------------------------------
2490// Fetch types and single-fetch executor
2491// ---------------------------------------------------------------------------
2492
2493/// Result classification for a single fetch attempt.
2494enum FetchResult {
2495    /// Data fetched, integrity-checked, and stored successfully.
2496    Stored,
2497    /// Content-address integrity check failed — do not retry.
2498    IntegrityFailed,
2499    /// Source failed (network error or non-success response) — retryable.
2500    SourceFailed,
2501}
2502
2503/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
2504/// worker loop to update queue state.
2505struct FetchOutcome {
2506    key: XorName,
2507    result: FetchResult,
2508}
2509
2510#[allow(clippy::too_many_lines)]
2511/// Execute a single fetch request against `source` for `key`.
2512///
2513/// Handles encoding, network I/O, integrity checking, storage, and trust
2514/// event reporting.  Returns a [`FetchOutcome`] so the caller can update
2515/// queue state without holding any locks during the network round-trip.
2516async fn execute_single_fetch(
2517    p2p_node: Arc<P2PNode>,
2518    storage: Arc<LmdbStorage>,
2519    config: Arc<ReplicationConfig>,
2520    key: XorName,
2521    source: PeerId,
2522) -> FetchOutcome {
2523    let request = protocol::FetchRequest { key };
2524    let msg = ReplicationMessage {
2525        request_id: rand::thread_rng().gen::<u64>(),
2526        body: ReplicationMessageBody::FetchRequest(request),
2527    };
2528
2529    let encoded = match msg.encode() {
2530        Ok(data) => data,
2531        Err(e) => {
2532            warn!("Failed to encode fetch request: {e}");
2533            return FetchOutcome {
2534                key,
2535                result: FetchResult::SourceFailed,
2536            };
2537        }
2538    };
2539
2540    let result = p2p_node
2541        .send_request(
2542            &source,
2543            REPLICATION_PROTOCOL_ID,
2544            encoded,
2545            config.fetch_request_timeout,
2546        )
2547        .await;
2548
2549    match result {
2550        Ok(response) => {
2551            let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2552                p2p_node
2553                    .report_trust_event(
2554                        &source,
2555                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2556                    )
2557                    .await;
2558                return FetchOutcome {
2559                    key,
2560                    result: FetchResult::SourceFailed,
2561                };
2562            };
2563
2564            match resp_msg.body {
2565                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2566                    key: resp_key,
2567                    data,
2568                }) => {
2569                    // Validate the response key matches the requested key.
2570                    // A malicious peer could serve valid data for a different
2571                    // key, passing integrity checks while the requested key
2572                    // is falsely marked as fetched.
2573                    if resp_key != key {
2574                        warn!(
2575                            "Fetch response key mismatch: requested {}, got {}",
2576                            hex::encode(key),
2577                            hex::encode(resp_key)
2578                        );
2579                        p2p_node
2580                            .report_trust_event(
2581                                &source,
2582                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2583                            )
2584                            .await;
2585                        return FetchOutcome {
2586                            key,
2587                            result: FetchResult::IntegrityFailed,
2588                        };
2589                    }
2590
2591                    // Enforce chunk size invariant on fetched data.
2592                    // Checked before the content-address hash to avoid
2593                    // hashing up to 10 MiB of oversized junk data.
2594                    if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2595                        warn!(
2596                            "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2597                            hex::encode(resp_key),
2598                            data.len(),
2599                            crate::ant_protocol::MAX_CHUNK_SIZE,
2600                        );
2601                        p2p_node
2602                            .report_trust_event(
2603                                &source,
2604                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2605                            )
2606                            .await;
2607                        return FetchOutcome {
2608                            key,
2609                            result: FetchResult::IntegrityFailed,
2610                        };
2611                    }
2612
2613                    // Content-address integrity check.
2614                    let computed = crate::client::compute_address(&data);
2615                    if computed != resp_key {
2616                        warn!(
2617                            "Fetched record integrity check failed: expected {}, got {}",
2618                            hex::encode(resp_key),
2619                            hex::encode(computed)
2620                        );
2621                        p2p_node
2622                            .report_trust_event(
2623                                &source,
2624                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2625                            )
2626                            .await;
2627                        return FetchOutcome {
2628                            key,
2629                            result: FetchResult::IntegrityFailed,
2630                        };
2631                    }
2632
2633                    if let Err(e) = storage.put(&resp_key, &data).await {
2634                        warn!(
2635                            "Failed to store fetched record {}: {e}",
2636                            hex::encode(resp_key)
2637                        );
2638                        return FetchOutcome {
2639                            key,
2640                            result: FetchResult::SourceFailed,
2641                        };
2642                    }
2643
2644                    FetchOutcome {
2645                        key,
2646                        result: FetchResult::Stored,
2647                    }
2648                }
2649                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2650                    ..
2651                }) => {
2652                    // This peer was selected as a fetch source because it
2653                    // recently answered `Present` during verification. A
2654                    // subsequent NotFound is evidence of a stale/false claim
2655                    // or chunk wiping, so penalize lightly and try another
2656                    // verified source.
2657                    warn!(
2658                        "Fetch: verified source {source} returned NotFound for {}",
2659                        hex::encode(key)
2660                    );
2661                    p2p_node
2662                        .report_trust_event(
2663                            &source,
2664                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2665                        )
2666                        .await;
2667                    FetchOutcome {
2668                        key,
2669                        result: FetchResult::SourceFailed,
2670                    }
2671                }
2672                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2673                    reason,
2674                    ..
2675                }) => {
2676                    warn!(
2677                        "Fetch: peer {source} returned error for {}: {reason}",
2678                        hex::encode(key)
2679                    );
2680                    p2p_node
2681                        .report_trust_event(
2682                            &source,
2683                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2684                        )
2685                        .await;
2686                    FetchOutcome {
2687                        key,
2688                        result: FetchResult::SourceFailed,
2689                    }
2690                }
2691                _ => {
2692                    // Unexpected message type — treat as malformed.
2693                    p2p_node
2694                        .report_trust_event(
2695                            &source,
2696                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2697                        )
2698                        .await;
2699                    FetchOutcome {
2700                        key,
2701                        result: FetchResult::SourceFailed,
2702                    }
2703                }
2704            }
2705        }
2706        Err(e) => {
2707            debug!("Fetch request to {source} failed: {e}");
2708            // No ApplicationFailure here — P2PNode::send_request() already
2709            // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
2710            FetchOutcome {
2711                key,
2712                result: FetchResult::SourceFailed,
2713            }
2714        }
2715    }
2716}
2717
2718// ---------------------------------------------------------------------------
2719// Audit result handler
2720// ---------------------------------------------------------------------------
2721
2722/// Format the first confirmed-failed key as a 16-hex-char label.
2723///
2724/// Pairs with `challenged_peer` to form a stable cross-host correlation
2725/// handle in the audit-failure log line, e.g.
2726///
2727/// ```text
2728/// Audit failure for <peer>: …, `first_failed_key=0x18878f1d2d9e0612`
2729/// ```
2730///
2731/// Falls back to `"0x"` when the list is empty so the log line never
2732/// contains a misleading default.
2733fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String {
2734    confirmed_failed_keys.first().map_or_else(
2735        || "0x".to_string(),
2736        |k| format!("0x{}", hex::encode(&k[..8])),
2737    )
2738}
2739
2740/// Handle audit result: log findings and emit trust events.
2741async fn handle_audit_result(
2742    result: &AuditTickResult,
2743    p2p_node: &Arc<P2PNode>,
2744    sync_state: &Arc<RwLock<NeighborSyncState>>,
2745    config: &ReplicationConfig,
2746) {
2747    match result {
2748        AuditTickResult::Passed {
2749            challenged_peer,
2750            keys_checked,
2751        } => {
2752            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2753            // Peer responded normally — clear the active bootstrap claim while
2754            // retaining history so a later claim is treated as repeated abuse.
2755            {
2756                let mut state = sync_state.write().await;
2757                state.clear_active_bootstrap_claim(challenged_peer);
2758            }
2759            p2p_node
2760                .report_trust_event(
2761                    challenged_peer,
2762                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2763                )
2764                .await;
2765        }
2766        AuditTickResult::Failed { evidence } => {
2767            if let FailureEvidence::AuditFailure {
2768                challenged_peer,
2769                confirmed_failed_keys,
2770                summary,
2771                reason,
2772                ..
2773            } = evidence
2774            {
2775                let first_failed_key = first_failed_key_label(confirmed_failed_keys);
2776                error!(
2777                    "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
2778                    confirmed_failed_keys.len(),
2779                    summary.challenged_keys,
2780                    summary.absent_keys,
2781                    summary.digest_mismatch_keys,
2782                );
2783                if audit_failure_clears_bootstrap_claim(reason) {
2784                    // Peer returned a non-bootstrap response — clear the active
2785                    // claim while retaining claim history.
2786                    let mut state = sync_state.write().await;
2787                    state.clear_active_bootstrap_claim(challenged_peer);
2788                } else {
2789                    debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
2790                }
2791                p2p_node
2792                    .report_trust_event(
2793                        challenged_peer,
2794                        TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2795                    )
2796                    .await;
2797            }
2798        }
2799        AuditTickResult::BootstrapClaim { peer } => {
2800            // Gap 6: BootstrapClaimAbuse grace period in audit path.
2801            // Separate state mutation from network I/O to avoid holding the
2802            // write lock across report_trust_event.
2803            let should_report = {
2804                let now = Instant::now();
2805                let mut state = sync_state.write().await;
2806                match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
2807                {
2808                    BootstrapClaimObservation::WithinGrace { .. } => {
2809                        debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2810                        false
2811                    }
2812                    BootstrapClaimObservation::PastGrace { first_seen } => {
2813                        warn!(
2814                            "Audit: peer {peer} claiming bootstrap past grace period \
2815                             ({:?} > {:?}), reporting abuse",
2816                            now.duration_since(first_seen),
2817                            config.bootstrap_claim_grace_period,
2818                        );
2819                        true
2820                    }
2821                    BootstrapClaimObservation::Repeated { first_seen } => {
2822                        warn!(
2823                            "Audit: peer {peer} repeated bootstrap claim after previously \
2824                             stopping; first claim was {:?} ago, reporting abuse",
2825                            now.duration_since(first_seen),
2826                        );
2827                        true
2828                    }
2829                }
2830            };
2831            if should_report {
2832                p2p_node
2833                    .report_trust_event(
2834                        peer,
2835                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2836                    )
2837                    .await;
2838            }
2839        }
2840        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2841    }
2842}
2843
2844fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
2845    !matches!(reason, AuditFailureReason::Timeout)
2846}
2847
2848// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.
2849
2850#[cfg(test)]
2851#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2852mod tests {
2853    use super::{
2854        audit_failure_clears_bootstrap_claim, first_failed_key_label, fresh_offer_payment_context,
2855        paid_notify_payment_context,
2856    };
2857    use crate::payment::VerificationContext;
2858    use crate::replication::types::AuditFailureReason;
2859
2860    #[test]
2861    fn fresh_offer_runs_client_put_payment_checks() {
2862        assert_eq!(
2863            fresh_offer_payment_context(),
2864            VerificationContext::ClientPut
2865        );
2866    }
2867
2868    #[test]
2869    fn paid_notify_uses_paid_list_admission_payment_checks() {
2870        assert_eq!(
2871            paid_notify_payment_context(),
2872            VerificationContext::PaidListAdmission
2873        );
2874    }
2875
2876    #[test]
2877    fn audit_timeout_preserves_active_bootstrap_claim() {
2878        assert!(!audit_failure_clears_bootstrap_claim(
2879            &AuditFailureReason::Timeout
2880        ));
2881    }
2882
2883    #[test]
2884    fn decoded_audit_failures_clear_active_bootstrap_claim() {
2885        for reason in [
2886            AuditFailureReason::MalformedResponse,
2887            AuditFailureReason::DigestMismatch,
2888            AuditFailureReason::KeyAbsent,
2889            AuditFailureReason::Rejected,
2890        ] {
2891            assert!(
2892                audit_failure_clears_bootstrap_claim(&reason),
2893                "decoded non-bootstrap failure {reason:?} should clear active claim"
2894            );
2895        }
2896    }
2897
2898    #[test]
2899    fn first_failed_key_label_truncates_to_16_hex_chars() {
2900        // The high-order 8 bytes of the XorName determine the label so an
2901        // operator can group audit-failures on the same chunk prefix.
2902        let mut key = [0u8; 32];
2903        key[0] = 0x18;
2904        key[7] = 0xff;
2905        // Low-order bytes (positions 8..32) are deliberately set to 0xAA
2906        // to verify they are NOT included in the label.
2907        for byte in &mut key[8..] {
2908            *byte = 0xAA;
2909        }
2910        let label = first_failed_key_label(&[key]);
2911        // Only the first 8 bytes are encoded, low-order bytes are dropped.
2912        assert_eq!(label, "0x18000000000000ff");
2913        assert_eq!(label.len(), "0x".len() + 16);
2914    }
2915
2916    #[test]
2917    fn first_failed_key_label_falls_back_when_empty() {
2918        // Should never happen in production (handle_audit_failure rejects
2919        // empty sets), but the formatter must still produce a valid label
2920        // so the log line doesn't contain a misleading default.
2921        assert_eq!(first_failed_key_label(&[]), "0x");
2922    }
2923
2924    #[test]
2925    fn first_failed_key_label_uses_first_key_only() {
2926        let first = [0x11u8; 32];
2927        let second = [0x22u8; 32];
2928        assert_eq!(
2929            first_failed_key_label(&[first, second]),
2930            format!("0x{}", hex::encode(&first[..8]))
2931        );
2932    }
2933}