Skip to main content

ant_node/replication/
mod.rs

1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::PaymentVerifier;
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50    max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51    REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55    FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56    VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61    BootstrapState, FailureEvidence, HintPipeline, NeighborSyncState, PeerSyncRecord,
62    VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68// ---------------------------------------------------------------------------
69// Constants
70// ---------------------------------------------------------------------------
71
72/// Prefix used by saorsa-core's request-response mechanism.
73const RR_PREFIX: &str = "/rr/";
74
75/// Boxed future type for in-flight fetch tasks.
76type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78/// Fetch worker polling interval in milliseconds.
79const FETCH_WORKER_POLL_MS: u64 = 100;
80
81/// Verification worker polling interval in milliseconds.
82const VERIFICATION_WORKER_POLL_MS: u64 = 250;
83
84/// Bootstrap drain check interval in seconds.
85const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
86
87/// Standard trust event weight for per-operation success/failure signals.
88///
89/// Used for individual replication fetch outcomes, integrity check failures,
90/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
91/// is reserved for confirmed audit failures.
92const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
93
94// ---------------------------------------------------------------------------
95// ReplicationEngine
96// ---------------------------------------------------------------------------
97
98/// The replication engine manages all replication background tasks and state.
99pub struct ReplicationEngine {
100    /// Replication configuration (shared across spawned tasks).
101    config: Arc<ReplicationConfig>,
102    /// P2P networking node.
103    p2p_node: Arc<P2PNode>,
104    /// Local chunk storage.
105    storage: Arc<LmdbStorage>,
106    /// Persistent paid-for-list.
107    paid_list: Arc<PaidList>,
108    /// Payment verifier for `PoP` validation.
109    payment_verifier: Arc<PaymentVerifier>,
110    /// Replication pipeline queues.
111    queues: Arc<RwLock<ReplicationQueues>>,
112    /// Neighbor sync cycle state.
113    sync_state: Arc<RwLock<NeighborSyncState>>,
114    /// Per-peer sync history (for `RepairOpportunity`).
115    ///
116    /// This map grows with peer churn and is intentionally unbounded: entries
117    /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
118    /// naturally bounded by the routing table's k-bucket capacity.
119    sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
120    /// Bootstrap state tracking.
121    bootstrap_state: Arc<RwLock<BootstrapState>>,
122    /// Whether this node is currently bootstrapping.
123    is_bootstrapping: Arc<RwLock<bool>>,
124    /// Trigger for early neighbor sync (signalled on topology changes).
125    sync_trigger: Arc<Notify>,
126    /// Notified when `is_bootstrapping` transitions from `true` to `false`.
127    bootstrap_complete_notify: Arc<Notify>,
128    /// Limits concurrent outbound replication sends to prevent bandwidth
129    /// saturation on home broadband connections.
130    send_semaphore: Arc<Semaphore>,
131    /// Receiver for fresh-write events from the chunk PUT handler.
132    ///
133    /// When present, `start()` spawns a drainer task that calls
134    /// `replicate_fresh` for each event.
135    fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
136    /// Shutdown token.
137    shutdown: CancellationToken,
138    /// Background task handles.
139    task_handles: Vec<JoinHandle<()>>,
140}
141
142impl ReplicationEngine {
143    /// Create a new replication engine.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the `PaidList` LMDB environment cannot be opened
148    /// or if the configuration fails validation.
149    pub async fn new(
150        config: ReplicationConfig,
151        p2p_node: Arc<P2PNode>,
152        storage: Arc<LmdbStorage>,
153        payment_verifier: Arc<PaymentVerifier>,
154        root_dir: &Path,
155        fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
156        shutdown: CancellationToken,
157    ) -> Result<Self> {
158        config.validate().map_err(Error::Config)?;
159
160        let paid_list = Arc::new(
161            PaidList::new(root_dir)
162                .await
163                .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
164        );
165
166        let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
167        let config = Arc::new(config);
168
169        Ok(Self {
170            config: Arc::clone(&config),
171            p2p_node,
172            storage,
173            paid_list,
174            payment_verifier,
175            queues: Arc::new(RwLock::new(ReplicationQueues::new())),
176            sync_state: Arc::new(RwLock::new(initial_neighbors)),
177            sync_history: Arc::new(RwLock::new(HashMap::new())),
178            bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
179            is_bootstrapping: Arc::new(RwLock::new(true)),
180            sync_trigger: Arc::new(Notify::new()),
181            bootstrap_complete_notify: Arc::new(Notify::new()),
182            send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
183            fresh_write_rx: Some(fresh_write_rx),
184            shutdown,
185            task_handles: Vec::new(),
186        })
187    }
188
189    /// Get a reference to the `PaidList`.
190    #[must_use]
191    pub fn paid_list(&self) -> &Arc<PaidList> {
192        &self.paid_list
193    }
194
195    /// Start all background tasks.
196    ///
197    /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
198    /// the `BootstrapComplete` event emitted during DHT bootstrap is not
199    /// missed by the bootstrap-sync gate.
200    pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
201        if !self.task_handles.is_empty() {
202            error!("ReplicationEngine::start() called while already running — ignoring");
203            return;
204        }
205        info!("Starting replication engine");
206
207        self.start_message_handler();
208        self.start_neighbor_sync_loop();
209        self.start_self_lookup_loop();
210        self.start_audit_loop();
211        self.start_fetch_worker();
212        self.start_verification_worker();
213        self.start_bootstrap_sync(dht_events);
214        self.start_fresh_write_drainer();
215
216        info!(
217            "Replication engine started with {} background tasks",
218            self.task_handles.len()
219        );
220    }
221
222    /// Returns `true` if the node is still in the replication bootstrap phase.
223    ///
224    /// During bootstrap, audit challenges return `Bootstrapping` instead of
225    /// digests, and neighbor sync responses carry `bootstrapping: true`.
226    pub async fn is_bootstrapping(&self) -> bool {
227        *self.is_bootstrapping.read().await
228    }
229
230    /// Wait until the replication bootstrap phase completes.
231    ///
232    /// Returns immediately if bootstrap has already completed. Useful for
233    /// readiness probes, health checks, and test harnesses that need the
234    /// node to be fully operational before proceeding.
235    ///
236    /// Returns `true` if bootstrap completed within the timeout, `false`
237    /// if the timeout elapsed first.
238    pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
239        // Register the notification future *before* checking the flag so that
240        // a transition between the read and the await is not missed.
241        let notified = self.bootstrap_complete_notify.notified();
242        tokio::pin!(notified);
243        notified.as_mut().enable();
244
245        if !*self.is_bootstrapping.read().await {
246            return true;
247        }
248
249        tokio::time::timeout(timeout, notified).await.is_ok()
250    }
251
252    /// Cancel all background tasks and wait for them to terminate.
253    ///
254    /// This must be awaited before dropping the engine when the caller needs
255    /// the `Arc<LmdbStorage>` references held by background tasks to be
256    /// released (e.g. before reopening the same LMDB environment).
257    pub async fn shutdown(&mut self) {
258        self.shutdown.cancel();
259        for (i, mut handle) in self.task_handles.drain(..).enumerate() {
260            match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
261                Ok(Ok(())) => {}
262                Ok(Err(e)) if e.is_cancelled() => {}
263                Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
264                Err(_) => {
265                    warn!("Replication task {i} did not stop within 10s, aborting");
266                    handle.abort();
267                }
268            }
269        }
270    }
271
272    /// Trigger an early neighbor sync round.
273    ///
274    /// Useful after topology changes (new nodes joining, network heal after
275    /// partition) when the caller wants replication to converge faster than
276    /// the regular 10-20 minute cadence.
277    pub fn trigger_neighbor_sync(&self) {
278        self.sync_trigger.notify_one();
279    }
280
281    /// Execute fresh replication for a newly stored record.
282    pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
283        fresh::replicate_fresh(
284            key,
285            data,
286            proof_of_payment,
287            &self.p2p_node,
288            &self.paid_list,
289            &self.config,
290            &self.send_semaphore,
291        )
292        .await;
293    }
294
295    // =======================================================================
296    // Background task launchers
297    // =======================================================================
298
299    /// Spawn a task that drains the fresh-write channel and triggers
300    /// replication for each newly-stored chunk.
301    fn start_fresh_write_drainer(&mut self) {
302        let Some(mut rx) = self.fresh_write_rx.take() else {
303            return;
304        };
305        let p2p = Arc::clone(&self.p2p_node);
306        let paid_list = Arc::clone(&self.paid_list);
307        let config = Arc::clone(&self.config);
308        let send_semaphore = Arc::clone(&self.send_semaphore);
309        let shutdown = self.shutdown.clone();
310
311        let handle = tokio::spawn(async move {
312            loop {
313                tokio::select! {
314                    () = shutdown.cancelled() => break,
315                    event = rx.recv() => {
316                        let Some(event) = event else { break };
317                        fresh::replicate_fresh(
318                            &event.key,
319                            &event.data,
320                            &event.payment_proof,
321                            &p2p,
322                            &paid_list,
323                            &config,
324                            &send_semaphore,
325                        )
326                        .await;
327                    }
328                }
329            }
330            debug!("Fresh-write drainer shut down");
331        });
332        self.task_handles.push(handle);
333    }
334
335    #[allow(clippy::too_many_lines)]
336    fn start_message_handler(&mut self) {
337        let mut p2p_events = self.p2p_node.subscribe_events();
338        let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
339        let p2p = Arc::clone(&self.p2p_node);
340        let storage = Arc::clone(&self.storage);
341        let paid_list = Arc::clone(&self.paid_list);
342        let payment_verifier = Arc::clone(&self.payment_verifier);
343        let queues = Arc::clone(&self.queues);
344        let config = Arc::clone(&self.config);
345        let shutdown = self.shutdown.clone();
346        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
347        let bootstrap_state = Arc::clone(&self.bootstrap_state);
348        let sync_history = Arc::clone(&self.sync_history);
349        let sync_trigger = Arc::clone(&self.sync_trigger);
350
351        let handle = tokio::spawn(async move {
352            loop {
353                tokio::select! {
354                    () = shutdown.cancelled() => break,
355                    event = p2p_events.recv() => {
356                        let Ok(event) = event else { continue };
357                        if let P2PEvent::Message {
358                            topic,
359                            source: Some(source),
360                            data,
361                            ..
362                        } = event {
363                            // Determine if this is a replication message
364                            // and whether it arrived via the /rr/ request-response
365                            // path (which wraps payloads in RequestResponseEnvelope).
366                            let rr_info = if topic == REPLICATION_PROTOCOL_ID {
367                                Some((data.clone(), None))
368                            } else if topic.starts_with(RR_PREFIX)
369                                && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
370                            {
371                                P2PNode::parse_request_envelope(&data)
372                                    .filter(|(_, is_resp, _)| !is_resp)
373                                    .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
374                            } else {
375                                None
376                            };
377                            if let Some((payload, rr_message_id)) = rr_info {
378                                match handle_replication_message(
379                                    &source,
380                                    &payload,
381                                    &p2p,
382                                    &storage,
383                                    &paid_list,
384                                    &payment_verifier,
385                                    &queues,
386                                    &config,
387                                    &is_bootstrapping,
388                                    &bootstrap_state,
389                                    &sync_history,
390                                    rr_message_id.as_deref(),
391                                ).await {
392                                    Ok(()) => {}
393                                    Err(e) => {
394                                        debug!(
395                                            "Replication message from {source} error: {e}"
396                                        );
397                                    }
398                                }
399                            }
400                        }
401                    }
402                    // Gap 4: Topology churn handling (Section 13).
403                    //
404                    // The DHT routing table emits KClosestPeersChanged when the
405                    // K-closest peer set actually changes, which is the precise
406                    // signal for triggering neighbor sync. This replaces the
407                    // previous approach of checking every PeerConnected /
408                    // PeerDisconnected event against the close group.
409                    dht_event = dht_events.recv() => {
410                        let Ok(dht_event) = dht_event else { continue };
411                        if let DhtNetworkEvent::KClosestPeersChanged { .. } = dht_event {
412                            debug!(
413                                "K-closest peers changed, triggering early neighbor sync"
414                            );
415                            sync_trigger.notify_one();
416                        }
417                    }
418                }
419            }
420            debug!("Replication message handler shut down");
421        });
422        self.task_handles.push(handle);
423    }
424
425    fn start_neighbor_sync_loop(&mut self) {
426        let p2p = Arc::clone(&self.p2p_node);
427        let storage = Arc::clone(&self.storage);
428        let paid_list = Arc::clone(&self.paid_list);
429        let queues = Arc::clone(&self.queues);
430        let config = Arc::clone(&self.config);
431        let shutdown = self.shutdown.clone();
432        let sync_state = Arc::clone(&self.sync_state);
433        let sync_history = Arc::clone(&self.sync_history);
434        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
435        let bootstrap_state = Arc::clone(&self.bootstrap_state);
436        let sync_trigger = Arc::clone(&self.sync_trigger);
437
438        let handle = tokio::spawn(async move {
439            loop {
440                let interval = config.random_neighbor_sync_interval();
441                tokio::select! {
442                    () = shutdown.cancelled() => break,
443                    () = tokio::time::sleep(interval) => {}
444                    () = sync_trigger.notified() => {
445                        debug!("Neighbor sync triggered by topology change");
446                    }
447                }
448                // Wrap the sync round in a select so shutdown cancels
449                // in-progress network operations rather than waiting for
450                // the full round to complete.
451                tokio::select! {
452                    () = shutdown.cancelled() => break,
453                    () = run_neighbor_sync_round(
454                        &p2p,
455                        &storage,
456                        &paid_list,
457                        &queues,
458                        &config,
459                        &sync_state,
460                        &sync_history,
461                        &is_bootstrapping,
462                        &bootstrap_state,
463                    ) => {}
464                }
465            }
466            debug!("Neighbor sync loop shut down");
467        });
468        self.task_handles.push(handle);
469    }
470
471    fn start_self_lookup_loop(&mut self) {
472        let p2p = Arc::clone(&self.p2p_node);
473        let config = Arc::clone(&self.config);
474        let shutdown = self.shutdown.clone();
475
476        let handle = tokio::spawn(async move {
477            loop {
478                let interval = config.random_self_lookup_interval();
479                tokio::select! {
480                    () = shutdown.cancelled() => break,
481                    () = tokio::time::sleep(interval) => {
482                        if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
483                            debug!("Self-lookup failed: {e}");
484                        }
485                    }
486                }
487            }
488            debug!("Self-lookup loop shut down");
489        });
490        self.task_handles.push(handle);
491    }
492
493    fn start_audit_loop(&mut self) {
494        let p2p = Arc::clone(&self.p2p_node);
495        let storage = Arc::clone(&self.storage);
496        let config = Arc::clone(&self.config);
497        let shutdown = self.shutdown.clone();
498        let sync_history = Arc::clone(&self.sync_history);
499        let bootstrap_state = Arc::clone(&self.bootstrap_state);
500        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
501        let sync_state = Arc::clone(&self.sync_state);
502
503        let handle = tokio::spawn(async move {
504            // Invariant 19: wait for bootstrap to drain before starting audits.
505            loop {
506                tokio::select! {
507                    () = shutdown.cancelled() => return,
508                    () = tokio::time::sleep(
509                        std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
510                    ) => {
511                        if bootstrap_state.read().await.is_drained() {
512                            break;
513                        }
514                    }
515                }
516            }
517
518            // Run one audit tick immediately after bootstrap drain.
519            {
520                let bootstrapping = *is_bootstrapping.read().await;
521                // Lock ordering: sync_state before sync_history (consistent
522                // with run_neighbor_sync_round and handle_sync_response).
523                let result = {
524                    let claims = sync_state.read().await;
525                    let history = sync_history.read().await;
526                    audit::audit_tick(
527                        &p2p,
528                        &storage,
529                        &config,
530                        &history,
531                        &claims.bootstrap_claims,
532                        bootstrapping,
533                    )
534                    .await
535                };
536                handle_audit_result(&result, &p2p, &sync_state, &config).await;
537            }
538
539            // Then run periodically.
540            loop {
541                let interval = config.random_audit_tick_interval();
542                tokio::select! {
543                    () = shutdown.cancelled() => break,
544                    () = tokio::time::sleep(interval) => {
545                        let bootstrapping = *is_bootstrapping.read().await;
546                        // Lock ordering: sync_state before sync_history.
547                        let result = {
548                            let claims = sync_state.read().await;
549                            let history = sync_history.read().await;
550                            audit::audit_tick(
551                                &p2p, &storage, &config, &history,
552                                &claims.bootstrap_claims,
553                                bootstrapping,
554                            )
555                            .await
556                        };
557                        handle_audit_result(&result, &p2p, &sync_state, &config).await;
558                    }
559                }
560            }
561            debug!("Audit loop shut down");
562        });
563        self.task_handles.push(handle);
564    }
565
566    #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
567    fn start_fetch_worker(&mut self) {
568        let p2p = Arc::clone(&self.p2p_node);
569        let storage = Arc::clone(&self.storage);
570        let queues = Arc::clone(&self.queues);
571        let config = Arc::clone(&self.config);
572        let shutdown = self.shutdown.clone();
573        let bootstrap_state = Arc::clone(&self.bootstrap_state);
574        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
575        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
576        let concurrency = max_parallel_fetch();
577
578        info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
579
580        let handle = tokio::spawn(async move {
581            // Each in-flight future yields (key, Option<FetchOutcome>) so we
582            // always recover the key — even if the inner task panics.
583            let mut in_flight = FuturesUnordered::<FetchFuture>::new();
584
585            loop {
586                // Fill up to `concurrency` slots from the queue.
587                {
588                    let mut q = queues.write().await;
589                    while in_flight.len() < concurrency {
590                        let Some(candidate) = q.dequeue_fetch() else {
591                            break;
592                        };
593                        let Some(&source) = candidate.sources.first() else {
594                            warn!(
595                                "Fetch candidate {} has no sources — dropping",
596                                hex::encode(candidate.key)
597                            );
598                            continue;
599                        };
600                        q.start_fetch(candidate.key, source, candidate.sources.clone());
601
602                        let p2p = Arc::clone(&p2p);
603                        let storage = Arc::clone(&storage);
604                        let config = Arc::clone(&config);
605                        let token = shutdown.clone();
606                        let fetch_key = candidate.key;
607                        in_flight.push(Box::pin(async move {
608                            let handle = tokio::spawn(async move {
609                                // Cancel-aware: abort when the engine shuts down.
610                                tokio::select! {
611                                    () = token.cancelled() => FetchOutcome {
612                                        key: fetch_key,
613                                        result: FetchResult::SourceFailed,
614                                    },
615                                    outcome = execute_single_fetch(
616                                        p2p, storage, config, fetch_key, source,
617                                    ) => outcome,
618                                }
619                            });
620                            match handle.await {
621                                Ok(outcome) => (outcome.key, Some(outcome)),
622                                Err(e) => {
623                                    error!(
624                                        "Fetch task for {} panicked: {e}",
625                                        hex::encode(fetch_key)
626                                    );
627                                    (fetch_key, None)
628                                }
629                            }
630                        }));
631                    }
632                } // release queues write lock
633
634                if in_flight.is_empty() {
635                    // No work — wait for new items or shutdown.
636                    tokio::select! {
637                        () = shutdown.cancelled() => break,
638                        () = tokio::time::sleep(
639                            std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
640                        ) => continue,
641                    }
642                }
643
644                // Wait for the next fetch to complete and process the result.
645                tokio::select! {
646                    () = shutdown.cancelled() => break,
647                    Some((key, maybe_outcome)) = in_flight.next() => {
648                        let mut q = queues.write().await;
649                        let terminal = if let Some(outcome) = maybe_outcome {
650                            match outcome.result {
651                                FetchResult::Stored => {
652                                    q.complete_fetch(&key);
653                                    true
654                                }
655                                FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
656                                    if let Some(next_peer) = q.retry_fetch(&key) {
657                                        // Spawn a new fetch task for the next source.
658                                        let p2p = Arc::clone(&p2p);
659                                        let storage = Arc::clone(&storage);
660                                        let config = Arc::clone(&config);
661                                        let token = shutdown.clone();
662                                        let fetch_key = key;
663                                        in_flight.push(Box::pin(async move {
664                                            let handle = tokio::spawn(async move {
665                                                tokio::select! {
666                                                    () = token.cancelled() => FetchOutcome {
667                                                        key: fetch_key,
668                                                        result: FetchResult::SourceFailed,
669                                                    },
670                                                    outcome = execute_single_fetch(
671                                                        p2p, storage, config, fetch_key, next_peer,
672                                                    ) => outcome,
673                                                }
674                                            });
675                                            match handle.await {
676                                                Ok(outcome) => (outcome.key, Some(outcome)),
677                                                Err(e) => {
678                                                    error!(
679                                                        "Fetch task for {} panicked: {e}",
680                                                        hex::encode(fetch_key)
681                                                    );
682                                                    (fetch_key, None)
683                                                }
684                                            }
685                                        }));
686                                        false
687                                    } else {
688                                        q.complete_fetch(&key);
689                                        true
690                                    }
691                                }
692                            }
693                        } else {
694                            // Task panicked — reclaim the in-flight slot.
695                            q.complete_fetch(&key);
696                            true
697                        };
698
699                        // Shrink bootstrap pending set on terminal exit.
700                        if terminal {
701                            drop(q); // release queues lock before acquiring bootstrap_state
702                            if !bootstrap_state.read().await.is_drained() {
703                                bootstrap_state.write().await.remove_key(&key);
704                                let q = queues.read().await;
705                                if bootstrap::check_bootstrap_drained(
706                                    &bootstrap_state,
707                                    &q,
708                                )
709                                .await
710                                {
711                                    complete_bootstrap(
712                                        &is_bootstrapping,
713                                        &bootstrap_complete_notify,
714                                    ).await;
715                                }
716                            }
717                        }
718                    }
719                }
720            }
721
722            // Cancel and drain remaining in-flight fetches on shutdown.
723            // The CancellationToken is already cancelled by this point, so
724            // spawned tasks will see cancellation via their select! branches.
725            while in_flight.next().await.is_some() {}
726            debug!("Fetch worker shut down");
727        });
728        self.task_handles.push(handle);
729    }
730
731    fn start_verification_worker(&mut self) {
732        let p2p = Arc::clone(&self.p2p_node);
733        let queues = Arc::clone(&self.queues);
734        let paid_list = Arc::clone(&self.paid_list);
735        let config = Arc::clone(&self.config);
736        let shutdown = self.shutdown.clone();
737        let bootstrap_state = Arc::clone(&self.bootstrap_state);
738        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
739        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
740
741        let handle = tokio::spawn(async move {
742            loop {
743                tokio::select! {
744                    () = shutdown.cancelled() => break,
745                    () = tokio::time::sleep(
746                        std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
747                    ) => {
748                        run_verification_cycle(
749                            &p2p, &paid_list, &queues, &config,
750                            &bootstrap_state, &is_bootstrapping,
751                            &bootstrap_complete_notify,
752                        ).await;
753                    }
754                }
755            }
756            debug!("Verification worker shut down");
757        });
758        self.task_handles.push(handle);
759    }
760
761    /// Gap 3: Run a one-shot bootstrap sync on startup.
762    ///
763    /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
764    /// (indicating the routing table is populated) before snapshotting
765    /// close neighbors. Falls back after a timeout so bootstrap nodes
766    /// (which have no peers and therefore never receive the event) still
767    /// proceed.
768    ///
769    /// After the gate, finds close neighbors, syncs with each in
770    /// round-robin batches, admits returned hints into the verification
771    /// pipeline, and tracks discovered keys for bootstrap drain detection.
772    fn start_bootstrap_sync(
773        &mut self,
774        dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
775    ) {
776        let p2p = Arc::clone(&self.p2p_node);
777        let storage = Arc::clone(&self.storage);
778        let paid_list = Arc::clone(&self.paid_list);
779        let queues = Arc::clone(&self.queues);
780        let config = Arc::clone(&self.config);
781        let shutdown = self.shutdown.clone();
782        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
783        let bootstrap_state = Arc::clone(&self.bootstrap_state);
784        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
785
786        let handle = tokio::spawn(async move {
787            // Wait for DHT bootstrap to complete before snapshotting
788            // neighbors. The routing table is empty until saorsa-core
789            // finishes its FIND_NODE rounds and bucket refreshes.
790            let gate = bootstrap::wait_for_bootstrap_complete(
791                dht_events,
792                config.bootstrap_complete_timeout_secs,
793                &shutdown,
794            )
795            .await;
796
797            if gate == bootstrap::BootstrapGateResult::Shutdown {
798                return;
799            }
800
801            let self_id = *p2p.peer_id();
802            let neighbors =
803                neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
804                    .await;
805
806            if neighbors.is_empty() {
807                info!("Bootstrap sync: no close neighbors found, marking drained");
808                bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
809                complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
810                return;
811            }
812
813            let neighbor_count = neighbors.len();
814            info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
815
816            // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
817            for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
818                if shutdown.is_cancelled() {
819                    break;
820                }
821
822                for peer in batch {
823                    if shutdown.is_cancelled() {
824                        break;
825                    }
826
827                    // Re-read on each iteration so peers see current state.
828                    let bootstrapping = *is_bootstrapping.read().await;
829
830                    bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
831
832                    let response = neighbor_sync::sync_with_peer(
833                        peer,
834                        &p2p,
835                        &storage,
836                        &paid_list,
837                        &config,
838                        bootstrapping,
839                    )
840                    .await;
841
842                    bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
843
844                    if let Some(resp) = response {
845                        if !resp.bootstrapping {
846                            // Admit hints into verification pipeline.
847                            let admitted_keys = admit_and_queue_hints(
848                                &self_id,
849                                peer,
850                                &resp.replica_hints,
851                                &resp.paid_hints,
852                                &p2p,
853                                &config,
854                                &storage,
855                                &paid_list,
856                                &queues,
857                            )
858                            .await;
859
860                            // Track discovered keys for drain detection.
861                            if !admitted_keys.is_empty() {
862                                bootstrap::track_discovered_keys(&bootstrap_state, &admitted_keys)
863                                    .await;
864                            }
865                        }
866                    }
867                }
868            }
869
870            // Check drain condition.
871            {
872                let q = queues.read().await;
873                if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
874                    complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
875                }
876            }
877
878            info!("Bootstrap sync completed");
879        });
880        self.task_handles.push(handle);
881    }
882}
883
884// ===========================================================================
885// Free functions for background tasks
886// ===========================================================================
887
888/// Handle an incoming replication protocol message.
889///
890/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
891/// request-response path and the response must be sent via `send_response`
892/// so saorsa-core can route it back to the waiting `send_request` caller.
893#[allow(clippy::too_many_arguments)]
894async fn handle_replication_message(
895    source: &PeerId,
896    data: &[u8],
897    p2p_node: &Arc<P2PNode>,
898    storage: &Arc<LmdbStorage>,
899    paid_list: &Arc<PaidList>,
900    payment_verifier: &Arc<PaymentVerifier>,
901    queues: &Arc<RwLock<ReplicationQueues>>,
902    config: &ReplicationConfig,
903    is_bootstrapping: &Arc<RwLock<bool>>,
904    bootstrap_state: &Arc<RwLock<BootstrapState>>,
905    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
906    rr_message_id: Option<&str>,
907) -> Result<()> {
908    let msg = ReplicationMessage::decode(data)
909        .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
910
911    match msg.body {
912        ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
913            handle_fresh_offer(
914                source,
915                offer,
916                storage,
917                paid_list,
918                payment_verifier,
919                p2p_node,
920                config,
921                msg.request_id,
922                rr_message_id,
923            )
924            .await
925        }
926        ReplicationMessageBody::PaidNotify(ref notify) => {
927            handle_paid_notify(
928                source,
929                notify,
930                paid_list,
931                payment_verifier,
932                p2p_node,
933                config,
934            )
935            .await
936        }
937        ReplicationMessageBody::NeighborSyncRequest(ref request) => {
938            let bootstrapping = *is_bootstrapping.read().await;
939            handle_neighbor_sync_request(
940                source,
941                request,
942                p2p_node,
943                storage,
944                paid_list,
945                queues,
946                config,
947                bootstrapping,
948                bootstrap_state,
949                sync_history,
950                msg.request_id,
951                rr_message_id,
952            )
953            .await
954        }
955        ReplicationMessageBody::VerificationRequest(ref request) => {
956            handle_verification_request(
957                source,
958                request,
959                storage,
960                paid_list,
961                p2p_node,
962                msg.request_id,
963                rr_message_id,
964            )
965            .await
966        }
967        ReplicationMessageBody::FetchRequest(ref request) => {
968            handle_fetch_request(
969                source,
970                request,
971                storage,
972                p2p_node,
973                msg.request_id,
974                rr_message_id,
975            )
976            .await
977        }
978        ReplicationMessageBody::AuditChallenge(ref challenge) => {
979            let bootstrapping = *is_bootstrapping.read().await;
980            handle_audit_challenge_msg(
981                source,
982                challenge,
983                storage,
984                p2p_node,
985                bootstrapping,
986                msg.request_id,
987                rr_message_id,
988            )
989            .await
990        }
991        // Response messages are handled by their respective request initiators.
992        ReplicationMessageBody::FreshReplicationResponse(_)
993        | ReplicationMessageBody::NeighborSyncResponse(_)
994        | ReplicationMessageBody::VerificationResponse(_)
995        | ReplicationMessageBody::FetchResponse(_)
996        | ReplicationMessageBody::AuditResponse(_) => Ok(()),
997    }
998}
999
1000// ---------------------------------------------------------------------------
1001// Per-message-type handlers
1002// ---------------------------------------------------------------------------
1003
1004#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1005async fn handle_fresh_offer(
1006    source: &PeerId,
1007    offer: &protocol::FreshReplicationOffer,
1008    storage: &Arc<LmdbStorage>,
1009    paid_list: &Arc<PaidList>,
1010    payment_verifier: &Arc<PaymentVerifier>,
1011    p2p_node: &Arc<P2PNode>,
1012    config: &ReplicationConfig,
1013    request_id: u64,
1014    rr_message_id: Option<&str>,
1015) -> Result<()> {
1016    let self_id = *p2p_node.peer_id();
1017
1018    // Rule 5: reject if PoP is missing.
1019    if offer.proof_of_payment.is_empty() {
1020        send_replication_response(
1021            source,
1022            p2p_node,
1023            request_id,
1024            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1025                key: offer.key,
1026                reason: "Missing proof of payment".to_string(),
1027            }),
1028            rr_message_id,
1029        )
1030        .await;
1031        return Ok(());
1032    }
1033
1034    // Enforce chunk size invariant: the normal PUT path rejects data larger
1035    // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1036    // prevent peers from pushing oversized records through replication.
1037    if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1038        warn!(
1039            "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1040            hex::encode(offer.key),
1041            offer.data.len(),
1042            crate::ant_protocol::MAX_CHUNK_SIZE,
1043        );
1044        p2p_node
1045            .report_trust_event(
1046                source,
1047                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1048            )
1049            .await;
1050        send_replication_response(
1051            source,
1052            p2p_node,
1053            request_id,
1054            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1055                key: offer.key,
1056                reason: format!(
1057                    "Data size {} exceeds maximum chunk size {}",
1058                    offer.data.len(),
1059                    crate::ant_protocol::MAX_CHUNK_SIZE,
1060                ),
1061            }),
1062            rr_message_id,
1063        )
1064        .await;
1065        return Ok(());
1066    }
1067
1068    // Rule 7: check responsibility.
1069    if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1070        send_replication_response(
1071            source,
1072            p2p_node,
1073            request_id,
1074            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1075                key: offer.key,
1076                reason: "Not responsible for this key".to_string(),
1077            }),
1078            rr_message_id,
1079        )
1080        .await;
1081        return Ok(());
1082    }
1083
1084    // Gap 1: Validate PoP via PaymentVerifier.
1085    match payment_verifier
1086        .verify_payment(&offer.key, Some(&offer.proof_of_payment))
1087        .await
1088    {
1089        Ok(status) if status.can_store() => {
1090            debug!(
1091                "PoP validated for fresh offer key {}",
1092                hex::encode(offer.key)
1093            );
1094        }
1095        Ok(_) => {
1096            send_replication_response(
1097                source,
1098                p2p_node,
1099                request_id,
1100                ReplicationMessageBody::FreshReplicationResponse(
1101                    FreshReplicationResponse::Rejected {
1102                        key: offer.key,
1103                        reason: "Payment verification failed: payment required".to_string(),
1104                    },
1105                ),
1106                rr_message_id,
1107            )
1108            .await;
1109            return Ok(());
1110        }
1111        Err(e) => {
1112            warn!(
1113                "PoP verification error for key {}: {e}",
1114                hex::encode(offer.key)
1115            );
1116            send_replication_response(
1117                source,
1118                p2p_node,
1119                request_id,
1120                ReplicationMessageBody::FreshReplicationResponse(
1121                    FreshReplicationResponse::Rejected {
1122                        key: offer.key,
1123                        reason: format!("Payment verification error: {e}"),
1124                    },
1125                ),
1126                rr_message_id,
1127            )
1128            .await;
1129            return Ok(());
1130        }
1131    }
1132
1133    // Rule 6: add to PaidForList.
1134    if let Err(e) = paid_list.insert(&offer.key).await {
1135        warn!("Failed to add key to PaidForList: {e}");
1136    }
1137
1138    // Store the record.
1139    match storage.put(&offer.key, &offer.data).await {
1140        Ok(_) => {
1141            send_replication_response(
1142                source,
1143                p2p_node,
1144                request_id,
1145                ReplicationMessageBody::FreshReplicationResponse(
1146                    FreshReplicationResponse::Accepted { key: offer.key },
1147                ),
1148                rr_message_id,
1149            )
1150            .await;
1151        }
1152        Err(e) => {
1153            send_replication_response(
1154                source,
1155                p2p_node,
1156                request_id,
1157                ReplicationMessageBody::FreshReplicationResponse(
1158                    FreshReplicationResponse::Rejected {
1159                        key: offer.key,
1160                        reason: format!("Storage error: {e}"),
1161                    },
1162                ),
1163                rr_message_id,
1164            )
1165            .await;
1166        }
1167    }
1168
1169    Ok(())
1170}
1171
1172async fn handle_paid_notify(
1173    _source: &PeerId,
1174    notify: &protocol::PaidNotify,
1175    paid_list: &Arc<PaidList>,
1176    payment_verifier: &Arc<PaymentVerifier>,
1177    p2p_node: &Arc<P2PNode>,
1178    config: &ReplicationConfig,
1179) -> Result<()> {
1180    let self_id = *p2p_node.peer_id();
1181
1182    // Rule 3: validate PoP presence before adding.
1183    if notify.proof_of_payment.is_empty() {
1184        return Ok(());
1185    }
1186
1187    // Check if we're in PaidCloseGroup for this key.
1188    if !admission::is_in_paid_close_group(
1189        &self_id,
1190        &notify.key,
1191        p2p_node,
1192        config.paid_list_close_group_size,
1193    )
1194    .await
1195    {
1196        return Ok(());
1197    }
1198
1199    // Gap 1: Validate PoP via PaymentVerifier.
1200    match payment_verifier
1201        .verify_payment(&notify.key, Some(&notify.proof_of_payment))
1202        .await
1203    {
1204        Ok(status) if status.can_store() => {
1205            debug!(
1206                "PoP validated for paid notify key {}",
1207                hex::encode(notify.key)
1208            );
1209        }
1210        Ok(_) => {
1211            warn!(
1212                "Paid notify rejected: payment required for key {}",
1213                hex::encode(notify.key)
1214            );
1215            return Ok(());
1216        }
1217        Err(e) => {
1218            warn!(
1219                "PoP verification error for paid notify key {}: {e}",
1220                hex::encode(notify.key)
1221            );
1222            return Ok(());
1223        }
1224    }
1225
1226    if let Err(e) = paid_list.insert(&notify.key).await {
1227        warn!("Failed to add paid notify key to PaidForList: {e}");
1228    }
1229
1230    Ok(())
1231}
1232
1233#[allow(clippy::too_many_arguments)]
1234async fn handle_neighbor_sync_request(
1235    source: &PeerId,
1236    request: &protocol::NeighborSyncRequest,
1237    p2p_node: &Arc<P2PNode>,
1238    storage: &Arc<LmdbStorage>,
1239    paid_list: &Arc<PaidList>,
1240    queues: &Arc<RwLock<ReplicationQueues>>,
1241    config: &ReplicationConfig,
1242    is_bootstrapping: bool,
1243    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1244    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1245    request_id: u64,
1246    rr_message_id: Option<&str>,
1247) -> Result<()> {
1248    let self_id = *p2p_node.peer_id();
1249
1250    // No per-request hint count limit: the wire message size limit
1251    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
1252    // challenges, sync hints don't drive expensive computation — they just
1253    // enter the verification queue. A per-request limit here would break
1254    // bootstrap replication for newly-joined nodes with 0 stored chunks.
1255
1256    // Build response (outbound hints).
1257    let (response, sender_in_rt) = neighbor_sync::handle_sync_request(
1258        source,
1259        request,
1260        p2p_node,
1261        storage,
1262        paid_list,
1263        config,
1264        is_bootstrapping,
1265    )
1266    .await;
1267
1268    // Send response.
1269    send_replication_response(
1270        source,
1271        p2p_node,
1272        request_id,
1273        ReplicationMessageBody::NeighborSyncResponse(response),
1274        rr_message_id,
1275    )
1276    .await;
1277
1278    // Process inbound hints only if sender is in LocalRT (Rule 4-6).
1279    if !sender_in_rt {
1280        return Ok(());
1281    }
1282
1283    // Update sync history for this peer.
1284    {
1285        let mut history = sync_history.write().await;
1286        let record = history.entry(*source).or_insert(PeerSyncRecord {
1287            last_sync: None,
1288            cycles_since_sync: 0,
1289        });
1290        record.last_sync = Some(Instant::now());
1291        record.cycles_since_sync = 0;
1292    }
1293
1294    // Admit inbound hints and queue for verification.
1295    let admitted_keys = admit_and_queue_hints(
1296        &self_id,
1297        source,
1298        &request.replica_hints,
1299        &request.paid_hints,
1300        p2p_node,
1301        config,
1302        storage,
1303        paid_list,
1304        queues,
1305    )
1306    .await;
1307
1308    // Track discovered keys for bootstrap drain detection so that
1309    // hints admitted via inbound sync requests are not missed.
1310    if is_bootstrapping && !admitted_keys.is_empty() {
1311        bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await;
1312    }
1313
1314    Ok(())
1315}
1316
1317async fn handle_verification_request(
1318    source: &PeerId,
1319    request: &protocol::VerificationRequest,
1320    storage: &Arc<LmdbStorage>,
1321    paid_list: &Arc<PaidList>,
1322    p2p_node: &Arc<P2PNode>,
1323    request_id: u64,
1324    rr_message_id: Option<&str>,
1325) -> Result<()> {
1326    // No per-request key count limit: the wire message size limit
1327    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
1328    // does cheap storage lookups per key, not expensive computation like
1329    // audit digest generation.
1330
1331    #[allow(clippy::cast_possible_truncation)]
1332    let keys_len = request.keys.len() as u32;
1333    let paid_check_set: HashSet<u32> = request
1334        .paid_list_check_indices
1335        .iter()
1336        .copied()
1337        .filter(|&idx| {
1338            if idx >= keys_len {
1339                warn!(
1340                    "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1341                    request.keys.len(),
1342                );
1343                false
1344            } else {
1345                true
1346            }
1347        })
1348        .collect();
1349
1350    let mut results = Vec::with_capacity(request.keys.len());
1351    for (i, key) in request.keys.iter().enumerate() {
1352        let present = storage.exists(key).unwrap_or(false);
1353        let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1354            Some(paid_list.contains(key).unwrap_or(false))
1355        } else {
1356            None
1357        };
1358        results.push(protocol::KeyVerificationResult {
1359            key: *key,
1360            present,
1361            paid,
1362        });
1363    }
1364
1365    send_replication_response(
1366        source,
1367        p2p_node,
1368        request_id,
1369        ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1370        rr_message_id,
1371    )
1372    .await;
1373
1374    Ok(())
1375}
1376
1377async fn handle_fetch_request(
1378    source: &PeerId,
1379    request: &protocol::FetchRequest,
1380    storage: &Arc<LmdbStorage>,
1381    p2p_node: &Arc<P2PNode>,
1382    request_id: u64,
1383    rr_message_id: Option<&str>,
1384) -> Result<()> {
1385    let response = match storage.get(&request.key).await {
1386        Ok(Some(data)) => protocol::FetchResponse::Success {
1387            key: request.key,
1388            data,
1389        },
1390        Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1391        Err(e) => protocol::FetchResponse::Error {
1392            key: request.key,
1393            reason: format!("{e}"),
1394        },
1395    };
1396
1397    send_replication_response(
1398        source,
1399        p2p_node,
1400        request_id,
1401        ReplicationMessageBody::FetchResponse(response),
1402        rr_message_id,
1403    )
1404    .await;
1405
1406    Ok(())
1407}
1408
1409async fn handle_audit_challenge_msg(
1410    source: &PeerId,
1411    challenge: &protocol::AuditChallenge,
1412    storage: &Arc<LmdbStorage>,
1413    p2p_node: &Arc<P2PNode>,
1414    is_bootstrapping: bool,
1415    request_id: u64,
1416    rr_message_id: Option<&str>,
1417) -> Result<()> {
1418    #[allow(clippy::cast_possible_truncation)]
1419    let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1420    let response = audit::handle_audit_challenge(
1421        challenge,
1422        storage,
1423        p2p_node.peer_id(),
1424        is_bootstrapping,
1425        stored_chunks,
1426    )
1427    .await;
1428
1429    send_replication_response(
1430        source,
1431        p2p_node,
1432        request_id,
1433        ReplicationMessageBody::AuditResponse(response),
1434        rr_message_id,
1435    )
1436    .await;
1437
1438    Ok(())
1439}
1440
1441// ---------------------------------------------------------------------------
1442// Message sending helper
1443// ---------------------------------------------------------------------------
1444
1445/// Send a replication response message. Fire-and-forget: logs errors but
1446/// does not propagate them.
1447///
1448/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
1449/// request-response path so saorsa-core can route it back to the caller's
1450/// `send_request` future. Otherwise it is sent as a plain message.
1451async fn send_replication_response(
1452    peer: &PeerId,
1453    p2p_node: &Arc<P2PNode>,
1454    request_id: u64,
1455    body: ReplicationMessageBody,
1456    rr_message_id: Option<&str>,
1457) {
1458    let msg = ReplicationMessage { request_id, body };
1459    let encoded = match msg.encode() {
1460        Ok(data) => data,
1461        Err(e) => {
1462            warn!("Failed to encode replication response: {e}");
1463            return;
1464        }
1465    };
1466    let result = if let Some(msg_id) = rr_message_id {
1467        p2p_node
1468            .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1469            .await
1470    } else {
1471        p2p_node
1472            .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1473            .await
1474    };
1475    if let Err(e) = result {
1476        debug!("Failed to send replication response to {peer}: {e}");
1477    }
1478}
1479
1480// ---------------------------------------------------------------------------
1481// Neighbor sync round
1482// ---------------------------------------------------------------------------
1483
1484/// Run one neighbor sync round.
1485#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1486async fn run_neighbor_sync_round(
1487    p2p_node: &Arc<P2PNode>,
1488    storage: &Arc<LmdbStorage>,
1489    paid_list: &Arc<PaidList>,
1490    queues: &Arc<RwLock<ReplicationQueues>>,
1491    config: &ReplicationConfig,
1492    sync_state: &Arc<RwLock<NeighborSyncState>>,
1493    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1494    is_bootstrapping: &Arc<RwLock<bool>>,
1495    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1496) {
1497    let self_id = *p2p_node.peer_id();
1498    let bootstrapping = *is_bootstrapping.read().await;
1499
1500    // Check if cycle is complete; start new one if needed.
1501    // We check under a read lock, then release it before the expensive
1502    // prune pass and DHT snapshot so other tasks are not starved.
1503    let cycle_complete = sync_state.read().await.is_cycle_complete();
1504    if cycle_complete {
1505        // Post-cycle pruning (Section 11) — runs without holding sync_state.
1506        pruning::run_prune_pass(&self_id, storage, paid_list, p2p_node, config).await;
1507
1508        // Increment `cycles_since_sync` for all peers.
1509        {
1510            let mut history = sync_history.write().await;
1511            for record in history.values_mut() {
1512                record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1513            }
1514        }
1515
1516        // Take fresh close-neighbor snapshot (DHT query, no lock held).
1517        let neighbors =
1518            neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1519                .await;
1520
1521        // Now re-acquire write lock and re-check before swapping cycle.
1522        let mut state = sync_state.write().await;
1523        if state.is_cycle_complete() {
1524            // Preserve last_sync_times and bootstrap_claims across cycles.
1525            // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
1526            // would reset the abuse detection timer every cycle.
1527            let old_sync_times = std::mem::take(&mut state.last_sync_times);
1528            let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1529            *state = NeighborSyncState::new_cycle(neighbors);
1530            state.last_sync_times = old_sync_times;
1531            state.bootstrap_claims = old_bootstrap_claims;
1532        }
1533    }
1534
1535    // Select batch of peers.
1536    let batch = {
1537        let mut state = sync_state.write().await;
1538        neighbor_sync::select_sync_batch(
1539            &mut state,
1540            config.neighbor_sync_peer_count,
1541            config.neighbor_sync_cooldown,
1542        )
1543    };
1544
1545    if batch.is_empty() {
1546        return;
1547    }
1548
1549    debug!("Neighbor sync: syncing with {} peers", batch.len());
1550
1551    // Sync with each peer in the batch.
1552    for peer in &batch {
1553        let response = neighbor_sync::sync_with_peer(
1554            peer,
1555            p2p_node,
1556            storage,
1557            paid_list,
1558            config,
1559            bootstrapping,
1560        )
1561        .await;
1562
1563        if let Some(resp) = response {
1564            handle_sync_response(
1565                &self_id,
1566                peer,
1567                &resp,
1568                p2p_node,
1569                config,
1570                bootstrapping,
1571                bootstrap_state,
1572                storage,
1573                paid_list,
1574                queues,
1575                sync_state,
1576                sync_history,
1577            )
1578            .await;
1579        } else {
1580            // Sync failed -- remove peer and try to fill slot.
1581            let replacement = {
1582                let mut state = sync_state.write().await;
1583                neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1584            };
1585
1586            // Attempt sync with the replacement peer (if one was found).
1587            if let Some(replacement_peer) = replacement {
1588                let replacement_resp = neighbor_sync::sync_with_peer(
1589                    &replacement_peer,
1590                    p2p_node,
1591                    storage,
1592                    paid_list,
1593                    config,
1594                    bootstrapping,
1595                )
1596                .await;
1597
1598                if let Some(resp) = replacement_resp {
1599                    handle_sync_response(
1600                        &self_id,
1601                        &replacement_peer,
1602                        &resp,
1603                        p2p_node,
1604                        config,
1605                        bootstrapping,
1606                        bootstrap_state,
1607                        storage,
1608                        paid_list,
1609                        queues,
1610                        sync_state,
1611                        sync_history,
1612                    )
1613                    .await;
1614                }
1615            }
1616        }
1617    }
1618}
1619
1620/// Process a successful neighbor sync response: record the sync, check for
1621/// bootstrap claim abuse, and admit inbound hints.
1622#[allow(clippy::too_many_arguments)]
1623async fn handle_sync_response(
1624    self_id: &PeerId,
1625    peer: &PeerId,
1626    resp: &NeighborSyncResponse,
1627    p2p_node: &Arc<P2PNode>,
1628    config: &ReplicationConfig,
1629    bootstrapping: bool,
1630    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1631    storage: &Arc<LmdbStorage>,
1632    paid_list: &Arc<PaidList>,
1633    queues: &Arc<RwLock<ReplicationQueues>>,
1634    sync_state: &Arc<RwLock<NeighborSyncState>>,
1635    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1636) {
1637    // Record successful sync.
1638    {
1639        let mut state = sync_state.write().await;
1640        neighbor_sync::record_successful_sync(&mut state, peer);
1641    }
1642    {
1643        let mut history = sync_history.write().await;
1644        let record = history.entry(*peer).or_insert(PeerSyncRecord {
1645            last_sync: None,
1646            cycles_since_sync: 0,
1647        });
1648        record.last_sync = Some(Instant::now());
1649        record.cycles_since_sync = 0;
1650    }
1651
1652    // Process inbound hints from response (skip if peer is bootstrapping).
1653    if resp.bootstrapping {
1654        // Gap 6: BootstrapClaimAbuse grace period enforcement.
1655        // Separate state mutation from network I/O to avoid holding the
1656        // write lock across report_trust_event.
1657        let should_report = {
1658            let now = Instant::now();
1659            let mut state = sync_state.write().await;
1660            let first_seen = state.bootstrap_claims.entry(*peer).or_insert(now);
1661            let claim_age = now.duration_since(*first_seen);
1662            if claim_age > config.bootstrap_claim_grace_period {
1663                warn!(
1664                    "Peer {peer} has been claiming bootstrap for {:?}, \
1665                     exceeding grace period of {:?} — reporting abuse",
1666                    claim_age, config.bootstrap_claim_grace_period,
1667                );
1668                true
1669            } else {
1670                false
1671            }
1672        };
1673        if should_report {
1674            p2p_node
1675                .report_trust_event(
1676                    peer,
1677                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1678                )
1679                .await;
1680        }
1681    } else {
1682        // Peer is not claiming bootstrap; clear any prior claim.
1683        {
1684            let mut state = sync_state.write().await;
1685            state.bootstrap_claims.remove(peer);
1686        }
1687        let admitted_keys = admit_and_queue_hints(
1688            self_id,
1689            peer,
1690            &resp.replica_hints,
1691            &resp.paid_hints,
1692            p2p_node,
1693            config,
1694            storage,
1695            paid_list,
1696            queues,
1697        )
1698        .await;
1699
1700        // Track discovered keys for bootstrap drain detection so that
1701        // hints admitted via regular neighbor sync are not missed.
1702        if bootstrapping && !admitted_keys.is_empty() {
1703            bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await;
1704        }
1705    }
1706}
1707
1708/// Admit hints and queue them for verification, returning newly-discovered keys.
1709///
1710/// Shared by neighbor-sync request handling, response handling, and bootstrap
1711/// sync so that admission + queueing logic lives in one place.
1712#[allow(clippy::too_many_arguments)]
1713async fn admit_and_queue_hints(
1714    self_id: &PeerId,
1715    source_peer: &PeerId,
1716    replica_hints: &[XorName],
1717    paid_hints: &[XorName],
1718    p2p_node: &Arc<P2PNode>,
1719    config: &ReplicationConfig,
1720    storage: &Arc<LmdbStorage>,
1721    paid_list: &Arc<PaidList>,
1722    queues: &Arc<RwLock<ReplicationQueues>>,
1723) -> HashSet<XorName> {
1724    let pending_keys: HashSet<XorName> = {
1725        let q = queues.read().await;
1726        q.pending_keys().into_iter().collect()
1727    };
1728
1729    let admitted = admission::admit_hints(
1730        self_id,
1731        replica_hints,
1732        paid_hints,
1733        p2p_node,
1734        config,
1735        storage,
1736        paid_list,
1737        &pending_keys,
1738    )
1739    .await;
1740
1741    let mut discovered = HashSet::new();
1742    let mut q = queues.write().await;
1743    let now = Instant::now();
1744
1745    for key in admitted.replica_keys {
1746        if !storage.exists(&key).unwrap_or(false) {
1747            let added = q.add_pending_verify(
1748                key,
1749                VerificationEntry {
1750                    state: VerificationState::PendingVerify,
1751                    pipeline: HintPipeline::Replica,
1752                    verified_sources: Vec::new(),
1753                    tried_sources: HashSet::new(),
1754                    created_at: now,
1755                    hint_sender: *source_peer,
1756                },
1757            );
1758            if added {
1759                discovered.insert(key);
1760            }
1761        }
1762    }
1763
1764    for key in admitted.paid_only_keys {
1765        let added = q.add_pending_verify(
1766            key,
1767            VerificationEntry {
1768                state: VerificationState::PendingVerify,
1769                pipeline: HintPipeline::PaidOnly,
1770                verified_sources: Vec::new(),
1771                tried_sources: HashSet::new(),
1772                created_at: now,
1773                hint_sender: *source_peer,
1774            },
1775        );
1776        if added {
1777            discovered.insert(key);
1778        }
1779    }
1780
1781    discovered
1782}
1783
1784// ---------------------------------------------------------------------------
1785// Verification cycle
1786// ---------------------------------------------------------------------------
1787
1788/// Run one verification cycle: process pending keys through quorum checks.
1789#[allow(clippy::too_many_lines)]
1790async fn run_verification_cycle(
1791    p2p_node: &Arc<P2PNode>,
1792    paid_list: &Arc<PaidList>,
1793    queues: &Arc<RwLock<ReplicationQueues>>,
1794    config: &ReplicationConfig,
1795    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1796    is_bootstrapping: &Arc<RwLock<bool>>,
1797    bootstrap_complete_notify: &Arc<Notify>,
1798) {
1799    // Evict stale entries that have been pending too long (e.g. unreachable
1800    // verification targets during a network partition).
1801    {
1802        let mut q = queues.write().await;
1803        q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
1804    }
1805
1806    let pending_keys = {
1807        let q = queues.read().await;
1808        q.pending_keys()
1809    };
1810
1811    if pending_keys.is_empty() {
1812        return;
1813    }
1814
1815    let self_id = *p2p_node.peer_id();
1816
1817    // Step 1: Check local PaidForList for fast-path authorization (Section 9,
1818    // step 4).
1819    let mut keys_needing_network = Vec::new();
1820    let mut terminal_keys: Vec<XorName> = Vec::new();
1821    {
1822        let mut q = queues.write().await;
1823        for key in &pending_keys {
1824            if paid_list.contains(key).unwrap_or(false) {
1825                if let Some(entry) = q.get_pending_mut(key) {
1826                    entry.state = VerificationState::PaidListVerified;
1827                    if entry.pipeline == HintPipeline::PaidOnly {
1828                        // Paid-only pipeline: PaidForList already updated, done.
1829                        q.remove_pending(key);
1830                        terminal_keys.push(*key);
1831                        continue;
1832                    }
1833                }
1834            }
1835            // Both branches (paid locally or not) need network verification.
1836            keys_needing_network.push(*key);
1837        }
1838    }
1839
1840    // Steps 2-5: Network verification (skipped if all keys resolved locally).
1841    if !keys_needing_network.is_empty() {
1842        // Step 2: Compute targets and run network verification round.
1843        let targets =
1844            quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
1845                .await;
1846
1847        let evidence =
1848            quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
1849
1850        // Step 3: Evaluate results — collect outcomes without holding the write
1851        // lock across paid-list I/O.
1852        let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
1853        {
1854            let q = queues.read().await;
1855            for key in &keys_needing_network {
1856                let Some(ev) = evidence.get(key) else {
1857                    continue;
1858                };
1859                let Some(entry) = q.get_pending(key) else {
1860                    continue;
1861                };
1862                let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
1863                evaluated.push((*key, outcome, entry.pipeline));
1864            }
1865        } // read lock released
1866
1867        // Step 4: Insert verified keys into PaidForList (no lock held).
1868        let mut paid_insert_keys: Vec<XorName> = Vec::new();
1869        for (key, outcome, _) in &evaluated {
1870            if matches!(
1871                outcome,
1872                KeyVerificationOutcome::QuorumVerified { .. }
1873                    | KeyVerificationOutcome::PaidListVerified { .. }
1874            ) {
1875                paid_insert_keys.push(*key);
1876            }
1877        }
1878        for key in &paid_insert_keys {
1879            if let Err(e) = paid_list.insert(key).await {
1880                warn!("Failed to add verified key to PaidForList: {e}");
1881            }
1882        }
1883
1884        // Step 5: Update queues with the evaluated outcomes.
1885        let mut q = queues.write().await;
1886        for (key, outcome, pipeline) in evaluated {
1887            match outcome {
1888                KeyVerificationOutcome::QuorumVerified { sources }
1889                | KeyVerificationOutcome::PaidListVerified { sources } => {
1890                    if pipeline == HintPipeline::Replica && !sources.is_empty() {
1891                        let distance =
1892                            crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
1893                        q.remove_pending(&key);
1894                        q.enqueue_fetch(key, distance, sources);
1895                        // Not terminal — key moved to fetch queue.
1896                    } else if pipeline == HintPipeline::Replica && sources.is_empty() {
1897                        warn!(
1898                            "Verified key {} has no holders (possible data loss)",
1899                            hex::encode(key)
1900                        );
1901                        q.remove_pending(&key);
1902                        terminal_keys.push(key);
1903                    } else {
1904                        q.remove_pending(&key);
1905                        terminal_keys.push(key);
1906                    }
1907                }
1908                KeyVerificationOutcome::QuorumFailed
1909                | KeyVerificationOutcome::QuorumInconclusive => {
1910                    q.remove_pending(&key);
1911                    terminal_keys.push(key);
1912                }
1913            }
1914        }
1915    }
1916
1917    // Step 6: Remove terminal keys from bootstrap pending set and re-check
1918    // the drain condition.
1919    update_bootstrap_after_verification(
1920        &terminal_keys,
1921        bootstrap_state,
1922        queues,
1923        is_bootstrapping,
1924        bootstrap_complete_notify,
1925    )
1926    .await;
1927}
1928
1929/// Post-verification bootstrap bookkeeping: remove terminal keys from the
1930/// bootstrap pending set and transition out of bootstrapping when drained.
1931async fn update_bootstrap_after_verification(
1932    terminal_keys: &[XorName],
1933    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1934    queues: &Arc<RwLock<ReplicationQueues>>,
1935    is_bootstrapping: &Arc<RwLock<bool>>,
1936    bootstrap_complete_notify: &Arc<Notify>,
1937) {
1938    if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
1939        return;
1940    }
1941    {
1942        let mut bs = bootstrap_state.write().await;
1943        for key in terminal_keys {
1944            bs.remove_key(key);
1945        }
1946    }
1947    let q = queues.read().await;
1948    if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
1949        complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
1950    }
1951}
1952
1953/// Set `is_bootstrapping` to `false` and wake all waiters.
1954async fn complete_bootstrap(
1955    is_bootstrapping: &Arc<RwLock<bool>>,
1956    bootstrap_complete_notify: &Arc<Notify>,
1957) {
1958    *is_bootstrapping.write().await = false;
1959    bootstrap_complete_notify.notify_waiters();
1960    info!("Replication bootstrap complete");
1961}
1962
1963// ---------------------------------------------------------------------------
1964// Fetch types and single-fetch executor
1965// ---------------------------------------------------------------------------
1966
1967/// Result classification for a single fetch attempt.
1968enum FetchResult {
1969    /// Data fetched, integrity-checked, and stored successfully.
1970    Stored,
1971    /// Content-address integrity check failed — do not retry.
1972    IntegrityFailed,
1973    /// Source failed (network error or non-success response) — retryable.
1974    SourceFailed,
1975}
1976
1977/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
1978/// worker loop to update queue state.
1979struct FetchOutcome {
1980    key: XorName,
1981    result: FetchResult,
1982}
1983
1984#[allow(clippy::too_many_lines)]
1985/// Execute a single fetch request against `source` for `key`.
1986///
1987/// Handles encoding, network I/O, integrity checking, storage, and trust
1988/// event reporting.  Returns a [`FetchOutcome`] so the caller can update
1989/// queue state without holding any locks during the network round-trip.
1990async fn execute_single_fetch(
1991    p2p_node: Arc<P2PNode>,
1992    storage: Arc<LmdbStorage>,
1993    config: Arc<ReplicationConfig>,
1994    key: XorName,
1995    source: PeerId,
1996) -> FetchOutcome {
1997    let request = protocol::FetchRequest { key };
1998    let msg = ReplicationMessage {
1999        request_id: rand::thread_rng().gen::<u64>(),
2000        body: ReplicationMessageBody::FetchRequest(request),
2001    };
2002
2003    let encoded = match msg.encode() {
2004        Ok(data) => data,
2005        Err(e) => {
2006            warn!("Failed to encode fetch request: {e}");
2007            return FetchOutcome {
2008                key,
2009                result: FetchResult::SourceFailed,
2010            };
2011        }
2012    };
2013
2014    let result = p2p_node
2015        .send_request(
2016            &source,
2017            REPLICATION_PROTOCOL_ID,
2018            encoded,
2019            config.fetch_request_timeout,
2020        )
2021        .await;
2022
2023    match result {
2024        Ok(response) => {
2025            let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2026                p2p_node
2027                    .report_trust_event(
2028                        &source,
2029                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2030                    )
2031                    .await;
2032                return FetchOutcome {
2033                    key,
2034                    result: FetchResult::SourceFailed,
2035                };
2036            };
2037
2038            match resp_msg.body {
2039                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2040                    key: resp_key,
2041                    data,
2042                }) => {
2043                    // Validate the response key matches the requested key.
2044                    // A malicious peer could serve valid data for a different
2045                    // key, passing integrity checks while the requested key
2046                    // is falsely marked as fetched.
2047                    if resp_key != key {
2048                        warn!(
2049                            "Fetch response key mismatch: requested {}, got {}",
2050                            hex::encode(key),
2051                            hex::encode(resp_key)
2052                        );
2053                        p2p_node
2054                            .report_trust_event(
2055                                &source,
2056                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2057                            )
2058                            .await;
2059                        return FetchOutcome {
2060                            key,
2061                            result: FetchResult::IntegrityFailed,
2062                        };
2063                    }
2064
2065                    // Enforce chunk size invariant on fetched data.
2066                    // Checked before the content-address hash to avoid
2067                    // hashing up to 10 MiB of oversized junk data.
2068                    if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2069                        warn!(
2070                            "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2071                            hex::encode(resp_key),
2072                            data.len(),
2073                            crate::ant_protocol::MAX_CHUNK_SIZE,
2074                        );
2075                        p2p_node
2076                            .report_trust_event(
2077                                &source,
2078                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2079                            )
2080                            .await;
2081                        return FetchOutcome {
2082                            key,
2083                            result: FetchResult::IntegrityFailed,
2084                        };
2085                    }
2086
2087                    // Content-address integrity check.
2088                    let computed = crate::client::compute_address(&data);
2089                    if computed != resp_key {
2090                        warn!(
2091                            "Fetched record integrity check failed: expected {}, got {}",
2092                            hex::encode(resp_key),
2093                            hex::encode(computed)
2094                        );
2095                        p2p_node
2096                            .report_trust_event(
2097                                &source,
2098                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2099                            )
2100                            .await;
2101                        return FetchOutcome {
2102                            key,
2103                            result: FetchResult::IntegrityFailed,
2104                        };
2105                    }
2106
2107                    if let Err(e) = storage.put(&resp_key, &data).await {
2108                        warn!(
2109                            "Failed to store fetched record {}: {e}",
2110                            hex::encode(resp_key)
2111                        );
2112                        return FetchOutcome {
2113                            key,
2114                            result: FetchResult::SourceFailed,
2115                        };
2116                    }
2117
2118                    FetchOutcome {
2119                        key,
2120                        result: FetchResult::Stored,
2121                    }
2122                }
2123                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2124                    ..
2125                }) => {
2126                    // NotFound is a legitimate response (peer may have pruned
2127                    // the data). No trust penalty — just retry another source.
2128                    debug!("Fetch: peer {source} does not have {}", hex::encode(key));
2129                    FetchOutcome {
2130                        key,
2131                        result: FetchResult::SourceFailed,
2132                    }
2133                }
2134                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2135                    reason,
2136                    ..
2137                }) => {
2138                    warn!(
2139                        "Fetch: peer {source} returned error for {}: {reason}",
2140                        hex::encode(key)
2141                    );
2142                    p2p_node
2143                        .report_trust_event(
2144                            &source,
2145                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2146                        )
2147                        .await;
2148                    FetchOutcome {
2149                        key,
2150                        result: FetchResult::SourceFailed,
2151                    }
2152                }
2153                _ => {
2154                    // Unexpected message type — treat as malformed.
2155                    p2p_node
2156                        .report_trust_event(
2157                            &source,
2158                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2159                        )
2160                        .await;
2161                    FetchOutcome {
2162                        key,
2163                        result: FetchResult::SourceFailed,
2164                    }
2165                }
2166            }
2167        }
2168        Err(e) => {
2169            debug!("Fetch request to {source} failed: {e}");
2170            // No ApplicationFailure here — P2PNode::send_request() already
2171            // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
2172            FetchOutcome {
2173                key,
2174                result: FetchResult::SourceFailed,
2175            }
2176        }
2177    }
2178}
2179
2180// ---------------------------------------------------------------------------
2181// Audit result handler
2182// ---------------------------------------------------------------------------
2183
2184/// Handle audit result: log findings and emit trust events.
2185async fn handle_audit_result(
2186    result: &AuditTickResult,
2187    p2p_node: &Arc<P2PNode>,
2188    sync_state: &Arc<RwLock<NeighborSyncState>>,
2189    config: &ReplicationConfig,
2190) {
2191    match result {
2192        AuditTickResult::Passed {
2193            challenged_peer,
2194            keys_checked,
2195        } => {
2196            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2197            // Peer responded normally — clear any stale bootstrap claim so
2198            // a future legitimate bootstrap claim starts with a fresh timer.
2199            {
2200                let mut state = sync_state.write().await;
2201                state.bootstrap_claims.remove(challenged_peer);
2202            }
2203            p2p_node
2204                .report_trust_event(
2205                    challenged_peer,
2206                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2207                )
2208                .await;
2209        }
2210        AuditTickResult::Failed { evidence } => {
2211            if let FailureEvidence::AuditFailure {
2212                challenged_peer,
2213                confirmed_failed_keys,
2214                ..
2215            } = evidence
2216            {
2217                error!(
2218                    "Audit failure for {challenged_peer}: {} confirmed failed keys",
2219                    confirmed_failed_keys.len()
2220                );
2221                // Peer responded with digests (not a bootstrap claim) — clear
2222                // any stale bootstrap claim timestamp.
2223                {
2224                    let mut state = sync_state.write().await;
2225                    state.bootstrap_claims.remove(challenged_peer);
2226                }
2227                p2p_node
2228                    .report_trust_event(
2229                        challenged_peer,
2230                        TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2231                    )
2232                    .await;
2233            }
2234        }
2235        AuditTickResult::BootstrapClaim { peer } => {
2236            // Gap 6: BootstrapClaimAbuse grace period in audit path.
2237            // Separate state mutation from network I/O to avoid holding the
2238            // write lock across report_trust_event.
2239            let should_report = {
2240                let now = Instant::now();
2241                let mut state = sync_state.write().await;
2242                let first_seen = state.bootstrap_claims.entry(*peer).or_insert(now);
2243                let claim_age = now.duration_since(*first_seen);
2244                if claim_age > config.bootstrap_claim_grace_period {
2245                    warn!(
2246                        "Audit: peer {peer} claiming bootstrap past grace period \
2247                         ({:?} > {:?}), reporting abuse",
2248                        claim_age, config.bootstrap_claim_grace_period,
2249                    );
2250                    true
2251                } else {
2252                    debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2253                    false
2254                }
2255            };
2256            if should_report {
2257                p2p_node
2258                    .report_trust_event(
2259                        peer,
2260                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2261                    )
2262                    .await;
2263            }
2264        }
2265        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2266    }
2267}
2268
2269// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.