Skip to main content

ant_node/replication/
mod.rs

1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::PaymentVerifier;
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50    max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51    REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55    FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56    VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61    AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
62    NeighborSyncState, PeerSyncRecord, VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68// ---------------------------------------------------------------------------
69// Constants
70// ---------------------------------------------------------------------------
71
72/// Prefix used by saorsa-core's request-response mechanism.
73const RR_PREFIX: &str = "/rr/";
74
75/// Boxed future type for in-flight fetch tasks.
76type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78/// Shared dependencies for one verification worker cycle.
79struct VerificationCycleContext<'a> {
80    p2p_node: &'a Arc<P2PNode>,
81    paid_list: &'a Arc<PaidList>,
82    storage: &'a Arc<LmdbStorage>,
83    queues: &'a Arc<RwLock<ReplicationQueues>>,
84    config: &'a ReplicationConfig,
85    bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
86    is_bootstrapping: &'a Arc<RwLock<bool>>,
87    bootstrap_complete_notify: &'a Arc<Notify>,
88}
89
90/// Fetch worker polling interval in milliseconds.
91const FETCH_WORKER_POLL_MS: u64 = 100;
92
93/// Verification worker polling interval in milliseconds.
94const VERIFICATION_WORKER_POLL_MS: u64 = 250;
95
96/// Bootstrap drain check interval in seconds.
97const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
98
99/// Standard trust event weight for per-operation success/failure signals.
100///
101/// Used for individual replication fetch outcomes, integrity check failures,
102/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
103/// is reserved for confirmed audit failures.
104const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
105
106// ---------------------------------------------------------------------------
107// ReplicationEngine
108// ---------------------------------------------------------------------------
109
110/// The replication engine manages all replication background tasks and state.
111pub struct ReplicationEngine {
112    /// Replication configuration (shared across spawned tasks).
113    config: Arc<ReplicationConfig>,
114    /// P2P networking node.
115    p2p_node: Arc<P2PNode>,
116    /// Local chunk storage.
117    storage: Arc<LmdbStorage>,
118    /// Persistent paid-for-list.
119    paid_list: Arc<PaidList>,
120    /// Payment verifier for `PoP` validation.
121    payment_verifier: Arc<PaymentVerifier>,
122    /// Replication pipeline queues.
123    queues: Arc<RwLock<ReplicationQueues>>,
124    /// Neighbor sync cycle state.
125    sync_state: Arc<RwLock<NeighborSyncState>>,
126    /// Per-peer sync history (for `RepairOpportunity`).
127    ///
128    /// This map grows with peer churn and is intentionally unbounded: entries
129    /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
130    /// naturally bounded by the routing table's k-bucket capacity.
131    sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
132    /// Bootstrap state tracking.
133    bootstrap_state: Arc<RwLock<BootstrapState>>,
134    /// Whether this node is currently bootstrapping.
135    is_bootstrapping: Arc<RwLock<bool>>,
136    /// Trigger for early neighbor sync (signalled on topology changes).
137    sync_trigger: Arc<Notify>,
138    /// Notified when `is_bootstrapping` transitions from `true` to `false`.
139    bootstrap_complete_notify: Arc<Notify>,
140    /// Limits concurrent outbound replication sends to prevent bandwidth
141    /// saturation on home broadband connections.
142    send_semaphore: Arc<Semaphore>,
143    /// Receiver for fresh-write events from the chunk PUT handler.
144    ///
145    /// When present, `start()` spawns a drainer task that calls
146    /// `replicate_fresh` for each event.
147    fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
148    /// Shutdown token.
149    shutdown: CancellationToken,
150    /// Background task handles.
151    task_handles: Vec<JoinHandle<()>>,
152}
153
154impl ReplicationEngine {
155    /// Create a new replication engine.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the `PaidList` LMDB environment cannot be opened
160    /// or if the configuration fails validation.
161    pub async fn new(
162        config: ReplicationConfig,
163        p2p_node: Arc<P2PNode>,
164        storage: Arc<LmdbStorage>,
165        payment_verifier: Arc<PaymentVerifier>,
166        root_dir: &Path,
167        fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
168        shutdown: CancellationToken,
169    ) -> Result<Self> {
170        config.validate().map_err(Error::Config)?;
171
172        let paid_list = Arc::new(
173            PaidList::new(root_dir)
174                .await
175                .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
176        );
177
178        let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
179        let config = Arc::new(config);
180
181        Ok(Self {
182            config: Arc::clone(&config),
183            p2p_node,
184            storage,
185            paid_list,
186            payment_verifier,
187            queues: Arc::new(RwLock::new(ReplicationQueues::new())),
188            sync_state: Arc::new(RwLock::new(initial_neighbors)),
189            sync_history: Arc::new(RwLock::new(HashMap::new())),
190            bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
191            is_bootstrapping: Arc::new(RwLock::new(true)),
192            sync_trigger: Arc::new(Notify::new()),
193            bootstrap_complete_notify: Arc::new(Notify::new()),
194            send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
195            fresh_write_rx: Some(fresh_write_rx),
196            shutdown,
197            task_handles: Vec::new(),
198        })
199    }
200
201    /// Get a reference to the `PaidList`.
202    #[must_use]
203    pub fn paid_list(&self) -> &Arc<PaidList> {
204        &self.paid_list
205    }
206
207    /// Start all background tasks.
208    ///
209    /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
210    /// the `BootstrapComplete` event emitted during DHT bootstrap is not
211    /// missed by the bootstrap-sync gate.
212    pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
213        if !self.task_handles.is_empty() {
214            error!("ReplicationEngine::start() called while already running — ignoring");
215            return;
216        }
217        info!("Starting replication engine");
218
219        self.start_message_handler();
220        self.start_neighbor_sync_loop();
221        self.start_self_lookup_loop();
222        self.start_audit_loop();
223        self.start_fetch_worker();
224        self.start_verification_worker();
225        self.start_bootstrap_sync(dht_events);
226        self.start_fresh_write_drainer();
227
228        info!(
229            "Replication engine started with {} background tasks",
230            self.task_handles.len()
231        );
232    }
233
234    /// Returns `true` if the node is still in the replication bootstrap phase.
235    ///
236    /// During bootstrap, audit challenges return `Bootstrapping` instead of
237    /// digests, and neighbor sync responses carry `bootstrapping: true`.
238    pub async fn is_bootstrapping(&self) -> bool {
239        *self.is_bootstrapping.read().await
240    }
241
242    /// Wait until the replication bootstrap phase completes.
243    ///
244    /// Returns immediately if bootstrap has already completed. Useful for
245    /// readiness probes, health checks, and test harnesses that need the
246    /// node to be fully operational before proceeding.
247    ///
248    /// Returns `true` if bootstrap completed within the timeout, `false`
249    /// if the timeout elapsed first.
250    pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
251        // Register the notification future *before* checking the flag so that
252        // a transition between the read and the await is not missed.
253        let notified = self.bootstrap_complete_notify.notified();
254        tokio::pin!(notified);
255        notified.as_mut().enable();
256
257        if !*self.is_bootstrapping.read().await {
258            return true;
259        }
260
261        tokio::time::timeout(timeout, notified).await.is_ok()
262    }
263
264    /// Cancel all background tasks and wait for them to terminate.
265    ///
266    /// This must be awaited before dropping the engine when the caller needs
267    /// the `Arc<LmdbStorage>` references held by background tasks to be
268    /// released (e.g. before reopening the same LMDB environment).
269    pub async fn shutdown(&mut self) {
270        self.shutdown.cancel();
271        for (i, mut handle) in self.task_handles.drain(..).enumerate() {
272            match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
273                Ok(Ok(())) => {}
274                Ok(Err(e)) if e.is_cancelled() => {}
275                Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
276                Err(_) => {
277                    warn!("Replication task {i} did not stop within 10s, aborting");
278                    handle.abort();
279                }
280            }
281        }
282    }
283
284    /// Trigger an early neighbor sync round.
285    ///
286    /// Useful after topology changes (new nodes joining, network heal after
287    /// partition) when the caller wants replication to converge faster than
288    /// the regular 10-20 minute cadence.
289    pub fn trigger_neighbor_sync(&self) {
290        self.sync_trigger.notify_one();
291    }
292
293    /// Execute fresh replication for a newly stored record.
294    pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
295        fresh::replicate_fresh(
296            key,
297            data,
298            proof_of_payment,
299            &self.p2p_node,
300            &self.paid_list,
301            &self.config,
302            &self.send_semaphore,
303        )
304        .await;
305    }
306
307    // =======================================================================
308    // Background task launchers
309    // =======================================================================
310
311    /// Spawn a task that drains the fresh-write channel and triggers
312    /// replication for each newly-stored chunk.
313    fn start_fresh_write_drainer(&mut self) {
314        let Some(mut rx) = self.fresh_write_rx.take() else {
315            return;
316        };
317        let p2p = Arc::clone(&self.p2p_node);
318        let paid_list = Arc::clone(&self.paid_list);
319        let config = Arc::clone(&self.config);
320        let send_semaphore = Arc::clone(&self.send_semaphore);
321        let shutdown = self.shutdown.clone();
322
323        let handle = tokio::spawn(async move {
324            loop {
325                tokio::select! {
326                    () = shutdown.cancelled() => break,
327                    event = rx.recv() => {
328                        let Some(event) = event else { break };
329                        fresh::replicate_fresh(
330                            &event.key,
331                            &event.data,
332                            &event.payment_proof,
333                            &p2p,
334                            &paid_list,
335                            &config,
336                            &send_semaphore,
337                        )
338                        .await;
339                    }
340                }
341            }
342            debug!("Fresh-write drainer shut down");
343        });
344        self.task_handles.push(handle);
345    }
346
347    #[allow(clippy::too_many_lines)]
348    fn start_message_handler(&mut self) {
349        let mut p2p_events = self.p2p_node.subscribe_events();
350        let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
351        let p2p = Arc::clone(&self.p2p_node);
352        let storage = Arc::clone(&self.storage);
353        let paid_list = Arc::clone(&self.paid_list);
354        let payment_verifier = Arc::clone(&self.payment_verifier);
355        let queues = Arc::clone(&self.queues);
356        let config = Arc::clone(&self.config);
357        let shutdown = self.shutdown.clone();
358        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
359        let bootstrap_state = Arc::clone(&self.bootstrap_state);
360        let sync_history = Arc::clone(&self.sync_history);
361        let sync_trigger = Arc::clone(&self.sync_trigger);
362
363        let handle = tokio::spawn(async move {
364            loop {
365                tokio::select! {
366                    () = shutdown.cancelled() => break,
367                    event = p2p_events.recv() => {
368                        let Ok(event) = event else { continue };
369                        if let P2PEvent::Message {
370                            topic,
371                            source: Some(source),
372                            data,
373                            ..
374                        } = event {
375                            // Determine if this is a replication message
376                            // and whether it arrived via the /rr/ request-response
377                            // path (which wraps payloads in RequestResponseEnvelope).
378                            let rr_info = if topic == REPLICATION_PROTOCOL_ID {
379                                Some((data.clone(), None))
380                            } else if topic.starts_with(RR_PREFIX)
381                                && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
382                            {
383                                P2PNode::parse_request_envelope(&data)
384                                    .filter(|(_, is_resp, _)| !is_resp)
385                                    .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
386                            } else {
387                                None
388                            };
389                            if let Some((payload, rr_message_id)) = rr_info {
390                                match handle_replication_message(
391                                    &source,
392                                    &payload,
393                                    &p2p,
394                                    &storage,
395                                    &paid_list,
396                                    &payment_verifier,
397                                    &queues,
398                                    &config,
399                                    &is_bootstrapping,
400                                    &bootstrap_state,
401                                    &sync_history,
402                                    rr_message_id.as_deref(),
403                                ).await {
404                                    Ok(()) => {}
405                                    Err(e) => {
406                                        debug!(
407                                            "Replication message from {source} error: {e}"
408                                        );
409                                    }
410                                }
411                            }
412                        }
413                    }
414                    // Gap 4: Topology churn handling (Section 13).
415                    //
416                    // The DHT routing table emits KClosestPeersChanged when the
417                    // K-closest peer set actually changes, which is the precise
418                    // signal for triggering neighbor sync. This replaces the
419                    // previous approach of checking every PeerConnected /
420                    // PeerDisconnected event against the close group.
421                    dht_event = dht_events.recv() => {
422                        let Ok(dht_event) = dht_event else { continue };
423                        if let DhtNetworkEvent::KClosestPeersChanged { .. } = dht_event {
424                            debug!(
425                                "K-closest peers changed, triggering early neighbor sync"
426                            );
427                            sync_trigger.notify_one();
428                        }
429                    }
430                }
431            }
432            debug!("Replication message handler shut down");
433        });
434        self.task_handles.push(handle);
435    }
436
437    fn start_neighbor_sync_loop(&mut self) {
438        let p2p = Arc::clone(&self.p2p_node);
439        let storage = Arc::clone(&self.storage);
440        let paid_list = Arc::clone(&self.paid_list);
441        let queues = Arc::clone(&self.queues);
442        let config = Arc::clone(&self.config);
443        let shutdown = self.shutdown.clone();
444        let sync_state = Arc::clone(&self.sync_state);
445        let sync_history = Arc::clone(&self.sync_history);
446        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
447        let bootstrap_state = Arc::clone(&self.bootstrap_state);
448        let sync_trigger = Arc::clone(&self.sync_trigger);
449
450        let handle = tokio::spawn(async move {
451            loop {
452                let interval = config.random_neighbor_sync_interval();
453                tokio::select! {
454                    () = shutdown.cancelled() => break,
455                    () = tokio::time::sleep(interval) => {}
456                    () = sync_trigger.notified() => {
457                        debug!("Neighbor sync triggered by topology change");
458                    }
459                }
460                // Wrap the sync round in a select so shutdown cancels
461                // in-progress network operations rather than waiting for
462                // the full round to complete.
463                tokio::select! {
464                    () = shutdown.cancelled() => break,
465                    () = run_neighbor_sync_round(
466                        &p2p,
467                        &storage,
468                        &paid_list,
469                        &queues,
470                        &config,
471                        &sync_state,
472                        &sync_history,
473                        &is_bootstrapping,
474                        &bootstrap_state,
475                    ) => {}
476                }
477            }
478            debug!("Neighbor sync loop shut down");
479        });
480        self.task_handles.push(handle);
481    }
482
483    fn start_self_lookup_loop(&mut self) {
484        let p2p = Arc::clone(&self.p2p_node);
485        let config = Arc::clone(&self.config);
486        let shutdown = self.shutdown.clone();
487
488        let handle = tokio::spawn(async move {
489            loop {
490                let interval = config.random_self_lookup_interval();
491                tokio::select! {
492                    () = shutdown.cancelled() => break,
493                    () = tokio::time::sleep(interval) => {
494                        if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
495                            debug!("Self-lookup failed: {e}");
496                        }
497                    }
498                }
499            }
500            debug!("Self-lookup loop shut down");
501        });
502        self.task_handles.push(handle);
503    }
504
505    fn start_audit_loop(&mut self) {
506        let p2p = Arc::clone(&self.p2p_node);
507        let storage = Arc::clone(&self.storage);
508        let config = Arc::clone(&self.config);
509        let shutdown = self.shutdown.clone();
510        let sync_history = Arc::clone(&self.sync_history);
511        let bootstrap_state = Arc::clone(&self.bootstrap_state);
512        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
513        let sync_state = Arc::clone(&self.sync_state);
514
515        let handle = tokio::spawn(async move {
516            // Invariant 19: wait for bootstrap to drain before starting audits.
517            loop {
518                tokio::select! {
519                    () = shutdown.cancelled() => return,
520                    () = tokio::time::sleep(
521                        std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
522                    ) => {
523                        if bootstrap_state.read().await.is_drained() {
524                            break;
525                        }
526                    }
527                }
528            }
529
530            // Run one audit tick immediately after bootstrap drain.
531            {
532                let bootstrapping = *is_bootstrapping.read().await;
533                let result = {
534                    let history = sync_history.read().await;
535                    audit::audit_tick(&p2p, &storage, &config, &history, bootstrapping).await
536                };
537                handle_audit_result(&result, &p2p, &sync_state, &config).await;
538            }
539
540            // Then run periodically.
541            loop {
542                let interval = config.random_audit_tick_interval();
543                tokio::select! {
544                    () = shutdown.cancelled() => break,
545                    () = tokio::time::sleep(interval) => {
546                        let bootstrapping = *is_bootstrapping.read().await;
547                        let result = {
548                            let history = sync_history.read().await;
549                            audit::audit_tick(
550                                &p2p,
551                                &storage,
552                                &config,
553                                &history,
554                                bootstrapping,
555                            )
556                            .await
557                        };
558                        handle_audit_result(&result, &p2p, &sync_state, &config).await;
559                    }
560                }
561            }
562            debug!("Audit loop shut down");
563        });
564        self.task_handles.push(handle);
565    }
566
567    #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
568    fn start_fetch_worker(&mut self) {
569        let p2p = Arc::clone(&self.p2p_node);
570        let storage = Arc::clone(&self.storage);
571        let queues = Arc::clone(&self.queues);
572        let config = Arc::clone(&self.config);
573        let shutdown = self.shutdown.clone();
574        let bootstrap_state = Arc::clone(&self.bootstrap_state);
575        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
576        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
577        let concurrency = max_parallel_fetch();
578
579        info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
580
581        let handle = tokio::spawn(async move {
582            // Each in-flight future yields (key, Option<FetchOutcome>) so we
583            // always recover the key — even if the inner task panics.
584            let mut in_flight = FuturesUnordered::<FetchFuture>::new();
585
586            loop {
587                // Fill up to `concurrency` slots from the queue.
588                {
589                    let mut q = queues.write().await;
590                    while in_flight.len() < concurrency {
591                        let Some(candidate) = q.dequeue_fetch() else {
592                            break;
593                        };
594                        let Some(&source) = candidate.sources.first() else {
595                            warn!(
596                                "Fetch candidate {} has no sources — dropping",
597                                hex::encode(candidate.key)
598                            );
599                            continue;
600                        };
601                        q.start_fetch(candidate.key, source, candidate.sources.clone());
602
603                        let p2p = Arc::clone(&p2p);
604                        let storage = Arc::clone(&storage);
605                        let config = Arc::clone(&config);
606                        let token = shutdown.clone();
607                        let fetch_key = candidate.key;
608                        in_flight.push(Box::pin(async move {
609                            let handle = tokio::spawn(async move {
610                                // Cancel-aware: abort when the engine shuts down.
611                                tokio::select! {
612                                    () = token.cancelled() => FetchOutcome {
613                                        key: fetch_key,
614                                        result: FetchResult::SourceFailed,
615                                    },
616                                    outcome = execute_single_fetch(
617                                        p2p, storage, config, fetch_key, source,
618                                    ) => outcome,
619                                }
620                            });
621                            match handle.await {
622                                Ok(outcome) => (outcome.key, Some(outcome)),
623                                Err(e) => {
624                                    error!(
625                                        "Fetch task for {} panicked: {e}",
626                                        hex::encode(fetch_key)
627                                    );
628                                    (fetch_key, None)
629                                }
630                            }
631                        }));
632                    }
633                } // release queues write lock
634
635                if in_flight.is_empty() {
636                    // No work — wait for new items or shutdown.
637                    tokio::select! {
638                        () = shutdown.cancelled() => break,
639                        () = tokio::time::sleep(
640                            std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
641                        ) => continue,
642                    }
643                }
644
645                // Wait for the next fetch to complete and process the result.
646                tokio::select! {
647                    () = shutdown.cancelled() => break,
648                    Some((key, maybe_outcome)) = in_flight.next() => {
649                        let mut q = queues.write().await;
650                        let terminal = if let Some(outcome) = maybe_outcome {
651                            match outcome.result {
652                                FetchResult::Stored => {
653                                    q.complete_fetch(&key);
654                                    true
655                                }
656                                FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
657                                    if let Some(next_peer) = q.retry_fetch(&key) {
658                                        // Spawn a new fetch task for the next source.
659                                        let p2p = Arc::clone(&p2p);
660                                        let storage = Arc::clone(&storage);
661                                        let config = Arc::clone(&config);
662                                        let token = shutdown.clone();
663                                        let fetch_key = key;
664                                        in_flight.push(Box::pin(async move {
665                                            let handle = tokio::spawn(async move {
666                                                tokio::select! {
667                                                    () = token.cancelled() => FetchOutcome {
668                                                        key: fetch_key,
669                                                        result: FetchResult::SourceFailed,
670                                                    },
671                                                    outcome = execute_single_fetch(
672                                                        p2p, storage, config, fetch_key, next_peer,
673                                                    ) => outcome,
674                                                }
675                                            });
676                                            match handle.await {
677                                                Ok(outcome) => (outcome.key, Some(outcome)),
678                                                Err(e) => {
679                                                    error!(
680                                                        "Fetch task for {} panicked: {e}",
681                                                        hex::encode(fetch_key)
682                                                    );
683                                                    (fetch_key, None)
684                                                }
685                                            }
686                                        }));
687                                        false
688                                    } else {
689                                        q.complete_fetch(&key);
690                                        true
691                                    }
692                                }
693                            }
694                        } else {
695                            // Task panicked — reclaim the in-flight slot.
696                            q.complete_fetch(&key);
697                            true
698                        };
699
700                        // Shrink bootstrap pending set on terminal exit.
701                        if terminal {
702                            drop(q); // release queues lock before acquiring bootstrap_state
703                            if !bootstrap_state.read().await.is_drained() {
704                                bootstrap_state.write().await.remove_key(&key);
705                                let q = queues.read().await;
706                                if bootstrap::check_bootstrap_drained(
707                                    &bootstrap_state,
708                                    &q,
709                                )
710                                .await
711                                {
712                                    complete_bootstrap(
713                                        &is_bootstrapping,
714                                        &bootstrap_complete_notify,
715                                    ).await;
716                                }
717                            }
718                        }
719                    }
720                }
721            }
722
723            // Cancel and drain remaining in-flight fetches on shutdown.
724            // The CancellationToken is already cancelled by this point, so
725            // spawned tasks will see cancellation via their select! branches.
726            while in_flight.next().await.is_some() {}
727            debug!("Fetch worker shut down");
728        });
729        self.task_handles.push(handle);
730    }
731
732    fn start_verification_worker(&mut self) {
733        let p2p = Arc::clone(&self.p2p_node);
734        let storage = Arc::clone(&self.storage);
735        let queues = Arc::clone(&self.queues);
736        let paid_list = Arc::clone(&self.paid_list);
737        let config = Arc::clone(&self.config);
738        let shutdown = self.shutdown.clone();
739        let bootstrap_state = Arc::clone(&self.bootstrap_state);
740        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
741        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
742
743        let handle = tokio::spawn(async move {
744            loop {
745                tokio::select! {
746                    () = shutdown.cancelled() => break,
747                    () = tokio::time::sleep(
748                        std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
749                    ) => {
750                        let ctx = VerificationCycleContext {
751                            p2p_node: &p2p,
752                            paid_list: &paid_list,
753                            storage: &storage,
754                            queues: &queues,
755                            config: &config,
756                            bootstrap_state: &bootstrap_state,
757                            is_bootstrapping: &is_bootstrapping,
758                            bootstrap_complete_notify: &bootstrap_complete_notify,
759                        };
760                        run_verification_cycle(ctx).await;
761                    }
762                }
763            }
764            debug!("Verification worker shut down");
765        });
766        self.task_handles.push(handle);
767    }
768
769    /// Gap 3: Run a one-shot bootstrap sync on startup.
770    ///
771    /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
772    /// (indicating the routing table is populated) before snapshotting
773    /// close neighbors. Falls back after a timeout so bootstrap nodes
774    /// (which have no peers and therefore never receive the event) still
775    /// proceed.
776    ///
777    /// After the gate, finds close neighbors, syncs with each in
778    /// round-robin batches, admits returned hints into the verification
779    /// pipeline, and tracks discovered keys for bootstrap drain detection.
780    fn start_bootstrap_sync(
781        &mut self,
782        dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
783    ) {
784        let p2p = Arc::clone(&self.p2p_node);
785        let storage = Arc::clone(&self.storage);
786        let paid_list = Arc::clone(&self.paid_list);
787        let queues = Arc::clone(&self.queues);
788        let config = Arc::clone(&self.config);
789        let shutdown = self.shutdown.clone();
790        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
791        let bootstrap_state = Arc::clone(&self.bootstrap_state);
792        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
793
794        let handle = tokio::spawn(async move {
795            // Wait for DHT bootstrap to complete before snapshotting
796            // neighbors. The routing table is empty until saorsa-core
797            // finishes its FIND_NODE rounds and bucket refreshes.
798            let gate = bootstrap::wait_for_bootstrap_complete(
799                dht_events,
800                config.bootstrap_complete_timeout_secs,
801                &shutdown,
802            )
803            .await;
804
805            if gate == bootstrap::BootstrapGateResult::Shutdown {
806                return;
807            }
808
809            let self_id = *p2p.peer_id();
810            let neighbors =
811                neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
812                    .await;
813
814            if neighbors.is_empty() {
815                info!("Bootstrap sync: no close neighbors found, marking drained");
816                bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
817                complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
818                return;
819            }
820
821            let neighbor_count = neighbors.len();
822            info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
823
824            // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
825            for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
826                if shutdown.is_cancelled() {
827                    break;
828                }
829
830                for peer in batch {
831                    if shutdown.is_cancelled() {
832                        break;
833                    }
834
835                    // Re-read on each iteration so peers see current state.
836                    let bootstrapping = *is_bootstrapping.read().await;
837
838                    bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
839
840                    let response = neighbor_sync::sync_with_peer(
841                        peer,
842                        &p2p,
843                        &storage,
844                        &paid_list,
845                        &config,
846                        bootstrapping,
847                    )
848                    .await;
849
850                    bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
851
852                    if let Some(resp) = response {
853                        if !resp.bootstrapping {
854                            // Admit hints into verification pipeline.
855                            let outcome = admit_and_queue_hints(
856                                &self_id,
857                                peer,
858                                &resp.replica_hints,
859                                &resp.paid_hints,
860                                &p2p,
861                                &config,
862                                &storage,
863                                &paid_list,
864                                &queues,
865                            )
866                            .await;
867
868                            // Track discovered keys for drain detection.
869                            if !outcome.discovered.is_empty() {
870                                bootstrap::track_discovered_keys(
871                                    &bootstrap_state,
872                                    &outcome.discovered,
873                                )
874                                .await;
875                            }
876
877                            // Record / retire capacity rejections so the
878                            // drain check correctly reflects whether each
879                            // source still owes us re-hinted work after
880                            // queue overflow.
881                            if outcome.capacity_rejected_count > 0 {
882                                bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
883                            } else {
884                                bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
885                            }
886                        }
887                    }
888                }
889            }
890
891            // Check drain condition.
892            {
893                let q = queues.read().await;
894                if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
895                    complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
896                }
897            }
898
899            info!("Bootstrap sync completed");
900        });
901        self.task_handles.push(handle);
902    }
903}
904
905// ===========================================================================
906// Free functions for background tasks
907// ===========================================================================
908
909/// Handle an incoming replication protocol message.
910///
911/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
912/// request-response path and the response must be sent via `send_response`
913/// so saorsa-core can route it back to the waiting `send_request` caller.
914#[allow(clippy::too_many_arguments)]
915async fn handle_replication_message(
916    source: &PeerId,
917    data: &[u8],
918    p2p_node: &Arc<P2PNode>,
919    storage: &Arc<LmdbStorage>,
920    paid_list: &Arc<PaidList>,
921    payment_verifier: &Arc<PaymentVerifier>,
922    queues: &Arc<RwLock<ReplicationQueues>>,
923    config: &ReplicationConfig,
924    is_bootstrapping: &Arc<RwLock<bool>>,
925    bootstrap_state: &Arc<RwLock<BootstrapState>>,
926    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
927    rr_message_id: Option<&str>,
928) -> Result<()> {
929    let msg = ReplicationMessage::decode(data)
930        .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
931
932    match msg.body {
933        ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
934            handle_fresh_offer(
935                source,
936                offer,
937                storage,
938                paid_list,
939                payment_verifier,
940                p2p_node,
941                config,
942                msg.request_id,
943                rr_message_id,
944            )
945            .await
946        }
947        ReplicationMessageBody::PaidNotify(ref notify) => {
948            handle_paid_notify(
949                source,
950                notify,
951                paid_list,
952                payment_verifier,
953                p2p_node,
954                config,
955            )
956            .await
957        }
958        ReplicationMessageBody::NeighborSyncRequest(ref request) => {
959            let bootstrapping = *is_bootstrapping.read().await;
960            handle_neighbor_sync_request(
961                source,
962                request,
963                p2p_node,
964                storage,
965                paid_list,
966                queues,
967                config,
968                bootstrapping,
969                bootstrap_state,
970                sync_history,
971                msg.request_id,
972                rr_message_id,
973            )
974            .await
975        }
976        ReplicationMessageBody::VerificationRequest(ref request) => {
977            handle_verification_request(
978                source,
979                request,
980                storage,
981                paid_list,
982                p2p_node,
983                msg.request_id,
984                rr_message_id,
985            )
986            .await
987        }
988        ReplicationMessageBody::FetchRequest(ref request) => {
989            handle_fetch_request(
990                source,
991                request,
992                storage,
993                p2p_node,
994                msg.request_id,
995                rr_message_id,
996            )
997            .await
998        }
999        ReplicationMessageBody::AuditChallenge(ref challenge) => {
1000            let bootstrapping = *is_bootstrapping.read().await;
1001            handle_audit_challenge_msg(
1002                source,
1003                challenge,
1004                storage,
1005                p2p_node,
1006                bootstrapping,
1007                msg.request_id,
1008                rr_message_id,
1009            )
1010            .await
1011        }
1012        // Response messages are handled by their respective request initiators.
1013        ReplicationMessageBody::FreshReplicationResponse(_)
1014        | ReplicationMessageBody::NeighborSyncResponse(_)
1015        | ReplicationMessageBody::VerificationResponse(_)
1016        | ReplicationMessageBody::FetchResponse(_)
1017        | ReplicationMessageBody::AuditResponse(_) => Ok(()),
1018    }
1019}
1020
1021// ---------------------------------------------------------------------------
1022// Per-message-type handlers
1023// ---------------------------------------------------------------------------
1024
1025#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1026async fn handle_fresh_offer(
1027    source: &PeerId,
1028    offer: &protocol::FreshReplicationOffer,
1029    storage: &Arc<LmdbStorage>,
1030    paid_list: &Arc<PaidList>,
1031    payment_verifier: &Arc<PaymentVerifier>,
1032    p2p_node: &Arc<P2PNode>,
1033    config: &ReplicationConfig,
1034    request_id: u64,
1035    rr_message_id: Option<&str>,
1036) -> Result<()> {
1037    let self_id = *p2p_node.peer_id();
1038
1039    // Rule 5: reject if PoP is missing.
1040    if offer.proof_of_payment.is_empty() {
1041        send_replication_response(
1042            source,
1043            p2p_node,
1044            request_id,
1045            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1046                key: offer.key,
1047                reason: "Missing proof of payment".to_string(),
1048            }),
1049            rr_message_id,
1050        )
1051        .await;
1052        return Ok(());
1053    }
1054
1055    // Enforce chunk size invariant: the normal PUT path rejects data larger
1056    // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1057    // prevent peers from pushing oversized records through replication.
1058    if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1059        warn!(
1060            "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1061            hex::encode(offer.key),
1062            offer.data.len(),
1063            crate::ant_protocol::MAX_CHUNK_SIZE,
1064        );
1065        p2p_node
1066            .report_trust_event(
1067                source,
1068                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1069            )
1070            .await;
1071        send_replication_response(
1072            source,
1073            p2p_node,
1074            request_id,
1075            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1076                key: offer.key,
1077                reason: format!(
1078                    "Data size {} exceeds maximum chunk size {}",
1079                    offer.data.len(),
1080                    crate::ant_protocol::MAX_CHUNK_SIZE,
1081                ),
1082            }),
1083            rr_message_id,
1084        )
1085        .await;
1086        return Ok(());
1087    }
1088
1089    // Rule 7: check responsibility.
1090    if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1091        send_replication_response(
1092            source,
1093            p2p_node,
1094            request_id,
1095            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1096                key: offer.key,
1097                reason: "Not responsible for this key".to_string(),
1098            }),
1099            rr_message_id,
1100        )
1101        .await;
1102        return Ok(());
1103    }
1104
1105    // Gap 1: Validate PoP via PaymentVerifier.
1106    match payment_verifier
1107        .verify_payment(&offer.key, Some(&offer.proof_of_payment))
1108        .await
1109    {
1110        Ok(status) if status.can_store() => {
1111            debug!(
1112                "PoP validated for fresh offer key {}",
1113                hex::encode(offer.key)
1114            );
1115        }
1116        Ok(_) => {
1117            send_replication_response(
1118                source,
1119                p2p_node,
1120                request_id,
1121                ReplicationMessageBody::FreshReplicationResponse(
1122                    FreshReplicationResponse::Rejected {
1123                        key: offer.key,
1124                        reason: "Payment verification failed: payment required".to_string(),
1125                    },
1126                ),
1127                rr_message_id,
1128            )
1129            .await;
1130            return Ok(());
1131        }
1132        Err(e) => {
1133            warn!(
1134                "PoP verification error for key {}: {e}",
1135                hex::encode(offer.key)
1136            );
1137            send_replication_response(
1138                source,
1139                p2p_node,
1140                request_id,
1141                ReplicationMessageBody::FreshReplicationResponse(
1142                    FreshReplicationResponse::Rejected {
1143                        key: offer.key,
1144                        reason: format!("Payment verification error: {e}"),
1145                    },
1146                ),
1147                rr_message_id,
1148            )
1149            .await;
1150            return Ok(());
1151        }
1152    }
1153
1154    // Rule 6: add to PaidForList.
1155    if let Err(e) = paid_list.insert(&offer.key).await {
1156        warn!("Failed to add key to PaidForList: {e}");
1157    }
1158
1159    // Store the record.
1160    match storage.put(&offer.key, &offer.data).await {
1161        Ok(_) => {
1162            send_replication_response(
1163                source,
1164                p2p_node,
1165                request_id,
1166                ReplicationMessageBody::FreshReplicationResponse(
1167                    FreshReplicationResponse::Accepted { key: offer.key },
1168                ),
1169                rr_message_id,
1170            )
1171            .await;
1172        }
1173        Err(e) => {
1174            send_replication_response(
1175                source,
1176                p2p_node,
1177                request_id,
1178                ReplicationMessageBody::FreshReplicationResponse(
1179                    FreshReplicationResponse::Rejected {
1180                        key: offer.key,
1181                        reason: format!("Storage error: {e}"),
1182                    },
1183                ),
1184                rr_message_id,
1185            )
1186            .await;
1187        }
1188    }
1189
1190    Ok(())
1191}
1192
1193async fn handle_paid_notify(
1194    _source: &PeerId,
1195    notify: &protocol::PaidNotify,
1196    paid_list: &Arc<PaidList>,
1197    payment_verifier: &Arc<PaymentVerifier>,
1198    p2p_node: &Arc<P2PNode>,
1199    config: &ReplicationConfig,
1200) -> Result<()> {
1201    let self_id = *p2p_node.peer_id();
1202
1203    // Rule 3: validate PoP presence before adding.
1204    if notify.proof_of_payment.is_empty() {
1205        return Ok(());
1206    }
1207
1208    // Check if we're in PaidCloseGroup for this key.
1209    if !admission::is_in_paid_close_group(
1210        &self_id,
1211        &notify.key,
1212        p2p_node,
1213        config.paid_list_close_group_size,
1214    )
1215    .await
1216    {
1217        return Ok(());
1218    }
1219
1220    // Gap 1: Validate PoP via PaymentVerifier.
1221    match payment_verifier
1222        .verify_payment(&notify.key, Some(&notify.proof_of_payment))
1223        .await
1224    {
1225        Ok(status) if status.can_store() => {
1226            debug!(
1227                "PoP validated for paid notify key {}",
1228                hex::encode(notify.key)
1229            );
1230        }
1231        Ok(_) => {
1232            warn!(
1233                "Paid notify rejected: payment required for key {}",
1234                hex::encode(notify.key)
1235            );
1236            return Ok(());
1237        }
1238        Err(e) => {
1239            warn!(
1240                "PoP verification error for paid notify key {}: {e}",
1241                hex::encode(notify.key)
1242            );
1243            return Ok(());
1244        }
1245    }
1246
1247    if let Err(e) = paid_list.insert(&notify.key).await {
1248        warn!("Failed to add paid notify key to PaidForList: {e}");
1249    }
1250
1251    Ok(())
1252}
1253
1254#[allow(clippy::too_many_arguments)]
1255async fn handle_neighbor_sync_request(
1256    source: &PeerId,
1257    request: &protocol::NeighborSyncRequest,
1258    p2p_node: &Arc<P2PNode>,
1259    storage: &Arc<LmdbStorage>,
1260    paid_list: &Arc<PaidList>,
1261    queues: &Arc<RwLock<ReplicationQueues>>,
1262    config: &ReplicationConfig,
1263    is_bootstrapping: bool,
1264    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1265    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1266    request_id: u64,
1267    rr_message_id: Option<&str>,
1268) -> Result<()> {
1269    let self_id = *p2p_node.peer_id();
1270
1271    // No per-request hint count limit: the wire message size limit
1272    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
1273    // challenges, sync hints don't drive expensive computation — they just
1274    // enter the verification queue. A per-request limit here would break
1275    // bootstrap replication for newly-joined nodes with 0 stored chunks.
1276
1277    // Build response (outbound hints).
1278    let (response, sender_in_rt) = neighbor_sync::handle_sync_request(
1279        source,
1280        request,
1281        p2p_node,
1282        storage,
1283        paid_list,
1284        config,
1285        is_bootstrapping,
1286    )
1287    .await;
1288
1289    // Send response.
1290    send_replication_response(
1291        source,
1292        p2p_node,
1293        request_id,
1294        ReplicationMessageBody::NeighborSyncResponse(response),
1295        rr_message_id,
1296    )
1297    .await;
1298
1299    // Process inbound hints only if sender is in LocalRT (Rule 4-6).
1300    if !sender_in_rt {
1301        return Ok(());
1302    }
1303
1304    // Update sync history for this peer.
1305    {
1306        let mut history = sync_history.write().await;
1307        let record = history.entry(*source).or_insert(PeerSyncRecord {
1308            last_sync: None,
1309            cycles_since_sync: 0,
1310        });
1311        record.last_sync = Some(Instant::now());
1312        record.cycles_since_sync = 0;
1313    }
1314
1315    // Admit inbound hints and queue for verification.
1316    let outcome = admit_and_queue_hints(
1317        &self_id,
1318        source,
1319        &request.replica_hints,
1320        &request.paid_hints,
1321        p2p_node,
1322        config,
1323        storage,
1324        paid_list,
1325        queues,
1326    )
1327    .await;
1328
1329    // Track discovered keys for bootstrap drain detection so that hints
1330    // admitted via inbound sync requests are not missed. Capacity-rejected
1331    // hints keep this source on the "not yet drained" list until its next
1332    // sync re-admits them; a clean cycle clears the source.
1333    if is_bootstrapping {
1334        if !outcome.discovered.is_empty() {
1335            bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1336        }
1337        if outcome.capacity_rejected_count > 0 {
1338            bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
1339        } else {
1340            bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
1341        }
1342    }
1343
1344    Ok(())
1345}
1346
1347async fn handle_verification_request(
1348    source: &PeerId,
1349    request: &protocol::VerificationRequest,
1350    storage: &Arc<LmdbStorage>,
1351    paid_list: &Arc<PaidList>,
1352    p2p_node: &Arc<P2PNode>,
1353    request_id: u64,
1354    rr_message_id: Option<&str>,
1355) -> Result<()> {
1356    // No per-request key count limit: the wire message size limit
1357    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
1358    // does cheap storage lookups per key, not expensive computation like
1359    // audit digest generation.
1360
1361    #[allow(clippy::cast_possible_truncation)]
1362    let keys_len = request.keys.len() as u32;
1363    let paid_check_set: HashSet<u32> = request
1364        .paid_list_check_indices
1365        .iter()
1366        .copied()
1367        .filter(|&idx| {
1368            if idx >= keys_len {
1369                warn!(
1370                    "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1371                    request.keys.len(),
1372                );
1373                false
1374            } else {
1375                true
1376            }
1377        })
1378        .collect();
1379
1380    let mut results = Vec::with_capacity(request.keys.len());
1381    for (i, key) in request.keys.iter().enumerate() {
1382        let present = storage.exists(key).unwrap_or(false);
1383        let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1384            Some(paid_list.contains(key).unwrap_or(false))
1385        } else {
1386            None
1387        };
1388        results.push(protocol::KeyVerificationResult {
1389            key: *key,
1390            present,
1391            paid,
1392        });
1393    }
1394
1395    send_replication_response(
1396        source,
1397        p2p_node,
1398        request_id,
1399        ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1400        rr_message_id,
1401    )
1402    .await;
1403
1404    Ok(())
1405}
1406
1407async fn handle_fetch_request(
1408    source: &PeerId,
1409    request: &protocol::FetchRequest,
1410    storage: &Arc<LmdbStorage>,
1411    p2p_node: &Arc<P2PNode>,
1412    request_id: u64,
1413    rr_message_id: Option<&str>,
1414) -> Result<()> {
1415    let response = match storage.get(&request.key).await {
1416        Ok(Some(data)) => protocol::FetchResponse::Success {
1417            key: request.key,
1418            data,
1419        },
1420        Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1421        Err(e) => protocol::FetchResponse::Error {
1422            key: request.key,
1423            reason: format!("{e}"),
1424        },
1425    };
1426
1427    send_replication_response(
1428        source,
1429        p2p_node,
1430        request_id,
1431        ReplicationMessageBody::FetchResponse(response),
1432        rr_message_id,
1433    )
1434    .await;
1435
1436    Ok(())
1437}
1438
1439async fn handle_audit_challenge_msg(
1440    source: &PeerId,
1441    challenge: &protocol::AuditChallenge,
1442    storage: &Arc<LmdbStorage>,
1443    p2p_node: &Arc<P2PNode>,
1444    is_bootstrapping: bool,
1445    request_id: u64,
1446    rr_message_id: Option<&str>,
1447) -> Result<()> {
1448    #[allow(clippy::cast_possible_truncation)]
1449    let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1450    let response = audit::handle_audit_challenge(
1451        challenge,
1452        storage,
1453        p2p_node.peer_id(),
1454        is_bootstrapping,
1455        stored_chunks,
1456    )
1457    .await;
1458
1459    send_replication_response(
1460        source,
1461        p2p_node,
1462        request_id,
1463        ReplicationMessageBody::AuditResponse(response),
1464        rr_message_id,
1465    )
1466    .await;
1467
1468    Ok(())
1469}
1470
1471// ---------------------------------------------------------------------------
1472// Message sending helper
1473// ---------------------------------------------------------------------------
1474
1475/// Send a replication response message. Fire-and-forget: logs errors but
1476/// does not propagate them.
1477///
1478/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
1479/// request-response path so saorsa-core can route it back to the caller's
1480/// `send_request` future. Otherwise it is sent as a plain message.
1481async fn send_replication_response(
1482    peer: &PeerId,
1483    p2p_node: &Arc<P2PNode>,
1484    request_id: u64,
1485    body: ReplicationMessageBody,
1486    rr_message_id: Option<&str>,
1487) {
1488    let msg = ReplicationMessage { request_id, body };
1489    let encoded = match msg.encode() {
1490        Ok(data) => data,
1491        Err(e) => {
1492            warn!("Failed to encode replication response: {e}");
1493            return;
1494        }
1495    };
1496    let result = if let Some(msg_id) = rr_message_id {
1497        p2p_node
1498            .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1499            .await
1500    } else {
1501        p2p_node
1502            .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1503            .await
1504    };
1505    if let Err(e) = result {
1506        debug!("Failed to send replication response to {peer}: {e}");
1507    }
1508}
1509
1510// ---------------------------------------------------------------------------
1511// Neighbor sync round
1512// ---------------------------------------------------------------------------
1513
1514/// Run one neighbor sync round.
1515#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1516async fn run_neighbor_sync_round(
1517    p2p_node: &Arc<P2PNode>,
1518    storage: &Arc<LmdbStorage>,
1519    paid_list: &Arc<PaidList>,
1520    queues: &Arc<RwLock<ReplicationQueues>>,
1521    config: &ReplicationConfig,
1522    sync_state: &Arc<RwLock<NeighborSyncState>>,
1523    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1524    is_bootstrapping: &Arc<RwLock<bool>>,
1525    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1526) {
1527    let self_id = *p2p_node.peer_id();
1528    let bootstrapping = *is_bootstrapping.read().await;
1529
1530    // Check if cycle is complete; start new one if needed.
1531    // We check under a read lock, then release it before the expensive
1532    // prune pass and DHT snapshot so other tasks are not starved.
1533    let cycle_complete = sync_state.read().await.is_cycle_complete();
1534    if cycle_complete {
1535        // Post-cycle pruning (Section 11) — runs without holding sync_state.
1536        // Remote prune-confirmation audits are storage-proof audits and only
1537        // run after bootstrap has drained.
1538        let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
1539        pruning::run_prune_pass(
1540            &self_id,
1541            storage,
1542            paid_list,
1543            p2p_node,
1544            config,
1545            sync_state,
1546            allow_remote_prune_audits,
1547        )
1548        .await;
1549
1550        // Increment `cycles_since_sync` for all peers.
1551        {
1552            let mut history = sync_history.write().await;
1553            for record in history.values_mut() {
1554                record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1555            }
1556        }
1557
1558        // Take fresh close-neighbor snapshot (DHT query, no lock held).
1559        let neighbors =
1560            neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1561                .await;
1562
1563        // Now re-acquire write lock and re-check before swapping cycle.
1564        let mut state = sync_state.write().await;
1565        if state.is_cycle_complete() {
1566            // Preserve cooldown and bootstrap-claim tracking across cycles.
1567            // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
1568            // would reset the abuse detection timer every cycle.
1569            let old_sync_times = std::mem::take(&mut state.last_sync_times);
1570            let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1571            let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
1572            let old_prune_cursor = state.prune_cursor;
1573            *state = NeighborSyncState::new_cycle(neighbors);
1574            state.last_sync_times = old_sync_times;
1575            state.bootstrap_claims = old_bootstrap_claims;
1576            state.bootstrap_claim_history = old_bootstrap_claim_history;
1577            state.prune_cursor = old_prune_cursor;
1578        }
1579    }
1580
1581    // Select batch of peers.
1582    let batch = {
1583        let mut state = sync_state.write().await;
1584        neighbor_sync::select_sync_batch(
1585            &mut state,
1586            config.neighbor_sync_peer_count,
1587            config.neighbor_sync_cooldown,
1588        )
1589    };
1590
1591    if batch.is_empty() {
1592        return;
1593    }
1594
1595    debug!("Neighbor sync: syncing with {} peers", batch.len());
1596
1597    // Sync with each peer in the batch.
1598    for peer in &batch {
1599        let response = neighbor_sync::sync_with_peer(
1600            peer,
1601            p2p_node,
1602            storage,
1603            paid_list,
1604            config,
1605            bootstrapping,
1606        )
1607        .await;
1608
1609        if let Some(resp) = response {
1610            handle_sync_response(
1611                &self_id,
1612                peer,
1613                &resp,
1614                p2p_node,
1615                config,
1616                bootstrapping,
1617                bootstrap_state,
1618                storage,
1619                paid_list,
1620                queues,
1621                sync_state,
1622                sync_history,
1623            )
1624            .await;
1625        } else {
1626            // Sync failed -- remove peer and try to fill slot.
1627            let replacement = {
1628                let mut state = sync_state.write().await;
1629                neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1630            };
1631
1632            // Attempt sync with the replacement peer (if one was found).
1633            if let Some(replacement_peer) = replacement {
1634                let replacement_resp = neighbor_sync::sync_with_peer(
1635                    &replacement_peer,
1636                    p2p_node,
1637                    storage,
1638                    paid_list,
1639                    config,
1640                    bootstrapping,
1641                )
1642                .await;
1643
1644                if let Some(resp) = replacement_resp {
1645                    handle_sync_response(
1646                        &self_id,
1647                        &replacement_peer,
1648                        &resp,
1649                        p2p_node,
1650                        config,
1651                        bootstrapping,
1652                        bootstrap_state,
1653                        storage,
1654                        paid_list,
1655                        queues,
1656                        sync_state,
1657                        sync_history,
1658                    )
1659                    .await;
1660                }
1661            }
1662        }
1663    }
1664}
1665
1666/// Process a successful neighbor sync response: record the sync, check for
1667/// bootstrap claim abuse, and admit inbound hints.
1668#[allow(clippy::too_many_arguments)]
1669async fn handle_sync_response(
1670    self_id: &PeerId,
1671    peer: &PeerId,
1672    resp: &NeighborSyncResponse,
1673    p2p_node: &Arc<P2PNode>,
1674    config: &ReplicationConfig,
1675    bootstrapping: bool,
1676    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1677    storage: &Arc<LmdbStorage>,
1678    paid_list: &Arc<PaidList>,
1679    queues: &Arc<RwLock<ReplicationQueues>>,
1680    sync_state: &Arc<RwLock<NeighborSyncState>>,
1681    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1682) {
1683    // Record successful sync.
1684    {
1685        let mut state = sync_state.write().await;
1686        neighbor_sync::record_successful_sync(&mut state, peer);
1687    }
1688    {
1689        let mut history = sync_history.write().await;
1690        let record = history.entry(*peer).or_insert(PeerSyncRecord {
1691            last_sync: None,
1692            cycles_since_sync: 0,
1693        });
1694        record.last_sync = Some(Instant::now());
1695        record.cycles_since_sync = 0;
1696    }
1697
1698    // Process inbound hints from response (skip if peer is bootstrapping).
1699    if resp.bootstrapping {
1700        // Gap 6: BootstrapClaimAbuse grace period enforcement.
1701        // Separate state mutation from network I/O to avoid holding the
1702        // write lock across report_trust_event.
1703        let should_report = {
1704            let now = Instant::now();
1705            let mut state = sync_state.write().await;
1706            match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
1707                BootstrapClaimObservation::WithinGrace { .. } => false,
1708                BootstrapClaimObservation::PastGrace { first_seen } => {
1709                    warn!(
1710                        "Peer {peer} has been claiming bootstrap for {:?}, \
1711                         exceeding grace period of {:?} — reporting abuse",
1712                        now.duration_since(first_seen),
1713                        config.bootstrap_claim_grace_period,
1714                    );
1715                    true
1716                }
1717                BootstrapClaimObservation::Repeated { first_seen } => {
1718                    warn!(
1719                        "Peer {peer} repeated bootstrap claim after previously stopping; \
1720                         first claim was {:?} ago — reporting abuse",
1721                        now.duration_since(first_seen),
1722                    );
1723                    true
1724                }
1725            }
1726        };
1727        if should_report {
1728            p2p_node
1729                .report_trust_event(
1730                    peer,
1731                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1732                )
1733                .await;
1734        }
1735    } else {
1736        // Peer is not claiming bootstrap; clear active claim while retaining
1737        // history so the peer cannot start a second grace window later.
1738        {
1739            let mut state = sync_state.write().await;
1740            state.clear_active_bootstrap_claim(peer);
1741        }
1742        let outcome = admit_and_queue_hints(
1743            self_id,
1744            peer,
1745            &resp.replica_hints,
1746            &resp.paid_hints,
1747            p2p_node,
1748            config,
1749            storage,
1750            paid_list,
1751            queues,
1752        )
1753        .await;
1754
1755        // Track discovered keys for bootstrap drain detection so that hints
1756        // admitted via regular neighbor sync are not missed. Capacity-
1757        // rejected hints keep this source on the "not yet drained" list
1758        // until its next sync replays them; a clean cycle clears it.
1759        if bootstrapping {
1760            if !outcome.discovered.is_empty() {
1761                bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1762            }
1763            if outcome.capacity_rejected_count > 0 {
1764                bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
1765            } else {
1766                bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
1767            }
1768        }
1769    }
1770}
1771
1772/// Admit hints and queue them for verification, returning newly-discovered keys.
1773///
1774/// Shared by neighbor-sync request handling, response handling, and bootstrap
1775/// sync so that admission + queueing logic lives in one place.
1776#[allow(clippy::too_many_arguments)]
1777/// Outcome of [`admit_and_queue_hints`].
1778///
1779/// `capacity_rejected_count` is non-zero when one or more legitimately
1780/// admissible hints were dropped because `pending_verify`'s global or
1781/// per-source bound was hit. Callers that care about completeness
1782/// (bootstrap drain accounting) MUST NOT treat their work as complete while
1783/// this is > 0 — the source will need to re-hint after capacity frees up.
1784struct AdmissionOutcome {
1785    discovered: HashSet<XorName>,
1786    capacity_rejected_count: usize,
1787}
1788
1789#[allow(clippy::too_many_arguments)]
1790async fn admit_and_queue_hints(
1791    self_id: &PeerId,
1792    source_peer: &PeerId,
1793    replica_hints: &[XorName],
1794    paid_hints: &[XorName],
1795    p2p_node: &Arc<P2PNode>,
1796    config: &ReplicationConfig,
1797    storage: &Arc<LmdbStorage>,
1798    paid_list: &Arc<PaidList>,
1799    queues: &Arc<RwLock<ReplicationQueues>>,
1800) -> AdmissionOutcome {
1801    let pending_keys: HashSet<XorName> = {
1802        let q = queues.read().await;
1803        q.pending_keys().into_iter().collect()
1804    };
1805
1806    let admitted = admission::admit_hints(
1807        self_id,
1808        replica_hints,
1809        paid_hints,
1810        p2p_node,
1811        config,
1812        storage,
1813        paid_list,
1814        &pending_keys,
1815    )
1816    .await;
1817
1818    let mut discovered = HashSet::new();
1819    let mut capacity_rejected_count: usize = 0;
1820    let mut q = queues.write().await;
1821    let now = Instant::now();
1822
1823    for key in admitted.replica_keys {
1824        if !storage.exists(&key).unwrap_or(false) {
1825            let result = q.add_pending_verify(
1826                key,
1827                VerificationEntry {
1828                    state: VerificationState::PendingVerify,
1829                    pipeline: HintPipeline::Replica,
1830                    verified_sources: Vec::new(),
1831                    tried_sources: HashSet::new(),
1832                    created_at: now,
1833                    hint_sender: *source_peer,
1834                },
1835            );
1836            match result {
1837                crate::replication::scheduling::AdmissionResult::Admitted => {
1838                    discovered.insert(key);
1839                }
1840                crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1841                crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1842                    capacity_rejected_count += 1;
1843                }
1844            }
1845        }
1846    }
1847
1848    for key in admitted.paid_only_keys {
1849        let result = q.add_pending_verify(
1850            key,
1851            VerificationEntry {
1852                state: VerificationState::PendingVerify,
1853                pipeline: HintPipeline::PaidOnly,
1854                verified_sources: Vec::new(),
1855                tried_sources: HashSet::new(),
1856                created_at: now,
1857                hint_sender: *source_peer,
1858            },
1859        );
1860        match result {
1861            crate::replication::scheduling::AdmissionResult::Admitted => {
1862                discovered.insert(key);
1863            }
1864            crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1865            crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1866                capacity_rejected_count += 1;
1867            }
1868        }
1869    }
1870
1871    if capacity_rejected_count > 0 {
1872        debug!(
1873            "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
1874             rejected at queue capacity; source will need to re-hint after pending_verify drains"
1875        );
1876    }
1877
1878    AdmissionOutcome {
1879        discovered,
1880        capacity_rejected_count,
1881    }
1882}
1883
1884// ---------------------------------------------------------------------------
1885// Verification cycle
1886// ---------------------------------------------------------------------------
1887
1888/// Run one verification cycle: process pending keys through quorum checks.
1889#[allow(clippy::too_many_lines)]
1890async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
1891    let VerificationCycleContext {
1892        p2p_node,
1893        paid_list,
1894        storage,
1895        queues,
1896        config,
1897        bootstrap_state,
1898        is_bootstrapping,
1899        bootstrap_complete_notify,
1900    } = ctx;
1901
1902    // Evict stale entries that have been pending too long (e.g. unreachable
1903    // verification targets during a network partition).
1904    {
1905        let mut q = queues.write().await;
1906        q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
1907    }
1908
1909    let pending_keys = {
1910        let q = queues.read().await;
1911        q.pending_keys()
1912    };
1913
1914    if pending_keys.is_empty() {
1915        return;
1916    }
1917
1918    let self_id = *p2p_node.peer_id();
1919
1920    // Step 1: Check local PaidForList for fast-path authorization (Section 9,
1921    // step 4).
1922    let mut local_paid_presence_probe_keys = Vec::new();
1923    let mut local_paid_paid_only_keys = Vec::new();
1924    let mut keys_needing_network = Vec::new();
1925    let mut terminal_keys: Vec<XorName> = Vec::new();
1926    {
1927        let mut q = queues.write().await;
1928        for key in &pending_keys {
1929            if paid_list.contains(key).unwrap_or(false) {
1930                if let Some(pipeline) =
1931                    q.set_pending_state(key, VerificationState::PaidListVerified)
1932                {
1933                    match pipeline {
1934                        HintPipeline::PaidOnly => {
1935                            // Paid-only + local paid state needs one more
1936                            // responsibility check outside this lock: if we
1937                            // are also in the storage close group, the hint
1938                            // can repair a missing replica.
1939                            local_paid_paid_only_keys.push(*key);
1940                        }
1941                        HintPipeline::Replica => {
1942                            // Local paid-list membership authorizes the key.
1943                            // We still need a presence probe to discover fetch
1944                            // sources, but we must not require remote paid
1945                            // majority or presence quorum.
1946                            local_paid_presence_probe_keys.push(*key);
1947                        }
1948                    }
1949                }
1950            } else {
1951                keys_needing_network.push(*key);
1952            }
1953        }
1954    }
1955
1956    if !local_paid_paid_only_keys.is_empty() {
1957        let mut terminal_paid_only = Vec::new();
1958        for key in local_paid_paid_only_keys {
1959            if storage.exists(&key).unwrap_or(false) {
1960                terminal_paid_only.push(key);
1961            } else if admission::is_responsible(&self_id, &key, p2p_node, config.close_group_size)
1962                .await
1963            {
1964                local_paid_presence_probe_keys.push(key);
1965            } else {
1966                terminal_paid_only.push(key);
1967            }
1968        }
1969
1970        if !terminal_paid_only.is_empty() {
1971            let mut q = queues.write().await;
1972            for key in terminal_paid_only {
1973                q.remove_pending(&key);
1974                terminal_keys.push(key);
1975            }
1976        }
1977    }
1978
1979    // Step 1b: Local paid-list hit for fetch-eligible keys. Per Section 9
1980    // step 4, authorization succeeds immediately; run a presence-only probe
1981    // to find any holder we can fetch from.
1982    if !local_paid_presence_probe_keys.is_empty() {
1983        let targets = quorum::compute_presence_targets(
1984            &local_paid_presence_probe_keys,
1985            p2p_node,
1986            config,
1987            &self_id,
1988        )
1989        .await;
1990        let evidence = quorum::run_verification_round(
1991            &local_paid_presence_probe_keys,
1992            &targets,
1993            p2p_node,
1994            config,
1995        )
1996        .await;
1997
1998        let mut q = queues.write().await;
1999        for key in local_paid_presence_probe_keys {
2000            if storage.exists(&key).unwrap_or(false) {
2001                q.remove_pending(&key);
2002                terminal_keys.push(key);
2003                continue;
2004            }
2005            let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
2006                quorum::present_sources_for_key(&key, ev, &targets)
2007            });
2008            if sources.is_empty() {
2009                // Terminal failure: remove pending and report. No fetch path.
2010                q.remove_pending(&key);
2011                warn!(
2012                    "Locally paid key {} has no responding holders (possible data loss)",
2013                    hex::encode(key)
2014                );
2015                terminal_keys.push(key);
2016            } else {
2017                let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2018                // Atomic remove+enqueue: if fetch_queue is at capacity, the
2019                // pending entry is preserved and retried next cycle (no
2020                // silent drop of verified replica-repair work).
2021                let _ = q.promote_pending_to_fetch(key, distance, sources);
2022            }
2023        }
2024    }
2025
2026    // Steps 2-5: Network verification (skipped if all keys resolved locally).
2027    if !keys_needing_network.is_empty() {
2028        // Step 2: Compute targets and run network verification round.
2029        let targets =
2030            quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
2031                .await;
2032
2033        let evidence =
2034            quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
2035
2036        // Step 3: Evaluate results — collect outcomes without holding the write
2037        // lock across paid-list I/O.
2038        let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
2039        {
2040            let q = queues.read().await;
2041            for key in &keys_needing_network {
2042                let Some(ev) = evidence.get(key) else {
2043                    continue;
2044                };
2045                let Some(entry) = q.get_pending(key) else {
2046                    continue;
2047                };
2048                let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
2049                evaluated.push((*key, outcome, entry.pipeline));
2050            }
2051        } // read lock released
2052
2053        // Step 4: Insert verified keys into PaidForList (no lock held).
2054        let mut paid_insert_keys: Vec<XorName> = Vec::new();
2055        for (key, outcome, _) in &evaluated {
2056            if matches!(
2057                outcome,
2058                KeyVerificationOutcome::QuorumVerified { .. }
2059                    | KeyVerificationOutcome::PaidListVerified { .. }
2060            ) {
2061                paid_insert_keys.push(*key);
2062            }
2063        }
2064        for key in &paid_insert_keys {
2065            if let Err(e) = paid_list.insert(key).await {
2066                warn!("Failed to add verified key to PaidForList: {e}");
2067            }
2068        }
2069
2070        // Paid-only hints normally update PaidForList only. If this node is
2071        // also storage-responsible for the key, a verified paid-only hint can
2072        // safely repair a missing replica using sources from the same
2073        // verification round.
2074        let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
2075        for (key, outcome, pipeline) in &evaluated {
2076            if *pipeline == HintPipeline::PaidOnly
2077                && matches!(
2078                    outcome,
2079                    KeyVerificationOutcome::QuorumVerified { .. }
2080                        | KeyVerificationOutcome::PaidListVerified { .. }
2081                )
2082                && !storage.exists(key).unwrap_or(false)
2083                && admission::is_responsible(&self_id, key, p2p_node, config.close_group_size).await
2084            {
2085                paid_only_fetch_keys.insert(*key);
2086            }
2087        }
2088
2089        // Step 5: Update queues with the evaluated outcomes.
2090        let mut q = queues.write().await;
2091        for (key, outcome, pipeline) in evaluated {
2092            match outcome {
2093                KeyVerificationOutcome::QuorumVerified { sources }
2094                | KeyVerificationOutcome::PaidListVerified { sources } => {
2095                    let fetch_eligible =
2096                        pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
2097                    if fetch_eligible && !sources.is_empty() {
2098                        let distance =
2099                            crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2100                        // Atomic remove+enqueue: on fetch_queue capacity miss
2101                        // the pending entry is preserved so this verified key
2102                        // is retried on the next cycle (no silent drop).
2103                        let _ = q.promote_pending_to_fetch(key, distance, sources);
2104                        // Not terminal — either moved to fetch queue, or
2105                        // retained as pending until queue drains.
2106                    } else if fetch_eligible && sources.is_empty() {
2107                        warn!(
2108                            "Verified responsible key {} has no holders (possible data loss)",
2109                            hex::encode(key)
2110                        );
2111                        q.remove_pending(&key);
2112                        terminal_keys.push(key);
2113                    } else {
2114                        q.remove_pending(&key);
2115                        terminal_keys.push(key);
2116                    }
2117                }
2118                KeyVerificationOutcome::QuorumFailed
2119                | KeyVerificationOutcome::QuorumInconclusive => {
2120                    q.remove_pending(&key);
2121                    terminal_keys.push(key);
2122                }
2123            }
2124        }
2125    }
2126
2127    // Step 6: Remove terminal keys from bootstrap pending set and re-check
2128    // the drain condition.
2129    update_bootstrap_after_verification(
2130        &terminal_keys,
2131        bootstrap_state,
2132        queues,
2133        is_bootstrapping,
2134        bootstrap_complete_notify,
2135    )
2136    .await;
2137}
2138
2139/// Post-verification bootstrap bookkeeping: remove terminal keys from the
2140/// bootstrap pending set and transition out of bootstrapping when drained.
2141async fn update_bootstrap_after_verification(
2142    terminal_keys: &[XorName],
2143    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2144    queues: &Arc<RwLock<ReplicationQueues>>,
2145    is_bootstrapping: &Arc<RwLock<bool>>,
2146    bootstrap_complete_notify: &Arc<Notify>,
2147) {
2148    if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
2149        return;
2150    }
2151    {
2152        let mut bs = bootstrap_state.write().await;
2153        for key in terminal_keys {
2154            bs.remove_key(key);
2155        }
2156    }
2157    let q = queues.read().await;
2158    if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
2159        complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
2160    }
2161}
2162
2163/// Set `is_bootstrapping` to `false` and wake all waiters.
2164async fn complete_bootstrap(
2165    is_bootstrapping: &Arc<RwLock<bool>>,
2166    bootstrap_complete_notify: &Arc<Notify>,
2167) {
2168    *is_bootstrapping.write().await = false;
2169    bootstrap_complete_notify.notify_waiters();
2170    info!("Replication bootstrap complete");
2171}
2172
2173// ---------------------------------------------------------------------------
2174// Fetch types and single-fetch executor
2175// ---------------------------------------------------------------------------
2176
2177/// Result classification for a single fetch attempt.
2178enum FetchResult {
2179    /// Data fetched, integrity-checked, and stored successfully.
2180    Stored,
2181    /// Content-address integrity check failed — do not retry.
2182    IntegrityFailed,
2183    /// Source failed (network error or non-success response) — retryable.
2184    SourceFailed,
2185}
2186
2187/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
2188/// worker loop to update queue state.
2189struct FetchOutcome {
2190    key: XorName,
2191    result: FetchResult,
2192}
2193
2194#[allow(clippy::too_many_lines)]
2195/// Execute a single fetch request against `source` for `key`.
2196///
2197/// Handles encoding, network I/O, integrity checking, storage, and trust
2198/// event reporting.  Returns a [`FetchOutcome`] so the caller can update
2199/// queue state without holding any locks during the network round-trip.
2200async fn execute_single_fetch(
2201    p2p_node: Arc<P2PNode>,
2202    storage: Arc<LmdbStorage>,
2203    config: Arc<ReplicationConfig>,
2204    key: XorName,
2205    source: PeerId,
2206) -> FetchOutcome {
2207    let request = protocol::FetchRequest { key };
2208    let msg = ReplicationMessage {
2209        request_id: rand::thread_rng().gen::<u64>(),
2210        body: ReplicationMessageBody::FetchRequest(request),
2211    };
2212
2213    let encoded = match msg.encode() {
2214        Ok(data) => data,
2215        Err(e) => {
2216            warn!("Failed to encode fetch request: {e}");
2217            return FetchOutcome {
2218                key,
2219                result: FetchResult::SourceFailed,
2220            };
2221        }
2222    };
2223
2224    let result = p2p_node
2225        .send_request(
2226            &source,
2227            REPLICATION_PROTOCOL_ID,
2228            encoded,
2229            config.fetch_request_timeout,
2230        )
2231        .await;
2232
2233    match result {
2234        Ok(response) => {
2235            let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2236                p2p_node
2237                    .report_trust_event(
2238                        &source,
2239                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2240                    )
2241                    .await;
2242                return FetchOutcome {
2243                    key,
2244                    result: FetchResult::SourceFailed,
2245                };
2246            };
2247
2248            match resp_msg.body {
2249                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2250                    key: resp_key,
2251                    data,
2252                }) => {
2253                    // Validate the response key matches the requested key.
2254                    // A malicious peer could serve valid data for a different
2255                    // key, passing integrity checks while the requested key
2256                    // is falsely marked as fetched.
2257                    if resp_key != key {
2258                        warn!(
2259                            "Fetch response key mismatch: requested {}, got {}",
2260                            hex::encode(key),
2261                            hex::encode(resp_key)
2262                        );
2263                        p2p_node
2264                            .report_trust_event(
2265                                &source,
2266                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2267                            )
2268                            .await;
2269                        return FetchOutcome {
2270                            key,
2271                            result: FetchResult::IntegrityFailed,
2272                        };
2273                    }
2274
2275                    // Enforce chunk size invariant on fetched data.
2276                    // Checked before the content-address hash to avoid
2277                    // hashing up to 10 MiB of oversized junk data.
2278                    if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2279                        warn!(
2280                            "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2281                            hex::encode(resp_key),
2282                            data.len(),
2283                            crate::ant_protocol::MAX_CHUNK_SIZE,
2284                        );
2285                        p2p_node
2286                            .report_trust_event(
2287                                &source,
2288                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2289                            )
2290                            .await;
2291                        return FetchOutcome {
2292                            key,
2293                            result: FetchResult::IntegrityFailed,
2294                        };
2295                    }
2296
2297                    // Content-address integrity check.
2298                    let computed = crate::client::compute_address(&data);
2299                    if computed != resp_key {
2300                        warn!(
2301                            "Fetched record integrity check failed: expected {}, got {}",
2302                            hex::encode(resp_key),
2303                            hex::encode(computed)
2304                        );
2305                        p2p_node
2306                            .report_trust_event(
2307                                &source,
2308                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2309                            )
2310                            .await;
2311                        return FetchOutcome {
2312                            key,
2313                            result: FetchResult::IntegrityFailed,
2314                        };
2315                    }
2316
2317                    if let Err(e) = storage.put(&resp_key, &data).await {
2318                        warn!(
2319                            "Failed to store fetched record {}: {e}",
2320                            hex::encode(resp_key)
2321                        );
2322                        return FetchOutcome {
2323                            key,
2324                            result: FetchResult::SourceFailed,
2325                        };
2326                    }
2327
2328                    FetchOutcome {
2329                        key,
2330                        result: FetchResult::Stored,
2331                    }
2332                }
2333                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2334                    ..
2335                }) => {
2336                    // This peer was selected as a fetch source because it
2337                    // recently answered `Present` during verification. A
2338                    // subsequent NotFound is evidence of a stale/false claim
2339                    // or chunk wiping, so penalize lightly and try another
2340                    // verified source.
2341                    warn!(
2342                        "Fetch: verified source {source} returned NotFound for {}",
2343                        hex::encode(key)
2344                    );
2345                    p2p_node
2346                        .report_trust_event(
2347                            &source,
2348                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2349                        )
2350                        .await;
2351                    FetchOutcome {
2352                        key,
2353                        result: FetchResult::SourceFailed,
2354                    }
2355                }
2356                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2357                    reason,
2358                    ..
2359                }) => {
2360                    warn!(
2361                        "Fetch: peer {source} returned error for {}: {reason}",
2362                        hex::encode(key)
2363                    );
2364                    p2p_node
2365                        .report_trust_event(
2366                            &source,
2367                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2368                        )
2369                        .await;
2370                    FetchOutcome {
2371                        key,
2372                        result: FetchResult::SourceFailed,
2373                    }
2374                }
2375                _ => {
2376                    // Unexpected message type — treat as malformed.
2377                    p2p_node
2378                        .report_trust_event(
2379                            &source,
2380                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2381                        )
2382                        .await;
2383                    FetchOutcome {
2384                        key,
2385                        result: FetchResult::SourceFailed,
2386                    }
2387                }
2388            }
2389        }
2390        Err(e) => {
2391            debug!("Fetch request to {source} failed: {e}");
2392            // No ApplicationFailure here — P2PNode::send_request() already
2393            // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
2394            FetchOutcome {
2395                key,
2396                result: FetchResult::SourceFailed,
2397            }
2398        }
2399    }
2400}
2401
2402// ---------------------------------------------------------------------------
2403// Audit result handler
2404// ---------------------------------------------------------------------------
2405
2406/// Handle audit result: log findings and emit trust events.
2407async fn handle_audit_result(
2408    result: &AuditTickResult,
2409    p2p_node: &Arc<P2PNode>,
2410    sync_state: &Arc<RwLock<NeighborSyncState>>,
2411    config: &ReplicationConfig,
2412) {
2413    match result {
2414        AuditTickResult::Passed {
2415            challenged_peer,
2416            keys_checked,
2417        } => {
2418            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2419            // Peer responded normally — clear the active bootstrap claim while
2420            // retaining history so a later claim is treated as repeated abuse.
2421            {
2422                let mut state = sync_state.write().await;
2423                state.clear_active_bootstrap_claim(challenged_peer);
2424            }
2425            p2p_node
2426                .report_trust_event(
2427                    challenged_peer,
2428                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2429                )
2430                .await;
2431        }
2432        AuditTickResult::Failed { evidence } => {
2433            if let FailureEvidence::AuditFailure {
2434                challenged_peer,
2435                confirmed_failed_keys,
2436                reason,
2437                ..
2438            } = evidence
2439            {
2440                error!(
2441                    "Audit failure for {challenged_peer}: {} confirmed failed keys",
2442                    confirmed_failed_keys.len()
2443                );
2444                if audit_failure_clears_bootstrap_claim(reason) {
2445                    // Peer returned a non-bootstrap response — clear the active
2446                    // claim while retaining claim history.
2447                    let mut state = sync_state.write().await;
2448                    state.clear_active_bootstrap_claim(challenged_peer);
2449                } else {
2450                    debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
2451                }
2452                p2p_node
2453                    .report_trust_event(
2454                        challenged_peer,
2455                        TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2456                    )
2457                    .await;
2458            }
2459        }
2460        AuditTickResult::BootstrapClaim { peer } => {
2461            // Gap 6: BootstrapClaimAbuse grace period in audit path.
2462            // Separate state mutation from network I/O to avoid holding the
2463            // write lock across report_trust_event.
2464            let should_report = {
2465                let now = Instant::now();
2466                let mut state = sync_state.write().await;
2467                match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
2468                {
2469                    BootstrapClaimObservation::WithinGrace { .. } => {
2470                        debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2471                        false
2472                    }
2473                    BootstrapClaimObservation::PastGrace { first_seen } => {
2474                        warn!(
2475                            "Audit: peer {peer} claiming bootstrap past grace period \
2476                             ({:?} > {:?}), reporting abuse",
2477                            now.duration_since(first_seen),
2478                            config.bootstrap_claim_grace_period,
2479                        );
2480                        true
2481                    }
2482                    BootstrapClaimObservation::Repeated { first_seen } => {
2483                        warn!(
2484                            "Audit: peer {peer} repeated bootstrap claim after previously \
2485                             stopping; first claim was {:?} ago, reporting abuse",
2486                            now.duration_since(first_seen),
2487                        );
2488                        true
2489                    }
2490                }
2491            };
2492            if should_report {
2493                p2p_node
2494                    .report_trust_event(
2495                        peer,
2496                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2497                    )
2498                    .await;
2499            }
2500        }
2501        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2502    }
2503}
2504
2505fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
2506    !matches!(reason, AuditFailureReason::Timeout)
2507}
2508
2509// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.
2510
2511#[cfg(test)]
2512#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2513mod tests {
2514    use super::audit_failure_clears_bootstrap_claim;
2515    use crate::replication::types::AuditFailureReason;
2516
2517    #[test]
2518    fn audit_timeout_preserves_active_bootstrap_claim() {
2519        assert!(!audit_failure_clears_bootstrap_claim(
2520            &AuditFailureReason::Timeout
2521        ));
2522    }
2523
2524    #[test]
2525    fn decoded_audit_failures_clear_active_bootstrap_claim() {
2526        for reason in [
2527            AuditFailureReason::MalformedResponse,
2528            AuditFailureReason::DigestMismatch,
2529            AuditFailureReason::KeyAbsent,
2530            AuditFailureReason::Rejected,
2531        ] {
2532            assert!(
2533                audit_failure_clears_bootstrap_claim(&reason),
2534                "decoded non-bootstrap failure {reason:?} should clear active claim"
2535            );
2536        }
2537    }
2538}