Skip to main content

ant_node/replication/
mod.rs

1//! Replication subsystem for the Autonomi network.
2//!
3//! Implements Kademlia-style replication with:
4//! - Fresh replication with `PoP` verification
5//! - Neighbor sync with round-robin cycle management
6//! - Batched quorum verification
7//! - Storage audit protocol (anti-outsourcing)
8//! - `PaidForList` persistence and convergence
9//! - Responsibility pruning with hysteresis
10
11// The replication engine intentionally holds `RwLock` read guards across await
12// boundaries (e.g. reading sync_history while calling audit_tick). Clippy's
13// nursery lint `significant_drop_tightening` flags these, but the guards must
14// remain live for the duration of the call.
15#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::PaymentVerifier;
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50    max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51    REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55    FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56    VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61    BootstrapState, FailureEvidence, HintPipeline, NeighborSyncState, PeerSyncRecord,
62    VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68// ---------------------------------------------------------------------------
69// Constants
70// ---------------------------------------------------------------------------
71
72/// Prefix used by saorsa-core's request-response mechanism.
73const RR_PREFIX: &str = "/rr/";
74
75/// Boxed future type for in-flight fetch tasks.
76type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78/// Fetch worker polling interval in milliseconds.
79const FETCH_WORKER_POLL_MS: u64 = 100;
80
81/// Verification worker polling interval in milliseconds.
82const VERIFICATION_WORKER_POLL_MS: u64 = 250;
83
84/// Bootstrap drain check interval in seconds.
85const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
86
87/// Standard trust event weight for per-operation success/failure signals.
88///
89/// Used for individual replication fetch outcomes, integrity check failures,
90/// and bootstrap claim abuse. Distinct from `AUDIT_FAILURE_TRUST_WEIGHT` which
91/// is reserved for confirmed audit failures.
92const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
93
94// ---------------------------------------------------------------------------
95// ReplicationEngine
96// ---------------------------------------------------------------------------
97
98/// The replication engine manages all replication background tasks and state.
99pub struct ReplicationEngine {
100    /// Replication configuration (shared across spawned tasks).
101    config: Arc<ReplicationConfig>,
102    /// P2P networking node.
103    p2p_node: Arc<P2PNode>,
104    /// Local chunk storage.
105    storage: Arc<LmdbStorage>,
106    /// Persistent paid-for-list.
107    paid_list: Arc<PaidList>,
108    /// Payment verifier for `PoP` validation.
109    payment_verifier: Arc<PaymentVerifier>,
110    /// Replication pipeline queues.
111    queues: Arc<RwLock<ReplicationQueues>>,
112    /// Neighbor sync cycle state.
113    sync_state: Arc<RwLock<NeighborSyncState>>,
114    /// Per-peer sync history (for `RepairOpportunity`).
115    ///
116    /// This map grows with peer churn and is intentionally unbounded: entries
117    /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are
118    /// naturally bounded by the routing table's k-bucket capacity.
119    sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
120    /// Bootstrap state tracking.
121    bootstrap_state: Arc<RwLock<BootstrapState>>,
122    /// Whether this node is currently bootstrapping.
123    is_bootstrapping: Arc<RwLock<bool>>,
124    /// Trigger for early neighbor sync (signalled on topology changes).
125    sync_trigger: Arc<Notify>,
126    /// Notified when `is_bootstrapping` transitions from `true` to `false`.
127    bootstrap_complete_notify: Arc<Notify>,
128    /// Limits concurrent outbound replication sends to prevent bandwidth
129    /// saturation on home broadband connections.
130    send_semaphore: Arc<Semaphore>,
131    /// Receiver for fresh-write events from the chunk PUT handler.
132    ///
133    /// When present, `start()` spawns a drainer task that calls
134    /// `replicate_fresh` for each event.
135    fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
136    /// Shutdown token.
137    shutdown: CancellationToken,
138    /// Background task handles.
139    task_handles: Vec<JoinHandle<()>>,
140}
141
142impl ReplicationEngine {
143    /// Create a new replication engine.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the `PaidList` LMDB environment cannot be opened
148    /// or if the configuration fails validation.
149    pub async fn new(
150        config: ReplicationConfig,
151        p2p_node: Arc<P2PNode>,
152        storage: Arc<LmdbStorage>,
153        payment_verifier: Arc<PaymentVerifier>,
154        root_dir: &Path,
155        fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
156        shutdown: CancellationToken,
157    ) -> Result<Self> {
158        config.validate().map_err(Error::Config)?;
159
160        let paid_list = Arc::new(
161            PaidList::new(root_dir)
162                .await
163                .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
164        );
165
166        let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
167        let config = Arc::new(config);
168
169        Ok(Self {
170            config: Arc::clone(&config),
171            p2p_node,
172            storage,
173            paid_list,
174            payment_verifier,
175            queues: Arc::new(RwLock::new(ReplicationQueues::new())),
176            sync_state: Arc::new(RwLock::new(initial_neighbors)),
177            sync_history: Arc::new(RwLock::new(HashMap::new())),
178            bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
179            is_bootstrapping: Arc::new(RwLock::new(true)),
180            sync_trigger: Arc::new(Notify::new()),
181            bootstrap_complete_notify: Arc::new(Notify::new()),
182            send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
183            fresh_write_rx: Some(fresh_write_rx),
184            shutdown,
185            task_handles: Vec::new(),
186        })
187    }
188
189    /// Get a reference to the `PaidList`.
190    #[must_use]
191    pub fn paid_list(&self) -> &Arc<PaidList> {
192        &self.paid_list
193    }
194
195    /// Start all background tasks.
196    ///
197    /// `dht_events` must be subscribed **before** `P2PNode::start()` so that
198    /// the `BootstrapComplete` event emitted during DHT bootstrap is not
199    /// missed by the bootstrap-sync gate.
200    pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
201        if !self.task_handles.is_empty() {
202            error!("ReplicationEngine::start() called while already running — ignoring");
203            return;
204        }
205        info!("Starting replication engine");
206
207        self.start_message_handler();
208        self.start_neighbor_sync_loop();
209        self.start_self_lookup_loop();
210        self.start_audit_loop();
211        self.start_fetch_worker();
212        self.start_verification_worker();
213        self.start_bootstrap_sync(dht_events);
214        self.start_fresh_write_drainer();
215
216        info!(
217            "Replication engine started with {} background tasks",
218            self.task_handles.len()
219        );
220    }
221
222    /// Returns `true` if the node is still in the replication bootstrap phase.
223    ///
224    /// During bootstrap, audit challenges return `Bootstrapping` instead of
225    /// digests, and neighbor sync responses carry `bootstrapping: true`.
226    pub async fn is_bootstrapping(&self) -> bool {
227        *self.is_bootstrapping.read().await
228    }
229
230    /// Wait until the replication bootstrap phase completes.
231    ///
232    /// Returns immediately if bootstrap has already completed. Useful for
233    /// readiness probes, health checks, and test harnesses that need the
234    /// node to be fully operational before proceeding.
235    ///
236    /// Returns `true` if bootstrap completed within the timeout, `false`
237    /// if the timeout elapsed first.
238    pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
239        // Register the notification future *before* checking the flag so that
240        // a transition between the read and the await is not missed.
241        let notified = self.bootstrap_complete_notify.notified();
242        tokio::pin!(notified);
243        notified.as_mut().enable();
244
245        if !*self.is_bootstrapping.read().await {
246            return true;
247        }
248
249        tokio::time::timeout(timeout, notified).await.is_ok()
250    }
251
252    /// Cancel all background tasks and wait for them to terminate.
253    ///
254    /// This must be awaited before dropping the engine when the caller needs
255    /// the `Arc<LmdbStorage>` references held by background tasks to be
256    /// released (e.g. before reopening the same LMDB environment).
257    pub async fn shutdown(&mut self) {
258        self.shutdown.cancel();
259        for (i, mut handle) in self.task_handles.drain(..).enumerate() {
260            match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
261                Ok(Ok(())) => {}
262                Ok(Err(e)) if e.is_cancelled() => {}
263                Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
264                Err(_) => {
265                    warn!("Replication task {i} did not stop within 10s, aborting");
266                    handle.abort();
267                }
268            }
269        }
270    }
271
272    /// Trigger an early neighbor sync round.
273    ///
274    /// Useful after topology changes (new nodes joining, network heal after
275    /// partition) when the caller wants replication to converge faster than
276    /// the regular 10-20 minute cadence.
277    pub fn trigger_neighbor_sync(&self) {
278        self.sync_trigger.notify_one();
279    }
280
281    /// Execute fresh replication for a newly stored record.
282    pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
283        fresh::replicate_fresh(
284            key,
285            data,
286            proof_of_payment,
287            &self.p2p_node,
288            &self.paid_list,
289            &self.config,
290            &self.send_semaphore,
291        )
292        .await;
293    }
294
295    // =======================================================================
296    // Background task launchers
297    // =======================================================================
298
299    /// Spawn a task that drains the fresh-write channel and triggers
300    /// replication for each newly-stored chunk.
301    fn start_fresh_write_drainer(&mut self) {
302        let Some(mut rx) = self.fresh_write_rx.take() else {
303            return;
304        };
305        let p2p = Arc::clone(&self.p2p_node);
306        let paid_list = Arc::clone(&self.paid_list);
307        let config = Arc::clone(&self.config);
308        let send_semaphore = Arc::clone(&self.send_semaphore);
309        let shutdown = self.shutdown.clone();
310
311        let handle = tokio::spawn(async move {
312            loop {
313                tokio::select! {
314                    () = shutdown.cancelled() => break,
315                    event = rx.recv() => {
316                        let Some(event) = event else { break };
317                        fresh::replicate_fresh(
318                            &event.key,
319                            &event.data,
320                            &event.payment_proof,
321                            &p2p,
322                            &paid_list,
323                            &config,
324                            &send_semaphore,
325                        )
326                        .await;
327                    }
328                }
329            }
330            debug!("Fresh-write drainer shut down");
331        });
332        self.task_handles.push(handle);
333    }
334
335    #[allow(clippy::too_many_lines)]
336    fn start_message_handler(&mut self) {
337        let mut p2p_events = self.p2p_node.subscribe_events();
338        let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
339        let p2p = Arc::clone(&self.p2p_node);
340        let storage = Arc::clone(&self.storage);
341        let paid_list = Arc::clone(&self.paid_list);
342        let payment_verifier = Arc::clone(&self.payment_verifier);
343        let queues = Arc::clone(&self.queues);
344        let config = Arc::clone(&self.config);
345        let shutdown = self.shutdown.clone();
346        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
347        let bootstrap_state = Arc::clone(&self.bootstrap_state);
348        let sync_history = Arc::clone(&self.sync_history);
349        let sync_trigger = Arc::clone(&self.sync_trigger);
350
351        let handle = tokio::spawn(async move {
352            loop {
353                tokio::select! {
354                    () = shutdown.cancelled() => break,
355                    event = p2p_events.recv() => {
356                        let Ok(event) = event else { continue };
357                        if let P2PEvent::Message {
358                            topic,
359                            source: Some(source),
360                            data,
361                        } = event {
362                            // Determine if this is a replication message
363                            // and whether it arrived via the /rr/ request-response
364                            // path (which wraps payloads in RequestResponseEnvelope).
365                            let rr_info = if topic == REPLICATION_PROTOCOL_ID {
366                                Some((data.clone(), None))
367                            } else if topic.starts_with(RR_PREFIX)
368                                && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
369                            {
370                                P2PNode::parse_request_envelope(&data)
371                                    .filter(|(_, is_resp, _)| !is_resp)
372                                    .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
373                            } else {
374                                None
375                            };
376                            if let Some((payload, rr_message_id)) = rr_info {
377                                match handle_replication_message(
378                                    &source,
379                                    &payload,
380                                    &p2p,
381                                    &storage,
382                                    &paid_list,
383                                    &payment_verifier,
384                                    &queues,
385                                    &config,
386                                    &is_bootstrapping,
387                                    &bootstrap_state,
388                                    &sync_history,
389                                    rr_message_id.as_deref(),
390                                ).await {
391                                    Ok(()) => {}
392                                    Err(e) => {
393                                        debug!(
394                                            "Replication message from {source} error: {e}"
395                                        );
396                                    }
397                                }
398                            }
399                        }
400                    }
401                    // Gap 4: Topology churn handling (Section 13).
402                    //
403                    // The DHT routing table emits KClosestPeersChanged when the
404                    // K-closest peer set actually changes, which is the precise
405                    // signal for triggering neighbor sync. This replaces the
406                    // previous approach of checking every PeerConnected /
407                    // PeerDisconnected event against the close group.
408                    dht_event = dht_events.recv() => {
409                        let Ok(dht_event) = dht_event else { continue };
410                        if let DhtNetworkEvent::KClosestPeersChanged { .. } = dht_event {
411                            debug!(
412                                "K-closest peers changed, triggering early neighbor sync"
413                            );
414                            sync_trigger.notify_one();
415                        }
416                    }
417                }
418            }
419            debug!("Replication message handler shut down");
420        });
421        self.task_handles.push(handle);
422    }
423
424    fn start_neighbor_sync_loop(&mut self) {
425        let p2p = Arc::clone(&self.p2p_node);
426        let storage = Arc::clone(&self.storage);
427        let paid_list = Arc::clone(&self.paid_list);
428        let queues = Arc::clone(&self.queues);
429        let config = Arc::clone(&self.config);
430        let shutdown = self.shutdown.clone();
431        let sync_state = Arc::clone(&self.sync_state);
432        let sync_history = Arc::clone(&self.sync_history);
433        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
434        let bootstrap_state = Arc::clone(&self.bootstrap_state);
435        let sync_trigger = Arc::clone(&self.sync_trigger);
436
437        let handle = tokio::spawn(async move {
438            loop {
439                let interval = config.random_neighbor_sync_interval();
440                tokio::select! {
441                    () = shutdown.cancelled() => break,
442                    () = tokio::time::sleep(interval) => {}
443                    () = sync_trigger.notified() => {
444                        debug!("Neighbor sync triggered by topology change");
445                    }
446                }
447                // Wrap the sync round in a select so shutdown cancels
448                // in-progress network operations rather than waiting for
449                // the full round to complete.
450                tokio::select! {
451                    () = shutdown.cancelled() => break,
452                    () = run_neighbor_sync_round(
453                        &p2p,
454                        &storage,
455                        &paid_list,
456                        &queues,
457                        &config,
458                        &sync_state,
459                        &sync_history,
460                        &is_bootstrapping,
461                        &bootstrap_state,
462                    ) => {}
463                }
464            }
465            debug!("Neighbor sync loop shut down");
466        });
467        self.task_handles.push(handle);
468    }
469
470    fn start_self_lookup_loop(&mut self) {
471        let p2p = Arc::clone(&self.p2p_node);
472        let config = Arc::clone(&self.config);
473        let shutdown = self.shutdown.clone();
474
475        let handle = tokio::spawn(async move {
476            loop {
477                let interval = config.random_self_lookup_interval();
478                tokio::select! {
479                    () = shutdown.cancelled() => break,
480                    () = tokio::time::sleep(interval) => {
481                        if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
482                            debug!("Self-lookup failed: {e}");
483                        }
484                    }
485                }
486            }
487            debug!("Self-lookup loop shut down");
488        });
489        self.task_handles.push(handle);
490    }
491
492    fn start_audit_loop(&mut self) {
493        let p2p = Arc::clone(&self.p2p_node);
494        let storage = Arc::clone(&self.storage);
495        let config = Arc::clone(&self.config);
496        let shutdown = self.shutdown.clone();
497        let sync_history = Arc::clone(&self.sync_history);
498        let bootstrap_state = Arc::clone(&self.bootstrap_state);
499        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
500        let sync_state = Arc::clone(&self.sync_state);
501
502        let handle = tokio::spawn(async move {
503            // Invariant 19: wait for bootstrap to drain before starting audits.
504            loop {
505                tokio::select! {
506                    () = shutdown.cancelled() => return,
507                    () = tokio::time::sleep(
508                        std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
509                    ) => {
510                        if bootstrap_state.read().await.is_drained() {
511                            break;
512                        }
513                    }
514                }
515            }
516
517            // Run one audit tick immediately after bootstrap drain.
518            {
519                let bootstrapping = *is_bootstrapping.read().await;
520                // Lock ordering: sync_state before sync_history (consistent
521                // with run_neighbor_sync_round and handle_sync_response).
522                let result = {
523                    let claims = sync_state.read().await;
524                    let history = sync_history.read().await;
525                    audit::audit_tick(
526                        &p2p,
527                        &storage,
528                        &config,
529                        &history,
530                        &claims.bootstrap_claims,
531                        bootstrapping,
532                    )
533                    .await
534                };
535                handle_audit_result(&result, &p2p, &sync_state, &config).await;
536            }
537
538            // Then run periodically.
539            loop {
540                let interval = config.random_audit_tick_interval();
541                tokio::select! {
542                    () = shutdown.cancelled() => break,
543                    () = tokio::time::sleep(interval) => {
544                        let bootstrapping = *is_bootstrapping.read().await;
545                        // Lock ordering: sync_state before sync_history.
546                        let result = {
547                            let claims = sync_state.read().await;
548                            let history = sync_history.read().await;
549                            audit::audit_tick(
550                                &p2p, &storage, &config, &history,
551                                &claims.bootstrap_claims,
552                                bootstrapping,
553                            )
554                            .await
555                        };
556                        handle_audit_result(&result, &p2p, &sync_state, &config).await;
557                    }
558                }
559            }
560            debug!("Audit loop shut down");
561        });
562        self.task_handles.push(handle);
563    }
564
565    #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
566    fn start_fetch_worker(&mut self) {
567        let p2p = Arc::clone(&self.p2p_node);
568        let storage = Arc::clone(&self.storage);
569        let queues = Arc::clone(&self.queues);
570        let config = Arc::clone(&self.config);
571        let shutdown = self.shutdown.clone();
572        let bootstrap_state = Arc::clone(&self.bootstrap_state);
573        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
574        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
575        let concurrency = max_parallel_fetch();
576
577        info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
578
579        let handle = tokio::spawn(async move {
580            // Each in-flight future yields (key, Option<FetchOutcome>) so we
581            // always recover the key — even if the inner task panics.
582            let mut in_flight = FuturesUnordered::<FetchFuture>::new();
583
584            loop {
585                // Fill up to `concurrency` slots from the queue.
586                {
587                    let mut q = queues.write().await;
588                    while in_flight.len() < concurrency {
589                        let Some(candidate) = q.dequeue_fetch() else {
590                            break;
591                        };
592                        let Some(&source) = candidate.sources.first() else {
593                            warn!(
594                                "Fetch candidate {} has no sources — dropping",
595                                hex::encode(candidate.key)
596                            );
597                            continue;
598                        };
599                        q.start_fetch(candidate.key, source, candidate.sources.clone());
600
601                        let p2p = Arc::clone(&p2p);
602                        let storage = Arc::clone(&storage);
603                        let config = Arc::clone(&config);
604                        let token = shutdown.clone();
605                        let fetch_key = candidate.key;
606                        in_flight.push(Box::pin(async move {
607                            let handle = tokio::spawn(async move {
608                                // Cancel-aware: abort when the engine shuts down.
609                                tokio::select! {
610                                    () = token.cancelled() => FetchOutcome {
611                                        key: fetch_key,
612                                        result: FetchResult::SourceFailed,
613                                    },
614                                    outcome = execute_single_fetch(
615                                        p2p, storage, config, fetch_key, source,
616                                    ) => outcome,
617                                }
618                            });
619                            match handle.await {
620                                Ok(outcome) => (outcome.key, Some(outcome)),
621                                Err(e) => {
622                                    error!(
623                                        "Fetch task for {} panicked: {e}",
624                                        hex::encode(fetch_key)
625                                    );
626                                    (fetch_key, None)
627                                }
628                            }
629                        }));
630                    }
631                } // release queues write lock
632
633                if in_flight.is_empty() {
634                    // No work — wait for new items or shutdown.
635                    tokio::select! {
636                        () = shutdown.cancelled() => break,
637                        () = tokio::time::sleep(
638                            std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
639                        ) => continue,
640                    }
641                }
642
643                // Wait for the next fetch to complete and process the result.
644                tokio::select! {
645                    () = shutdown.cancelled() => break,
646                    Some((key, maybe_outcome)) = in_flight.next() => {
647                        let mut q = queues.write().await;
648                        let terminal = if let Some(outcome) = maybe_outcome {
649                            match outcome.result {
650                                FetchResult::Stored => {
651                                    q.complete_fetch(&key);
652                                    true
653                                }
654                                FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
655                                    if let Some(next_peer) = q.retry_fetch(&key) {
656                                        // Spawn a new fetch task for the next source.
657                                        let p2p = Arc::clone(&p2p);
658                                        let storage = Arc::clone(&storage);
659                                        let config = Arc::clone(&config);
660                                        let token = shutdown.clone();
661                                        let fetch_key = key;
662                                        in_flight.push(Box::pin(async move {
663                                            let handle = tokio::spawn(async move {
664                                                tokio::select! {
665                                                    () = token.cancelled() => FetchOutcome {
666                                                        key: fetch_key,
667                                                        result: FetchResult::SourceFailed,
668                                                    },
669                                                    outcome = execute_single_fetch(
670                                                        p2p, storage, config, fetch_key, next_peer,
671                                                    ) => outcome,
672                                                }
673                                            });
674                                            match handle.await {
675                                                Ok(outcome) => (outcome.key, Some(outcome)),
676                                                Err(e) => {
677                                                    error!(
678                                                        "Fetch task for {} panicked: {e}",
679                                                        hex::encode(fetch_key)
680                                                    );
681                                                    (fetch_key, None)
682                                                }
683                                            }
684                                        }));
685                                        false
686                                    } else {
687                                        q.complete_fetch(&key);
688                                        true
689                                    }
690                                }
691                            }
692                        } else {
693                            // Task panicked — reclaim the in-flight slot.
694                            q.complete_fetch(&key);
695                            true
696                        };
697
698                        // Shrink bootstrap pending set on terminal exit.
699                        if terminal {
700                            drop(q); // release queues lock before acquiring bootstrap_state
701                            if !bootstrap_state.read().await.is_drained() {
702                                bootstrap_state.write().await.remove_key(&key);
703                                let q = queues.read().await;
704                                if bootstrap::check_bootstrap_drained(
705                                    &bootstrap_state,
706                                    &q,
707                                )
708                                .await
709                                {
710                                    complete_bootstrap(
711                                        &is_bootstrapping,
712                                        &bootstrap_complete_notify,
713                                    ).await;
714                                }
715                            }
716                        }
717                    }
718                }
719            }
720
721            // Cancel and drain remaining in-flight fetches on shutdown.
722            // The CancellationToken is already cancelled by this point, so
723            // spawned tasks will see cancellation via their select! branches.
724            while in_flight.next().await.is_some() {}
725            debug!("Fetch worker shut down");
726        });
727        self.task_handles.push(handle);
728    }
729
730    fn start_verification_worker(&mut self) {
731        let p2p = Arc::clone(&self.p2p_node);
732        let queues = Arc::clone(&self.queues);
733        let paid_list = Arc::clone(&self.paid_list);
734        let config = Arc::clone(&self.config);
735        let shutdown = self.shutdown.clone();
736        let bootstrap_state = Arc::clone(&self.bootstrap_state);
737        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
738        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
739
740        let handle = tokio::spawn(async move {
741            loop {
742                tokio::select! {
743                    () = shutdown.cancelled() => break,
744                    () = tokio::time::sleep(
745                        std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
746                    ) => {
747                        run_verification_cycle(
748                            &p2p, &paid_list, &queues, &config,
749                            &bootstrap_state, &is_bootstrapping,
750                            &bootstrap_complete_notify,
751                        ).await;
752                    }
753                }
754            }
755            debug!("Verification worker shut down");
756        });
757        self.task_handles.push(handle);
758    }
759
760    /// Gap 3: Run a one-shot bootstrap sync on startup.
761    ///
762    /// Waits for saorsa-core to emit `DhtNetworkEvent::BootstrapComplete`
763    /// (indicating the routing table is populated) before snapshotting
764    /// close neighbors. Falls back after a timeout so bootstrap nodes
765    /// (which have no peers and therefore never receive the event) still
766    /// proceed.
767    ///
768    /// After the gate, finds close neighbors, syncs with each in
769    /// round-robin batches, admits returned hints into the verification
770    /// pipeline, and tracks discovered keys for bootstrap drain detection.
771    fn start_bootstrap_sync(
772        &mut self,
773        dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
774    ) {
775        let p2p = Arc::clone(&self.p2p_node);
776        let storage = Arc::clone(&self.storage);
777        let paid_list = Arc::clone(&self.paid_list);
778        let queues = Arc::clone(&self.queues);
779        let config = Arc::clone(&self.config);
780        let shutdown = self.shutdown.clone();
781        let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
782        let bootstrap_state = Arc::clone(&self.bootstrap_state);
783        let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
784
785        let handle = tokio::spawn(async move {
786            // Wait for DHT bootstrap to complete before snapshotting
787            // neighbors. The routing table is empty until saorsa-core
788            // finishes its FIND_NODE rounds and bucket refreshes.
789            let gate = bootstrap::wait_for_bootstrap_complete(
790                dht_events,
791                config.bootstrap_complete_timeout_secs,
792                &shutdown,
793            )
794            .await;
795
796            if gate == bootstrap::BootstrapGateResult::Shutdown {
797                return;
798            }
799
800            let self_id = *p2p.peer_id();
801            let neighbors =
802                neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
803                    .await;
804
805            if neighbors.is_empty() {
806                info!("Bootstrap sync: no close neighbors found, marking drained");
807                bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
808                complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
809                return;
810            }
811
812            let neighbor_count = neighbors.len();
813            info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
814
815            // Process neighbors in batches of NEIGHBOR_SYNC_PEER_COUNT.
816            for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
817                if shutdown.is_cancelled() {
818                    break;
819                }
820
821                for peer in batch {
822                    if shutdown.is_cancelled() {
823                        break;
824                    }
825
826                    // Re-read on each iteration so peers see current state.
827                    let bootstrapping = *is_bootstrapping.read().await;
828
829                    bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
830
831                    let response = neighbor_sync::sync_with_peer(
832                        peer,
833                        &p2p,
834                        &storage,
835                        &paid_list,
836                        &config,
837                        bootstrapping,
838                    )
839                    .await;
840
841                    bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
842
843                    if let Some(resp) = response {
844                        if !resp.bootstrapping {
845                            // Admit hints into verification pipeline.
846                            let admitted_keys = admit_and_queue_hints(
847                                &self_id,
848                                peer,
849                                &resp.replica_hints,
850                                &resp.paid_hints,
851                                &p2p,
852                                &config,
853                                &storage,
854                                &paid_list,
855                                &queues,
856                            )
857                            .await;
858
859                            // Track discovered keys for drain detection.
860                            if !admitted_keys.is_empty() {
861                                bootstrap::track_discovered_keys(&bootstrap_state, &admitted_keys)
862                                    .await;
863                            }
864                        }
865                    }
866                }
867            }
868
869            // Check drain condition.
870            {
871                let q = queues.read().await;
872                if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
873                    complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
874                }
875            }
876
877            info!("Bootstrap sync completed");
878        });
879        self.task_handles.push(handle);
880    }
881}
882
883// ===========================================================================
884// Free functions for background tasks
885// ===========================================================================
886
887/// Handle an incoming replication protocol message.
888///
889/// When `rr_message_id` is `Some`, the request arrived via the `/rr/`
890/// request-response path and the response must be sent via `send_response`
891/// so saorsa-core can route it back to the waiting `send_request` caller.
892#[allow(clippy::too_many_arguments)]
893async fn handle_replication_message(
894    source: &PeerId,
895    data: &[u8],
896    p2p_node: &Arc<P2PNode>,
897    storage: &Arc<LmdbStorage>,
898    paid_list: &Arc<PaidList>,
899    payment_verifier: &Arc<PaymentVerifier>,
900    queues: &Arc<RwLock<ReplicationQueues>>,
901    config: &ReplicationConfig,
902    is_bootstrapping: &Arc<RwLock<bool>>,
903    bootstrap_state: &Arc<RwLock<BootstrapState>>,
904    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
905    rr_message_id: Option<&str>,
906) -> Result<()> {
907    let msg = ReplicationMessage::decode(data)
908        .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
909
910    match msg.body {
911        ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
912            handle_fresh_offer(
913                source,
914                offer,
915                storage,
916                paid_list,
917                payment_verifier,
918                p2p_node,
919                config,
920                msg.request_id,
921                rr_message_id,
922            )
923            .await
924        }
925        ReplicationMessageBody::PaidNotify(ref notify) => {
926            handle_paid_notify(
927                source,
928                notify,
929                paid_list,
930                payment_verifier,
931                p2p_node,
932                config,
933            )
934            .await
935        }
936        ReplicationMessageBody::NeighborSyncRequest(ref request) => {
937            let bootstrapping = *is_bootstrapping.read().await;
938            handle_neighbor_sync_request(
939                source,
940                request,
941                p2p_node,
942                storage,
943                paid_list,
944                queues,
945                config,
946                bootstrapping,
947                bootstrap_state,
948                sync_history,
949                msg.request_id,
950                rr_message_id,
951            )
952            .await
953        }
954        ReplicationMessageBody::VerificationRequest(ref request) => {
955            handle_verification_request(
956                source,
957                request,
958                storage,
959                paid_list,
960                p2p_node,
961                msg.request_id,
962                rr_message_id,
963            )
964            .await
965        }
966        ReplicationMessageBody::FetchRequest(ref request) => {
967            handle_fetch_request(
968                source,
969                request,
970                storage,
971                p2p_node,
972                msg.request_id,
973                rr_message_id,
974            )
975            .await
976        }
977        ReplicationMessageBody::AuditChallenge(ref challenge) => {
978            let bootstrapping = *is_bootstrapping.read().await;
979            handle_audit_challenge_msg(
980                source,
981                challenge,
982                storage,
983                p2p_node,
984                bootstrapping,
985                msg.request_id,
986                rr_message_id,
987            )
988            .await
989        }
990        // Response messages are handled by their respective request initiators.
991        ReplicationMessageBody::FreshReplicationResponse(_)
992        | ReplicationMessageBody::NeighborSyncResponse(_)
993        | ReplicationMessageBody::VerificationResponse(_)
994        | ReplicationMessageBody::FetchResponse(_)
995        | ReplicationMessageBody::AuditResponse(_) => Ok(()),
996    }
997}
998
999// ---------------------------------------------------------------------------
1000// Per-message-type handlers
1001// ---------------------------------------------------------------------------
1002
1003#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1004async fn handle_fresh_offer(
1005    source: &PeerId,
1006    offer: &protocol::FreshReplicationOffer,
1007    storage: &Arc<LmdbStorage>,
1008    paid_list: &Arc<PaidList>,
1009    payment_verifier: &Arc<PaymentVerifier>,
1010    p2p_node: &Arc<P2PNode>,
1011    config: &ReplicationConfig,
1012    request_id: u64,
1013    rr_message_id: Option<&str>,
1014) -> Result<()> {
1015    let self_id = *p2p_node.peer_id();
1016
1017    // Rule 5: reject if PoP is missing.
1018    if offer.proof_of_payment.is_empty() {
1019        send_replication_response(
1020            source,
1021            p2p_node,
1022            request_id,
1023            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1024                key: offer.key,
1025                reason: "Missing proof of payment".to_string(),
1026            }),
1027            rr_message_id,
1028        )
1029        .await;
1030        return Ok(());
1031    }
1032
1033    // Enforce chunk size invariant: the normal PUT path rejects data larger
1034    // than MAX_CHUNK_SIZE; the replication receive path must do the same to
1035    // prevent peers from pushing oversized records through replication.
1036    if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1037        warn!(
1038            "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1039            hex::encode(offer.key),
1040            offer.data.len(),
1041            crate::ant_protocol::MAX_CHUNK_SIZE,
1042        );
1043        p2p_node
1044            .report_trust_event(
1045                source,
1046                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1047            )
1048            .await;
1049        send_replication_response(
1050            source,
1051            p2p_node,
1052            request_id,
1053            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1054                key: offer.key,
1055                reason: format!(
1056                    "Data size {} exceeds maximum chunk size {}",
1057                    offer.data.len(),
1058                    crate::ant_protocol::MAX_CHUNK_SIZE,
1059                ),
1060            }),
1061            rr_message_id,
1062        )
1063        .await;
1064        return Ok(());
1065    }
1066
1067    // Rule 7: check responsibility.
1068    if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1069        send_replication_response(
1070            source,
1071            p2p_node,
1072            request_id,
1073            ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1074                key: offer.key,
1075                reason: "Not responsible for this key".to_string(),
1076            }),
1077            rr_message_id,
1078        )
1079        .await;
1080        return Ok(());
1081    }
1082
1083    // Gap 1: Validate PoP via PaymentVerifier.
1084    match payment_verifier
1085        .verify_payment(&offer.key, Some(&offer.proof_of_payment))
1086        .await
1087    {
1088        Ok(status) if status.can_store() => {
1089            debug!(
1090                "PoP validated for fresh offer key {}",
1091                hex::encode(offer.key)
1092            );
1093        }
1094        Ok(_) => {
1095            send_replication_response(
1096                source,
1097                p2p_node,
1098                request_id,
1099                ReplicationMessageBody::FreshReplicationResponse(
1100                    FreshReplicationResponse::Rejected {
1101                        key: offer.key,
1102                        reason: "Payment verification failed: payment required".to_string(),
1103                    },
1104                ),
1105                rr_message_id,
1106            )
1107            .await;
1108            return Ok(());
1109        }
1110        Err(e) => {
1111            warn!(
1112                "PoP verification error for key {}: {e}",
1113                hex::encode(offer.key)
1114            );
1115            send_replication_response(
1116                source,
1117                p2p_node,
1118                request_id,
1119                ReplicationMessageBody::FreshReplicationResponse(
1120                    FreshReplicationResponse::Rejected {
1121                        key: offer.key,
1122                        reason: format!("Payment verification error: {e}"),
1123                    },
1124                ),
1125                rr_message_id,
1126            )
1127            .await;
1128            return Ok(());
1129        }
1130    }
1131
1132    // Rule 6: add to PaidForList.
1133    if let Err(e) = paid_list.insert(&offer.key).await {
1134        warn!("Failed to add key to PaidForList: {e}");
1135    }
1136
1137    // Store the record.
1138    match storage.put(&offer.key, &offer.data).await {
1139        Ok(_) => {
1140            send_replication_response(
1141                source,
1142                p2p_node,
1143                request_id,
1144                ReplicationMessageBody::FreshReplicationResponse(
1145                    FreshReplicationResponse::Accepted { key: offer.key },
1146                ),
1147                rr_message_id,
1148            )
1149            .await;
1150        }
1151        Err(e) => {
1152            send_replication_response(
1153                source,
1154                p2p_node,
1155                request_id,
1156                ReplicationMessageBody::FreshReplicationResponse(
1157                    FreshReplicationResponse::Rejected {
1158                        key: offer.key,
1159                        reason: format!("Storage error: {e}"),
1160                    },
1161                ),
1162                rr_message_id,
1163            )
1164            .await;
1165        }
1166    }
1167
1168    Ok(())
1169}
1170
1171async fn handle_paid_notify(
1172    _source: &PeerId,
1173    notify: &protocol::PaidNotify,
1174    paid_list: &Arc<PaidList>,
1175    payment_verifier: &Arc<PaymentVerifier>,
1176    p2p_node: &Arc<P2PNode>,
1177    config: &ReplicationConfig,
1178) -> Result<()> {
1179    let self_id = *p2p_node.peer_id();
1180
1181    // Rule 3: validate PoP presence before adding.
1182    if notify.proof_of_payment.is_empty() {
1183        return Ok(());
1184    }
1185
1186    // Check if we're in PaidCloseGroup for this key.
1187    if !admission::is_in_paid_close_group(
1188        &self_id,
1189        &notify.key,
1190        p2p_node,
1191        config.paid_list_close_group_size,
1192    )
1193    .await
1194    {
1195        return Ok(());
1196    }
1197
1198    // Gap 1: Validate PoP via PaymentVerifier.
1199    match payment_verifier
1200        .verify_payment(&notify.key, Some(&notify.proof_of_payment))
1201        .await
1202    {
1203        Ok(status) if status.can_store() => {
1204            debug!(
1205                "PoP validated for paid notify key {}",
1206                hex::encode(notify.key)
1207            );
1208        }
1209        Ok(_) => {
1210            warn!(
1211                "Paid notify rejected: payment required for key {}",
1212                hex::encode(notify.key)
1213            );
1214            return Ok(());
1215        }
1216        Err(e) => {
1217            warn!(
1218                "PoP verification error for paid notify key {}: {e}",
1219                hex::encode(notify.key)
1220            );
1221            return Ok(());
1222        }
1223    }
1224
1225    if let Err(e) = paid_list.insert(&notify.key).await {
1226        warn!("Failed to add paid notify key to PaidForList: {e}");
1227    }
1228
1229    Ok(())
1230}
1231
1232#[allow(clippy::too_many_arguments)]
1233async fn handle_neighbor_sync_request(
1234    source: &PeerId,
1235    request: &protocol::NeighborSyncRequest,
1236    p2p_node: &Arc<P2PNode>,
1237    storage: &Arc<LmdbStorage>,
1238    paid_list: &Arc<PaidList>,
1239    queues: &Arc<RwLock<ReplicationQueues>>,
1240    config: &ReplicationConfig,
1241    is_bootstrapping: bool,
1242    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1243    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1244    request_id: u64,
1245    rr_message_id: Option<&str>,
1246) -> Result<()> {
1247    let self_id = *p2p_node.peer_id();
1248
1249    // No per-request hint count limit: the wire message size limit
1250    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Unlike audit
1251    // challenges, sync hints don't drive expensive computation — they just
1252    // enter the verification queue. A per-request limit here would break
1253    // bootstrap replication for newly-joined nodes with 0 stored chunks.
1254
1255    // Build response (outbound hints).
1256    let (response, sender_in_rt) = neighbor_sync::handle_sync_request(
1257        source,
1258        request,
1259        p2p_node,
1260        storage,
1261        paid_list,
1262        config,
1263        is_bootstrapping,
1264    )
1265    .await;
1266
1267    // Send response.
1268    send_replication_response(
1269        source,
1270        p2p_node,
1271        request_id,
1272        ReplicationMessageBody::NeighborSyncResponse(response),
1273        rr_message_id,
1274    )
1275    .await;
1276
1277    // Process inbound hints only if sender is in LocalRT (Rule 4-6).
1278    if !sender_in_rt {
1279        return Ok(());
1280    }
1281
1282    // Update sync history for this peer.
1283    {
1284        let mut history = sync_history.write().await;
1285        let record = history.entry(*source).or_insert(PeerSyncRecord {
1286            last_sync: None,
1287            cycles_since_sync: 0,
1288        });
1289        record.last_sync = Some(Instant::now());
1290        record.cycles_since_sync = 0;
1291    }
1292
1293    // Admit inbound hints and queue for verification.
1294    let admitted_keys = admit_and_queue_hints(
1295        &self_id,
1296        source,
1297        &request.replica_hints,
1298        &request.paid_hints,
1299        p2p_node,
1300        config,
1301        storage,
1302        paid_list,
1303        queues,
1304    )
1305    .await;
1306
1307    // Track discovered keys for bootstrap drain detection so that
1308    // hints admitted via inbound sync requests are not missed.
1309    if is_bootstrapping && !admitted_keys.is_empty() {
1310        bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await;
1311    }
1312
1313    Ok(())
1314}
1315
1316async fn handle_verification_request(
1317    source: &PeerId,
1318    request: &protocol::VerificationRequest,
1319    storage: &Arc<LmdbStorage>,
1320    paid_list: &Arc<PaidList>,
1321    p2p_node: &Arc<P2PNode>,
1322    request_id: u64,
1323    rr_message_id: Option<&str>,
1324) -> Result<()> {
1325    // No per-request key count limit: the wire message size limit
1326    // (MAX_REPLICATION_MESSAGE_SIZE) already caps the payload. Verification
1327    // does cheap storage lookups per key, not expensive computation like
1328    // audit digest generation.
1329
1330    #[allow(clippy::cast_possible_truncation)]
1331    let keys_len = request.keys.len() as u32;
1332    let paid_check_set: HashSet<u32> = request
1333        .paid_list_check_indices
1334        .iter()
1335        .copied()
1336        .filter(|&idx| {
1337            if idx >= keys_len {
1338                warn!(
1339                    "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1340                    request.keys.len(),
1341                );
1342                false
1343            } else {
1344                true
1345            }
1346        })
1347        .collect();
1348
1349    let mut results = Vec::with_capacity(request.keys.len());
1350    for (i, key) in request.keys.iter().enumerate() {
1351        let present = storage.exists(key).unwrap_or(false);
1352        let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1353            Some(paid_list.contains(key).unwrap_or(false))
1354        } else {
1355            None
1356        };
1357        results.push(protocol::KeyVerificationResult {
1358            key: *key,
1359            present,
1360            paid,
1361        });
1362    }
1363
1364    send_replication_response(
1365        source,
1366        p2p_node,
1367        request_id,
1368        ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1369        rr_message_id,
1370    )
1371    .await;
1372
1373    Ok(())
1374}
1375
1376async fn handle_fetch_request(
1377    source: &PeerId,
1378    request: &protocol::FetchRequest,
1379    storage: &Arc<LmdbStorage>,
1380    p2p_node: &Arc<P2PNode>,
1381    request_id: u64,
1382    rr_message_id: Option<&str>,
1383) -> Result<()> {
1384    let response = match storage.get(&request.key).await {
1385        Ok(Some(data)) => protocol::FetchResponse::Success {
1386            key: request.key,
1387            data,
1388        },
1389        Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1390        Err(e) => protocol::FetchResponse::Error {
1391            key: request.key,
1392            reason: format!("{e}"),
1393        },
1394    };
1395
1396    send_replication_response(
1397        source,
1398        p2p_node,
1399        request_id,
1400        ReplicationMessageBody::FetchResponse(response),
1401        rr_message_id,
1402    )
1403    .await;
1404
1405    Ok(())
1406}
1407
1408async fn handle_audit_challenge_msg(
1409    source: &PeerId,
1410    challenge: &protocol::AuditChallenge,
1411    storage: &Arc<LmdbStorage>,
1412    p2p_node: &Arc<P2PNode>,
1413    is_bootstrapping: bool,
1414    request_id: u64,
1415    rr_message_id: Option<&str>,
1416) -> Result<()> {
1417    #[allow(clippy::cast_possible_truncation)]
1418    let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1419    let response = audit::handle_audit_challenge(
1420        challenge,
1421        storage,
1422        p2p_node.peer_id(),
1423        is_bootstrapping,
1424        stored_chunks,
1425    )
1426    .await;
1427
1428    send_replication_response(
1429        source,
1430        p2p_node,
1431        request_id,
1432        ReplicationMessageBody::AuditResponse(response),
1433        rr_message_id,
1434    )
1435    .await;
1436
1437    Ok(())
1438}
1439
1440// ---------------------------------------------------------------------------
1441// Message sending helper
1442// ---------------------------------------------------------------------------
1443
1444/// Send a replication response message. Fire-and-forget: logs errors but
1445/// does not propagate them.
1446///
1447/// When `rr_message_id` is `Some`, the response is sent via the `/rr/`
1448/// request-response path so saorsa-core can route it back to the caller's
1449/// `send_request` future. Otherwise it is sent as a plain message.
1450async fn send_replication_response(
1451    peer: &PeerId,
1452    p2p_node: &Arc<P2PNode>,
1453    request_id: u64,
1454    body: ReplicationMessageBody,
1455    rr_message_id: Option<&str>,
1456) {
1457    let msg = ReplicationMessage { request_id, body };
1458    let encoded = match msg.encode() {
1459        Ok(data) => data,
1460        Err(e) => {
1461            warn!("Failed to encode replication response: {e}");
1462            return;
1463        }
1464    };
1465    let result = if let Some(msg_id) = rr_message_id {
1466        p2p_node
1467            .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1468            .await
1469    } else {
1470        p2p_node
1471            .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1472            .await
1473    };
1474    if let Err(e) = result {
1475        debug!("Failed to send replication response to {peer}: {e}");
1476    }
1477}
1478
1479// ---------------------------------------------------------------------------
1480// Neighbor sync round
1481// ---------------------------------------------------------------------------
1482
1483/// Run one neighbor sync round.
1484#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1485async fn run_neighbor_sync_round(
1486    p2p_node: &Arc<P2PNode>,
1487    storage: &Arc<LmdbStorage>,
1488    paid_list: &Arc<PaidList>,
1489    queues: &Arc<RwLock<ReplicationQueues>>,
1490    config: &ReplicationConfig,
1491    sync_state: &Arc<RwLock<NeighborSyncState>>,
1492    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1493    is_bootstrapping: &Arc<RwLock<bool>>,
1494    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1495) {
1496    let self_id = *p2p_node.peer_id();
1497    let bootstrapping = *is_bootstrapping.read().await;
1498
1499    // Check if cycle is complete; start new one if needed.
1500    // We check under a read lock, then release it before the expensive
1501    // prune pass and DHT snapshot so other tasks are not starved.
1502    let cycle_complete = sync_state.read().await.is_cycle_complete();
1503    if cycle_complete {
1504        // Post-cycle pruning (Section 11) — runs without holding sync_state.
1505        pruning::run_prune_pass(&self_id, storage, paid_list, p2p_node, config).await;
1506
1507        // Increment `cycles_since_sync` for all peers.
1508        {
1509            let mut history = sync_history.write().await;
1510            for record in history.values_mut() {
1511                record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1512            }
1513        }
1514
1515        // Take fresh close-neighbor snapshot (DHT query, no lock held).
1516        let neighbors =
1517            neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1518                .await;
1519
1520        // Now re-acquire write lock and re-check before swapping cycle.
1521        let mut state = sync_state.write().await;
1522        if state.is_cycle_complete() {
1523            // Preserve last_sync_times and bootstrap_claims across cycles.
1524            // Claims have a 24h lifecycle vs 10-20 min cycles — dropping them
1525            // would reset the abuse detection timer every cycle.
1526            let old_sync_times = std::mem::take(&mut state.last_sync_times);
1527            let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1528            *state = NeighborSyncState::new_cycle(neighbors);
1529            state.last_sync_times = old_sync_times;
1530            state.bootstrap_claims = old_bootstrap_claims;
1531        }
1532    }
1533
1534    // Select batch of peers.
1535    let batch = {
1536        let mut state = sync_state.write().await;
1537        neighbor_sync::select_sync_batch(
1538            &mut state,
1539            config.neighbor_sync_peer_count,
1540            config.neighbor_sync_cooldown,
1541        )
1542    };
1543
1544    if batch.is_empty() {
1545        return;
1546    }
1547
1548    debug!("Neighbor sync: syncing with {} peers", batch.len());
1549
1550    // Sync with each peer in the batch.
1551    for peer in &batch {
1552        let response = neighbor_sync::sync_with_peer(
1553            peer,
1554            p2p_node,
1555            storage,
1556            paid_list,
1557            config,
1558            bootstrapping,
1559        )
1560        .await;
1561
1562        if let Some(resp) = response {
1563            handle_sync_response(
1564                &self_id,
1565                peer,
1566                &resp,
1567                p2p_node,
1568                config,
1569                bootstrapping,
1570                bootstrap_state,
1571                storage,
1572                paid_list,
1573                queues,
1574                sync_state,
1575                sync_history,
1576            )
1577            .await;
1578        } else {
1579            // Sync failed -- remove peer and try to fill slot.
1580            let replacement = {
1581                let mut state = sync_state.write().await;
1582                neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1583            };
1584
1585            // Attempt sync with the replacement peer (if one was found).
1586            if let Some(replacement_peer) = replacement {
1587                let replacement_resp = neighbor_sync::sync_with_peer(
1588                    &replacement_peer,
1589                    p2p_node,
1590                    storage,
1591                    paid_list,
1592                    config,
1593                    bootstrapping,
1594                )
1595                .await;
1596
1597                if let Some(resp) = replacement_resp {
1598                    handle_sync_response(
1599                        &self_id,
1600                        &replacement_peer,
1601                        &resp,
1602                        p2p_node,
1603                        config,
1604                        bootstrapping,
1605                        bootstrap_state,
1606                        storage,
1607                        paid_list,
1608                        queues,
1609                        sync_state,
1610                        sync_history,
1611                    )
1612                    .await;
1613                }
1614            }
1615        }
1616    }
1617}
1618
1619/// Process a successful neighbor sync response: record the sync, check for
1620/// bootstrap claim abuse, and admit inbound hints.
1621#[allow(clippy::too_many_arguments)]
1622async fn handle_sync_response(
1623    self_id: &PeerId,
1624    peer: &PeerId,
1625    resp: &NeighborSyncResponse,
1626    p2p_node: &Arc<P2PNode>,
1627    config: &ReplicationConfig,
1628    bootstrapping: bool,
1629    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1630    storage: &Arc<LmdbStorage>,
1631    paid_list: &Arc<PaidList>,
1632    queues: &Arc<RwLock<ReplicationQueues>>,
1633    sync_state: &Arc<RwLock<NeighborSyncState>>,
1634    sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1635) {
1636    // Record successful sync.
1637    {
1638        let mut state = sync_state.write().await;
1639        neighbor_sync::record_successful_sync(&mut state, peer);
1640    }
1641    {
1642        let mut history = sync_history.write().await;
1643        let record = history.entry(*peer).or_insert(PeerSyncRecord {
1644            last_sync: None,
1645            cycles_since_sync: 0,
1646        });
1647        record.last_sync = Some(Instant::now());
1648        record.cycles_since_sync = 0;
1649    }
1650
1651    // Process inbound hints from response (skip if peer is bootstrapping).
1652    if resp.bootstrapping {
1653        // Gap 6: BootstrapClaimAbuse grace period enforcement.
1654        // Separate state mutation from network I/O to avoid holding the
1655        // write lock across report_trust_event.
1656        let should_report = {
1657            let now = Instant::now();
1658            let mut state = sync_state.write().await;
1659            let first_seen = state.bootstrap_claims.entry(*peer).or_insert(now);
1660            let claim_age = now.duration_since(*first_seen);
1661            if claim_age > config.bootstrap_claim_grace_period {
1662                warn!(
1663                    "Peer {peer} has been claiming bootstrap for {:?}, \
1664                     exceeding grace period of {:?} — reporting abuse",
1665                    claim_age, config.bootstrap_claim_grace_period,
1666                );
1667                true
1668            } else {
1669                false
1670            }
1671        };
1672        if should_report {
1673            p2p_node
1674                .report_trust_event(
1675                    peer,
1676                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1677                )
1678                .await;
1679        }
1680    } else {
1681        // Peer is not claiming bootstrap; clear any prior claim.
1682        {
1683            let mut state = sync_state.write().await;
1684            state.bootstrap_claims.remove(peer);
1685        }
1686        let admitted_keys = admit_and_queue_hints(
1687            self_id,
1688            peer,
1689            &resp.replica_hints,
1690            &resp.paid_hints,
1691            p2p_node,
1692            config,
1693            storage,
1694            paid_list,
1695            queues,
1696        )
1697        .await;
1698
1699        // Track discovered keys for bootstrap drain detection so that
1700        // hints admitted via regular neighbor sync are not missed.
1701        if bootstrapping && !admitted_keys.is_empty() {
1702            bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await;
1703        }
1704    }
1705}
1706
1707/// Admit hints and queue them for verification, returning newly-discovered keys.
1708///
1709/// Shared by neighbor-sync request handling, response handling, and bootstrap
1710/// sync so that admission + queueing logic lives in one place.
1711#[allow(clippy::too_many_arguments)]
1712async fn admit_and_queue_hints(
1713    self_id: &PeerId,
1714    source_peer: &PeerId,
1715    replica_hints: &[XorName],
1716    paid_hints: &[XorName],
1717    p2p_node: &Arc<P2PNode>,
1718    config: &ReplicationConfig,
1719    storage: &Arc<LmdbStorage>,
1720    paid_list: &Arc<PaidList>,
1721    queues: &Arc<RwLock<ReplicationQueues>>,
1722) -> HashSet<XorName> {
1723    let pending_keys: HashSet<XorName> = {
1724        let q = queues.read().await;
1725        q.pending_keys().into_iter().collect()
1726    };
1727
1728    let admitted = admission::admit_hints(
1729        self_id,
1730        replica_hints,
1731        paid_hints,
1732        p2p_node,
1733        config,
1734        storage,
1735        paid_list,
1736        &pending_keys,
1737    )
1738    .await;
1739
1740    let mut discovered = HashSet::new();
1741    let mut q = queues.write().await;
1742    let now = Instant::now();
1743
1744    for key in admitted.replica_keys {
1745        if !storage.exists(&key).unwrap_or(false) {
1746            let added = q.add_pending_verify(
1747                key,
1748                VerificationEntry {
1749                    state: VerificationState::PendingVerify,
1750                    pipeline: HintPipeline::Replica,
1751                    verified_sources: Vec::new(),
1752                    tried_sources: HashSet::new(),
1753                    created_at: now,
1754                    hint_sender: *source_peer,
1755                },
1756            );
1757            if added {
1758                discovered.insert(key);
1759            }
1760        }
1761    }
1762
1763    for key in admitted.paid_only_keys {
1764        let added = q.add_pending_verify(
1765            key,
1766            VerificationEntry {
1767                state: VerificationState::PendingVerify,
1768                pipeline: HintPipeline::PaidOnly,
1769                verified_sources: Vec::new(),
1770                tried_sources: HashSet::new(),
1771                created_at: now,
1772                hint_sender: *source_peer,
1773            },
1774        );
1775        if added {
1776            discovered.insert(key);
1777        }
1778    }
1779
1780    discovered
1781}
1782
1783// ---------------------------------------------------------------------------
1784// Verification cycle
1785// ---------------------------------------------------------------------------
1786
1787/// Run one verification cycle: process pending keys through quorum checks.
1788#[allow(clippy::too_many_lines)]
1789async fn run_verification_cycle(
1790    p2p_node: &Arc<P2PNode>,
1791    paid_list: &Arc<PaidList>,
1792    queues: &Arc<RwLock<ReplicationQueues>>,
1793    config: &ReplicationConfig,
1794    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1795    is_bootstrapping: &Arc<RwLock<bool>>,
1796    bootstrap_complete_notify: &Arc<Notify>,
1797) {
1798    // Evict stale entries that have been pending too long (e.g. unreachable
1799    // verification targets during a network partition).
1800    {
1801        let mut q = queues.write().await;
1802        q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
1803    }
1804
1805    let pending_keys = {
1806        let q = queues.read().await;
1807        q.pending_keys()
1808    };
1809
1810    if pending_keys.is_empty() {
1811        return;
1812    }
1813
1814    let self_id = *p2p_node.peer_id();
1815
1816    // Step 1: Check local PaidForList for fast-path authorization (Section 9,
1817    // step 4).
1818    let mut keys_needing_network = Vec::new();
1819    let mut terminal_keys: Vec<XorName> = Vec::new();
1820    {
1821        let mut q = queues.write().await;
1822        for key in &pending_keys {
1823            if paid_list.contains(key).unwrap_or(false) {
1824                if let Some(entry) = q.get_pending_mut(key) {
1825                    entry.state = VerificationState::PaidListVerified;
1826                    if entry.pipeline == HintPipeline::PaidOnly {
1827                        // Paid-only pipeline: PaidForList already updated, done.
1828                        q.remove_pending(key);
1829                        terminal_keys.push(*key);
1830                        continue;
1831                    }
1832                }
1833            }
1834            // Both branches (paid locally or not) need network verification.
1835            keys_needing_network.push(*key);
1836        }
1837    }
1838
1839    // Steps 2-5: Network verification (skipped if all keys resolved locally).
1840    if !keys_needing_network.is_empty() {
1841        // Step 2: Compute targets and run network verification round.
1842        let targets =
1843            quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
1844                .await;
1845
1846        let evidence =
1847            quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
1848
1849        // Step 3: Evaluate results — collect outcomes without holding the write
1850        // lock across paid-list I/O.
1851        let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
1852        {
1853            let q = queues.read().await;
1854            for key in &keys_needing_network {
1855                let Some(ev) = evidence.get(key) else {
1856                    continue;
1857                };
1858                let Some(entry) = q.get_pending(key) else {
1859                    continue;
1860                };
1861                let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
1862                evaluated.push((*key, outcome, entry.pipeline));
1863            }
1864        } // read lock released
1865
1866        // Step 4: Insert verified keys into PaidForList (no lock held).
1867        let mut paid_insert_keys: Vec<XorName> = Vec::new();
1868        for (key, outcome, _) in &evaluated {
1869            if matches!(
1870                outcome,
1871                KeyVerificationOutcome::QuorumVerified { .. }
1872                    | KeyVerificationOutcome::PaidListVerified { .. }
1873            ) {
1874                paid_insert_keys.push(*key);
1875            }
1876        }
1877        for key in &paid_insert_keys {
1878            if let Err(e) = paid_list.insert(key).await {
1879                warn!("Failed to add verified key to PaidForList: {e}");
1880            }
1881        }
1882
1883        // Step 5: Update queues with the evaluated outcomes.
1884        let mut q = queues.write().await;
1885        for (key, outcome, pipeline) in evaluated {
1886            match outcome {
1887                KeyVerificationOutcome::QuorumVerified { sources }
1888                | KeyVerificationOutcome::PaidListVerified { sources } => {
1889                    if pipeline == HintPipeline::Replica && !sources.is_empty() {
1890                        let distance =
1891                            crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
1892                        q.remove_pending(&key);
1893                        q.enqueue_fetch(key, distance, sources);
1894                        // Not terminal — key moved to fetch queue.
1895                    } else if pipeline == HintPipeline::Replica && sources.is_empty() {
1896                        warn!(
1897                            "Verified key {} has no holders (possible data loss)",
1898                            hex::encode(key)
1899                        );
1900                        q.remove_pending(&key);
1901                        terminal_keys.push(key);
1902                    } else {
1903                        q.remove_pending(&key);
1904                        terminal_keys.push(key);
1905                    }
1906                }
1907                KeyVerificationOutcome::QuorumFailed
1908                | KeyVerificationOutcome::QuorumInconclusive => {
1909                    q.remove_pending(&key);
1910                    terminal_keys.push(key);
1911                }
1912            }
1913        }
1914    }
1915
1916    // Step 6: Remove terminal keys from bootstrap pending set and re-check
1917    // the drain condition.
1918    update_bootstrap_after_verification(
1919        &terminal_keys,
1920        bootstrap_state,
1921        queues,
1922        is_bootstrapping,
1923        bootstrap_complete_notify,
1924    )
1925    .await;
1926}
1927
1928/// Post-verification bootstrap bookkeeping: remove terminal keys from the
1929/// bootstrap pending set and transition out of bootstrapping when drained.
1930async fn update_bootstrap_after_verification(
1931    terminal_keys: &[XorName],
1932    bootstrap_state: &Arc<RwLock<BootstrapState>>,
1933    queues: &Arc<RwLock<ReplicationQueues>>,
1934    is_bootstrapping: &Arc<RwLock<bool>>,
1935    bootstrap_complete_notify: &Arc<Notify>,
1936) {
1937    if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
1938        return;
1939    }
1940    {
1941        let mut bs = bootstrap_state.write().await;
1942        for key in terminal_keys {
1943            bs.remove_key(key);
1944        }
1945    }
1946    let q = queues.read().await;
1947    if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
1948        complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
1949    }
1950}
1951
1952/// Set `is_bootstrapping` to `false` and wake all waiters.
1953async fn complete_bootstrap(
1954    is_bootstrapping: &Arc<RwLock<bool>>,
1955    bootstrap_complete_notify: &Arc<Notify>,
1956) {
1957    *is_bootstrapping.write().await = false;
1958    bootstrap_complete_notify.notify_waiters();
1959    info!("Replication bootstrap complete");
1960}
1961
1962// ---------------------------------------------------------------------------
1963// Fetch types and single-fetch executor
1964// ---------------------------------------------------------------------------
1965
1966/// Result classification for a single fetch attempt.
1967enum FetchResult {
1968    /// Data fetched, integrity-checked, and stored successfully.
1969    Stored,
1970    /// Content-address integrity check failed — do not retry.
1971    IntegrityFailed,
1972    /// Source failed (network error or non-success response) — retryable.
1973    SourceFailed,
1974}
1975
1976/// Outcome produced by [`execute_single_fetch`] and consumed by the fetch
1977/// worker loop to update queue state.
1978struct FetchOutcome {
1979    key: XorName,
1980    result: FetchResult,
1981}
1982
1983#[allow(clippy::too_many_lines)]
1984/// Execute a single fetch request against `source` for `key`.
1985///
1986/// Handles encoding, network I/O, integrity checking, storage, and trust
1987/// event reporting.  Returns a [`FetchOutcome`] so the caller can update
1988/// queue state without holding any locks during the network round-trip.
1989async fn execute_single_fetch(
1990    p2p_node: Arc<P2PNode>,
1991    storage: Arc<LmdbStorage>,
1992    config: Arc<ReplicationConfig>,
1993    key: XorName,
1994    source: PeerId,
1995) -> FetchOutcome {
1996    let request = protocol::FetchRequest { key };
1997    let msg = ReplicationMessage {
1998        request_id: rand::thread_rng().gen::<u64>(),
1999        body: ReplicationMessageBody::FetchRequest(request),
2000    };
2001
2002    let encoded = match msg.encode() {
2003        Ok(data) => data,
2004        Err(e) => {
2005            warn!("Failed to encode fetch request: {e}");
2006            return FetchOutcome {
2007                key,
2008                result: FetchResult::SourceFailed,
2009            };
2010        }
2011    };
2012
2013    let result = p2p_node
2014        .send_request(
2015            &source,
2016            REPLICATION_PROTOCOL_ID,
2017            encoded,
2018            config.fetch_request_timeout,
2019        )
2020        .await;
2021
2022    match result {
2023        Ok(response) => {
2024            let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2025                p2p_node
2026                    .report_trust_event(
2027                        &source,
2028                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2029                    )
2030                    .await;
2031                return FetchOutcome {
2032                    key,
2033                    result: FetchResult::SourceFailed,
2034                };
2035            };
2036
2037            match resp_msg.body {
2038                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2039                    key: resp_key,
2040                    data,
2041                }) => {
2042                    // Validate the response key matches the requested key.
2043                    // A malicious peer could serve valid data for a different
2044                    // key, passing integrity checks while the requested key
2045                    // is falsely marked as fetched.
2046                    if resp_key != key {
2047                        warn!(
2048                            "Fetch response key mismatch: requested {}, got {}",
2049                            hex::encode(key),
2050                            hex::encode(resp_key)
2051                        );
2052                        p2p_node
2053                            .report_trust_event(
2054                                &source,
2055                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2056                            )
2057                            .await;
2058                        return FetchOutcome {
2059                            key,
2060                            result: FetchResult::IntegrityFailed,
2061                        };
2062                    }
2063
2064                    // Enforce chunk size invariant on fetched data.
2065                    // Checked before the content-address hash to avoid
2066                    // hashing up to 10 MiB of oversized junk data.
2067                    if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2068                        warn!(
2069                            "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2070                            hex::encode(resp_key),
2071                            data.len(),
2072                            crate::ant_protocol::MAX_CHUNK_SIZE,
2073                        );
2074                        p2p_node
2075                            .report_trust_event(
2076                                &source,
2077                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2078                            )
2079                            .await;
2080                        return FetchOutcome {
2081                            key,
2082                            result: FetchResult::IntegrityFailed,
2083                        };
2084                    }
2085
2086                    // Content-address integrity check.
2087                    let computed = crate::client::compute_address(&data);
2088                    if computed != resp_key {
2089                        warn!(
2090                            "Fetched record integrity check failed: expected {}, got {}",
2091                            hex::encode(resp_key),
2092                            hex::encode(computed)
2093                        );
2094                        p2p_node
2095                            .report_trust_event(
2096                                &source,
2097                                TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2098                            )
2099                            .await;
2100                        return FetchOutcome {
2101                            key,
2102                            result: FetchResult::IntegrityFailed,
2103                        };
2104                    }
2105
2106                    if let Err(e) = storage.put(&resp_key, &data).await {
2107                        warn!(
2108                            "Failed to store fetched record {}: {e}",
2109                            hex::encode(resp_key)
2110                        );
2111                        return FetchOutcome {
2112                            key,
2113                            result: FetchResult::SourceFailed,
2114                        };
2115                    }
2116
2117                    FetchOutcome {
2118                        key,
2119                        result: FetchResult::Stored,
2120                    }
2121                }
2122                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2123                    ..
2124                }) => {
2125                    // NotFound is a legitimate response (peer may have pruned
2126                    // the data). No trust penalty — just retry another source.
2127                    debug!("Fetch: peer {source} does not have {}", hex::encode(key));
2128                    FetchOutcome {
2129                        key,
2130                        result: FetchResult::SourceFailed,
2131                    }
2132                }
2133                ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2134                    reason,
2135                    ..
2136                }) => {
2137                    warn!(
2138                        "Fetch: peer {source} returned error for {}: {reason}",
2139                        hex::encode(key)
2140                    );
2141                    p2p_node
2142                        .report_trust_event(
2143                            &source,
2144                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2145                        )
2146                        .await;
2147                    FetchOutcome {
2148                        key,
2149                        result: FetchResult::SourceFailed,
2150                    }
2151                }
2152                _ => {
2153                    // Unexpected message type — treat as malformed.
2154                    p2p_node
2155                        .report_trust_event(
2156                            &source,
2157                            TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2158                        )
2159                        .await;
2160                    FetchOutcome {
2161                        key,
2162                        result: FetchResult::SourceFailed,
2163                    }
2164                }
2165            }
2166        }
2167        Err(e) => {
2168            debug!("Fetch request to {source} failed: {e}");
2169            // No ApplicationFailure here — P2PNode::send_request() already
2170            // reports ConnectionTimeout / ConnectionFailed to the TrustEngine.
2171            FetchOutcome {
2172                key,
2173                result: FetchResult::SourceFailed,
2174            }
2175        }
2176    }
2177}
2178
2179// ---------------------------------------------------------------------------
2180// Audit result handler
2181// ---------------------------------------------------------------------------
2182
2183/// Handle audit result: log findings and emit trust events.
2184async fn handle_audit_result(
2185    result: &AuditTickResult,
2186    p2p_node: &Arc<P2PNode>,
2187    sync_state: &Arc<RwLock<NeighborSyncState>>,
2188    config: &ReplicationConfig,
2189) {
2190    match result {
2191        AuditTickResult::Passed {
2192            challenged_peer,
2193            keys_checked,
2194        } => {
2195            debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2196            // Peer responded normally — clear any stale bootstrap claim so
2197            // a future legitimate bootstrap claim starts with a fresh timer.
2198            {
2199                let mut state = sync_state.write().await;
2200                state.bootstrap_claims.remove(challenged_peer);
2201            }
2202            p2p_node
2203                .report_trust_event(
2204                    challenged_peer,
2205                    TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2206                )
2207                .await;
2208        }
2209        AuditTickResult::Failed { evidence } => {
2210            if let FailureEvidence::AuditFailure {
2211                challenged_peer,
2212                confirmed_failed_keys,
2213                ..
2214            } = evidence
2215            {
2216                error!(
2217                    "Audit failure for {challenged_peer}: {} confirmed failed keys",
2218                    confirmed_failed_keys.len()
2219                );
2220                // Peer responded with digests (not a bootstrap claim) — clear
2221                // any stale bootstrap claim timestamp.
2222                {
2223                    let mut state = sync_state.write().await;
2224                    state.bootstrap_claims.remove(challenged_peer);
2225                }
2226                p2p_node
2227                    .report_trust_event(
2228                        challenged_peer,
2229                        TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2230                    )
2231                    .await;
2232            }
2233        }
2234        AuditTickResult::BootstrapClaim { peer } => {
2235            // Gap 6: BootstrapClaimAbuse grace period in audit path.
2236            // Separate state mutation from network I/O to avoid holding the
2237            // write lock across report_trust_event.
2238            let should_report = {
2239                let now = Instant::now();
2240                let mut state = sync_state.write().await;
2241                let first_seen = state.bootstrap_claims.entry(*peer).or_insert(now);
2242                let claim_age = now.duration_since(*first_seen);
2243                if claim_age > config.bootstrap_claim_grace_period {
2244                    warn!(
2245                        "Audit: peer {peer} claiming bootstrap past grace period \
2246                         ({:?} > {:?}), reporting abuse",
2247                        claim_age, config.bootstrap_claim_grace_period,
2248                    );
2249                    true
2250                } else {
2251                    debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2252                    false
2253                }
2254            };
2255            if should_report {
2256                p2p_node
2257                    .report_trust_event(
2258                        peer,
2259                        TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2260                    )
2261                    .await;
2262            }
2263        }
2264        AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2265    }
2266}
2267
2268// `admit_bootstrap_hints` was consolidated into `admit_and_queue_hints`.