Skip to main content

ant_node/replication/
mod.rs

1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::{PaymentVerifier, VerificationContext};
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50    max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51    REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55    FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56    VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61    AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
62    NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68// ---------------------------------------------------------------------------
69// Constants
70// ---------------------------------------------------------------------------
71
72/// Prefix used by saorsa-core's request-response mechanism.
73const RR_PREFIX: &str = "/rr/";
74
75/// Boxed future type for in-flight fetch tasks.
76type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78/// Shared dependencies for one verification worker cycle.
79struct VerificationCycleContext<'a> {
80    p2p_node: &'a Arc<P2PNode>,
81    paid_list: &'a Arc<PaidList>,
82    storage: &'a Arc<LmdbStorage>,
83    queues: &'a Arc<RwLock<ReplicationQueues>>,
84    config: &'a ReplicationConfig,
85    bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
86    is_bootstrapping: &'a Arc<RwLock<bool>>,
87    bootstrap_complete_notify: &'a Arc<Notify>,
88}
89
90/// Fetch worker polling interval in milliseconds.
91const FETCH_WORKER_POLL_MS: u64 = 100;
92
93/// Verification worker polling interval in milliseconds.
94const VERIFICATION_WORKER_POLL_MS: u64 = 250;
95
96/// Bootstrap drain check interval in seconds.
97const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
98
99/// Standard trust event weight for per-operation success/failure signals.
100///
101/// Used for individual replication fetch outcomes, integrity check failures,
102/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
103/// is reserved for confirmed audit failures.
104const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
105
106// ---------------------------------------------------------------------------
107// ReplicationEngine
108// ---------------------------------------------------------------------------
109
110/// The replication engine manages all replication background tasks and state.
111pub struct ReplicationEngine {
112    /// Replication configuration (shared across spawned tasks).
113    config: Arc<ReplicationConfig>,
114    /// P2P networking node.
115    p2p_node: Arc<P2PNode>,
116    /// Local chunk storage.
117    storage: Arc<LmdbStorage>,
118    /// Persistent paid-for-list.
119    paid_list: Arc<PaidList>,
120    /// Payment verifier for `PoP` validation.
121    payment_verifier: Arc<PaymentVerifier>,
122    /// Replication pipeline queues.
123    queues: Arc<RwLock<ReplicationQueues>>,
124    /// Neighbor sync cycle state.
125    sync_state: Arc<RwLock<NeighborSyncState>>,
126    /// Per-peer sync history (for `RepairOpportunity`).
127    ///
128    /// This map grows with peer churn and is intentionally unbounded: entries
129    /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
130    /// naturally bounded by the routing table's k-bucket capacity.
131    sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
132    /// Completed local neighbor-sync cycle epoch for proof maturity.
133    sync_cycle_epoch: Arc<RwLock<u64>>,
134    /// Per-key repair proof tracking for audit eligibility.
135    repair_proofs: Arc<RwLock<RepairProofs>>,
136    /// Bootstrap state tracking.
137    bootstrap_state: Arc<RwLock<BootstrapState>>,
138    /// Whether this node is currently bootstrapping.
139    is_bootstrapping: Arc<RwLock<bool>>,
140    /// Trigger for early neighbor sync (signalled on topology changes).
141    sync_trigger: Arc<Notify>,
142    /// Notified when `is_bootstrapping` transitions from `true` to `false`.
143    bootstrap_complete_notify: Arc<Notify>,
144    /// Limits concurrent outbound replication sends to prevent bandwidth
145    /// saturation on home broadband connections.
146    send_semaphore: Arc<Semaphore>,
147    /// Receiver for fresh-write events from the chunk PUT handler.
148    ///
149    /// When present, `start()` spawns a drainer task that calls
150    /// `replicate_fresh` for each event.
151    fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
152    /// Shutdown token.
153    shutdown: CancellationToken,
154    /// Background task handles.
155    task_handles: Vec<JoinHandle<()>>,
156}
157
158impl ReplicationEngine {
159    /// Create a new replication engine.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the `PaidList` LMDB environment cannot be opened
164    /// or if the configuration fails validation.
165    pub async fn new(
166        config: ReplicationConfig,
167        p2p_node: Arc<P2PNode>,
168        storage: Arc<LmdbStorage>,
169        payment_verifier: Arc<PaymentVerifier>,
170        root_dir: &Path,
171        fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
172        shutdown: CancellationToken,
173    ) -> Result<Self> {
174        config.validate().map_err(Error::Config)?;
175
176        let paid_list = Arc::new(
177            PaidList::new(root_dir)
178                .await
179                .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
180        );
181
182        let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
183        let config = Arc::new(config);
184
185        Ok(Self {
186            config: Arc::clone(&config),
187            p2p_node,
188            storage,
189            paid_list,
190            payment_verifier,
191            queues: Arc::new(RwLock::new(ReplicationQueues::new())),
192            sync_state: Arc::new(RwLock::new(initial_neighbors)),
193            sync_history: Arc::new(RwLock::new(HashMap::new())),
194            sync_cycle_epoch: Arc::new(RwLock::new(0)),
195            repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
196            bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
197            is_bootstrapping: Arc::new(RwLock::new(true)),
198            sync_trigger: Arc::new(Notify::new()),
199            bootstrap_complete_notify: Arc::new(Notify::new()),
200            send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
201            fresh_write_rx: Some(fresh_write_rx),
202            shutdown,
203            task_handles: Vec::new(),
204        })
205    }
206
207    /// Get a reference to the `PaidList`.
208    #[must_use]
209    pub fn paid_list(&self) -> &Arc<PaidList> {
210        &self.paid_list
211    }
212
213    /// Start all background tasks.
214    ///
215    /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
216    /// the `BootstrapComplete` event emitted during DHT bootstrap is not
217    /// missed by the bootstrap-sync gate.
218    pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
219        if !self.task_handles.is_empty() {
220            error!("ReplicationEngine::start() called while already running — ignoring");
221            return;
222        }
223        info!("Starting replication engine");
224
225        self.start_message_handler();
226        self.start_neighbor_sync_loop();
227        self.start_self_lookup_loop();
228        self.start_audit_loop();
229        self.start_fetch_worker();
230        self.start_verification_worker();
231        self.start_bootstrap_sync(dht_events);
232        self.start_fresh_write_drainer();
233
234        info!(
235            "Replication engine started with {} background tasks",
236            self.task_handles.len()
237        );
238    }
239
240    /// Returns `true` if the node is still in the replication bootstrap phase.
241    ///
242    /// During bootstrap, audit challenges return `Bootstrapping` instead of
243    /// digests, and neighbor sync responses carry `bootstrapping: true`.
244    pub async fn is_bootstrapping(&self) -> bool {
245        *self.is_bootstrapping.read().await
246    }
247
248    /// Wait until the replication bootstrap phase completes.
249    ///
250    /// Returns immediately if bootstrap has already completed. Useful for
251    /// readiness probes, health checks, and test harnesses that need the
252    /// node to be fully operational before proceeding.
253    ///
254    /// Returns `true` if bootstrap completed within the timeout, `false`
255    /// if the timeout elapsed first.
256    pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
257        // Register the notification future *before* checking the flag so that
258        // a transition between the read and the await is not missed.
259        let notified = self.bootstrap_complete_notify.notified();
260        tokio::pin!(notified);
261        notified.as_mut().enable();
262
263        if !*self.is_bootstrapping.read().await {
264            return true;
265        }
266
267        tokio::time::timeout(timeout, notified).await.is_ok()
268    }
269
270    /// Cancel all background tasks and wait for them to terminate.
271    ///
272    /// This must be awaited before dropping the engine when the caller needs
273    /// the `Arc<LmdbStorage>` references held by background tasks to be
274    /// released (e.g. before reopening the same LMDB environment).
275    pub async fn shutdown(&mut self) {
276        self.shutdown.cancel();
277        for (i, mut handle) in self.task_handles.drain(..).enumerate() {
278            match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
279                Ok(Ok(())) => {}
280                Ok(Err(e)) if e.is_cancelled() => {}
281                Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
282                Err(_) => {
283                    warn!("Replication task {i} did not stop within 10s, aborting");
284                    handle.abort();
285                }
286            }
287        }
288    }
289
290    /// Trigger an early neighbor sync round.
291    ///
292    /// Useful after topology changes (new nodes joining, network heal after
293    /// partition) when the caller wants replication to converge faster than
294    /// the regular 10-20 minute cadence.
295    pub fn trigger_neighbor_sync(&self) {
296        self.sync_trigger.notify_one();
297    }
298
299    /// Execute fresh replication for a newly stored record.
300    pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
301        fresh::replicate_fresh(
302            key,
303            data,
304            proof_of_payment,
305            &self.p2p_node,
306            &self.paid_list,
307            &self.config,
308            &self.send_semaphore,
309        )
310        .await;
311    }
312
313    // =======================================================================
314    // Background task launchers
315    // =======================================================================
316
317    /// Spawn a task that drains the fresh-write channel and triggers
318    /// replication for each newly-stored chunk.
319    fn start_fresh_write_drainer(&mut self) {
320        let Some(mut rx) = self.fresh_write_rx.take() else {
321            return;
322        };
323        let p2p = Arc::clone(&self.p2p_node);
324        let paid_list = Arc::clone(&self.paid_list);
325        let config = Arc::clone(&self.config);
326        let send_semaphore = Arc::clone(&self.send_semaphore);
327        let shutdown = self.shutdown.clone();
328
329        let handle = tokio::spawn(async move {
330            loop {
331                tokio::select! {
332                    () = shutdown.cancelled() => break,
333                    event = rx.recv() => {
334                        let Some(event) = event else { break };
335                        fresh::replicate_fresh(
336                            &event.key,
337                            &event.data,
338                            &event.payment_proof,
339                            &p2p,
340                            &paid_list,
341                            &config,
342                            &send_semaphore,
343                        )
344                        .await;
345                    }
346                }
347            }
348            debug!("Fresh-write drainer shut down");
349        });
350        self.task_handles.push(handle);
351    }
352
353    #[allow(clippy::too_many_lines)]
354    fn start_message_handler(&mut self) {
355        let mut p2p_events = self.p2p_node.subscribe_events();
356        let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
357        let p2p = Arc::clone(&self.p2p_node);
358        let storage = Arc::clone(&self.storage);
359        let paid_list = Arc::clone(&self.paid_list);
360        let payment_verifier = Arc::clone(&self.payment_verifier);
361        let queues = Arc::clone(&self.queues);
362        let config = Arc::clone(&self.config);
363        let shutdown = self.shutdown.clone();
364        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
365        let bootstrap_state = Arc::clone(&self.bootstrap_state);
366        let sync_history = Arc::clone(&self.sync_history);
367        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
368        let repair_proofs = Arc::clone(&self.repair_proofs);
369        let sync_trigger = Arc::clone(&self.sync_trigger);
370
371        let handle = tokio::spawn(async move {
372            loop {
373                tokio::select! {
374                    () = shutdown.cancelled() => break,
375                    event = p2p_events.recv() => {
376                        let Ok(event) = event else { continue };
377                        if let P2PEvent::Message {
378                            topic,
379                            source: Some(source),
380                            data,
381                            ..
382                        } = event {
383                            // Determine if this is a replication message
384                            // and whether it arrived via the /rr/ request-response
385                            // path (which wraps payloads in RequestResponseEnvelope).
386                            let rr_info = if topic == REPLICATION_PROTOCOL_ID {
387                                Some((data.clone(), None))
388                            } else if topic.starts_with(RR_PREFIX)
389                                && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
390                            {
391                                P2PNode::parse_request_envelope(&data)
392                                    .filter(|(_, is_resp, _)| !is_resp)
393                                    .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
394                            } else {
395                                None
396                            };
397                            if let Some((payload, rr_message_id)) = rr_info {
398                                match handle_replication_message(
399                                    &source,
400                                    &payload,
401                                    &p2p,
402                                    &storage,
403                                    &paid_list,
404                                    &payment_verifier,
405                                    &queues,
406                                    &config,
407                                    &is_bootstrapping,
408                                    &bootstrap_state,
409                                    &sync_history,
410                                    &sync_cycle_epoch,
411                                    &repair_proofs,
412                                    rr_message_id.as_deref(),
413                                ).await {
414                                    Ok(()) => {}
415                                    Err(e) => {
416                                        debug!(
417                                            "Replication message from {source} error: {e}"
418                                        );
419                                    }
420                                }
421                            }
422                        }
423                    }
424                    // Gap 4: Topology churn handling (Section 13).
425                    //
426                    // The DHT routing table emits KClosestPeersChanged when the
427                    // K-closest peer set actually changes, which is the precise
428                    // signal for triggering neighbor sync. This replaces the
429                    // previous approach of checking every PeerConnected /
430                    // PeerDisconnected event against the close group.
431                    dht_event = dht_events.recv() => {
432                        let Ok(dht_event) = dht_event else { continue };
433                        match dht_event {
434                            DhtNetworkEvent::KClosestPeersChanged { .. } => {
435                                debug!(
436                                    "K-closest peers changed, triggering early neighbor sync"
437                                );
438                                sync_trigger.notify_one();
439                            }
440                            DhtNetworkEvent::PeerRemoved { peer_id } => {
441                                repair_proofs.write().await.remove_peer(&peer_id);
442                            }
443                            _ => {}
444                        }
445                    }
446                }
447            }
448            debug!("Replication message handler shut down");
449        });
450        self.task_handles.push(handle);
451    }
452
453    fn start_neighbor_sync_loop(&mut self) {
454        let p2p = Arc::clone(&self.p2p_node);
455        let storage = Arc::clone(&self.storage);
456        let paid_list = Arc::clone(&self.paid_list);
457        let queues = Arc::clone(&self.queues);
458        let config = Arc::clone(&self.config);
459        let shutdown = self.shutdown.clone();
460        let sync_state = Arc::clone(&self.sync_state);
461        let sync_history = Arc::clone(&self.sync_history);
462        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
463        let repair_proofs = Arc::clone(&self.repair_proofs);
464        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
465        let bootstrap_state = Arc::clone(&self.bootstrap_state);
466        let sync_trigger = Arc::clone(&self.sync_trigger);
467
468        let handle = tokio::spawn(async move {
469            loop {
470                let interval = config.random_neighbor_sync_interval();
471                tokio::select! {
472                    () = shutdown.cancelled() => break,
473                    () = tokio::time::sleep(interval) => {}
474                    () = sync_trigger.notified() => {
475                        debug!("Neighbor sync triggered by topology change");
476                    }
477                }
478                // Wrap the sync round in a select so shutdown cancels
479                // in-progress network operations rather than waiting for
480                // the full round to complete.
481                tokio::select! {
482                    () = shutdown.cancelled() => break,
483                    () = run_neighbor_sync_round(
484                        &p2p,
485                        &storage,
486                        &paid_list,
487                        &queues,
488                        &config,
489                        &sync_state,
490                        &sync_history,
491                        &sync_cycle_epoch,
492                        &repair_proofs,
493                        &is_bootstrapping,
494                        &bootstrap_state,
495                    ) => {}
496                }
497            }
498            debug!("Neighbor sync loop shut down");
499        });
500        self.task_handles.push(handle);
501    }
502
503    fn start_self_lookup_loop(&mut self) {
504        let p2p = Arc::clone(&self.p2p_node);
505        let config = Arc::clone(&self.config);
506        let shutdown = self.shutdown.clone();
507
508        let handle = tokio::spawn(async move {
509            loop {
510                let interval = config.random_self_lookup_interval();
511                tokio::select! {
512                    () = shutdown.cancelled() => break,
513                    () = tokio::time::sleep(interval) => {
514                        if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
515                            debug!("Self-lookup failed: {e}");
516                        }
517                    }
518                }
519            }
520            debug!("Self-lookup loop shut down");
521        });
522        self.task_handles.push(handle);
523    }
524
525    fn start_audit_loop(&mut self) {
526        let p2p = Arc::clone(&self.p2p_node);
527        let storage = Arc::clone(&self.storage);
528        let config = Arc::clone(&self.config);
529        let shutdown = self.shutdown.clone();
530        let sync_history = Arc::clone(&self.sync_history);
531        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
532        let repair_proofs = Arc::clone(&self.repair_proofs);
533        let bootstrap_state = Arc::clone(&self.bootstrap_state);
534        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
535        let sync_state = Arc::clone(&self.sync_state);
536
537        let handle = tokio::spawn(async move {
538            // Invariant 19: wait for bootstrap to drain before starting audits.
539            loop {
540                tokio::select! {
541                    () = shutdown.cancelled() => return,
542                    () = tokio::time::sleep(
543                        std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
544                    ) => {
545                        if bootstrap_state.read().await.is_drained() {
546                            break;
547                        }
548                    }
549                }
550            }
551
552            // Run one audit tick immediately after bootstrap drain.
553            {
554                let bootstrapping = *is_bootstrapping.read().await;
555                let result = {
556                    let history = sync_history.read().await;
557                    let current_sync_epoch = *sync_cycle_epoch.read().await;
558                    audit::audit_tick_with_repair_proofs(
559                        &p2p,
560                        &storage,
561                        &config,
562                        &history,
563                        &repair_proofs,
564                        current_sync_epoch,
565                        bootstrapping,
566                    )
567                    .await
568                };
569                handle_audit_result(&result, &p2p, &sync_state, &config).await;
570            }
571
572            // Then run periodically.
573            loop {
574                let interval = config.random_audit_tick_interval();
575                tokio::select! {
576                    () = shutdown.cancelled() => break,
577                    () = tokio::time::sleep(interval) => {
578                        let bootstrapping = *is_bootstrapping.read().await;
579                        let result = {
580                            let history = sync_history.read().await;
581                            let current_sync_epoch = *sync_cycle_epoch.read().await;
582                            audit::audit_tick_with_repair_proofs(
583                                &p2p,
584                                &storage,
585                                &config,
586                                &history,
587                                &repair_proofs,
588                                current_sync_epoch,
589                                bootstrapping,
590                            )
591                            .await
592                        };
593                        handle_audit_result(&result, &p2p, &sync_state, &config).await;
594                    }
595                }
596            }
597            debug!("Audit loop shut down");
598        });
599        self.task_handles.push(handle);
600    }
601
602    #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
603    fn start_fetch_worker(&mut self) {
604        let p2p = Arc::clone(&self.p2p_node);
605        let storage = Arc::clone(&self.storage);
606        let queues = Arc::clone(&self.queues);
607        let config = Arc::clone(&self.config);
608        let shutdown = self.shutdown.clone();
609        let bootstrap_state = Arc::clone(&self.bootstrap_state);
610        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
611        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
612        let concurrency = max_parallel_fetch();
613
614        info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
615
616        let handle = tokio::spawn(async move {
617            // Each in-flight future yields (key, Option<FetchOutcome>) so we
618            // always recover the key — even if the inner task panics.
619            let mut in_flight = FuturesUnordered::<FetchFuture>::new();
620
621            loop {
622                // Fill up to `concurrency` slots from the queue.
623                {
624                    let mut q = queues.write().await;
625                    while in_flight.len() < concurrency {
626                        let Some(candidate) = q.dequeue_fetch() else {
627                            break;
628                        };
629                        let Some(&source) = candidate.sources.first() else {
630                            warn!(
631                                "Fetch candidate {} has no sources — dropping",
632                                hex::encode(candidate.key)
633                            );
634                            continue;
635                        };
636                        q.start_fetch(candidate.key, source, candidate.sources.clone());
637
638                        let p2p = Arc::clone(&p2p);
639                        let storage = Arc::clone(&storage);
640                        let config = Arc::clone(&config);
641                        let token = shutdown.clone();
642                        let fetch_key = candidate.key;
643                        in_flight.push(Box::pin(async move {
644                            let handle = tokio::spawn(async move {
645                                // Cancel-aware: abort when the engine shuts down.
646                                tokio::select! {
647                                    () = token.cancelled() => FetchOutcome {
648                                        key: fetch_key,
649                                        result: FetchResult::SourceFailed,
650                                    },
651                                    outcome = execute_single_fetch(
652                                        p2p, storage, config, fetch_key, source,
653                                    ) => outcome,
654                                }
655                            });
656                            match handle.await {
657                                Ok(outcome) => (outcome.key, Some(outcome)),
658                                Err(e) => {
659                                    error!(
660                                        "Fetch task for {} panicked: {e}",
661                                        hex::encode(fetch_key)
662                                    );
663                                    (fetch_key, None)
664                                }
665                            }
666                        }));
667                    }
668                } // release queues write lock
669
670                if in_flight.is_empty() {
671                    // No work — wait for new items or shutdown.
672                    tokio::select! {
673                        () = shutdown.cancelled() => break,
674                        () = tokio::time::sleep(
675                            std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
676                        ) => continue,
677                    }
678                }
679
680                // Wait for the next fetch to complete and process the result.
681                tokio::select! {
682                    () = shutdown.cancelled() => break,
683                    Some((key, maybe_outcome)) = in_flight.next() => {
684                        let mut q = queues.write().await;
685                        let terminal = if let Some(outcome) = maybe_outcome {
686                            match outcome.result {
687                                FetchResult::Stored => {
688                                    q.complete_fetch(&key);
689                                    true
690                                }
691                                FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
692                                    if let Some(next_peer) = q.retry_fetch(&key) {
693                                        // Spawn a new fetch task for the next source.
694                                        let p2p = Arc::clone(&p2p);
695                                        let storage = Arc::clone(&storage);
696                                        let config = Arc::clone(&config);
697                                        let token = shutdown.clone();
698                                        let fetch_key = key;
699                                        in_flight.push(Box::pin(async move {
700                                            let handle = tokio::spawn(async move {
701                                                tokio::select! {
702                                                    () = token.cancelled() => FetchOutcome {
703                                                        key: fetch_key,
704                                                        result: FetchResult::SourceFailed,
705                                                    },
706                                                    outcome = execute_single_fetch(
707                                                        p2p, storage, config, fetch_key, next_peer,
708                                                    ) => outcome,
709                                                }
710                                            });
711                                            match handle.await {
712                                                Ok(outcome) => (outcome.key, Some(outcome)),
713                                                Err(e) => {
714                                                    error!(
715                                                        "Fetch task for {} panicked: {e}",
716                                                        hex::encode(fetch_key)
717                                                    );
718                                                    (fetch_key, None)
719                                                }
720                                            }
721                                        }));
722                                        false
723                                    } else {
724                                        q.complete_fetch(&key);
725                                        true
726                                    }
727                                }
728                            }
729                        } else {
730                            // Task panicked — reclaim the in-flight slot.
731                            q.complete_fetch(&key);
732                            true
733                        };
734
735                        // Shrink bootstrap pending set on terminal exit.
736                        if terminal {
737                            drop(q); // release queues lock before acquiring bootstrap_state
738                            if !bootstrap_state.read().await.is_drained() {
739                                bootstrap_state.write().await.remove_key(&key);
740                                let q = queues.read().await;
741                                if bootstrap::check_bootstrap_drained(
742                                    &bootstrap_state,
743                                    &q,
744                                )
745                                .await
746                                {
747                                    complete_bootstrap(
748                                        &is_bootstrapping,
749                                        &bootstrap_complete_notify,
750                                    ).await;
751                                }
752                            }
753                        }
754                    }
755                }
756            }
757
758            // Cancel and drain remaining in-flight fetches on shutdown.
759            // The CancellationToken is already cancelled by this point, so
760            // spawned tasks will see cancellation via their select! branches.
761            while in_flight.next().await.is_some() {}
762            debug!("Fetch worker shut down");
763        });
764        self.task_handles.push(handle);
765    }
766
767    fn start_verification_worker(&mut self) {
768        let p2p = Arc::clone(&self.p2p_node);
769        let storage = Arc::clone(&self.storage);
770        let queues = Arc::clone(&self.queues);
771        let paid_list = Arc::clone(&self.paid_list);
772        let config = Arc::clone(&self.config);
773        let shutdown = self.shutdown.clone();
774        let bootstrap_state = Arc::clone(&self.bootstrap_state);
775        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
776        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
777
778        let handle = tokio::spawn(async move {
779            loop {
780                tokio::select! {
781                    () = shutdown.cancelled() => break,
782                    () = tokio::time::sleep(
783                        std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
784                    ) => {
785                        let ctx = VerificationCycleContext {
786                            p2p_node: &p2p,
787                            paid_list: &paid_list,
788                            storage: &storage,
789                            queues: &queues,
790                            config: &config,
791                            bootstrap_state: &bootstrap_state,
792                            is_bootstrapping: &is_bootstrapping,
793                            bootstrap_complete_notify: &bootstrap_complete_notify,
794                        };
795                        run_verification_cycle(ctx).await;
796                    }
797                }
798            }
799            debug!("Verification worker shut down");
800        });
801        self.task_handles.push(handle);
802    }
803
804    /// Gap 3: Run a one-shot bootstrap sync on startup.
805    ///
806    /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
807    /// (indicating the routing table is populated) before snapshotting
808    /// close neighbors. Falls back after a timeout so bootstrap nodes
809    /// (which have no peers and therefore never receive the event) still
810    /// proceed.
811    ///
812    /// After the gate, finds close neighbors, syncs with each in
813    /// round-robin batches, admits returned hints into the verification
814    /// pipeline, and tracks discovered keys for bootstrap drain detection.
815    #[allow(clippy::too_many_lines)]
816    fn start_bootstrap_sync(
817        &mut self,
818        dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
819    ) {
820        let p2p = Arc::clone(&self.p2p_node);
821        let storage = Arc::clone(&self.storage);
822        let paid_list = Arc::clone(&self.paid_list);
823        let queues = Arc::clone(&self.queues);
824        let config = Arc::clone(&self.config);
825        let shutdown = self.shutdown.clone();
826        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
827        let bootstrap_state = Arc::clone(&self.bootstrap_state);
828        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
829        let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
830        let repair_proofs = Arc::clone(&self.repair_proofs);
831
832        let handle = tokio::spawn(async move {
833            // Wait for DHT bootstrap to complete before snapshotting
834            // neighbors. The routing table is empty until saorsa-core
835            // finishes its FIND_NODE rounds and bucket refreshes.
836            let gate = bootstrap::wait_for_bootstrap_complete(
837                dht_events,
838                config.bootstrap_complete_timeout_secs,
839                &shutdown,
840            )
841            .await;
842
843            if gate == bootstrap::BootstrapGateResult::Shutdown {
844                return;
845            }
846
847            let self_id = *p2p.peer_id();
848            let neighbors =
849                neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
850                    .await;
851
852            if neighbors.is_empty() {
853                info!("Bootstrap sync: no close neighbors found, marking drained");
854                bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
855                complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
856                return;
857            }
858
859            let neighbor_count = neighbors.len();
860            info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
861
862            // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
863            for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
864                if shutdown.is_cancelled() {
865                    break;
866                }
867
868                for peer in batch {
869                    if shutdown.is_cancelled() {
870                        break;
871                    }
872
873                    // Re-read on each iteration so peers see current state.
874                    let bootstrapping = *is_bootstrapping.read().await;
875
876                    bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
877
878                    let outcome = neighbor_sync::sync_with_peer_with_outcome(
879                        peer,
880                        &p2p,
881                        &storage,
882                        &paid_list,
883                        &config,
884                        bootstrapping,
885                    )
886                    .await;
887
888                    bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
889
890                    if let Some(outcome) = outcome {
891                        if !outcome.response.bootstrapping {
892                            record_sent_replica_hints(
893                                peer,
894                                &outcome.sent_replica_hints,
895                                &repair_proofs,
896                                &sync_cycle_epoch,
897                            )
898                            .await;
899                            // Admit hints into verification pipeline.
900                            let outcome = admit_and_queue_hints(
901                                &self_id,
902                                peer,
903                                &outcome.response.replica_hints,
904                                &outcome.response.paid_hints,
905                                &p2p,
906                                &config,
907                                &storage,
908                                &paid_list,
909                                &queues,
910                            )
911                            .await;
912
913                            // Track discovered keys for drain detection.
914                            if !outcome.discovered.is_empty() {
915                                bootstrap::track_discovered_keys(
916                                    &bootstrap_state,
917                                    &outcome.discovered,
918                                )
919                                .await;
920                            }
921
922                            // Record / retire capacity rejections so the
923                            // drain check correctly reflects whether each
924                            // source still owes us re-hinted work after
925                            // queue overflow.
926                            if outcome.capacity_rejected_count > 0 {
927                                bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
928                            } else {
929                                bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
930                            }
931                        }
932                    }
933                }
934            }
935
936            // Check drain condition.
937            {
938                let q = queues.read().await;
939                if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
940                    complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
941                }
942            }
943
944            info!("Bootstrap sync completed");
945        });
946        self.task_handles.push(handle);
947    }
948}
949
950// ===========================================================================
951// Free functions for background tasks
952// ===========================================================================
953
954/// Handle an incoming replication protocol message.
955///
956/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
957/// request-response path and the response must be sent via `send_response`
958/// so saorsa-core can route it back to the waiting `send_request` caller.
959#[allow(clippy::too_many_arguments)]
960async fn handle_replication_message(
961    source: &PeerId,
962    data: &[u8],
963    p2p_node: &Arc<P2PNode>,
964    storage: &Arc<LmdbStorage>,
965    paid_list: &Arc<PaidList>,
966    payment_verifier: &Arc<PaymentVerifier>,
967    queues: &Arc<RwLock<ReplicationQueues>>,
968    config: &ReplicationConfig,
969    is_bootstrapping: &Arc<RwLock<bool>>,
970    bootstrap_state: &Arc<RwLock<BootstrapState>>,
971    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
972    sync_cycle_epoch: &Arc<RwLock<u64>>,
973    repair_proofs: &Arc<RwLock<RepairProofs>>,
974    rr_message_id: Option<&str>,
975) -> Result<()> {
976    let msg = ReplicationMessage::decode(data)
977        .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
978
979    match msg.body {
980        ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
981            handle_fresh_offer(
982                source,
983                offer,
984                storage,
985                paid_list,
986                payment_verifier,
987                p2p_node,
988                config,
989                msg.request_id,
990                rr_message_id,
991            )
992            .await
993        }
994        ReplicationMessageBody::PaidNotify(ref notify) => {
995            handle_paid_notify(
996                source,
997                notify,
998                paid_list,
999                payment_verifier,
1000                p2p_node,
1001                config,
1002            )
1003            .await
1004        }
1005        ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1006            let bootstrapping = *is_bootstrapping.read().await;
1007            handle_neighbor_sync_request(
1008                source,
1009                request,
1010                p2p_node,
1011                storage,
1012                paid_list,
1013                queues,
1014                config,
1015                bootstrapping,
1016                bootstrap_state,
1017                sync_history,
1018                sync_cycle_epoch,
1019                repair_proofs,
1020                msg.request_id,
1021                rr_message_id,
1022            )
1023            .await
1024        }
1025        ReplicationMessageBody::VerificationRequest(ref request) => {
1026            handle_verification_request(
1027                source,
1028                request,
1029                storage,
1030                paid_list,
1031                p2p_node,
1032                msg.request_id,
1033                rr_message_id,
1034            )
1035            .await
1036        }
1037        ReplicationMessageBody::FetchRequest(ref request) => {
1038            handle_fetch_request(
1039                source,
1040                request,
1041                storage,
1042                p2p_node,
1043                msg.request_id,
1044                rr_message_id,
1045            )
1046            .await
1047        }
1048        ReplicationMessageBody::AuditChallenge(ref challenge) => {
1049            let bootstrapping = *is_bootstrapping.read().await;
1050            handle_audit_challenge_msg(
1051                source,
1052                challenge,
1053                storage,
1054                p2p_node,
1055                bootstrapping,
1056                msg.request_id,
1057                rr_message_id,
1058            )
1059            .await
1060        }
1061        // Response messages are handled by their respective request initiators.
1062        ReplicationMessageBody::FreshReplicationResponse(_)
1063        | ReplicationMessageBody::NeighborSyncResponse(_)
1064        | ReplicationMessageBody::VerificationResponse(_)
1065        | ReplicationMessageBody::FetchResponse(_)
1066        | ReplicationMessageBody::AuditResponse(_) => Ok(()),
1067    }
1068}
1069
1070// ---------------------------------------------------------------------------
1071// Per-message-type handlers
1072// ---------------------------------------------------------------------------
1073
1074#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1075async fn handle_fresh_offer(
1076    source: &PeerId,
1077    offer: &protocol::FreshReplicationOffer,
1078    storage: &Arc<LmdbStorage>,
1079    paid_list: &Arc<PaidList>,
1080    payment_verifier: &Arc<PaymentVerifier>,
1081    p2p_node: &Arc<P2PNode>,
1082    config: &ReplicationConfig,
1083    request_id: u64,
1084    rr_message_id: Option<&str>,
1085) -> Result<()> {
1086    let self_id = *p2p_node.peer_id();
1087
1088    // Rule 5: reject if PoP is missing.
1089    if offer.proof_of_payment.is_empty() {
1090        send_replication_response(
1091            source,
1092            p2p_node,
1093            request_id,
1094            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1095                key: offer.key,
1096                reason: "Missing proof of payment".to_string(),
1097            }),
1098            rr_message_id,
1099        )
1100        .await;
1101        return Ok(());
1102    }
1103
1104    // Enforce chunk size invariant: the normal PUT path rejects data larger
1105    // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1106    // prevent peers from pushing oversized records through replication.
1107    if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1108        warn!(
1109            "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1110            hex::encode(offer.key),
1111            offer.data.len(),
1112            crate::ant_protocol::MAX_CHUNK_SIZE,
1113        );
1114        p2p_node
1115            .report_trust_event(
1116                source,
1117                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1118            )
1119            .await;
1120        send_replication_response(
1121            source,
1122            p2p_node,
1123            request_id,
1124            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1125                key: offer.key,
1126                reason: format!(
1127                    "Data size {} exceeds maximum chunk size {}",
1128                    offer.data.len(),
1129                    crate::ant_protocol::MAX_CHUNK_SIZE,
1130                ),
1131            }),
1132            rr_message_id,
1133        )
1134        .await;
1135        return Ok(());
1136    }
1137
1138    // Rule 7: check responsibility.
1139    if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1140        send_replication_response(
1141            source,
1142            p2p_node,
1143            request_id,
1144            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1145                key: offer.key,
1146                reason: "Not responsible for this key".to_string(),
1147            }),
1148            rr_message_id,
1149        )
1150        .await;
1151        return Ok(());
1152    }
1153
1154    // Gap 1: Validate PoP via PaymentVerifier. This is an already-settled
1155    // receipt handed over by a neighbour, not a live sale — Replication
1156    // context skips the storer-being-paid-now checks (own-quote price
1157    // freshness, local recipient, merkle candidate closeness) that would
1158    // otherwise reject every honest hand-over once counts grow, the close
1159    // group churns, or the live DHT drifts from the pool's original sample.
1160    match payment_verifier
1161        .verify_payment(
1162            &offer.key,
1163            Some(&offer.proof_of_payment),
1164            VerificationContext::Replication,
1165        )
1166        .await
1167    {
1168        Ok(status) if status.can_store() => {
1169            debug!(
1170                "PoP validated for fresh offer key {}",
1171                hex::encode(offer.key)
1172            );
1173        }
1174        Ok(_) => {
1175            send_replication_response(
1176                source,
1177                p2p_node,
1178                request_id,
1179                ReplicationMessageBody::FreshReplicationResponse(
1180                    FreshReplicationResponse::Rejected {
1181                        key: offer.key,
1182                        reason: "Payment verification failed: payment required".to_string(),
1183                    },
1184                ),
1185                rr_message_id,
1186            )
1187            .await;
1188            return Ok(());
1189        }
1190        Err(e) => {
1191            warn!(
1192                "PoP verification error for key {}: {e}",
1193                hex::encode(offer.key)
1194            );
1195            send_replication_response(
1196                source,
1197                p2p_node,
1198                request_id,
1199                ReplicationMessageBody::FreshReplicationResponse(
1200                    FreshReplicationResponse::Rejected {
1201                        key: offer.key,
1202                        reason: format!("Payment verification error: {e}"),
1203                    },
1204                ),
1205                rr_message_id,
1206            )
1207            .await;
1208            return Ok(());
1209        }
1210    }
1211
1212    // Rule 6: add to PaidForList.
1213    if let Err(e) = paid_list.insert(&offer.key).await {
1214        warn!("Failed to add key to PaidForList: {e}");
1215    }
1216
1217    // Store the record.
1218    match storage.put(&offer.key, &offer.data).await {
1219        Ok(_) => {
1220            send_replication_response(
1221                source,
1222                p2p_node,
1223                request_id,
1224                ReplicationMessageBody::FreshReplicationResponse(
1225                    FreshReplicationResponse::Accepted { key: offer.key },
1226                ),
1227                rr_message_id,
1228            )
1229            .await;
1230        }
1231        Err(e) => {
1232            send_replication_response(
1233                source,
1234                p2p_node,
1235                request_id,
1236                ReplicationMessageBody::FreshReplicationResponse(
1237                    FreshReplicationResponse::Rejected {
1238                        key: offer.key,
1239                        reason: format!("Storage error: {e}"),
1240                    },
1241                ),
1242                rr_message_id,
1243            )
1244            .await;
1245        }
1246    }
1247
1248    Ok(())
1249}
1250
1251async fn handle_paid_notify(
1252    _source: &PeerId,
1253    notify: &protocol::PaidNotify,
1254    paid_list: &Arc<PaidList>,
1255    payment_verifier: &Arc<PaymentVerifier>,
1256    p2p_node: &Arc<P2PNode>,
1257    config: &ReplicationConfig,
1258) -> Result<()> {
1259    let self_id = *p2p_node.peer_id();
1260
1261    // Rule 3: validate PoP presence before adding.
1262    if notify.proof_of_payment.is_empty() {
1263        return Ok(());
1264    }
1265
1266    // Check if we're in PaidCloseGroup for this key.
1267    if !admission::is_in_paid_close_group(
1268        &self_id,
1269        &notify.key,
1270        p2p_node,
1271        config.paid_list_close_group_size,
1272    )
1273    .await
1274    {
1275        return Ok(());
1276    }
1277
1278    // Gap 1: Validate PoP via PaymentVerifier. Same as the fresh-offer path:
1279    // a settled receipt, so Replication context (see VerificationContext).
1280    match payment_verifier
1281        .verify_payment(
1282            &notify.key,
1283            Some(&notify.proof_of_payment),
1284            VerificationContext::Replication,
1285        )
1286        .await
1287    {
1288        Ok(status) if status.can_store() => {
1289            debug!(
1290                "PoP validated for paid notify key {}",
1291                hex::encode(notify.key)
1292            );
1293        }
1294        Ok(_) => {
1295            warn!(
1296                "Paid notify rejected: payment required for key {}",
1297                hex::encode(notify.key)
1298            );
1299            return Ok(());
1300        }
1301        Err(e) => {
1302            warn!(
1303                "PoP verification error for paid notify key {}: {e}",
1304                hex::encode(notify.key)
1305            );
1306            return Ok(());
1307        }
1308    }
1309
1310    if let Err(e) = paid_list.insert(&notify.key).await {
1311        warn!("Failed to add paid notify key to PaidForList: {e}");
1312    }
1313
1314    Ok(())
1315}
1316
1317#[allow(clippy::too_many_arguments)]
1318async fn handle_neighbor_sync_request(
1319    source: &PeerId,
1320    request: &protocol::NeighborSyncRequest,
1321    p2p_node: &Arc<P2PNode>,
1322    storage: &Arc<LmdbStorage>,
1323    paid_list: &Arc<PaidList>,
1324    queues: &Arc<RwLock<ReplicationQueues>>,
1325    config: &ReplicationConfig,
1326    is_bootstrapping: bool,
1327    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1328    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1329    sync_cycle_epoch: &Arc<RwLock<u64>>,
1330    repair_proofs: &Arc<RwLock<RepairProofs>>,
1331    request_id: u64,
1332    rr_message_id: Option<&str>,
1333) -> Result<()> {
1334    let self_id = *p2p_node.peer_id();
1335
1336    // No per-request hint count limit: the wire message size limit
1337    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
1338    // challenges, sync hints don't drive expensive computation — they just
1339    // enter the verification queue. A per-request limit here would break
1340    // bootstrap replication for newly-joined nodes with 0 stored chunks.
1341
1342    // Build response (outbound hints).
1343    let (response, sent_replica_hints, sender_in_rt) =
1344        neighbor_sync::handle_sync_request_with_proofs(
1345            source,
1346            request,
1347            p2p_node,
1348            storage,
1349            paid_list,
1350            config,
1351            is_bootstrapping,
1352        )
1353        .await;
1354
1355    // Send response.
1356    let response_sent = send_replication_response_checked(
1357        source,
1358        p2p_node,
1359        request_id,
1360        ReplicationMessageBody::NeighborSyncResponse(response),
1361        rr_message_id,
1362    )
1363    .await;
1364
1365    // Process inbound hints only if sender is in LocalRT (Rule 4-6).
1366    if !sender_in_rt {
1367        return Ok(());
1368    }
1369
1370    // Update sync history for this peer before recording repair proofs so a
1371    // same-tick audit cannot combine a fresh key proof with stale peer maturity.
1372    {
1373        let mut history = sync_history.write().await;
1374        let record = history.entry(*source).or_insert(PeerSyncRecord {
1375            last_sync: None,
1376            cycles_since_sync: 0,
1377        });
1378        record.last_sync = Some(Instant::now());
1379        record.cycles_since_sync = 0;
1380    }
1381
1382    if response_sent && !request.bootstrapping {
1383        record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
1384            .await;
1385    }
1386
1387    // Admit inbound hints and queue for verification.
1388    let outcome = admit_and_queue_hints(
1389        &self_id,
1390        source,
1391        &request.replica_hints,
1392        &request.paid_hints,
1393        p2p_node,
1394        config,
1395        storage,
1396        paid_list,
1397        queues,
1398    )
1399    .await;
1400
1401    // Track discovered keys for bootstrap drain detection so that hints
1402    // admitted via inbound sync requests are not missed. Capacity-rejected
1403    // hints keep this source on the "not yet drained" list until its next
1404    // sync re-admits them; a clean cycle clears the source.
1405    if is_bootstrapping {
1406        if !outcome.discovered.is_empty() {
1407            bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1408        }
1409        if outcome.capacity_rejected_count > 0 {
1410            bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
1411        } else {
1412            bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
1413        }
1414    }
1415
1416    Ok(())
1417}
1418
1419async fn handle_verification_request(
1420    source: &PeerId,
1421    request: &protocol::VerificationRequest,
1422    storage: &Arc<LmdbStorage>,
1423    paid_list: &Arc<PaidList>,
1424    p2p_node: &Arc<P2PNode>,
1425    request_id: u64,
1426    rr_message_id: Option<&str>,
1427) -> Result<()> {
1428    // No per-request key count limit: the wire message size limit
1429    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
1430    // does cheap storage lookups per key, not expensive computation like
1431    // audit digest generation.
1432
1433    #[allow(clippy::cast_possible_truncation)]
1434    let keys_len = request.keys.len() as u32;
1435    let paid_check_set: HashSet<u32> = request
1436        .paid_list_check_indices
1437        .iter()
1438        .copied()
1439        .filter(|&idx| {
1440            if idx >= keys_len {
1441                warn!(
1442                    "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1443                    request.keys.len(),
1444                );
1445                false
1446            } else {
1447                true
1448            }
1449        })
1450        .collect();
1451
1452    let mut results = Vec::with_capacity(request.keys.len());
1453    for (i, key) in request.keys.iter().enumerate() {
1454        let present = storage.exists(key).unwrap_or(false);
1455        let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1456            Some(paid_list.contains(key).unwrap_or(false))
1457        } else {
1458            None
1459        };
1460        results.push(protocol::KeyVerificationResult {
1461            key: *key,
1462            present,
1463            paid,
1464        });
1465    }
1466
1467    send_replication_response(
1468        source,
1469        p2p_node,
1470        request_id,
1471        ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1472        rr_message_id,
1473    )
1474    .await;
1475
1476    Ok(())
1477}
1478
1479async fn handle_fetch_request(
1480    source: &PeerId,
1481    request: &protocol::FetchRequest,
1482    storage: &Arc<LmdbStorage>,
1483    p2p_node: &Arc<P2PNode>,
1484    request_id: u64,
1485    rr_message_id: Option<&str>,
1486) -> Result<()> {
1487    let response = match storage.get(&request.key).await {
1488        Ok(Some(data)) => protocol::FetchResponse::Success {
1489            key: request.key,
1490            data,
1491        },
1492        Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1493        Err(e) => protocol::FetchResponse::Error {
1494            key: request.key,
1495            reason: format!("{e}"),
1496        },
1497    };
1498
1499    send_replication_response(
1500        source,
1501        p2p_node,
1502        request_id,
1503        ReplicationMessageBody::FetchResponse(response),
1504        rr_message_id,
1505    )
1506    .await;
1507
1508    Ok(())
1509}
1510
1511async fn handle_audit_challenge_msg(
1512    source: &PeerId,
1513    challenge: &protocol::AuditChallenge,
1514    storage: &Arc<LmdbStorage>,
1515    p2p_node: &Arc<P2PNode>,
1516    is_bootstrapping: bool,
1517    request_id: u64,
1518    rr_message_id: Option<&str>,
1519) -> Result<()> {
1520    #[allow(clippy::cast_possible_truncation)]
1521    let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1522    let response = audit::handle_audit_challenge(
1523        challenge,
1524        storage,
1525        p2p_node.peer_id(),
1526        is_bootstrapping,
1527        stored_chunks,
1528    )
1529    .await;
1530
1531    send_replication_response(
1532        source,
1533        p2p_node,
1534        request_id,
1535        ReplicationMessageBody::AuditResponse(response),
1536        rr_message_id,
1537    )
1538    .await;
1539
1540    Ok(())
1541}
1542
1543// ---------------------------------------------------------------------------
1544// Message sending helper
1545// ---------------------------------------------------------------------------
1546
1547/// Send a replication response message as a best-effort reply.
1548///
1549/// Encode and send failures are logged by the checked helper. Most response
1550/// paths do not need to branch on send success, so this wrapper keeps those
1551/// call sites explicit about their best-effort behavior.
1552async fn send_replication_response(
1553    peer: &PeerId,
1554    p2p_node: &Arc<P2PNode>,
1555    request_id: u64,
1556    body: ReplicationMessageBody,
1557    rr_message_id: Option<&str>,
1558) {
1559    let _ =
1560        send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
1561}
1562
1563/// Send a replication response message and report whether it was accepted.
1564///
1565/// Returns `true` after the message is encoded and accepted by the P2P send
1566/// path. Returns `false` after logging an encode or send failure. Repair-proof
1567/// recording uses this to avoid trusting hints that were not actually sent.
1568///
1569/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
1570/// request-response path so saorsa-core can route it back to the caller's
1571/// `send_request` future. Otherwise it is sent as a plain message.
1572async fn send_replication_response_checked(
1573    peer: &PeerId,
1574    p2p_node: &Arc<P2PNode>,
1575    request_id: u64,
1576    body: ReplicationMessageBody,
1577    rr_message_id: Option<&str>,
1578) -> bool {
1579    let msg = ReplicationMessage { request_id, body };
1580    let encoded = match msg.encode() {
1581        Ok(data) => data,
1582        Err(e) => {
1583            warn!("Failed to encode replication response: {e}");
1584            return false;
1585        }
1586    };
1587    let result = if let Some(msg_id) = rr_message_id {
1588        p2p_node
1589            .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1590            .await
1591    } else {
1592        p2p_node
1593            .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1594            .await
1595    };
1596    if let Err(e) = result {
1597        debug!("Failed to send replication response to {peer}: {e}");
1598        return false;
1599    }
1600    true
1601}
1602
1603async fn record_sent_replica_hints(
1604    peer: &PeerId,
1605    hints: &[neighbor_sync::SentReplicaHint],
1606    repair_proofs: &Arc<RwLock<RepairProofs>>,
1607    sync_cycle_epoch: &Arc<RwLock<u64>>,
1608) {
1609    if hints.is_empty() {
1610        return;
1611    }
1612
1613    let hinted_at_epoch = *sync_cycle_epoch.read().await;
1614    let mut proofs = repair_proofs.write().await;
1615    for hint in hints {
1616        if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
1617            debug!(
1618                "Recorded repair hint proof for peer {peer} and key {}",
1619                hex::encode(hint.key)
1620            );
1621        }
1622    }
1623}
1624
1625// ---------------------------------------------------------------------------
1626// Neighbor sync round
1627// ---------------------------------------------------------------------------
1628
1629/// Run one neighbor sync round.
1630#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1631async fn run_neighbor_sync_round(
1632    p2p_node: &Arc<P2PNode>,
1633    storage: &Arc<LmdbStorage>,
1634    paid_list: &Arc<PaidList>,
1635    queues: &Arc<RwLock<ReplicationQueues>>,
1636    config: &ReplicationConfig,
1637    sync_state: &Arc<RwLock<NeighborSyncState>>,
1638    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1639    sync_cycle_epoch: &Arc<RwLock<u64>>,
1640    repair_proofs: &Arc<RwLock<RepairProofs>>,
1641    is_bootstrapping: &Arc<RwLock<bool>>,
1642    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1643) {
1644    let self_id = *p2p_node.peer_id();
1645    let bootstrapping = *is_bootstrapping.read().await;
1646
1647    // Check if cycle is complete; start new one if needed.
1648    // We check under a read lock, then release it before the expensive
1649    // prune pass and DHT snapshot so other tasks are not starved.
1650    let cycle_complete = sync_state.read().await.is_cycle_complete();
1651    if cycle_complete {
1652        // A completed local neighbor-sync cycle matures key-specific repair
1653        // proofs recorded in earlier epochs.
1654        {
1655            let mut history = sync_history.write().await;
1656            for record in history.values_mut() {
1657                record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1658            }
1659        }
1660        let current_sync_epoch = {
1661            let mut epoch = sync_cycle_epoch.write().await;
1662            *epoch = epoch.saturating_add(1);
1663            *epoch
1664        };
1665
1666        // Post-cycle pruning (Section 11) — runs without holding sync_state.
1667        // Remote prune-confirmation audits are storage-proof audits and only
1668        // run after bootstrap has drained.
1669        let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
1670        pruning::run_prune_pass_with_context(pruning::PrunePassContext {
1671            self_id: &self_id,
1672            storage,
1673            paid_list,
1674            p2p_node,
1675            config,
1676            sync_state,
1677            repair_proofs,
1678            current_sync_epoch,
1679            allow_remote_prune_audits,
1680        })
1681        .await;
1682
1683        // Take fresh close-neighbor snapshot (DHT query, no lock held).
1684        let neighbors =
1685            neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1686                .await;
1687
1688        // Now re-acquire write lock and re-check before swapping cycle.
1689        let mut state = sync_state.write().await;
1690        if state.is_cycle_complete() {
1691            // Preserve cooldown and bootstrap-claim tracking across cycles.
1692            // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
1693            // would reset the abuse detection timer every cycle.
1694            let old_sync_times = std::mem::take(&mut state.last_sync_times);
1695            let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1696            let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
1697            let old_prune_cursor = state.prune_cursor;
1698            *state = NeighborSyncState::new_cycle(neighbors);
1699            state.last_sync_times = old_sync_times;
1700            state.bootstrap_claims = old_bootstrap_claims;
1701            state.bootstrap_claim_history = old_bootstrap_claim_history;
1702            state.prune_cursor = old_prune_cursor;
1703        }
1704    }
1705
1706    // Select batch of peers.
1707    let batch = {
1708        let mut state = sync_state.write().await;
1709        neighbor_sync::select_sync_batch(
1710            &mut state,
1711            config.neighbor_sync_peer_count,
1712            config.neighbor_sync_cooldown,
1713        )
1714    };
1715
1716    if batch.is_empty() {
1717        return;
1718    }
1719
1720    debug!("Neighbor sync: syncing with {} peers", batch.len());
1721
1722    // Sync with each peer in the batch.
1723    for peer in &batch {
1724        let outcome = neighbor_sync::sync_with_peer_with_outcome(
1725            peer,
1726            p2p_node,
1727            storage,
1728            paid_list,
1729            config,
1730            bootstrapping,
1731        )
1732        .await;
1733
1734        if let Some(outcome) = outcome {
1735            handle_sync_response(
1736                &self_id,
1737                peer,
1738                &outcome.response,
1739                &outcome.sent_replica_hints,
1740                p2p_node,
1741                config,
1742                bootstrapping,
1743                bootstrap_state,
1744                storage,
1745                paid_list,
1746                queues,
1747                sync_state,
1748                sync_history,
1749                sync_cycle_epoch,
1750                repair_proofs,
1751            )
1752            .await;
1753        } else {
1754            // Sync failed -- remove peer and try to fill slot.
1755            let replacement = {
1756                let mut state = sync_state.write().await;
1757                neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1758            };
1759
1760            // Attempt sync with the replacement peer (if one was found).
1761            if let Some(replacement_peer) = replacement {
1762                let replacement_outcome = neighbor_sync::sync_with_peer_with_outcome(
1763                    &replacement_peer,
1764                    p2p_node,
1765                    storage,
1766                    paid_list,
1767                    config,
1768                    bootstrapping,
1769                )
1770                .await;
1771
1772                if let Some(outcome) = replacement_outcome {
1773                    handle_sync_response(
1774                        &self_id,
1775                        &replacement_peer,
1776                        &outcome.response,
1777                        &outcome.sent_replica_hints,
1778                        p2p_node,
1779                        config,
1780                        bootstrapping,
1781                        bootstrap_state,
1782                        storage,
1783                        paid_list,
1784                        queues,
1785                        sync_state,
1786                        sync_history,
1787                        sync_cycle_epoch,
1788                        repair_proofs,
1789                    )
1790                    .await;
1791                }
1792            }
1793        }
1794    }
1795}
1796
1797/// Process a successful neighbor sync response: record the sync, check for
1798/// bootstrap claim abuse, and admit inbound hints.
1799#[allow(clippy::too_many_arguments)]
1800async fn handle_sync_response(
1801    self_id: &PeerId,
1802    peer: &PeerId,
1803    resp: &NeighborSyncResponse,
1804    sent_replica_hints: &[neighbor_sync::SentReplicaHint],
1805    p2p_node: &Arc<P2PNode>,
1806    config: &ReplicationConfig,
1807    bootstrapping: bool,
1808    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1809    storage: &Arc<LmdbStorage>,
1810    paid_list: &Arc<PaidList>,
1811    queues: &Arc<RwLock<ReplicationQueues>>,
1812    sync_state: &Arc<RwLock<NeighborSyncState>>,
1813    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1814    sync_cycle_epoch: &Arc<RwLock<u64>>,
1815    repair_proofs: &Arc<RwLock<RepairProofs>>,
1816) {
1817    // Record successful sync.
1818    {
1819        let mut state = sync_state.write().await;
1820        neighbor_sync::record_successful_sync(&mut state, peer);
1821    }
1822    {
1823        let mut history = sync_history.write().await;
1824        let record = history.entry(*peer).or_insert(PeerSyncRecord {
1825            last_sync: None,
1826            cycles_since_sync: 0,
1827        });
1828        record.last_sync = Some(Instant::now());
1829        record.cycles_since_sync = 0;
1830    }
1831
1832    // Process inbound hints from response (skip if peer is bootstrapping).
1833    if resp.bootstrapping {
1834        // Gap 6: BootstrapClaimAbuse grace period enforcement.
1835        // Separate state mutation from network I/O to avoid holding the
1836        // write lock across report_trust_event.
1837        let should_report = {
1838            let now = Instant::now();
1839            let mut state = sync_state.write().await;
1840            match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
1841                BootstrapClaimObservation::WithinGrace { .. } => false,
1842                BootstrapClaimObservation::PastGrace { first_seen } => {
1843                    warn!(
1844                        "Peer {peer} has been claiming bootstrap for {:?}, \
1845                         exceeding grace period of {:?} — reporting abuse",
1846                        now.duration_since(first_seen),
1847                        config.bootstrap_claim_grace_period,
1848                    );
1849                    true
1850                }
1851                BootstrapClaimObservation::Repeated { first_seen } => {
1852                    warn!(
1853                        "Peer {peer} repeated bootstrap claim after previously stopping; \
1854                         first claim was {:?} ago — reporting abuse",
1855                        now.duration_since(first_seen),
1856                    );
1857                    true
1858                }
1859            }
1860        };
1861        if should_report {
1862            p2p_node
1863                .report_trust_event(
1864                    peer,
1865                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1866                )
1867                .await;
1868        }
1869    } else {
1870        // Peer is not claiming bootstrap; clear active claim while retaining
1871        // history so the peer cannot start a second grace window later.
1872        {
1873            let mut state = sync_state.write().await;
1874            state.clear_active_bootstrap_claim(peer);
1875        }
1876        record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
1877        let outcome = admit_and_queue_hints(
1878            self_id,
1879            peer,
1880            &resp.replica_hints,
1881            &resp.paid_hints,
1882            p2p_node,
1883            config,
1884            storage,
1885            paid_list,
1886            queues,
1887        )
1888        .await;
1889
1890        // Track discovered keys for bootstrap drain detection so that hints
1891        // admitted via regular neighbor sync are not missed. Capacity-
1892        // rejected hints keep this source on the "not yet drained" list
1893        // until its next sync replays them; a clean cycle clears it.
1894        if bootstrapping {
1895            if !outcome.discovered.is_empty() {
1896                bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1897            }
1898            if outcome.capacity_rejected_count > 0 {
1899                bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
1900            } else {
1901                bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
1902            }
1903        }
1904    }
1905}
1906
1907/// Admit hints and queue them for verification, returning newly-discovered keys.
1908///
1909/// Shared by neighbor-sync request handling, response handling, and bootstrap
1910/// sync so that admission + queueing logic lives in one place.
1911#[allow(clippy::too_many_arguments)]
1912/// Outcome of [`admit_and_queue_hints`].
1913///
1914/// `capacity_rejected_count` is non-zero when one or more legitimately
1915/// admissible hints were dropped because `pending_verify`'s global or
1916/// per-source bound was hit. Callers that care about completeness
1917/// (bootstrap drain accounting) MUST NOT treat their work as complete while
1918/// this is > 0 — the source will need to re-hint after capacity frees up.
1919struct AdmissionOutcome {
1920    discovered: HashSet<XorName>,
1921    capacity_rejected_count: usize,
1922}
1923
1924#[allow(clippy::too_many_arguments)]
1925async fn admit_and_queue_hints(
1926    self_id: &PeerId,
1927    source_peer: &PeerId,
1928    replica_hints: &[XorName],
1929    paid_hints: &[XorName],
1930    p2p_node: &Arc<P2PNode>,
1931    config: &ReplicationConfig,
1932    storage: &Arc<LmdbStorage>,
1933    paid_list: &Arc<PaidList>,
1934    queues: &Arc<RwLock<ReplicationQueues>>,
1935) -> AdmissionOutcome {
1936    let pending_keys: HashSet<XorName> = {
1937        let q = queues.read().await;
1938        q.pending_keys().into_iter().collect()
1939    };
1940
1941    let admitted = admission::admit_hints(
1942        self_id,
1943        replica_hints,
1944        paid_hints,
1945        p2p_node,
1946        config,
1947        storage,
1948        paid_list,
1949        &pending_keys,
1950    )
1951    .await;
1952
1953    let mut discovered = HashSet::new();
1954    let mut capacity_rejected_count: usize = 0;
1955    let mut q = queues.write().await;
1956    let now = Instant::now();
1957
1958    for key in admitted.replica_keys {
1959        if !storage.exists(&key).unwrap_or(false) {
1960            let result = q.add_pending_verify(
1961                key,
1962                VerificationEntry {
1963                    state: VerificationState::PendingVerify,
1964                    pipeline: HintPipeline::Replica,
1965                    verified_sources: Vec::new(),
1966                    tried_sources: HashSet::new(),
1967                    created_at: now,
1968                    hint_sender: *source_peer,
1969                },
1970            );
1971            match result {
1972                crate::replication::scheduling::AdmissionResult::Admitted => {
1973                    discovered.insert(key);
1974                }
1975                crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1976                crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1977                    capacity_rejected_count += 1;
1978                }
1979            }
1980        }
1981    }
1982
1983    for key in admitted.paid_only_keys {
1984        let result = q.add_pending_verify(
1985            key,
1986            VerificationEntry {
1987                state: VerificationState::PendingVerify,
1988                pipeline: HintPipeline::PaidOnly,
1989                verified_sources: Vec::new(),
1990                tried_sources: HashSet::new(),
1991                created_at: now,
1992                hint_sender: *source_peer,
1993            },
1994        );
1995        match result {
1996            crate::replication::scheduling::AdmissionResult::Admitted => {
1997                discovered.insert(key);
1998            }
1999            crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2000            crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2001                capacity_rejected_count += 1;
2002            }
2003        }
2004    }
2005
2006    if capacity_rejected_count > 0 {
2007        debug!(
2008            "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
2009             rejected at queue capacity; source will need to re-hint after pending_verify drains"
2010        );
2011    }
2012
2013    AdmissionOutcome {
2014        discovered,
2015        capacity_rejected_count,
2016    }
2017}
2018
2019// ---------------------------------------------------------------------------
2020// Verification cycle
2021// ---------------------------------------------------------------------------
2022
2023/// Run one verification cycle: process pending keys through quorum checks.
2024#[allow(clippy::too_many_lines)]
2025async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
2026    let VerificationCycleContext {
2027        p2p_node,
2028        paid_list,
2029        storage,
2030        queues,
2031        config,
2032        bootstrap_state,
2033        is_bootstrapping,
2034        bootstrap_complete_notify,
2035    } = ctx;
2036
2037    // Evict stale entries that have been pending too long (e.g. unreachable
2038    // verification targets during a network partition).
2039    {
2040        let mut q = queues.write().await;
2041        q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
2042    }
2043
2044    let pending_keys = {
2045        let q = queues.read().await;
2046        q.pending_keys()
2047    };
2048
2049    if pending_keys.is_empty() {
2050        return;
2051    }
2052
2053    let self_id = *p2p_node.peer_id();
2054
2055    // Step 1: Check local PaidForList for fast-path authorization (Section 9,
2056    // step 4).
2057    let mut local_paid_presence_probe_keys = Vec::new();
2058    let mut local_paid_paid_only_keys = Vec::new();
2059    let mut keys_needing_network = Vec::new();
2060    let mut terminal_keys: Vec<XorName> = Vec::new();
2061    {
2062        let mut q = queues.write().await;
2063        for key in &pending_keys {
2064            if paid_list.contains(key).unwrap_or(false) {
2065                if let Some(pipeline) =
2066                    q.set_pending_state(key, VerificationState::PaidListVerified)
2067                {
2068                    match pipeline {
2069                        HintPipeline::PaidOnly => {
2070                            // Paid-only + local paid state needs one more
2071                            // responsibility check outside this lock: if we
2072                            // are also in the storage close group, the hint
2073                            // can repair a missing replica.
2074                            local_paid_paid_only_keys.push(*key);
2075                        }
2076                        HintPipeline::Replica => {
2077                            // Local paid-list membership authorizes the key.
2078                            // We still need a presence probe to discover fetch
2079                            // sources, but we must not require remote paid
2080                            // majority or presence quorum.
2081                            local_paid_presence_probe_keys.push(*key);
2082                        }
2083                    }
2084                }
2085            } else {
2086                keys_needing_network.push(*key);
2087            }
2088        }
2089    }
2090
2091    if !local_paid_paid_only_keys.is_empty() {
2092        let mut terminal_paid_only = Vec::new();
2093        for key in local_paid_paid_only_keys {
2094            if storage.exists(&key).unwrap_or(false) {
2095                terminal_paid_only.push(key);
2096            } else if admission::is_responsible(&self_id, &key, p2p_node, config.close_group_size)
2097                .await
2098            {
2099                local_paid_presence_probe_keys.push(key);
2100            } else {
2101                terminal_paid_only.push(key);
2102            }
2103        }
2104
2105        if !terminal_paid_only.is_empty() {
2106            let mut q = queues.write().await;
2107            for key in terminal_paid_only {
2108                q.remove_pending(&key);
2109                terminal_keys.push(key);
2110            }
2111        }
2112    }
2113
2114    // Step 1b: Local paid-list hit for fetch-eligible keys. Per Section 9
2115    // step 4, authorization succeeds immediately; run a presence-only probe
2116    // to find any holder we can fetch from.
2117    if !local_paid_presence_probe_keys.is_empty() {
2118        let targets = quorum::compute_presence_targets(
2119            &local_paid_presence_probe_keys,
2120            p2p_node,
2121            config,
2122            &self_id,
2123        )
2124        .await;
2125        let evidence = quorum::run_verification_round(
2126            &local_paid_presence_probe_keys,
2127            &targets,
2128            p2p_node,
2129            config,
2130        )
2131        .await;
2132
2133        let mut q = queues.write().await;
2134        for key in local_paid_presence_probe_keys {
2135            if storage.exists(&key).unwrap_or(false) {
2136                q.remove_pending(&key);
2137                terminal_keys.push(key);
2138                continue;
2139            }
2140            let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
2141                quorum::present_sources_for_key(&key, ev, &targets)
2142            });
2143            if sources.is_empty() {
2144                // Terminal failure: remove pending and report. No fetch path.
2145                q.remove_pending(&key);
2146                warn!(
2147                    "Locally paid key {} has no responding holders (possible data loss)",
2148                    hex::encode(key)
2149                );
2150                terminal_keys.push(key);
2151            } else {
2152                let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2153                // Atomic remove+enqueue: if fetch_queue is at capacity, the
2154                // pending entry is preserved and retried next cycle (no
2155                // silent drop of verified replica-repair work).
2156                let _ = q.promote_pending_to_fetch(key, distance, sources);
2157            }
2158        }
2159    }
2160
2161    // Steps 2-5: Network verification (skipped if all keys resolved locally).
2162    if !keys_needing_network.is_empty() {
2163        // Step 2: Compute targets and run network verification round.
2164        let targets =
2165            quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
2166                .await;
2167
2168        let evidence =
2169            quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
2170
2171        // Step 3: Evaluate results — collect outcomes without holding the write
2172        // lock across paid-list I/O.
2173        let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
2174        {
2175            let q = queues.read().await;
2176            for key in &keys_needing_network {
2177                let Some(ev) = evidence.get(key) else {
2178                    continue;
2179                };
2180                let Some(entry) = q.get_pending(key) else {
2181                    continue;
2182                };
2183                let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
2184                evaluated.push((*key, outcome, entry.pipeline));
2185            }
2186        } // read lock released
2187
2188        // Step 4: Insert verified keys into PaidForList (no lock held).
2189        let mut paid_insert_keys: Vec<XorName> = Vec::new();
2190        for (key, outcome, _) in &evaluated {
2191            if matches!(
2192                outcome,
2193                KeyVerificationOutcome::QuorumVerified { .. }
2194                    | KeyVerificationOutcome::PaidListVerified { .. }
2195            ) {
2196                paid_insert_keys.push(*key);
2197            }
2198        }
2199        for key in &paid_insert_keys {
2200            if let Err(e) = paid_list.insert(key).await {
2201                warn!("Failed to add verified key to PaidForList: {e}");
2202            }
2203        }
2204
2205        // Paid-only hints normally update PaidForList only. If this node is
2206        // also storage-responsible for the key, a verified paid-only hint can
2207        // safely repair a missing replica using sources from the same
2208        // verification round.
2209        let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
2210        for (key, outcome, pipeline) in &evaluated {
2211            if *pipeline == HintPipeline::PaidOnly
2212                && matches!(
2213                    outcome,
2214                    KeyVerificationOutcome::QuorumVerified { .. }
2215                        | KeyVerificationOutcome::PaidListVerified { .. }
2216                )
2217                && !storage.exists(key).unwrap_or(false)
2218                && admission::is_responsible(&self_id, key, p2p_node, config.close_group_size).await
2219            {
2220                paid_only_fetch_keys.insert(*key);
2221            }
2222        }
2223
2224        // Step 5: Update queues with the evaluated outcomes.
2225        let mut q = queues.write().await;
2226        for (key, outcome, pipeline) in evaluated {
2227            match outcome {
2228                KeyVerificationOutcome::QuorumVerified { sources }
2229                | KeyVerificationOutcome::PaidListVerified { sources } => {
2230                    let fetch_eligible =
2231                        pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
2232                    if fetch_eligible && !sources.is_empty() {
2233                        let distance =
2234                            crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2235                        // Atomic remove+enqueue: on fetch_queue capacity miss
2236                        // the pending entry is preserved so this verified key
2237                        // is retried on the next cycle (no silent drop).
2238                        let _ = q.promote_pending_to_fetch(key, distance, sources);
2239                        // Not terminal — either moved to fetch queue, or
2240                        // retained as pending until queue drains.
2241                    } else if fetch_eligible && sources.is_empty() {
2242                        warn!(
2243                            "Verified responsible key {} has no holders (possible data loss)",
2244                            hex::encode(key)
2245                        );
2246                        q.remove_pending(&key);
2247                        terminal_keys.push(key);
2248                    } else {
2249                        q.remove_pending(&key);
2250                        terminal_keys.push(key);
2251                    }
2252                }
2253                KeyVerificationOutcome::QuorumFailed
2254                | KeyVerificationOutcome::QuorumInconclusive => {
2255                    q.remove_pending(&key);
2256                    terminal_keys.push(key);
2257                }
2258            }
2259        }
2260    }
2261
2262    // Step 6: Remove terminal keys from bootstrap pending set and re-check
2263    // the drain condition.
2264    update_bootstrap_after_verification(
2265        &terminal_keys,
2266        bootstrap_state,
2267        queues,
2268        is_bootstrapping,
2269        bootstrap_complete_notify,
2270    )
2271    .await;
2272}
2273
2274/// Post-verification bootstrap bookkeeping: remove terminal keys from the
2275/// bootstrap pending set and transition out of bootstrapping when drained.
2276async fn update_bootstrap_after_verification(
2277    terminal_keys: &[XorName],
2278    bootstrap_state: &Arc<RwLock<BootstrapState>>,
2279    queues: &Arc<RwLock<ReplicationQueues>>,
2280    is_bootstrapping: &Arc<RwLock<bool>>,
2281    bootstrap_complete_notify: &Arc<Notify>,
2282) {
2283    if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
2284        return;
2285    }
2286    {
2287        let mut bs = bootstrap_state.write().await;
2288        for key in terminal_keys {
2289            bs.remove_key(key);
2290        }
2291    }
2292    let q = queues.read().await;
2293    if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
2294        complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
2295    }
2296}
2297
2298/// Set `is_bootstrapping` to `false` and wake all waiters.
2299async fn complete_bootstrap(
2300    is_bootstrapping: &Arc<RwLock<bool>>,
2301    bootstrap_complete_notify: &Arc<Notify>,
2302) {
2303    *is_bootstrapping.write().await = false;
2304    bootstrap_complete_notify.notify_waiters();
2305    info!("Replication bootstrap complete");
2306}
2307
2308// ---------------------------------------------------------------------------
2309// Fetch types and single-fetch executor
2310// ---------------------------------------------------------------------------
2311
2312/// Result classification for a single fetch attempt.
2313enum FetchResult {
2314    /// Data fetched, integrity-checked, and stored successfully.
2315    Stored,
2316    /// Content-address integrity check failed — do not retry.
2317    IntegrityFailed,
2318    /// Source failed (network error or non-success response) — retryable.
2319    SourceFailed,
2320}
2321
2322/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
2323/// worker loop to update queue state.
2324struct FetchOutcome {
2325    key: XorName,
2326    result: FetchResult,
2327}
2328
2329#[allow(clippy::too_many_lines)]
2330/// Execute a single fetch request against `source` for `key`.
2331///
2332/// Handles encoding, network I/O, integrity checking, storage, and trust
2333/// event reporting.  Returns a [`FetchOutcome`] so the caller can update
2334/// queue state without holding any locks during the network round-trip.
2335async fn execute_single_fetch(
2336    p2p_node: Arc<P2PNode>,
2337    storage: Arc<LmdbStorage>,
2338    config: Arc<ReplicationConfig>,
2339    key: XorName,
2340    source: PeerId,
2341) -> FetchOutcome {
2342    let request = protocol::FetchRequest { key };
2343    let msg = ReplicationMessage {
2344        request_id: rand::thread_rng().gen::<u64>(),
2345        body: ReplicationMessageBody::FetchRequest(request),
2346    };
2347
2348    let encoded = match msg.encode() {
2349        Ok(data) => data,
2350        Err(e) => {
2351            warn!("Failed to encode fetch request: {e}");
2352            return FetchOutcome {
2353                key,
2354                result: FetchResult::SourceFailed,
2355            };
2356        }
2357    };
2358
2359    let result = p2p_node
2360        .send_request(
2361            &source,
2362            REPLICATION_PROTOCOL_ID,
2363            encoded,
2364            config.fetch_request_timeout,
2365        )
2366        .await;
2367
2368    match result {
2369        Ok(response) => {
2370            let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2371                p2p_node
2372                    .report_trust_event(
2373                        &source,
2374                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2375                    )
2376                    .await;
2377                return FetchOutcome {
2378                    key,
2379                    result: FetchResult::SourceFailed,
2380                };
2381            };
2382
2383            match resp_msg.body {
2384                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2385                    key: resp_key,
2386                    data,
2387                }) => {
2388                    // Validate the response key matches the requested key.
2389                    // A malicious peer could serve valid data for a different
2390                    // key, passing integrity checks while the requested key
2391                    // is falsely marked as fetched.
2392                    if resp_key != key {
2393                        warn!(
2394                            "Fetch response key mismatch: requested {}, got {}",
2395                            hex::encode(key),
2396                            hex::encode(resp_key)
2397                        );
2398                        p2p_node
2399                            .report_trust_event(
2400                                &source,
2401                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2402                            )
2403                            .await;
2404                        return FetchOutcome {
2405                            key,
2406                            result: FetchResult::IntegrityFailed,
2407                        };
2408                    }
2409
2410                    // Enforce chunk size invariant on fetched data.
2411                    // Checked before the content-address hash to avoid
2412                    // hashing up to 10 MiB of oversized junk data.
2413                    if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2414                        warn!(
2415                            "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2416                            hex::encode(resp_key),
2417                            data.len(),
2418                            crate::ant_protocol::MAX_CHUNK_SIZE,
2419                        );
2420                        p2p_node
2421                            .report_trust_event(
2422                                &source,
2423                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2424                            )
2425                            .await;
2426                        return FetchOutcome {
2427                            key,
2428                            result: FetchResult::IntegrityFailed,
2429                        };
2430                    }
2431
2432                    // Content-address integrity check.
2433                    let computed = crate::client::compute_address(&data);
2434                    if computed != resp_key {
2435                        warn!(
2436                            "Fetched record integrity check failed: expected {}, got {}",
2437                            hex::encode(resp_key),
2438                            hex::encode(computed)
2439                        );
2440                        p2p_node
2441                            .report_trust_event(
2442                                &source,
2443                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2444                            )
2445                            .await;
2446                        return FetchOutcome {
2447                            key,
2448                            result: FetchResult::IntegrityFailed,
2449                        };
2450                    }
2451
2452                    if let Err(e) = storage.put(&resp_key, &data).await {
2453                        warn!(
2454                            "Failed to store fetched record {}: {e}",
2455                            hex::encode(resp_key)
2456                        );
2457                        return FetchOutcome {
2458                            key,
2459                            result: FetchResult::SourceFailed,
2460                        };
2461                    }
2462
2463                    FetchOutcome {
2464                        key,
2465                        result: FetchResult::Stored,
2466                    }
2467                }
2468                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2469                    ..
2470                }) => {
2471                    // This peer was selected as a fetch source because it
2472                    // recently answered `Present` during verification. A
2473                    // subsequent NotFound is evidence of a stale/false claim
2474                    // or chunk wiping, so penalize lightly and try another
2475                    // verified source.
2476                    warn!(
2477                        "Fetch: verified source {source} returned NotFound for {}",
2478                        hex::encode(key)
2479                    );
2480                    p2p_node
2481                        .report_trust_event(
2482                            &source,
2483                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2484                        )
2485                        .await;
2486                    FetchOutcome {
2487                        key,
2488                        result: FetchResult::SourceFailed,
2489                    }
2490                }
2491                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2492                    reason,
2493                    ..
2494                }) => {
2495                    warn!(
2496                        "Fetch: peer {source} returned error for {}: {reason}",
2497                        hex::encode(key)
2498                    );
2499                    p2p_node
2500                        .report_trust_event(
2501                            &source,
2502                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2503                        )
2504                        .await;
2505                    FetchOutcome {
2506                        key,
2507                        result: FetchResult::SourceFailed,
2508                    }
2509                }
2510                _ => {
2511                    // Unexpected message type — treat as malformed.
2512                    p2p_node
2513                        .report_trust_event(
2514                            &source,
2515                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2516                        )
2517                        .await;
2518                    FetchOutcome {
2519                        key,
2520                        result: FetchResult::SourceFailed,
2521                    }
2522                }
2523            }
2524        }
2525        Err(e) => {
2526            debug!("Fetch request to {source} failed: {e}");
2527            // No ApplicationFailure here — P2PNode::send_request() already
2528            // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
2529            FetchOutcome {
2530                key,
2531                result: FetchResult::SourceFailed,
2532            }
2533        }
2534    }
2535}
2536
2537// ---------------------------------------------------------------------------
2538// Audit result handler
2539// ---------------------------------------------------------------------------
2540
2541/// Handle audit result: log findings and emit trust events.
2542async fn handle_audit_result(
2543    result: &AuditTickResult,
2544    p2p_node: &Arc<P2PNode>,
2545    sync_state: &Arc<RwLock<NeighborSyncState>>,
2546    config: &ReplicationConfig,
2547) {
2548    match result {
2549        AuditTickResult::Passed {
2550            challenged_peer,
2551            keys_checked,
2552        } => {
2553            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2554            // Peer responded normally — clear the active bootstrap claim while
2555            // retaining history so a later claim is treated as repeated abuse.
2556            {
2557                let mut state = sync_state.write().await;
2558                state.clear_active_bootstrap_claim(challenged_peer);
2559            }
2560            p2p_node
2561                .report_trust_event(
2562                    challenged_peer,
2563                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2564                )
2565                .await;
2566        }
2567        AuditTickResult::Failed { evidence } => {
2568            if let FailureEvidence::AuditFailure {
2569                challenged_peer,
2570                confirmed_failed_keys,
2571                summary,
2572                reason,
2573                ..
2574            } = evidence
2575            {
2576                error!(
2577                    "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}",
2578                    confirmed_failed_keys.len(),
2579                    summary.challenged_keys,
2580                    summary.absent_keys,
2581                    summary.digest_mismatch_keys,
2582                );
2583                if audit_failure_clears_bootstrap_claim(reason) {
2584                    // Peer returned a non-bootstrap response — clear the active
2585                    // claim while retaining claim history.
2586                    let mut state = sync_state.write().await;
2587                    state.clear_active_bootstrap_claim(challenged_peer);
2588                } else {
2589                    debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
2590                }
2591                p2p_node
2592                    .report_trust_event(
2593                        challenged_peer,
2594                        TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2595                    )
2596                    .await;
2597            }
2598        }
2599        AuditTickResult::BootstrapClaim { peer } => {
2600            // Gap 6: BootstrapClaimAbuse grace period in audit path.
2601            // Separate state mutation from network I/O to avoid holding the
2602            // write lock across report_trust_event.
2603            let should_report = {
2604                let now = Instant::now();
2605                let mut state = sync_state.write().await;
2606                match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
2607                {
2608                    BootstrapClaimObservation::WithinGrace { .. } => {
2609                        debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2610                        false
2611                    }
2612                    BootstrapClaimObservation::PastGrace { first_seen } => {
2613                        warn!(
2614                            "Audit: peer {peer} claiming bootstrap past grace period \
2615                             ({:?} > {:?}), reporting abuse",
2616                            now.duration_since(first_seen),
2617                            config.bootstrap_claim_grace_period,
2618                        );
2619                        true
2620                    }
2621                    BootstrapClaimObservation::Repeated { first_seen } => {
2622                        warn!(
2623                            "Audit: peer {peer} repeated bootstrap claim after previously \
2624                             stopping; first claim was {:?} ago, reporting abuse",
2625                            now.duration_since(first_seen),
2626                        );
2627                        true
2628                    }
2629                }
2630            };
2631            if should_report {
2632                p2p_node
2633                    .report_trust_event(
2634                        peer,
2635                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2636                    )
2637                    .await;
2638            }
2639        }
2640        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2641    }
2642}
2643
2644fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
2645    !matches!(reason, AuditFailureReason::Timeout)
2646}
2647
2648// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.
2649
2650#[cfg(test)]
2651#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2652mod tests {
2653    use super::audit_failure_clears_bootstrap_claim;
2654    use crate::replication::types::AuditFailureReason;
2655
2656    #[test]
2657    fn audit_timeout_preserves_active_bootstrap_claim() {
2658        assert!(!audit_failure_clears_bootstrap_claim(
2659            &AuditFailureReason::Timeout
2660        ));
2661    }
2662
2663    #[test]
2664    fn decoded_audit_failures_clear_active_bootstrap_claim() {
2665        for reason in [
2666            AuditFailureReason::MalformedResponse,
2667            AuditFailureReason::DigestMismatch,
2668            AuditFailureReason::KeyAbsent,
2669            AuditFailureReason::Rejected,
2670        ] {
2671            assert!(
2672                audit_failure_clears_bootstrap_claim(&reason),
2673                "decoded non-bootstrap failure {reason:?} should clear active claim"
2674            );
2675        }
2676    }
2677}