Skip to main content

ant_node/replication/
mod.rs

1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::PaymentVerifier;
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50    max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51    REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55    FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56    VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61    AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
62    NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68// ---------------------------------------------------------------------------
69// Constants
70// ---------------------------------------------------------------------------
71
72/// Prefix used by saorsa-core's request-response mechanism.
73const RR_PREFIX: &str = "/rr/";
74
75/// Boxed future type for in-flight fetch tasks.
76type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78/// Shared dependencies for one verification worker cycle.
79struct VerificationCycleContext<'a> {
80    p2p_node: &'a Arc<P2PNode>,
81    paid_list: &'a Arc<PaidList>,
82    storage: &'a Arc<LmdbStorage>,
83    queues: &'a Arc<RwLock<ReplicationQueues>>,
84    config: &'a ReplicationConfig,
85    bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
86    is_bootstrapping: &'a Arc<RwLock<bool>>,
87    bootstrap_complete_notify: &'a Arc<Notify>,
88}
89
90/// Fetch worker polling interval in milliseconds.
91const FETCH_WORKER_POLL_MS: u64 = 100;
92
93/// Verification worker polling interval in milliseconds.
94const VERIFICATION_WORKER_POLL_MS: u64 = 250;
95
96/// Bootstrap drain check interval in seconds.
97const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
98
99/// Standard trust event weight for per-operation success/failure signals.
100///
101/// Used for individual replication fetch outcomes, integrity check failures,
102/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
103/// is reserved for confirmed audit failures.
104const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
105
106// ---------------------------------------------------------------------------
107// ReplicationEngine
108// ---------------------------------------------------------------------------
109
110/// The replication engine manages all replication background tasks and state.
111pub struct ReplicationEngine {
112    /// Replication configuration (shared across spawned tasks).
113    config: Arc<ReplicationConfig>,
114    /// P2P networking node.
115    p2p_node: Arc<P2PNode>,
116    /// Local chunk storage.
117    storage: Arc<LmdbStorage>,
118    /// Persistent paid-for-list.
119    paid_list: Arc<PaidList>,
120    /// Payment verifier for `PoP` validation.
121    payment_verifier: Arc<PaymentVerifier>,
122    /// Replication pipeline queues.
123    queues: Arc<RwLock<ReplicationQueues>>,
124    /// Neighbor sync cycle state.
125    sync_state: Arc<RwLock<NeighborSyncState>>,
126    /// Per-peer sync history (for `RepairOpportunity`).
127    ///
128    /// This map grows with peer churn and is intentionally unbounded: entries
129    /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
130    /// naturally bounded by the routing table's k-bucket capacity.
131    sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
132    /// Completed local neighbor-sync cycle epoch for proof maturity.
133    sync_cycle_epoch: Arc<RwLock<u64>>,
134    /// Per-key repair proof tracking for audit eligibility.
135    repair_proofs: Arc<RwLock<RepairProofs>>,
136    /// Bootstrap state tracking.
137    bootstrap_state: Arc<RwLock<BootstrapState>>,
138    /// Whether this node is currently bootstrapping.
139    is_bootstrapping: Arc<RwLock<bool>>,
140    /// Trigger for early neighbor sync (signalled on topology changes).
141    sync_trigger: Arc<Notify>,
142    /// Notified when `is_bootstrapping` transitions from `true` to `false`.
143    bootstrap_complete_notify: Arc<Notify>,
144    /// Limits concurrent outbound replication sends to prevent bandwidth
145    /// saturation on home broadband connections.
146    send_semaphore: Arc<Semaphore>,
147    /// Receiver for fresh-write events from the chunk PUT handler.
148    ///
149    /// When present, `start()` spawns a drainer task that calls
150    /// `replicate_fresh` for each event.
151    fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
152    /// Shutdown token.
153    shutdown: CancellationToken,
154    /// Background task handles.
155    task_handles: Vec<JoinHandle<()>>,
156}
157
158impl ReplicationEngine {
159    /// Create a new replication engine.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the `PaidList` LMDB environment cannot be opened
164    /// or if the configuration fails validation.
165    pub async fn new(
166        config: ReplicationConfig,
167        p2p_node: Arc<P2PNode>,
168        storage: Arc<LmdbStorage>,
169        payment_verifier: Arc<PaymentVerifier>,
170        root_dir: &Path,
171        fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
172        shutdown: CancellationToken,
173    ) -> Result<Self> {
174        config.validate().map_err(Error::Config)?;
175
176        let paid_list = Arc::new(
177            PaidList::new(root_dir)
178                .await
179                .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
180        );
181
182        let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
183        let config = Arc::new(config);
184
185        Ok(Self {
186            config: Arc::clone(&config),
187            p2p_node,
188            storage,
189            paid_list,
190            payment_verifier,
191            queues: Arc::new(RwLock::new(ReplicationQueues::new())),
192            sync_state: Arc::new(RwLock::new(initial_neighbors)),
193            sync_history: Arc::new(RwLock::new(HashMap::new())),
194            sync_cycle_epoch: Arc::new(RwLock::new(0)),
195            repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
196            bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
197            is_bootstrapping: Arc::new(RwLock::new(true)),
198            sync_trigger: Arc::new(Notify::new()),
199            bootstrap_complete_notify: Arc::new(Notify::new()),
200            send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
201            fresh_write_rx: Some(fresh_write_rx),
202            shutdown,
203            task_handles: Vec::new(),
204        })
205    }
206
207    /// Get a reference to the `PaidList`.
208    #[must_use]
209    pub fn paid_list(&self) -> &Arc<PaidList> {
210        &self.paid_list
211    }
212
213    /// Start all background tasks.
214    ///
215    /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
216    /// the `BootstrapComplete` event emitted during DHT bootstrap is not
217    /// missed by the bootstrap-sync gate.
218    pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
219        if !self.task_handles.is_empty() {
220            error!("ReplicationEngine::start() called while already running — ignoring");
221            return;
222        }
223        info!("Starting replication engine");
224
225        self.start_message_handler();
226        self.start_neighbor_sync_loop();
227        self.start_self_lookup_loop();
228        self.start_audit_loop();
229        self.start_fetch_worker();
230        self.start_verification_worker();
231        self.start_bootstrap_sync(dht_events);
232        self.start_fresh_write_drainer();
233
234        info!(
235            "Replication engine started with {} background tasks",
236            self.task_handles.len()
237        );
238    }
239
240    /// Returns `true` if the node is still in the replication bootstrap phase.
241    ///
242    /// During bootstrap, audit challenges return `Bootstrapping` instead of
243    /// digests, and neighbor sync responses carry `bootstrapping: true`.
244    pub async fn is_bootstrapping(&self) -> bool {
245        *self.is_bootstrapping.read().await
246    }
247
248    /// Wait until the replication bootstrap phase completes.
249    ///
250    /// Returns immediately if bootstrap has already completed. Useful for
251    /// readiness probes, health checks, and test harnesses that need the
252    /// node to be fully operational before proceeding.
253    ///
254    /// Returns `true` if bootstrap completed within the timeout, `false`
255    /// if the timeout elapsed first.
256    pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
257        // Register the notification future *before* checking the flag so that
258        // a transition between the read and the await is not missed.
259        let notified = self.bootstrap_complete_notify.notified();
260        tokio::pin!(notified);
261        notified.as_mut().enable();
262
263        if !*self.is_bootstrapping.read().await {
264            return true;
265        }
266
267        tokio::time::timeout(timeout, notified).await.is_ok()
268    }
269
270    /// Cancel all background tasks and wait for them to terminate.
271    ///
272    /// This must be awaited before dropping the engine when the caller needs
273    /// the `Arc<LmdbStorage>` references held by background tasks to be
274    /// released (e.g. before reopening the same LMDB environment).
275    pub async fn shutdown(&mut self) {
276        self.shutdown.cancel();
277        for (i, mut handle) in self.task_handles.drain(..).enumerate() {
278            match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
279                Ok(Ok(())) => {}
280                Ok(Err(e)) if e.is_cancelled() => {}
281                Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
282                Err(_) => {
283                    warn!("Replication task {i} did not stop within 10s, aborting");
284                    handle.abort();
285                }
286            }
287        }
288    }
289
290    /// Trigger an early neighbor sync round.
291    ///
292    /// Useful after topology changes (new nodes joining, network heal after
293    /// partition) when the caller wants replication to converge faster than
294    /// the regular 10-20 minute cadence.
295    pub fn trigger_neighbor_sync(&self) {
296        self.sync_trigger.notify_one();
297    }
298
299    /// Execute fresh replication for a newly stored record.
300    pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
301        fresh::replicate_fresh(
302            key,
303            data,
304            proof_of_payment,
305            &self.p2p_node,
306            &self.paid_list,
307            &self.config,
308            &self.send_semaphore,
309        )
310        .await;
311    }
312
313    // =======================================================================
314    // Background task launchers
315    // =======================================================================
316
317    /// Spawn a task that drains the fresh-write channel and triggers
318    /// replication for each newly-stored chunk.
319    fn start_fresh_write_drainer(&mut self) {
320        let Some(mut rx) = self.fresh_write_rx.take() else {
321            return;
322        };
323        let p2p = Arc::clone(&self.p2p_node);
324        let paid_list = Arc::clone(&self.paid_list);
325        let config = Arc::clone(&self.config);
326        let send_semaphore = Arc::clone(&self.send_semaphore);
327        let shutdown = self.shutdown.clone();
328
329        let handle = tokio::spawn(async move {
330            loop {
331                tokio::select! {
332                    () = shutdown.cancelled() => break,
333                    event = rx.recv() => {
334                        let Some(event) = event else { break };
335                        fresh::replicate_fresh(
336                            &event.key,
337                            &event.data,
338                            &event.payment_proof,
339                            &p2p,
340                            &paid_list,
341                            &config,
342                            &send_semaphore,
343                        )
344                        .await;
345                    }
346                }
347            }
348            debug!("Fresh-write drainer shut down");
349        });
350        self.task_handles.push(handle);
351    }
352
353    #[allow(clippy::too_many_lines)]
354    fn start_message_handler(&mut self) {
355        let mut p2p_events = self.p2p_node.subscribe_events();
356        let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
357        let p2p = Arc::clone(&self.p2p_node);
358        let storage = Arc::clone(&self.storage);
359        let paid_list = Arc::clone(&self.paid_list);
360        let payment_verifier = Arc::clone(&self.payment_verifier);
361        let queues = Arc::clone(&self.queues);
362        let config = Arc::clone(&self.config);
363        let shutdown = self.shutdown.clone();
364        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
365        let bootstrap_state = Arc::clone(&self.bootstrap_state);
366        let sync_history = Arc::clone(&self.sync_history);
367        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
368        let repair_proofs = Arc::clone(&self.repair_proofs);
369        let sync_trigger = Arc::clone(&self.sync_trigger);
370
371        let handle = tokio::spawn(async move {
372            loop {
373                tokio::select! {
374                    () = shutdown.cancelled() => break,
375                    event = p2p_events.recv() => {
376                        let Ok(event) = event else { continue };
377                        if let P2PEvent::Message {
378                            topic,
379                            source: Some(source),
380                            data,
381                            ..
382                        } = event {
383                            // Determine if this is a replication message
384                            // and whether it arrived via the /rr/ request-response
385                            // path (which wraps payloads in RequestResponseEnvelope).
386                            let rr_info = if topic == REPLICATION_PROTOCOL_ID {
387                                Some((data.clone(), None))
388                            } else if topic.starts_with(RR_PREFIX)
389                                && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
390                            {
391                                P2PNode::parse_request_envelope(&data)
392                                    .filter(|(_, is_resp, _)| !is_resp)
393                                    .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
394                            } else {
395                                None
396                            };
397                            if let Some((payload, rr_message_id)) = rr_info {
398                                match handle_replication_message(
399                                    &source,
400                                    &payload,
401                                    &p2p,
402                                    &storage,
403                                    &paid_list,
404                                    &payment_verifier,
405                                    &queues,
406                                    &config,
407                                    &is_bootstrapping,
408                                    &bootstrap_state,
409                                    &sync_history,
410                                    &sync_cycle_epoch,
411                                    &repair_proofs,
412                                    rr_message_id.as_deref(),
413                                ).await {
414                                    Ok(()) => {}
415                                    Err(e) => {
416                                        debug!(
417                                            "Replication message from {source} error: {e}"
418                                        );
419                                    }
420                                }
421                            }
422                        }
423                    }
424                    // Gap 4: Topology churn handling (Section 13).
425                    //
426                    // The DHT routing table emits KClosestPeersChanged when the
427                    // K-closest peer set actually changes, which is the precise
428                    // signal for triggering neighbor sync. This replaces the
429                    // previous approach of checking every PeerConnected /
430                    // PeerDisconnected event against the close group.
431                    dht_event = dht_events.recv() => {
432                        let Ok(dht_event) = dht_event else { continue };
433                        match dht_event {
434                            DhtNetworkEvent::KClosestPeersChanged { .. } => {
435                                debug!(
436                                    "K-closest peers changed, triggering early neighbor sync"
437                                );
438                                sync_trigger.notify_one();
439                            }
440                            DhtNetworkEvent::PeerRemoved { peer_id } => {
441                                repair_proofs.write().await.remove_peer(&peer_id);
442                            }
443                            _ => {}
444                        }
445                    }
446                }
447            }
448            debug!("Replication message handler shut down");
449        });
450        self.task_handles.push(handle);
451    }
452
453    fn start_neighbor_sync_loop(&mut self) {
454        let p2p = Arc::clone(&self.p2p_node);
455        let storage = Arc::clone(&self.storage);
456        let paid_list = Arc::clone(&self.paid_list);
457        let queues = Arc::clone(&self.queues);
458        let config = Arc::clone(&self.config);
459        let shutdown = self.shutdown.clone();
460        let sync_state = Arc::clone(&self.sync_state);
461        let sync_history = Arc::clone(&self.sync_history);
462        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
463        let repair_proofs = Arc::clone(&self.repair_proofs);
464        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
465        let bootstrap_state = Arc::clone(&self.bootstrap_state);
466        let sync_trigger = Arc::clone(&self.sync_trigger);
467
468        let handle = tokio::spawn(async move {
469            loop {
470                let interval = config.random_neighbor_sync_interval();
471                tokio::select! {
472                    () = shutdown.cancelled() => break,
473                    () = tokio::time::sleep(interval) => {}
474                    () = sync_trigger.notified() => {
475                        debug!("Neighbor sync triggered by topology change");
476                    }
477                }
478                // Wrap the sync round in a select so shutdown cancels
479                // in-progress network operations rather than waiting for
480                // the full round to complete.
481                tokio::select! {
482                    () = shutdown.cancelled() => break,
483                    () = run_neighbor_sync_round(
484                        &p2p,
485                        &storage,
486                        &paid_list,
487                        &queues,
488                        &config,
489                        &sync_state,
490                        &sync_history,
491                        &sync_cycle_epoch,
492                        &repair_proofs,
493                        &is_bootstrapping,
494                        &bootstrap_state,
495                    ) => {}
496                }
497            }
498            debug!("Neighbor sync loop shut down");
499        });
500        self.task_handles.push(handle);
501    }
502
503    fn start_self_lookup_loop(&mut self) {
504        let p2p = Arc::clone(&self.p2p_node);
505        let config = Arc::clone(&self.config);
506        let shutdown = self.shutdown.clone();
507
508        let handle = tokio::spawn(async move {
509            loop {
510                let interval = config.random_self_lookup_interval();
511                tokio::select! {
512                    () = shutdown.cancelled() => break,
513                    () = tokio::time::sleep(interval) => {
514                        if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
515                            debug!("Self-lookup failed: {e}");
516                        }
517                    }
518                }
519            }
520            debug!("Self-lookup loop shut down");
521        });
522        self.task_handles.push(handle);
523    }
524
525    fn start_audit_loop(&mut self) {
526        let p2p = Arc::clone(&self.p2p_node);
527        let storage = Arc::clone(&self.storage);
528        let config = Arc::clone(&self.config);
529        let shutdown = self.shutdown.clone();
530        let sync_history = Arc::clone(&self.sync_history);
531        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
532        let repair_proofs = Arc::clone(&self.repair_proofs);
533        let bootstrap_state = Arc::clone(&self.bootstrap_state);
534        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
535        let sync_state = Arc::clone(&self.sync_state);
536
537        let handle = tokio::spawn(async move {
538            // Invariant 19: wait for bootstrap to drain before starting audits.
539            loop {
540                tokio::select! {
541                    () = shutdown.cancelled() => return,
542                    () = tokio::time::sleep(
543                        std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
544                    ) => {
545                        if bootstrap_state.read().await.is_drained() {
546                            break;
547                        }
548                    }
549                }
550            }
551
552            // Run one audit tick immediately after bootstrap drain.
553            {
554                let bootstrapping = *is_bootstrapping.read().await;
555                let result = {
556                    let history = sync_history.read().await;
557                    let current_sync_epoch = *sync_cycle_epoch.read().await;
558                    audit::audit_tick_with_repair_proofs(
559                        &p2p,
560                        &storage,
561                        &config,
562                        &history,
563                        &repair_proofs,
564                        current_sync_epoch,
565                        bootstrapping,
566                    )
567                    .await
568                };
569                handle_audit_result(&result, &p2p, &sync_state, &config).await;
570            }
571
572            // Then run periodically.
573            loop {
574                let interval = config.random_audit_tick_interval();
575                tokio::select! {
576                    () = shutdown.cancelled() => break,
577                    () = tokio::time::sleep(interval) => {
578                        let bootstrapping = *is_bootstrapping.read().await;
579                        let result = {
580                            let history = sync_history.read().await;
581                            let current_sync_epoch = *sync_cycle_epoch.read().await;
582                            audit::audit_tick_with_repair_proofs(
583                                &p2p,
584                                &storage,
585                                &config,
586                                &history,
587                                &repair_proofs,
588                                current_sync_epoch,
589                                bootstrapping,
590                            )
591                            .await
592                        };
593                        handle_audit_result(&result, &p2p, &sync_state, &config).await;
594                    }
595                }
596            }
597            debug!("Audit loop shut down");
598        });
599        self.task_handles.push(handle);
600    }
601
602    #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
603    fn start_fetch_worker(&mut self) {
604        let p2p = Arc::clone(&self.p2p_node);
605        let storage = Arc::clone(&self.storage);
606        let queues = Arc::clone(&self.queues);
607        let config = Arc::clone(&self.config);
608        let shutdown = self.shutdown.clone();
609        let bootstrap_state = Arc::clone(&self.bootstrap_state);
610        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
611        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
612        let concurrency = max_parallel_fetch();
613
614        info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
615
616        let handle = tokio::spawn(async move {
617            // Each in-flight future yields (key, Option<FetchOutcome>) so we
618            // always recover the key — even if the inner task panics.
619            let mut in_flight = FuturesUnordered::<FetchFuture>::new();
620
621            loop {
622                // Fill up to `concurrency` slots from the queue.
623                {
624                    let mut q = queues.write().await;
625                    while in_flight.len() < concurrency {
626                        let Some(candidate) = q.dequeue_fetch() else {
627                            break;
628                        };
629                        let Some(&source) = candidate.sources.first() else {
630                            warn!(
631                                "Fetch candidate {} has no sources — dropping",
632                                hex::encode(candidate.key)
633                            );
634                            continue;
635                        };
636                        q.start_fetch(candidate.key, source, candidate.sources.clone());
637
638                        let p2p = Arc::clone(&p2p);
639                        let storage = Arc::clone(&storage);
640                        let config = Arc::clone(&config);
641                        let token = shutdown.clone();
642                        let fetch_key = candidate.key;
643                        in_flight.push(Box::pin(async move {
644                            let handle = tokio::spawn(async move {
645                                // Cancel-aware: abort when the engine shuts down.
646                                tokio::select! {
647                                    () = token.cancelled() => FetchOutcome {
648                                        key: fetch_key,
649                                        result: FetchResult::SourceFailed,
650                                    },
651                                    outcome = execute_single_fetch(
652                                        p2p, storage, config, fetch_key, source,
653                                    ) => outcome,
654                                }
655                            });
656                            match handle.await {
657                                Ok(outcome) => (outcome.key, Some(outcome)),
658                                Err(e) => {
659                                    error!(
660                                        "Fetch task for {} panicked: {e}",
661                                        hex::encode(fetch_key)
662                                    );
663                                    (fetch_key, None)
664                                }
665                            }
666                        }));
667                    }
668                } // release queues write lock
669
670                if in_flight.is_empty() {
671                    // No work — wait for new items or shutdown.
672                    tokio::select! {
673                        () = shutdown.cancelled() => break,
674                        () = tokio::time::sleep(
675                            std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
676                        ) => continue,
677                    }
678                }
679
680                // Wait for the next fetch to complete and process the result.
681                tokio::select! {
682                    () = shutdown.cancelled() => break,
683                    Some((key, maybe_outcome)) = in_flight.next() => {
684                        let mut q = queues.write().await;
685                        let terminal = if let Some(outcome) = maybe_outcome {
686                            match outcome.result {
687                                FetchResult::Stored => {
688                                    q.complete_fetch(&key);
689                                    true
690                                }
691                                FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
692                                    if let Some(next_peer) = q.retry_fetch(&key) {
693                                        // Spawn a new fetch task for the next source.
694                                        let p2p = Arc::clone(&p2p);
695                                        let storage = Arc::clone(&storage);
696                                        let config = Arc::clone(&config);
697                                        let token = shutdown.clone();
698                                        let fetch_key = key;
699                                        in_flight.push(Box::pin(async move {
700                                            let handle = tokio::spawn(async move {
701                                                tokio::select! {
702                                                    () = token.cancelled() => FetchOutcome {
703                                                        key: fetch_key,
704                                                        result: FetchResult::SourceFailed,
705                                                    },
706                                                    outcome = execute_single_fetch(
707                                                        p2p, storage, config, fetch_key, next_peer,
708                                                    ) => outcome,
709                                                }
710                                            });
711                                            match handle.await {
712                                                Ok(outcome) => (outcome.key, Some(outcome)),
713                                                Err(e) => {
714                                                    error!(
715                                                        "Fetch task for {} panicked: {e}",
716                                                        hex::encode(fetch_key)
717                                                    );
718                                                    (fetch_key, None)
719                                                }
720                                            }
721                                        }));
722                                        false
723                                    } else {
724                                        q.complete_fetch(&key);
725                                        true
726                                    }
727                                }
728                            }
729                        } else {
730                            // Task panicked — reclaim the in-flight slot.
731                            q.complete_fetch(&key);
732                            true
733                        };
734
735                        // Shrink bootstrap pending set on terminal exit.
736                        if terminal {
737                            drop(q); // release queues lock before acquiring bootstrap_state
738                            if !bootstrap_state.read().await.is_drained() {
739                                bootstrap_state.write().await.remove_key(&key);
740                                let q = queues.read().await;
741                                if bootstrap::check_bootstrap_drained(
742                                    &bootstrap_state,
743                                    &q,
744                                )
745                                .await
746                                {
747                                    complete_bootstrap(
748                                        &is_bootstrapping,
749                                        &bootstrap_complete_notify,
750                                    ).await;
751                                }
752                            }
753                        }
754                    }
755                }
756            }
757
758            // Cancel and drain remaining in-flight fetches on shutdown.
759            // The CancellationToken is already cancelled by this point, so
760            // spawned tasks will see cancellation via their select! branches.
761            while in_flight.next().await.is_some() {}
762            debug!("Fetch worker shut down");
763        });
764        self.task_handles.push(handle);
765    }
766
767    fn start_verification_worker(&mut self) {
768        let p2p = Arc::clone(&self.p2p_node);
769        let storage = Arc::clone(&self.storage);
770        let queues = Arc::clone(&self.queues);
771        let paid_list = Arc::clone(&self.paid_list);
772        let config = Arc::clone(&self.config);
773        let shutdown = self.shutdown.clone();
774        let bootstrap_state = Arc::clone(&self.bootstrap_state);
775        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
776        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
777
778        let handle = tokio::spawn(async move {
779            loop {
780                tokio::select! {
781                    () = shutdown.cancelled() => break,
782                    () = tokio::time::sleep(
783                        std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
784                    ) => {
785                        let ctx = VerificationCycleContext {
786                            p2p_node: &p2p,
787                            paid_list: &paid_list,
788                            storage: &storage,
789                            queues: &queues,
790                            config: &config,
791                            bootstrap_state: &bootstrap_state,
792                            is_bootstrapping: &is_bootstrapping,
793                            bootstrap_complete_notify: &bootstrap_complete_notify,
794                        };
795                        run_verification_cycle(ctx).await;
796                    }
797                }
798            }
799            debug!("Verification worker shut down");
800        });
801        self.task_handles.push(handle);
802    }
803
804    /// Gap 3: Run a one-shot bootstrap sync on startup.
805    ///
806    /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
807    /// (indicating the routing table is populated) before snapshotting
808    /// close neighbors. Falls back after a timeout so bootstrap nodes
809    /// (which have no peers and therefore never receive the event) still
810    /// proceed.
811    ///
812    /// After the gate, finds close neighbors, syncs with each in
813    /// round-robin batches, admits returned hints into the verification
814    /// pipeline, and tracks discovered keys for bootstrap drain detection.
815    #[allow(clippy::too_many_lines)]
816    fn start_bootstrap_sync(
817        &mut self,
818        dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
819    ) {
820        let p2p = Arc::clone(&self.p2p_node);
821        let storage = Arc::clone(&self.storage);
822        let paid_list = Arc::clone(&self.paid_list);
823        let queues = Arc::clone(&self.queues);
824        let config = Arc::clone(&self.config);
825        let shutdown = self.shutdown.clone();
826        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
827        let bootstrap_state = Arc::clone(&self.bootstrap_state);
828        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
829        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
830        let repair_proofs = Arc::clone(&self.repair_proofs);
831
832        let handle = tokio::spawn(async move {
833            // Wait for DHT bootstrap to complete before snapshotting
834            // neighbors. The routing table is empty until saorsa-core
835            // finishes its FIND_NODE rounds and bucket refreshes.
836            let gate = bootstrap::wait_for_bootstrap_complete(
837                dht_events,
838                config.bootstrap_complete_timeout_secs,
839                &shutdown,
840            )
841            .await;
842
843            if gate == bootstrap::BootstrapGateResult::Shutdown {
844                return;
845            }
846
847            let self_id = *p2p.peer_id();
848            let neighbors =
849                neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
850                    .await;
851
852            if neighbors.is_empty() {
853                info!("Bootstrap sync: no close neighbors found, marking drained");
854                bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
855                complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
856                return;
857            }
858
859            let neighbor_count = neighbors.len();
860            info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
861
862            // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
863            for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
864                if shutdown.is_cancelled() {
865                    break;
866                }
867
868                for peer in batch {
869                    if shutdown.is_cancelled() {
870                        break;
871                    }
872
873                    // Re-read on each iteration so peers see current state.
874                    let bootstrapping = *is_bootstrapping.read().await;
875
876                    bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
877
878                    let outcome = neighbor_sync::sync_with_peer_with_outcome(
879                        peer,
880                        &p2p,
881                        &storage,
882                        &paid_list,
883                        &config,
884                        bootstrapping,
885                    )
886                    .await;
887
888                    bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
889
890                    if let Some(outcome) = outcome {
891                        if !outcome.response.bootstrapping {
892                            record_sent_replica_hints(
893                                peer,
894                                &outcome.sent_replica_hints,
895                                &repair_proofs,
896                                &sync_cycle_epoch,
897                            )
898                            .await;
899                            // Admit hints into verification pipeline.
900                            let outcome = admit_and_queue_hints(
901                                &self_id,
902                                peer,
903                                &outcome.response.replica_hints,
904                                &outcome.response.paid_hints,
905                                &p2p,
906                                &config,
907                                &storage,
908                                &paid_list,
909                                &queues,
910                            )
911                            .await;
912
913                            // Track discovered keys for drain detection.
914                            if !outcome.discovered.is_empty() {
915                                bootstrap::track_discovered_keys(
916                                    &bootstrap_state,
917                                    &outcome.discovered,
918                                )
919                                .await;
920                            }
921
922                            // Record / retire capacity rejections so the
923                            // drain check correctly reflects whether each
924                            // source still owes us re-hinted work after
925                            // queue overflow.
926                            if outcome.capacity_rejected_count > 0 {
927                                bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
928                            } else {
929                                bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
930                            }
931                        }
932                    }
933                }
934            }
935
936            // Check drain condition.
937            {
938                let q = queues.read().await;
939                if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
940                    complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
941                }
942            }
943
944            info!("Bootstrap sync completed");
945        });
946        self.task_handles.push(handle);
947    }
948}
949
950// ===========================================================================
951// Free functions for background tasks
952// ===========================================================================
953
954/// Handle an incoming replication protocol message.
955///
956/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
957/// request-response path and the response must be sent via `send_response`
958/// so saorsa-core can route it back to the waiting `send_request` caller.
959#[allow(clippy::too_many_arguments)]
960async fn handle_replication_message(
961    source: &PeerId,
962    data: &[u8],
963    p2p_node: &Arc<P2PNode>,
964    storage: &Arc<LmdbStorage>,
965    paid_list: &Arc<PaidList>,
966    payment_verifier: &Arc<PaymentVerifier>,
967    queues: &Arc<RwLock<ReplicationQueues>>,
968    config: &ReplicationConfig,
969    is_bootstrapping: &Arc<RwLock<bool>>,
970    bootstrap_state: &Arc<RwLock<BootstrapState>>,
971    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
972    sync_cycle_epoch: &Arc<RwLock<u64>>,
973    repair_proofs: &Arc<RwLock<RepairProofs>>,
974    rr_message_id: Option<&str>,
975) -> Result<()> {
976    let msg = ReplicationMessage::decode(data)
977        .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
978
979    match msg.body {
980        ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
981            handle_fresh_offer(
982                source,
983                offer,
984                storage,
985                paid_list,
986                payment_verifier,
987                p2p_node,
988                config,
989                msg.request_id,
990                rr_message_id,
991            )
992            .await
993        }
994        ReplicationMessageBody::PaidNotify(ref notify) => {
995            handle_paid_notify(
996                source,
997                notify,
998                paid_list,
999                payment_verifier,
1000                p2p_node,
1001                config,
1002            )
1003            .await
1004        }
1005        ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1006            let bootstrapping = *is_bootstrapping.read().await;
1007            handle_neighbor_sync_request(
1008                source,
1009                request,
1010                p2p_node,
1011                storage,
1012                paid_list,
1013                queues,
1014                config,
1015                bootstrapping,
1016                bootstrap_state,
1017                sync_history,
1018                sync_cycle_epoch,
1019                repair_proofs,
1020                msg.request_id,
1021                rr_message_id,
1022            )
1023            .await
1024        }
1025        ReplicationMessageBody::VerificationRequest(ref request) => {
1026            handle_verification_request(
1027                source,
1028                request,
1029                storage,
1030                paid_list,
1031                p2p_node,
1032                msg.request_id,
1033                rr_message_id,
1034            )
1035            .await
1036        }
1037        ReplicationMessageBody::FetchRequest(ref request) => {
1038            handle_fetch_request(
1039                source,
1040                request,
1041                storage,
1042                p2p_node,
1043                msg.request_id,
1044                rr_message_id,
1045            )
1046            .await
1047        }
1048        ReplicationMessageBody::AuditChallenge(ref challenge) => {
1049            let bootstrapping = *is_bootstrapping.read().await;
1050            handle_audit_challenge_msg(
1051                source,
1052                challenge,
1053                storage,
1054                p2p_node,
1055                bootstrapping,
1056                msg.request_id,
1057                rr_message_id,
1058            )
1059            .await
1060        }
1061        // Response messages are handled by their respective request initiators.
1062        ReplicationMessageBody::FreshReplicationResponse(_)
1063        | ReplicationMessageBody::NeighborSyncResponse(_)
1064        | ReplicationMessageBody::VerificationResponse(_)
1065        | ReplicationMessageBody::FetchResponse(_)
1066        | ReplicationMessageBody::AuditResponse(_) => Ok(()),
1067    }
1068}
1069
1070// ---------------------------------------------------------------------------
1071// Per-message-type handlers
1072// ---------------------------------------------------------------------------
1073
1074#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1075async fn handle_fresh_offer(
1076    source: &PeerId,
1077    offer: &protocol::FreshReplicationOffer,
1078    storage: &Arc<LmdbStorage>,
1079    paid_list: &Arc<PaidList>,
1080    payment_verifier: &Arc<PaymentVerifier>,
1081    p2p_node: &Arc<P2PNode>,
1082    config: &ReplicationConfig,
1083    request_id: u64,
1084    rr_message_id: Option<&str>,
1085) -> Result<()> {
1086    let self_id = *p2p_node.peer_id();
1087
1088    // Rule 5: reject if PoP is missing.
1089    if offer.proof_of_payment.is_empty() {
1090        send_replication_response(
1091            source,
1092            p2p_node,
1093            request_id,
1094            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1095                key: offer.key,
1096                reason: "Missing proof of payment".to_string(),
1097            }),
1098            rr_message_id,
1099        )
1100        .await;
1101        return Ok(());
1102    }
1103
1104    // Enforce chunk size invariant: the normal PUT path rejects data larger
1105    // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1106    // prevent peers from pushing oversized records through replication.
1107    if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1108        warn!(
1109            "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1110            hex::encode(offer.key),
1111            offer.data.len(),
1112            crate::ant_protocol::MAX_CHUNK_SIZE,
1113        );
1114        p2p_node
1115            .report_trust_event(
1116                source,
1117                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1118            )
1119            .await;
1120        send_replication_response(
1121            source,
1122            p2p_node,
1123            request_id,
1124            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1125                key: offer.key,
1126                reason: format!(
1127                    "Data size {} exceeds maximum chunk size {}",
1128                    offer.data.len(),
1129                    crate::ant_protocol::MAX_CHUNK_SIZE,
1130                ),
1131            }),
1132            rr_message_id,
1133        )
1134        .await;
1135        return Ok(());
1136    }
1137
1138    // Rule 7: check responsibility.
1139    if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1140        send_replication_response(
1141            source,
1142            p2p_node,
1143            request_id,
1144            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1145                key: offer.key,
1146                reason: "Not responsible for this key".to_string(),
1147            }),
1148            rr_message_id,
1149        )
1150        .await;
1151        return Ok(());
1152    }
1153
1154    // Gap 1: Validate PoP via PaymentVerifier.
1155    match payment_verifier
1156        .verify_payment(&offer.key, Some(&offer.proof_of_payment))
1157        .await
1158    {
1159        Ok(status) if status.can_store() => {
1160            debug!(
1161                "PoP validated for fresh offer key {}",
1162                hex::encode(offer.key)
1163            );
1164        }
1165        Ok(_) => {
1166            send_replication_response(
1167                source,
1168                p2p_node,
1169                request_id,
1170                ReplicationMessageBody::FreshReplicationResponse(
1171                    FreshReplicationResponse::Rejected {
1172                        key: offer.key,
1173                        reason: "Payment verification failed: payment required".to_string(),
1174                    },
1175                ),
1176                rr_message_id,
1177            )
1178            .await;
1179            return Ok(());
1180        }
1181        Err(e) => {
1182            warn!(
1183                "PoP verification error for key {}: {e}",
1184                hex::encode(offer.key)
1185            );
1186            send_replication_response(
1187                source,
1188                p2p_node,
1189                request_id,
1190                ReplicationMessageBody::FreshReplicationResponse(
1191                    FreshReplicationResponse::Rejected {
1192                        key: offer.key,
1193                        reason: format!("Payment verification error: {e}"),
1194                    },
1195                ),
1196                rr_message_id,
1197            )
1198            .await;
1199            return Ok(());
1200        }
1201    }
1202
1203    // Rule 6: add to PaidForList.
1204    if let Err(e) = paid_list.insert(&offer.key).await {
1205        warn!("Failed to add key to PaidForList: {e}");
1206    }
1207
1208    // Store the record.
1209    match storage.put(&offer.key, &offer.data).await {
1210        Ok(_) => {
1211            send_replication_response(
1212                source,
1213                p2p_node,
1214                request_id,
1215                ReplicationMessageBody::FreshReplicationResponse(
1216                    FreshReplicationResponse::Accepted { key: offer.key },
1217                ),
1218                rr_message_id,
1219            )
1220            .await;
1221        }
1222        Err(e) => {
1223            send_replication_response(
1224                source,
1225                p2p_node,
1226                request_id,
1227                ReplicationMessageBody::FreshReplicationResponse(
1228                    FreshReplicationResponse::Rejected {
1229                        key: offer.key,
1230                        reason: format!("Storage error: {e}"),
1231                    },
1232                ),
1233                rr_message_id,
1234            )
1235            .await;
1236        }
1237    }
1238
1239    Ok(())
1240}
1241
1242async fn handle_paid_notify(
1243    _source: &PeerId,
1244    notify: &protocol::PaidNotify,
1245    paid_list: &Arc<PaidList>,
1246    payment_verifier: &Arc<PaymentVerifier>,
1247    p2p_node: &Arc<P2PNode>,
1248    config: &ReplicationConfig,
1249) -> Result<()> {
1250    let self_id = *p2p_node.peer_id();
1251
1252    // Rule 3: validate PoP presence before adding.
1253    if notify.proof_of_payment.is_empty() {
1254        return Ok(());
1255    }
1256
1257    // Check if we're in PaidCloseGroup for this key.
1258    if !admission::is_in_paid_close_group(
1259        &self_id,
1260        &notify.key,
1261        p2p_node,
1262        config.paid_list_close_group_size,
1263    )
1264    .await
1265    {
1266        return Ok(());
1267    }
1268
1269    // Gap 1: Validate PoP via PaymentVerifier.
1270    match payment_verifier
1271        .verify_payment(&notify.key, Some(&notify.proof_of_payment))
1272        .await
1273    {
1274        Ok(status) if status.can_store() => {
1275            debug!(
1276                "PoP validated for paid notify key {}",
1277                hex::encode(notify.key)
1278            );
1279        }
1280        Ok(_) => {
1281            warn!(
1282                "Paid notify rejected: payment required for key {}",
1283                hex::encode(notify.key)
1284            );
1285            return Ok(());
1286        }
1287        Err(e) => {
1288            warn!(
1289                "PoP verification error for paid notify key {}: {e}",
1290                hex::encode(notify.key)
1291            );
1292            return Ok(());
1293        }
1294    }
1295
1296    if let Err(e) = paid_list.insert(&notify.key).await {
1297        warn!("Failed to add paid notify key to PaidForList: {e}");
1298    }
1299
1300    Ok(())
1301}
1302
1303#[allow(clippy::too_many_arguments)]
1304async fn handle_neighbor_sync_request(
1305    source: &PeerId,
1306    request: &protocol::NeighborSyncRequest,
1307    p2p_node: &Arc<P2PNode>,
1308    storage: &Arc<LmdbStorage>,
1309    paid_list: &Arc<PaidList>,
1310    queues: &Arc<RwLock<ReplicationQueues>>,
1311    config: &ReplicationConfig,
1312    is_bootstrapping: bool,
1313    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1314    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1315    sync_cycle_epoch: &Arc<RwLock<u64>>,
1316    repair_proofs: &Arc<RwLock<RepairProofs>>,
1317    request_id: u64,
1318    rr_message_id: Option<&str>,
1319) -> Result<()> {
1320    let self_id = *p2p_node.peer_id();
1321
1322    // No per-request hint count limit: the wire message size limit
1323    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
1324    // challenges, sync hints don't drive expensive computation — they just
1325    // enter the verification queue. A per-request limit here would break
1326    // bootstrap replication for newly-joined nodes with 0 stored chunks.
1327
1328    // Build response (outbound hints).
1329    let (response, sent_replica_hints, sender_in_rt) =
1330        neighbor_sync::handle_sync_request_with_proofs(
1331            source,
1332            request,
1333            p2p_node,
1334            storage,
1335            paid_list,
1336            config,
1337            is_bootstrapping,
1338        )
1339        .await;
1340
1341    // Send response.
1342    let response_sent = send_replication_response_checked(
1343        source,
1344        p2p_node,
1345        request_id,
1346        ReplicationMessageBody::NeighborSyncResponse(response),
1347        rr_message_id,
1348    )
1349    .await;
1350
1351    // Process inbound hints only if sender is in LocalRT (Rule 4-6).
1352    if !sender_in_rt {
1353        return Ok(());
1354    }
1355
1356    // Update sync history for this peer before recording repair proofs so a
1357    // same-tick audit cannot combine a fresh key proof with stale peer maturity.
1358    {
1359        let mut history = sync_history.write().await;
1360        let record = history.entry(*source).or_insert(PeerSyncRecord {
1361            last_sync: None,
1362            cycles_since_sync: 0,
1363        });
1364        record.last_sync = Some(Instant::now());
1365        record.cycles_since_sync = 0;
1366    }
1367
1368    if response_sent && !request.bootstrapping {
1369        record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
1370            .await;
1371    }
1372
1373    // Admit inbound hints and queue for verification.
1374    let outcome = admit_and_queue_hints(
1375        &self_id,
1376        source,
1377        &request.replica_hints,
1378        &request.paid_hints,
1379        p2p_node,
1380        config,
1381        storage,
1382        paid_list,
1383        queues,
1384    )
1385    .await;
1386
1387    // Track discovered keys for bootstrap drain detection so that hints
1388    // admitted via inbound sync requests are not missed. Capacity-rejected
1389    // hints keep this source on the "not yet drained" list until its next
1390    // sync re-admits them; a clean cycle clears the source.
1391    if is_bootstrapping {
1392        if !outcome.discovered.is_empty() {
1393            bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1394        }
1395        if outcome.capacity_rejected_count > 0 {
1396            bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
1397        } else {
1398            bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
1399        }
1400    }
1401
1402    Ok(())
1403}
1404
1405async fn handle_verification_request(
1406    source: &PeerId,
1407    request: &protocol::VerificationRequest,
1408    storage: &Arc<LmdbStorage>,
1409    paid_list: &Arc<PaidList>,
1410    p2p_node: &Arc<P2PNode>,
1411    request_id: u64,
1412    rr_message_id: Option<&str>,
1413) -> Result<()> {
1414    // No per-request key count limit: the wire message size limit
1415    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
1416    // does cheap storage lookups per key, not expensive computation like
1417    // audit digest generation.
1418
1419    #[allow(clippy::cast_possible_truncation)]
1420    let keys_len = request.keys.len() as u32;
1421    let paid_check_set: HashSet<u32> = request
1422        .paid_list_check_indices
1423        .iter()
1424        .copied()
1425        .filter(|&idx| {
1426            if idx >= keys_len {
1427                warn!(
1428                    "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1429                    request.keys.len(),
1430                );
1431                false
1432            } else {
1433                true
1434            }
1435        })
1436        .collect();
1437
1438    let mut results = Vec::with_capacity(request.keys.len());
1439    for (i, key) in request.keys.iter().enumerate() {
1440        let present = storage.exists(key).unwrap_or(false);
1441        let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1442            Some(paid_list.contains(key).unwrap_or(false))
1443        } else {
1444            None
1445        };
1446        results.push(protocol::KeyVerificationResult {
1447            key: *key,
1448            present,
1449            paid,
1450        });
1451    }
1452
1453    send_replication_response(
1454        source,
1455        p2p_node,
1456        request_id,
1457        ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1458        rr_message_id,
1459    )
1460    .await;
1461
1462    Ok(())
1463}
1464
1465async fn handle_fetch_request(
1466    source: &PeerId,
1467    request: &protocol::FetchRequest,
1468    storage: &Arc<LmdbStorage>,
1469    p2p_node: &Arc<P2PNode>,
1470    request_id: u64,
1471    rr_message_id: Option<&str>,
1472) -> Result<()> {
1473    let response = match storage.get(&request.key).await {
1474        Ok(Some(data)) => protocol::FetchResponse::Success {
1475            key: request.key,
1476            data,
1477        },
1478        Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1479        Err(e) => protocol::FetchResponse::Error {
1480            key: request.key,
1481            reason: format!("{e}"),
1482        },
1483    };
1484
1485    send_replication_response(
1486        source,
1487        p2p_node,
1488        request_id,
1489        ReplicationMessageBody::FetchResponse(response),
1490        rr_message_id,
1491    )
1492    .await;
1493
1494    Ok(())
1495}
1496
1497async fn handle_audit_challenge_msg(
1498    source: &PeerId,
1499    challenge: &protocol::AuditChallenge,
1500    storage: &Arc<LmdbStorage>,
1501    p2p_node: &Arc<P2PNode>,
1502    is_bootstrapping: bool,
1503    request_id: u64,
1504    rr_message_id: Option<&str>,
1505) -> Result<()> {
1506    #[allow(clippy::cast_possible_truncation)]
1507    let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1508    let response = audit::handle_audit_challenge(
1509        challenge,
1510        storage,
1511        p2p_node.peer_id(),
1512        is_bootstrapping,
1513        stored_chunks,
1514    )
1515    .await;
1516
1517    send_replication_response(
1518        source,
1519        p2p_node,
1520        request_id,
1521        ReplicationMessageBody::AuditResponse(response),
1522        rr_message_id,
1523    )
1524    .await;
1525
1526    Ok(())
1527}
1528
1529// ---------------------------------------------------------------------------
1530// Message sending helper
1531// ---------------------------------------------------------------------------
1532
1533/// Send a replication response message as a best-effort reply.
1534///
1535/// Encode and send failures are logged by the checked helper. Most response
1536/// paths do not need to branch on send success, so this wrapper keeps those
1537/// call sites explicit about their best-effort behavior.
1538async fn send_replication_response(
1539    peer: &PeerId,
1540    p2p_node: &Arc<P2PNode>,
1541    request_id: u64,
1542    body: ReplicationMessageBody,
1543    rr_message_id: Option<&str>,
1544) {
1545    let _ =
1546        send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
1547}
1548
1549/// Send a replication response message and report whether it was accepted.
1550///
1551/// Returns `true` after the message is encoded and accepted by the P2P send
1552/// path. Returns `false` after logging an encode or send failure. Repair-proof
1553/// recording uses this to avoid trusting hints that were not actually sent.
1554///
1555/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
1556/// request-response path so saorsa-core can route it back to the caller's
1557/// `send_request` future. Otherwise it is sent as a plain message.
1558async fn send_replication_response_checked(
1559    peer: &PeerId,
1560    p2p_node: &Arc<P2PNode>,
1561    request_id: u64,
1562    body: ReplicationMessageBody,
1563    rr_message_id: Option<&str>,
1564) -> bool {
1565    let msg = ReplicationMessage { request_id, body };
1566    let encoded = match msg.encode() {
1567        Ok(data) => data,
1568        Err(e) => {
1569            warn!("Failed to encode replication response: {e}");
1570            return false;
1571        }
1572    };
1573    let result = if let Some(msg_id) = rr_message_id {
1574        p2p_node
1575            .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1576            .await
1577    } else {
1578        p2p_node
1579            .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1580            .await
1581    };
1582    if let Err(e) = result {
1583        debug!("Failed to send replication response to {peer}: {e}");
1584        return false;
1585    }
1586    true
1587}
1588
1589async fn record_sent_replica_hints(
1590    peer: &PeerId,
1591    hints: &[neighbor_sync::SentReplicaHint],
1592    repair_proofs: &Arc<RwLock<RepairProofs>>,
1593    sync_cycle_epoch: &Arc<RwLock<u64>>,
1594) {
1595    if hints.is_empty() {
1596        return;
1597    }
1598
1599    let hinted_at_epoch = *sync_cycle_epoch.read().await;
1600    let mut proofs = repair_proofs.write().await;
1601    for hint in hints {
1602        if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
1603            debug!(
1604                "Recorded repair hint proof for peer {peer} and key {}",
1605                hex::encode(hint.key)
1606            );
1607        }
1608    }
1609}
1610
1611// ---------------------------------------------------------------------------
1612// Neighbor sync round
1613// ---------------------------------------------------------------------------
1614
1615/// Run one neighbor sync round.
1616#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1617async fn run_neighbor_sync_round(
1618    p2p_node: &Arc<P2PNode>,
1619    storage: &Arc<LmdbStorage>,
1620    paid_list: &Arc<PaidList>,
1621    queues: &Arc<RwLock<ReplicationQueues>>,
1622    config: &ReplicationConfig,
1623    sync_state: &Arc<RwLock<NeighborSyncState>>,
1624    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1625    sync_cycle_epoch: &Arc<RwLock<u64>>,
1626    repair_proofs: &Arc<RwLock<RepairProofs>>,
1627    is_bootstrapping: &Arc<RwLock<bool>>,
1628    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1629) {
1630    let self_id = *p2p_node.peer_id();
1631    let bootstrapping = *is_bootstrapping.read().await;
1632
1633    // Check if cycle is complete; start new one if needed.
1634    // We check under a read lock, then release it before the expensive
1635    // prune pass and DHT snapshot so other tasks are not starved.
1636    let cycle_complete = sync_state.read().await.is_cycle_complete();
1637    if cycle_complete {
1638        // A completed local neighbor-sync cycle matures key-specific repair
1639        // proofs recorded in earlier epochs.
1640        {
1641            let mut history = sync_history.write().await;
1642            for record in history.values_mut() {
1643                record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1644            }
1645        }
1646        let current_sync_epoch = {
1647            let mut epoch = sync_cycle_epoch.write().await;
1648            *epoch = epoch.saturating_add(1);
1649            *epoch
1650        };
1651
1652        // Post-cycle pruning (Section 11) — runs without holding sync_state.
1653        // Remote prune-confirmation audits are storage-proof audits and only
1654        // run after bootstrap has drained.
1655        let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
1656        pruning::run_prune_pass_with_context(pruning::PrunePassContext {
1657            self_id: &self_id,
1658            storage,
1659            paid_list,
1660            p2p_node,
1661            config,
1662            sync_state,
1663            repair_proofs,
1664            current_sync_epoch,
1665            allow_remote_prune_audits,
1666        })
1667        .await;
1668
1669        // Take fresh close-neighbor snapshot (DHT query, no lock held).
1670        let neighbors =
1671            neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1672                .await;
1673
1674        // Now re-acquire write lock and re-check before swapping cycle.
1675        let mut state = sync_state.write().await;
1676        if state.is_cycle_complete() {
1677            // Preserve cooldown and bootstrap-claim tracking across cycles.
1678            // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
1679            // would reset the abuse detection timer every cycle.
1680            let old_sync_times = std::mem::take(&mut state.last_sync_times);
1681            let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1682            let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
1683            let old_prune_cursor = state.prune_cursor;
1684            *state = NeighborSyncState::new_cycle(neighbors);
1685            state.last_sync_times = old_sync_times;
1686            state.bootstrap_claims = old_bootstrap_claims;
1687            state.bootstrap_claim_history = old_bootstrap_claim_history;
1688            state.prune_cursor = old_prune_cursor;
1689        }
1690    }
1691
1692    // Select batch of peers.
1693    let batch = {
1694        let mut state = sync_state.write().await;
1695        neighbor_sync::select_sync_batch(
1696            &mut state,
1697            config.neighbor_sync_peer_count,
1698            config.neighbor_sync_cooldown,
1699        )
1700    };
1701
1702    if batch.is_empty() {
1703        return;
1704    }
1705
1706    debug!("Neighbor sync: syncing with {} peers", batch.len());
1707
1708    // Sync with each peer in the batch.
1709    for peer in &batch {
1710        let outcome = neighbor_sync::sync_with_peer_with_outcome(
1711            peer,
1712            p2p_node,
1713            storage,
1714            paid_list,
1715            config,
1716            bootstrapping,
1717        )
1718        .await;
1719
1720        if let Some(outcome) = outcome {
1721            handle_sync_response(
1722                &self_id,
1723                peer,
1724                &outcome.response,
1725                &outcome.sent_replica_hints,
1726                p2p_node,
1727                config,
1728                bootstrapping,
1729                bootstrap_state,
1730                storage,
1731                paid_list,
1732                queues,
1733                sync_state,
1734                sync_history,
1735                sync_cycle_epoch,
1736                repair_proofs,
1737            )
1738            .await;
1739        } else {
1740            // Sync failed -- remove peer and try to fill slot.
1741            let replacement = {
1742                let mut state = sync_state.write().await;
1743                neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1744            };
1745
1746            // Attempt sync with the replacement peer (if one was found).
1747            if let Some(replacement_peer) = replacement {
1748                let replacement_outcome = neighbor_sync::sync_with_peer_with_outcome(
1749                    &replacement_peer,
1750                    p2p_node,
1751                    storage,
1752                    paid_list,
1753                    config,
1754                    bootstrapping,
1755                )
1756                .await;
1757
1758                if let Some(outcome) = replacement_outcome {
1759                    handle_sync_response(
1760                        &self_id,
1761                        &replacement_peer,
1762                        &outcome.response,
1763                        &outcome.sent_replica_hints,
1764                        p2p_node,
1765                        config,
1766                        bootstrapping,
1767                        bootstrap_state,
1768                        storage,
1769                        paid_list,
1770                        queues,
1771                        sync_state,
1772                        sync_history,
1773                        sync_cycle_epoch,
1774                        repair_proofs,
1775                    )
1776                    .await;
1777                }
1778            }
1779        }
1780    }
1781}
1782
1783/// Process a successful neighbor sync response: record the sync, check for
1784/// bootstrap claim abuse, and admit inbound hints.
1785#[allow(clippy::too_many_arguments)]
1786async fn handle_sync_response(
1787    self_id: &PeerId,
1788    peer: &PeerId,
1789    resp: &NeighborSyncResponse,
1790    sent_replica_hints: &[neighbor_sync::SentReplicaHint],
1791    p2p_node: &Arc<P2PNode>,
1792    config: &ReplicationConfig,
1793    bootstrapping: bool,
1794    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1795    storage: &Arc<LmdbStorage>,
1796    paid_list: &Arc<PaidList>,
1797    queues: &Arc<RwLock<ReplicationQueues>>,
1798    sync_state: &Arc<RwLock<NeighborSyncState>>,
1799    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1800    sync_cycle_epoch: &Arc<RwLock<u64>>,
1801    repair_proofs: &Arc<RwLock<RepairProofs>>,
1802) {
1803    // Record successful sync.
1804    {
1805        let mut state = sync_state.write().await;
1806        neighbor_sync::record_successful_sync(&mut state, peer);
1807    }
1808    {
1809        let mut history = sync_history.write().await;
1810        let record = history.entry(*peer).or_insert(PeerSyncRecord {
1811            last_sync: None,
1812            cycles_since_sync: 0,
1813        });
1814        record.last_sync = Some(Instant::now());
1815        record.cycles_since_sync = 0;
1816    }
1817
1818    // Process inbound hints from response (skip if peer is bootstrapping).
1819    if resp.bootstrapping {
1820        // Gap 6: BootstrapClaimAbuse grace period enforcement.
1821        // Separate state mutation from network I/O to avoid holding the
1822        // write lock across report_trust_event.
1823        let should_report = {
1824            let now = Instant::now();
1825            let mut state = sync_state.write().await;
1826            match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
1827                BootstrapClaimObservation::WithinGrace { .. } => false,
1828                BootstrapClaimObservation::PastGrace { first_seen } => {
1829                    warn!(
1830                        "Peer {peer} has been claiming bootstrap for {:?}, \
1831                         exceeding grace period of {:?} — reporting abuse",
1832                        now.duration_since(first_seen),
1833                        config.bootstrap_claim_grace_period,
1834                    );
1835                    true
1836                }
1837                BootstrapClaimObservation::Repeated { first_seen } => {
1838                    warn!(
1839                        "Peer {peer} repeated bootstrap claim after previously stopping; \
1840                         first claim was {:?} ago — reporting abuse",
1841                        now.duration_since(first_seen),
1842                    );
1843                    true
1844                }
1845            }
1846        };
1847        if should_report {
1848            p2p_node
1849                .report_trust_event(
1850                    peer,
1851                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1852                )
1853                .await;
1854        }
1855    } else {
1856        // Peer is not claiming bootstrap; clear active claim while retaining
1857        // history so the peer cannot start a second grace window later.
1858        {
1859            let mut state = sync_state.write().await;
1860            state.clear_active_bootstrap_claim(peer);
1861        }
1862        record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
1863        let outcome = admit_and_queue_hints(
1864            self_id,
1865            peer,
1866            &resp.replica_hints,
1867            &resp.paid_hints,
1868            p2p_node,
1869            config,
1870            storage,
1871            paid_list,
1872            queues,
1873        )
1874        .await;
1875
1876        // Track discovered keys for bootstrap drain detection so that hints
1877        // admitted via regular neighbor sync are not missed. Capacity-
1878        // rejected hints keep this source on the "not yet drained" list
1879        // until its next sync replays them; a clean cycle clears it.
1880        if bootstrapping {
1881            if !outcome.discovered.is_empty() {
1882                bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1883            }
1884            if outcome.capacity_rejected_count > 0 {
1885                bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
1886            } else {
1887                bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
1888            }
1889        }
1890    }
1891}
1892
1893/// Admit hints and queue them for verification, returning newly-discovered keys.
1894///
1895/// Shared by neighbor-sync request handling, response handling, and bootstrap
1896/// sync so that admission + queueing logic lives in one place.
1897#[allow(clippy::too_many_arguments)]
1898/// Outcome of [`admit_and_queue_hints`].
1899///
1900/// `capacity_rejected_count` is non-zero when one or more legitimately
1901/// admissible hints were dropped because `pending_verify`'s global or
1902/// per-source bound was hit. Callers that care about completeness
1903/// (bootstrap drain accounting) MUST NOT treat their work as complete while
1904/// this is > 0 — the source will need to re-hint after capacity frees up.
1905struct AdmissionOutcome {
1906    discovered: HashSet<XorName>,
1907    capacity_rejected_count: usize,
1908}
1909
1910#[allow(clippy::too_many_arguments)]
1911async fn admit_and_queue_hints(
1912    self_id: &PeerId,
1913    source_peer: &PeerId,
1914    replica_hints: &[XorName],
1915    paid_hints: &[XorName],
1916    p2p_node: &Arc<P2PNode>,
1917    config: &ReplicationConfig,
1918    storage: &Arc<LmdbStorage>,
1919    paid_list: &Arc<PaidList>,
1920    queues: &Arc<RwLock<ReplicationQueues>>,
1921) -> AdmissionOutcome {
1922    let pending_keys: HashSet<XorName> = {
1923        let q = queues.read().await;
1924        q.pending_keys().into_iter().collect()
1925    };
1926
1927    let admitted = admission::admit_hints(
1928        self_id,
1929        replica_hints,
1930        paid_hints,
1931        p2p_node,
1932        config,
1933        storage,
1934        paid_list,
1935        &pending_keys,
1936    )
1937    .await;
1938
1939    let mut discovered = HashSet::new();
1940    let mut capacity_rejected_count: usize = 0;
1941    let mut q = queues.write().await;
1942    let now = Instant::now();
1943
1944    for key in admitted.replica_keys {
1945        if !storage.exists(&key).unwrap_or(false) {
1946            let result = q.add_pending_verify(
1947                key,
1948                VerificationEntry {
1949                    state: VerificationState::PendingVerify,
1950                    pipeline: HintPipeline::Replica,
1951                    verified_sources: Vec::new(),
1952                    tried_sources: HashSet::new(),
1953                    created_at: now,
1954                    hint_sender: *source_peer,
1955                },
1956            );
1957            match result {
1958                crate::replication::scheduling::AdmissionResult::Admitted => {
1959                    discovered.insert(key);
1960                }
1961                crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1962                crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1963                    capacity_rejected_count += 1;
1964                }
1965            }
1966        }
1967    }
1968
1969    for key in admitted.paid_only_keys {
1970        let result = q.add_pending_verify(
1971            key,
1972            VerificationEntry {
1973                state: VerificationState::PendingVerify,
1974                pipeline: HintPipeline::PaidOnly,
1975                verified_sources: Vec::new(),
1976                tried_sources: HashSet::new(),
1977                created_at: now,
1978                hint_sender: *source_peer,
1979            },
1980        );
1981        match result {
1982            crate::replication::scheduling::AdmissionResult::Admitted => {
1983                discovered.insert(key);
1984            }
1985            crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1986            crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1987                capacity_rejected_count += 1;
1988            }
1989        }
1990    }
1991
1992    if capacity_rejected_count > 0 {
1993        debug!(
1994            "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
1995             rejected at queue capacity; source will need to re-hint after pending_verify drains"
1996        );
1997    }
1998
1999    AdmissionOutcome {
2000        discovered,
2001        capacity_rejected_count,
2002    }
2003}
2004
2005// ---------------------------------------------------------------------------
2006// Verification cycle
2007// ---------------------------------------------------------------------------
2008
2009/// Run one verification cycle: process pending keys through quorum checks.
2010#[allow(clippy::too_many_lines)]
2011async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
2012    let VerificationCycleContext {
2013        p2p_node,
2014        paid_list,
2015        storage,
2016        queues,
2017        config,
2018        bootstrap_state,
2019        is_bootstrapping,
2020        bootstrap_complete_notify,
2021    } = ctx;
2022
2023    // Evict stale entries that have been pending too long (e.g. unreachable
2024    // verification targets during a network partition).
2025    {
2026        let mut q = queues.write().await;
2027        q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
2028    }
2029
2030    let pending_keys = {
2031        let q = queues.read().await;
2032        q.pending_keys()
2033    };
2034
2035    if pending_keys.is_empty() {
2036        return;
2037    }
2038
2039    let self_id = *p2p_node.peer_id();
2040
2041    // Step 1: Check local PaidForList for fast-path authorization (Section 9,
2042    // step 4).
2043    let mut local_paid_presence_probe_keys = Vec::new();
2044    let mut local_paid_paid_only_keys = Vec::new();
2045    let mut keys_needing_network = Vec::new();
2046    let mut terminal_keys: Vec<XorName> = Vec::new();
2047    {
2048        let mut q = queues.write().await;
2049        for key in &pending_keys {
2050            if paid_list.contains(key).unwrap_or(false) {
2051                if let Some(pipeline) =
2052                    q.set_pending_state(key, VerificationState::PaidListVerified)
2053                {
2054                    match pipeline {
2055                        HintPipeline::PaidOnly => {
2056                            // Paid-only + local paid state needs one more
2057                            // responsibility check outside this lock: if we
2058                            // are also in the storage close group, the hint
2059                            // can repair a missing replica.
2060                            local_paid_paid_only_keys.push(*key);
2061                        }
2062                        HintPipeline::Replica => {
2063                            // Local paid-list membership authorizes the key.
2064                            // We still need a presence probe to discover fetch
2065                            // sources, but we must not require remote paid
2066                            // majority or presence quorum.
2067                            local_paid_presence_probe_keys.push(*key);
2068                        }
2069                    }
2070                }
2071            } else {
2072                keys_needing_network.push(*key);
2073            }
2074        }
2075    }
2076
2077    if !local_paid_paid_only_keys.is_empty() {
2078        let mut terminal_paid_only = Vec::new();
2079        for key in local_paid_paid_only_keys {
2080            if storage.exists(&key).unwrap_or(false) {
2081                terminal_paid_only.push(key);
2082            } else if admission::is_responsible(&self_id, &key, p2p_node, config.close_group_size)
2083                .await
2084            {
2085                local_paid_presence_probe_keys.push(key);
2086            } else {
2087                terminal_paid_only.push(key);
2088            }
2089        }
2090
2091        if !terminal_paid_only.is_empty() {
2092            let mut q = queues.write().await;
2093            for key in terminal_paid_only {
2094                q.remove_pending(&key);
2095                terminal_keys.push(key);
2096            }
2097        }
2098    }
2099
2100    // Step 1b: Local paid-list hit for fetch-eligible keys. Per Section 9
2101    // step 4, authorization succeeds immediately; run a presence-only probe
2102    // to find any holder we can fetch from.
2103    if !local_paid_presence_probe_keys.is_empty() {
2104        let targets = quorum::compute_presence_targets(
2105            &local_paid_presence_probe_keys,
2106            p2p_node,
2107            config,
2108            &self_id,
2109        )
2110        .await;
2111        let evidence = quorum::run_verification_round(
2112            &local_paid_presence_probe_keys,
2113            &targets,
2114            p2p_node,
2115            config,
2116        )
2117        .await;
2118
2119        let mut q = queues.write().await;
2120        for key in local_paid_presence_probe_keys {
2121            if storage.exists(&key).unwrap_or(false) {
2122                q.remove_pending(&key);
2123                terminal_keys.push(key);
2124                continue;
2125            }
2126            let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
2127                quorum::present_sources_for_key(&key, ev, &targets)
2128            });
2129            if sources.is_empty() {
2130                // Terminal failure: remove pending and report. No fetch path.
2131                q.remove_pending(&key);
2132                warn!(
2133                    "Locally paid key {} has no responding holders (possible data loss)",
2134                    hex::encode(key)
2135                );
2136                terminal_keys.push(key);
2137            } else {
2138                let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2139                // Atomic remove+enqueue: if fetch_queue is at capacity, the
2140                // pending entry is preserved and retried next cycle (no
2141                // silent drop of verified replica-repair work).
2142                let _ = q.promote_pending_to_fetch(key, distance, sources);
2143            }
2144        }
2145    }
2146
2147    // Steps 2-5: Network verification (skipped if all keys resolved locally).
2148    if !keys_needing_network.is_empty() {
2149        // Step 2: Compute targets and run network verification round.
2150        let targets =
2151            quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
2152                .await;
2153
2154        let evidence =
2155            quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
2156
2157        // Step 3: Evaluate results — collect outcomes without holding the write
2158        // lock across paid-list I/O.
2159        let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
2160        {
2161            let q = queues.read().await;
2162            for key in &keys_needing_network {
2163                let Some(ev) = evidence.get(key) else {
2164                    continue;
2165                };
2166                let Some(entry) = q.get_pending(key) else {
2167                    continue;
2168                };
2169                let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
2170                evaluated.push((*key, outcome, entry.pipeline));
2171            }
2172        } // read lock released
2173
2174        // Step 4: Insert verified keys into PaidForList (no lock held).
2175        let mut paid_insert_keys: Vec<XorName> = Vec::new();
2176        for (key, outcome, _) in &evaluated {
2177            if matches!(
2178                outcome,
2179                KeyVerificationOutcome::QuorumVerified { .. }
2180                    | KeyVerificationOutcome::PaidListVerified { .. }
2181            ) {
2182                paid_insert_keys.push(*key);
2183            }
2184        }
2185        for key in &paid_insert_keys {
2186            if let Err(e) = paid_list.insert(key).await {
2187                warn!("Failed to add verified key to PaidForList: {e}");
2188            }
2189        }
2190
2191        // Paid-only hints normally update PaidForList only. If this node is
2192        // also storage-responsible for the key, a verified paid-only hint can
2193        // safely repair a missing replica using sources from the same
2194        // verification round.
2195        let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
2196        for (key, outcome, pipeline) in &evaluated {
2197            if *pipeline == HintPipeline::PaidOnly
2198                && matches!(
2199                    outcome,
2200                    KeyVerificationOutcome::QuorumVerified { .. }
2201                        | KeyVerificationOutcome::PaidListVerified { .. }
2202                )
2203                && !storage.exists(key).unwrap_or(false)
2204                && admission::is_responsible(&self_id, key, p2p_node, config.close_group_size).await
2205            {
2206                paid_only_fetch_keys.insert(*key);
2207            }
2208        }
2209
2210        // Step 5: Update queues with the evaluated outcomes.
2211        let mut q = queues.write().await;
2212        for (key, outcome, pipeline) in evaluated {
2213            match outcome {
2214                KeyVerificationOutcome::QuorumVerified { sources }
2215                | KeyVerificationOutcome::PaidListVerified { sources } => {
2216                    let fetch_eligible =
2217                        pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
2218                    if fetch_eligible && !sources.is_empty() {
2219                        let distance =
2220                            crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2221                        // Atomic remove+enqueue: on fetch_queue capacity miss
2222                        // the pending entry is preserved so this verified key
2223                        // is retried on the next cycle (no silent drop).
2224                        let _ = q.promote_pending_to_fetch(key, distance, sources);
2225                        // Not terminal — either moved to fetch queue, or
2226                        // retained as pending until queue drains.
2227                    } else if fetch_eligible && sources.is_empty() {
2228                        warn!(
2229                            "Verified responsible key {} has no holders (possible data loss)",
2230                            hex::encode(key)
2231                        );
2232                        q.remove_pending(&key);
2233                        terminal_keys.push(key);
2234                    } else {
2235                        q.remove_pending(&key);
2236                        terminal_keys.push(key);
2237                    }
2238                }
2239                KeyVerificationOutcome::QuorumFailed
2240                | KeyVerificationOutcome::QuorumInconclusive => {
2241                    q.remove_pending(&key);
2242                    terminal_keys.push(key);
2243                }
2244            }
2245        }
2246    }
2247
2248    // Step 6: Remove terminal keys from bootstrap pending set and re-check
2249    // the drain condition.
2250    update_bootstrap_after_verification(
2251        &terminal_keys,
2252        bootstrap_state,
2253        queues,
2254        is_bootstrapping,
2255        bootstrap_complete_notify,
2256    )
2257    .await;
2258}
2259
2260/// Post-verification bootstrap bookkeeping: remove terminal keys from the
2261/// bootstrap pending set and transition out of bootstrapping when drained.
2262async fn update_bootstrap_after_verification(
2263    terminal_keys: &[XorName],
2264    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2265    queues: &Arc<RwLock<ReplicationQueues>>,
2266    is_bootstrapping: &Arc<RwLock<bool>>,
2267    bootstrap_complete_notify: &Arc<Notify>,
2268) {
2269    if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
2270        return;
2271    }
2272    {
2273        let mut bs = bootstrap_state.write().await;
2274        for key in terminal_keys {
2275            bs.remove_key(key);
2276        }
2277    }
2278    let q = queues.read().await;
2279    if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
2280        complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
2281    }
2282}
2283
2284/// Set `is_bootstrapping` to `false` and wake all waiters.
2285async fn complete_bootstrap(
2286    is_bootstrapping: &Arc<RwLock<bool>>,
2287    bootstrap_complete_notify: &Arc<Notify>,
2288) {
2289    *is_bootstrapping.write().await = false;
2290    bootstrap_complete_notify.notify_waiters();
2291    info!("Replication bootstrap complete");
2292}
2293
2294// ---------------------------------------------------------------------------
2295// Fetch types and single-fetch executor
2296// ---------------------------------------------------------------------------
2297
2298/// Result classification for a single fetch attempt.
2299enum FetchResult {
2300    /// Data fetched, integrity-checked, and stored successfully.
2301    Stored,
2302    /// Content-address integrity check failed — do not retry.
2303    IntegrityFailed,
2304    /// Source failed (network error or non-success response) — retryable.
2305    SourceFailed,
2306}
2307
2308/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
2309/// worker loop to update queue state.
2310struct FetchOutcome {
2311    key: XorName,
2312    result: FetchResult,
2313}
2314
2315#[allow(clippy::too_many_lines)]
2316/// Execute a single fetch request against `source` for `key`.
2317///
2318/// Handles encoding, network I/O, integrity checking, storage, and trust
2319/// event reporting.  Returns a [`FetchOutcome`] so the caller can update
2320/// queue state without holding any locks during the network round-trip.
2321async fn execute_single_fetch(
2322    p2p_node: Arc<P2PNode>,
2323    storage: Arc<LmdbStorage>,
2324    config: Arc<ReplicationConfig>,
2325    key: XorName,
2326    source: PeerId,
2327) -> FetchOutcome {
2328    let request = protocol::FetchRequest { key };
2329    let msg = ReplicationMessage {
2330        request_id: rand::thread_rng().gen::<u64>(),
2331        body: ReplicationMessageBody::FetchRequest(request),
2332    };
2333
2334    let encoded = match msg.encode() {
2335        Ok(data) => data,
2336        Err(e) => {
2337            warn!("Failed to encode fetch request: {e}");
2338            return FetchOutcome {
2339                key,
2340                result: FetchResult::SourceFailed,
2341            };
2342        }
2343    };
2344
2345    let result = p2p_node
2346        .send_request(
2347            &source,
2348            REPLICATION_PROTOCOL_ID,
2349            encoded,
2350            config.fetch_request_timeout,
2351        )
2352        .await;
2353
2354    match result {
2355        Ok(response) => {
2356            let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2357                p2p_node
2358                    .report_trust_event(
2359                        &source,
2360                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2361                    )
2362                    .await;
2363                return FetchOutcome {
2364                    key,
2365                    result: FetchResult::SourceFailed,
2366                };
2367            };
2368
2369            match resp_msg.body {
2370                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2371                    key: resp_key,
2372                    data,
2373                }) => {
2374                    // Validate the response key matches the requested key.
2375                    // A malicious peer could serve valid data for a different
2376                    // key, passing integrity checks while the requested key
2377                    // is falsely marked as fetched.
2378                    if resp_key != key {
2379                        warn!(
2380                            "Fetch response key mismatch: requested {}, got {}",
2381                            hex::encode(key),
2382                            hex::encode(resp_key)
2383                        );
2384                        p2p_node
2385                            .report_trust_event(
2386                                &source,
2387                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2388                            )
2389                            .await;
2390                        return FetchOutcome {
2391                            key,
2392                            result: FetchResult::IntegrityFailed,
2393                        };
2394                    }
2395
2396                    // Enforce chunk size invariant on fetched data.
2397                    // Checked before the content-address hash to avoid
2398                    // hashing up to 10 MiB of oversized junk data.
2399                    if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2400                        warn!(
2401                            "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2402                            hex::encode(resp_key),
2403                            data.len(),
2404                            crate::ant_protocol::MAX_CHUNK_SIZE,
2405                        );
2406                        p2p_node
2407                            .report_trust_event(
2408                                &source,
2409                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2410                            )
2411                            .await;
2412                        return FetchOutcome {
2413                            key,
2414                            result: FetchResult::IntegrityFailed,
2415                        };
2416                    }
2417
2418                    // Content-address integrity check.
2419                    let computed = crate::client::compute_address(&data);
2420                    if computed != resp_key {
2421                        warn!(
2422                            "Fetched record integrity check failed: expected {}, got {}",
2423                            hex::encode(resp_key),
2424                            hex::encode(computed)
2425                        );
2426                        p2p_node
2427                            .report_trust_event(
2428                                &source,
2429                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2430                            )
2431                            .await;
2432                        return FetchOutcome {
2433                            key,
2434                            result: FetchResult::IntegrityFailed,
2435                        };
2436                    }
2437
2438                    if let Err(e) = storage.put(&resp_key, &data).await {
2439                        warn!(
2440                            "Failed to store fetched record {}: {e}",
2441                            hex::encode(resp_key)
2442                        );
2443                        return FetchOutcome {
2444                            key,
2445                            result: FetchResult::SourceFailed,
2446                        };
2447                    }
2448
2449                    FetchOutcome {
2450                        key,
2451                        result: FetchResult::Stored,
2452                    }
2453                }
2454                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2455                    ..
2456                }) => {
2457                    // This peer was selected as a fetch source because it
2458                    // recently answered `Present` during verification. A
2459                    // subsequent NotFound is evidence of a stale/false claim
2460                    // or chunk wiping, so penalize lightly and try another
2461                    // verified source.
2462                    warn!(
2463                        "Fetch: verified source {source} returned NotFound for {}",
2464                        hex::encode(key)
2465                    );
2466                    p2p_node
2467                        .report_trust_event(
2468                            &source,
2469                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2470                        )
2471                        .await;
2472                    FetchOutcome {
2473                        key,
2474                        result: FetchResult::SourceFailed,
2475                    }
2476                }
2477                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2478                    reason,
2479                    ..
2480                }) => {
2481                    warn!(
2482                        "Fetch: peer {source} returned error for {}: {reason}",
2483                        hex::encode(key)
2484                    );
2485                    p2p_node
2486                        .report_trust_event(
2487                            &source,
2488                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2489                        )
2490                        .await;
2491                    FetchOutcome {
2492                        key,
2493                        result: FetchResult::SourceFailed,
2494                    }
2495                }
2496                _ => {
2497                    // Unexpected message type — treat as malformed.
2498                    p2p_node
2499                        .report_trust_event(
2500                            &source,
2501                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2502                        )
2503                        .await;
2504                    FetchOutcome {
2505                        key,
2506                        result: FetchResult::SourceFailed,
2507                    }
2508                }
2509            }
2510        }
2511        Err(e) => {
2512            debug!("Fetch request to {source} failed: {e}");
2513            // No ApplicationFailure here — P2PNode::send_request() already
2514            // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
2515            FetchOutcome {
2516                key,
2517                result: FetchResult::SourceFailed,
2518            }
2519        }
2520    }
2521}
2522
2523// ---------------------------------------------------------------------------
2524// Audit result handler
2525// ---------------------------------------------------------------------------
2526
2527/// Handle audit result: log findings and emit trust events.
2528async fn handle_audit_result(
2529    result: &AuditTickResult,
2530    p2p_node: &Arc<P2PNode>,
2531    sync_state: &Arc<RwLock<NeighborSyncState>>,
2532    config: &ReplicationConfig,
2533) {
2534    match result {
2535        AuditTickResult::Passed {
2536            challenged_peer,
2537            keys_checked,
2538        } => {
2539            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2540            // Peer responded normally — clear the active bootstrap claim while
2541            // retaining history so a later claim is treated as repeated abuse.
2542            {
2543                let mut state = sync_state.write().await;
2544                state.clear_active_bootstrap_claim(challenged_peer);
2545            }
2546            p2p_node
2547                .report_trust_event(
2548                    challenged_peer,
2549                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2550                )
2551                .await;
2552        }
2553        AuditTickResult::Failed { evidence } => {
2554            if let FailureEvidence::AuditFailure {
2555                challenged_peer,
2556                confirmed_failed_keys,
2557                reason,
2558                ..
2559            } = evidence
2560            {
2561                error!(
2562                    "Audit failure for {challenged_peer}: {} confirmed failed keys",
2563                    confirmed_failed_keys.len()
2564                );
2565                if audit_failure_clears_bootstrap_claim(reason) {
2566                    // Peer returned a non-bootstrap response — clear the active
2567                    // claim while retaining claim history.
2568                    let mut state = sync_state.write().await;
2569                    state.clear_active_bootstrap_claim(challenged_peer);
2570                } else {
2571                    debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
2572                }
2573                p2p_node
2574                    .report_trust_event(
2575                        challenged_peer,
2576                        TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2577                    )
2578                    .await;
2579            }
2580        }
2581        AuditTickResult::BootstrapClaim { peer } => {
2582            // Gap 6: BootstrapClaimAbuse grace period in audit path.
2583            // Separate state mutation from network I/O to avoid holding the
2584            // write lock across report_trust_event.
2585            let should_report = {
2586                let now = Instant::now();
2587                let mut state = sync_state.write().await;
2588                match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
2589                {
2590                    BootstrapClaimObservation::WithinGrace { .. } => {
2591                        debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2592                        false
2593                    }
2594                    BootstrapClaimObservation::PastGrace { first_seen } => {
2595                        warn!(
2596                            "Audit: peer {peer} claiming bootstrap past grace period \
2597                             ({:?} > {:?}), reporting abuse",
2598                            now.duration_since(first_seen),
2599                            config.bootstrap_claim_grace_period,
2600                        );
2601                        true
2602                    }
2603                    BootstrapClaimObservation::Repeated { first_seen } => {
2604                        warn!(
2605                            "Audit: peer {peer} repeated bootstrap claim after previously \
2606                             stopping; first claim was {:?} ago, reporting abuse",
2607                            now.duration_since(first_seen),
2608                        );
2609                        true
2610                    }
2611                }
2612            };
2613            if should_report {
2614                p2p_node
2615                    .report_trust_event(
2616                        peer,
2617                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2618                    )
2619                    .await;
2620            }
2621        }
2622        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2623    }
2624}
2625
2626fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
2627    !matches!(reason, AuditFailureReason::Timeout)
2628}
2629
2630// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.
2631
2632#[cfg(test)]
2633#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2634mod tests {
2635    use super::audit_failure_clears_bootstrap_claim;
2636    use crate::replication::types::AuditFailureReason;
2637
2638    #[test]
2639    fn audit_timeout_preserves_active_bootstrap_claim() {
2640        assert!(!audit_failure_clears_bootstrap_claim(
2641            &AuditFailureReason::Timeout
2642        ));
2643    }
2644
2645    #[test]
2646    fn decoded_audit_failures_clear_active_bootstrap_claim() {
2647        for reason in [
2648            AuditFailureReason::MalformedResponse,
2649            AuditFailureReason::DigestMismatch,
2650            AuditFailureReason::KeyAbsent,
2651            AuditFailureReason::Rejected,
2652        ] {
2653            assert!(
2654                audit_failure_clears_bootstrap_claim(&reason),
2655                "decoded non-bootstrap failure {reason:?} should clear active claim"
2656            );
2657        }
2658    }
2659}