1#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::{PaymentVerifier, VerificationContext};
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50 max_parallel_fetch, storage_admission_width, ReplicationConfig,
51 MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55 FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56 VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61 AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
62 NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68const RR_PREFIX: &str = "/rr/";
74
75fn fresh_offer_payment_context() -> VerificationContext {
76 VerificationContext::ClientPut
77}
78
79fn paid_notify_payment_context() -> VerificationContext {
80 VerificationContext::PaidListAdmission
81}
82
83type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
85
86struct VerificationCycleContext<'a> {
88 p2p_node: &'a Arc<P2PNode>,
89 paid_list: &'a Arc<PaidList>,
90 storage: &'a Arc<LmdbStorage>,
91 queues: &'a Arc<RwLock<ReplicationQueues>>,
92 config: &'a ReplicationConfig,
93 bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
94 is_bootstrapping: &'a Arc<RwLock<bool>>,
95 bootstrap_complete_notify: &'a Arc<Notify>,
96}
97
98const FETCH_WORKER_POLL_MS: u64 = 100;
100
101const VERIFICATION_WORKER_POLL_MS: u64 = 250;
103
104const VERIFICATION_CYCLE_SLOW_LOG_MS: u128 = 500;
106
107const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
109
110const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
116
117pub struct ReplicationEngine {
123 config: Arc<ReplicationConfig>,
125 p2p_node: Arc<P2PNode>,
127 storage: Arc<LmdbStorage>,
129 paid_list: Arc<PaidList>,
131 payment_verifier: Arc<PaymentVerifier>,
133 queues: Arc<RwLock<ReplicationQueues>>,
135 sync_state: Arc<RwLock<NeighborSyncState>>,
137 sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
143 sync_cycle_epoch: Arc<RwLock<u64>>,
145 repair_proofs: Arc<RwLock<RepairProofs>>,
147 bootstrap_state: Arc<RwLock<BootstrapState>>,
149 is_bootstrapping: Arc<RwLock<bool>>,
151 sync_trigger: Arc<Notify>,
153 bootstrap_complete_notify: Arc<Notify>,
155 send_semaphore: Arc<Semaphore>,
158 fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
163 shutdown: CancellationToken,
165 task_handles: Vec<JoinHandle<()>>,
167}
168
169impl ReplicationEngine {
170 pub async fn new(
177 config: ReplicationConfig,
178 p2p_node: Arc<P2PNode>,
179 storage: Arc<LmdbStorage>,
180 payment_verifier: Arc<PaymentVerifier>,
181 root_dir: &Path,
182 fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
183 shutdown: CancellationToken,
184 ) -> Result<Self> {
185 config.validate().map_err(Error::Config)?;
186
187 let paid_list = Arc::new(
188 PaidList::new(root_dir)
189 .await
190 .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
191 );
192
193 let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
194 let config = Arc::new(config);
195
196 Ok(Self {
197 config: Arc::clone(&config),
198 p2p_node,
199 storage,
200 paid_list,
201 payment_verifier,
202 queues: Arc::new(RwLock::new(ReplicationQueues::new())),
203 sync_state: Arc::new(RwLock::new(initial_neighbors)),
204 sync_history: Arc::new(RwLock::new(HashMap::new())),
205 sync_cycle_epoch: Arc::new(RwLock::new(0)),
206 repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
207 bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
208 is_bootstrapping: Arc::new(RwLock::new(true)),
209 sync_trigger: Arc::new(Notify::new()),
210 bootstrap_complete_notify: Arc::new(Notify::new()),
211 send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
212 fresh_write_rx: Some(fresh_write_rx),
213 shutdown,
214 task_handles: Vec::new(),
215 })
216 }
217
218 #[must_use]
220 pub fn paid_list(&self) -> &Arc<PaidList> {
221 &self.paid_list
222 }
223
224 pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
230 if !self.task_handles.is_empty() {
231 error!("ReplicationEngine::start() called while already running — ignoring");
232 return;
233 }
234 info!("Starting replication engine");
235
236 self.start_message_handler();
237 self.start_neighbor_sync_loop();
238 self.start_self_lookup_loop();
239 self.start_audit_loop();
240 self.start_fetch_worker();
241 self.start_verification_worker();
242 self.start_bootstrap_sync(dht_events);
243 self.start_fresh_write_drainer();
244
245 info!(
246 "Replication engine started with {} background tasks",
247 self.task_handles.len()
248 );
249 }
250
251 pub async fn is_bootstrapping(&self) -> bool {
256 *self.is_bootstrapping.read().await
257 }
258
259 pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
268 let notified = self.bootstrap_complete_notify.notified();
271 tokio::pin!(notified);
272 notified.as_mut().enable();
273
274 if !*self.is_bootstrapping.read().await {
275 return true;
276 }
277
278 tokio::time::timeout(timeout, notified).await.is_ok()
279 }
280
281 pub async fn shutdown(&mut self) {
287 self.shutdown.cancel();
288 for (i, mut handle) in self.task_handles.drain(..).enumerate() {
289 match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
290 Ok(Ok(())) => {}
291 Ok(Err(e)) if e.is_cancelled() => {}
292 Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
293 Err(_) => {
294 warn!("Replication task {i} did not stop within 10s, aborting");
295 handle.abort();
296 }
297 }
298 }
299 }
300
301 pub fn trigger_neighbor_sync(&self) {
307 self.sync_trigger.notify_one();
308 }
309
310 pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
312 fresh::replicate_fresh(
313 key,
314 data,
315 proof_of_payment,
316 &self.p2p_node,
317 &self.paid_list,
318 &self.config,
319 &self.send_semaphore,
320 )
321 .await;
322 }
323
324 fn start_fresh_write_drainer(&mut self) {
331 let Some(mut rx) = self.fresh_write_rx.take() else {
332 return;
333 };
334 let p2p = Arc::clone(&self.p2p_node);
335 let paid_list = Arc::clone(&self.paid_list);
336 let config = Arc::clone(&self.config);
337 let send_semaphore = Arc::clone(&self.send_semaphore);
338 let shutdown = self.shutdown.clone();
339
340 let handle = tokio::spawn(async move {
341 loop {
342 tokio::select! {
343 () = shutdown.cancelled() => break,
344 event = rx.recv() => {
345 let Some(event) = event else { break };
346 fresh::replicate_fresh(
347 &event.key,
348 &event.data,
349 &event.payment_proof,
350 &p2p,
351 &paid_list,
352 &config,
353 &send_semaphore,
354 )
355 .await;
356 }
357 }
358 }
359 debug!("Fresh-write drainer shut down");
360 });
361 self.task_handles.push(handle);
362 }
363
364 #[allow(clippy::too_many_lines)]
365 fn start_message_handler(&mut self) {
366 let mut p2p_events = self.p2p_node.subscribe_events();
367 let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
368 let p2p = Arc::clone(&self.p2p_node);
369 let storage = Arc::clone(&self.storage);
370 let paid_list = Arc::clone(&self.paid_list);
371 let payment_verifier = Arc::clone(&self.payment_verifier);
372 let queues = Arc::clone(&self.queues);
373 let config = Arc::clone(&self.config);
374 let shutdown = self.shutdown.clone();
375 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
376 let bootstrap_state = Arc::clone(&self.bootstrap_state);
377 let sync_history = Arc::clone(&self.sync_history);
378 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
379 let repair_proofs = Arc::clone(&self.repair_proofs);
380 let sync_trigger = Arc::clone(&self.sync_trigger);
381 let sync_state = Arc::clone(&self.sync_state);
382
383 let handle = tokio::spawn(async move {
384 loop {
385 tokio::select! {
386 () = shutdown.cancelled() => break,
387 event = p2p_events.recv() => {
388 let Ok(event) = event else { continue };
389 if let P2PEvent::Message {
390 topic,
391 source: Some(source),
392 data,
393 ..
394 } = event {
395 let rr_info = if topic == REPLICATION_PROTOCOL_ID {
399 Some((data.clone(), None))
400 } else if topic.starts_with(RR_PREFIX)
401 && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
402 {
403 P2PNode::parse_request_envelope(&data)
404 .filter(|(_, is_resp, _)| !is_resp)
405 .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
406 } else {
407 None
408 };
409 if let Some((payload, rr_message_id)) = rr_info {
410 match handle_replication_message(
411 &source,
412 &payload,
413 &p2p,
414 &storage,
415 &paid_list,
416 &payment_verifier,
417 &queues,
418 &config,
419 &is_bootstrapping,
420 &bootstrap_state,
421 &sync_history,
422 &sync_cycle_epoch,
423 &repair_proofs,
424 rr_message_id.as_deref(),
425 ).await {
426 Ok(()) => {}
427 Err(e) => {
428 debug!(
429 "Replication message from {source} error: {e}"
430 );
431 }
432 }
433 }
434 }
435 }
436 dht_event = dht_events.recv() => {
444 let Ok(dht_event) = dht_event else { continue };
445 match dht_event {
446 DhtNetworkEvent::KClosestPeersChanged { old, new } => {
447 let old_peers = old
448 .iter()
449 .take(config.neighbor_sync_scope)
450 .copied()
451 .collect::<HashSet<_>>();
452 let new_scoped = new
453 .iter()
454 .take(config.neighbor_sync_scope)
455 .copied()
456 .collect::<Vec<_>>();
457 let new_peers =
458 new_scoped.iter().copied().collect::<HashSet<_>>();
459 let entrants = new_scoped
460 .iter()
461 .copied()
462 .filter(|peer| !old_peers.contains(peer))
463 .collect::<Vec<_>>();
464 let entrant_count = entrants.len();
465 let (priority_insertions, sync_removals) = {
466 let mut state = sync_state.write().await;
467 let sync_removals = state.retain_sync_peers(&new_peers);
468 let priority_insertions = state.queue_priority_peers(entrants);
469 (priority_insertions, sync_removals)
470 };
471 if priority_insertions > 0 {
472 debug!(
473 "K-closest peers changed, queued {priority_insertions}/{entrant_count} new close peers for priority neighbor sync and pruned {sync_removals} departed pending sync entries"
474 );
475 } else {
476 debug!(
477 "K-closest peers changed, no additional close peers queued, pruned {sync_removals} departed pending sync entries, triggering early neighbor sync"
478 );
479 }
480 sync_trigger.notify_one();
481 }
482 DhtNetworkEvent::PeerRemoved { peer_id } => {
483 sync_state.write().await.remove_peer(&peer_id);
484 repair_proofs.write().await.remove_peer(&peer_id);
485 }
486 _ => {}
487 }
488 }
489 }
490 }
491 debug!("Replication message handler shut down");
492 });
493 self.task_handles.push(handle);
494 }
495
496 fn start_neighbor_sync_loop(&mut self) {
497 let p2p = Arc::clone(&self.p2p_node);
498 let storage = Arc::clone(&self.storage);
499 let paid_list = Arc::clone(&self.paid_list);
500 let queues = Arc::clone(&self.queues);
501 let config = Arc::clone(&self.config);
502 let shutdown = self.shutdown.clone();
503 let sync_state = Arc::clone(&self.sync_state);
504 let sync_history = Arc::clone(&self.sync_history);
505 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
506 let repair_proofs = Arc::clone(&self.repair_proofs);
507 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
508 let bootstrap_state = Arc::clone(&self.bootstrap_state);
509 let sync_trigger = Arc::clone(&self.sync_trigger);
510
511 let handle = tokio::spawn(async move {
512 loop {
513 let interval = config.random_neighbor_sync_interval();
514 tokio::select! {
515 () = shutdown.cancelled() => break,
516 () = tokio::time::sleep(interval) => {}
517 () = sync_trigger.notified() => {
518 debug!("Neighbor sync triggered by topology change");
519 }
520 }
521 tokio::select! {
525 () = shutdown.cancelled() => break,
526 () = run_neighbor_sync_round(
527 &p2p,
528 &storage,
529 &paid_list,
530 &queues,
531 &config,
532 &sync_state,
533 &sync_history,
534 &sync_cycle_epoch,
535 &repair_proofs,
536 &is_bootstrapping,
537 &bootstrap_state,
538 ) => {}
539 }
540 }
541 debug!("Neighbor sync loop shut down");
542 });
543 self.task_handles.push(handle);
544 }
545
546 fn start_self_lookup_loop(&mut self) {
547 let p2p = Arc::clone(&self.p2p_node);
548 let config = Arc::clone(&self.config);
549 let shutdown = self.shutdown.clone();
550
551 let handle = tokio::spawn(async move {
552 loop {
553 let interval = config.random_self_lookup_interval();
554 tokio::select! {
555 () = shutdown.cancelled() => break,
556 () = tokio::time::sleep(interval) => {
557 if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
558 debug!("Self-lookup failed: {e}");
559 }
560 }
561 }
562 }
563 debug!("Self-lookup loop shut down");
564 });
565 self.task_handles.push(handle);
566 }
567
568 fn start_audit_loop(&mut self) {
569 let p2p = Arc::clone(&self.p2p_node);
570 let storage = Arc::clone(&self.storage);
571 let config = Arc::clone(&self.config);
572 let shutdown = self.shutdown.clone();
573 let sync_history = Arc::clone(&self.sync_history);
574 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
575 let repair_proofs = Arc::clone(&self.repair_proofs);
576 let bootstrap_state = Arc::clone(&self.bootstrap_state);
577 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
578 let sync_state = Arc::clone(&self.sync_state);
579
580 let handle = tokio::spawn(async move {
581 loop {
583 tokio::select! {
584 () = shutdown.cancelled() => return,
585 () = tokio::time::sleep(
586 std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
587 ) => {
588 if bootstrap_state.read().await.is_drained() {
589 break;
590 }
591 }
592 }
593 }
594
595 {
597 let bootstrapping = *is_bootstrapping.read().await;
598 let result = {
599 let history = sync_history.read().await;
600 let current_sync_epoch = *sync_cycle_epoch.read().await;
601 audit::audit_tick_with_repair_proofs(
602 &p2p,
603 &storage,
604 &config,
605 &history,
606 &repair_proofs,
607 current_sync_epoch,
608 bootstrapping,
609 )
610 .await
611 };
612 handle_audit_result(&result, &p2p, &sync_state, &config).await;
613 }
614
615 loop {
617 let interval = config.random_audit_tick_interval();
618 tokio::select! {
619 () = shutdown.cancelled() => break,
620 () = tokio::time::sleep(interval) => {
621 let bootstrapping = *is_bootstrapping.read().await;
622 let result = {
623 let history = sync_history.read().await;
624 let current_sync_epoch = *sync_cycle_epoch.read().await;
625 audit::audit_tick_with_repair_proofs(
626 &p2p,
627 &storage,
628 &config,
629 &history,
630 &repair_proofs,
631 current_sync_epoch,
632 bootstrapping,
633 )
634 .await
635 };
636 handle_audit_result(&result, &p2p, &sync_state, &config).await;
637 }
638 }
639 }
640 debug!("Audit loop shut down");
641 });
642 self.task_handles.push(handle);
643 }
644
645 #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
646 fn start_fetch_worker(&mut self) {
647 let p2p = Arc::clone(&self.p2p_node);
648 let storage = Arc::clone(&self.storage);
649 let queues = Arc::clone(&self.queues);
650 let config = Arc::clone(&self.config);
651 let shutdown = self.shutdown.clone();
652 let bootstrap_state = Arc::clone(&self.bootstrap_state);
653 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
654 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
655 let concurrency = max_parallel_fetch();
656
657 info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
658
659 let handle = tokio::spawn(async move {
660 let mut in_flight = FuturesUnordered::<FetchFuture>::new();
663
664 loop {
665 {
667 let mut q = queues.write().await;
668 while in_flight.len() < concurrency {
669 let Some(candidate) = q.dequeue_fetch() else {
670 break;
671 };
672 let Some(&source) = candidate.sources.first() else {
673 warn!(
674 "Fetch candidate {} has no sources — dropping",
675 hex::encode(candidate.key)
676 );
677 continue;
678 };
679 q.start_fetch(candidate.key, source, candidate.sources.clone());
680
681 let p2p = Arc::clone(&p2p);
682 let storage = Arc::clone(&storage);
683 let config = Arc::clone(&config);
684 let token = shutdown.clone();
685 let fetch_key = candidate.key;
686 in_flight.push(Box::pin(async move {
687 let handle = tokio::spawn(async move {
688 tokio::select! {
690 () = token.cancelled() => FetchOutcome {
691 key: fetch_key,
692 result: FetchResult::SourceFailed,
693 },
694 outcome = execute_single_fetch(
695 p2p, storage, config, fetch_key, source,
696 ) => outcome,
697 }
698 });
699 match handle.await {
700 Ok(outcome) => (outcome.key, Some(outcome)),
701 Err(e) => {
702 error!(
703 "Fetch task for {} panicked: {e}",
704 hex::encode(fetch_key)
705 );
706 (fetch_key, None)
707 }
708 }
709 }));
710 }
711 } if in_flight.is_empty() {
714 tokio::select! {
716 () = shutdown.cancelled() => break,
717 () = tokio::time::sleep(
718 std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
719 ) => continue,
720 }
721 }
722
723 tokio::select! {
725 () = shutdown.cancelled() => break,
726 Some((key, maybe_outcome)) = in_flight.next() => {
727 let mut q = queues.write().await;
728 let terminal = if let Some(outcome) = maybe_outcome {
729 match outcome.result {
730 FetchResult::Stored => {
731 q.complete_fetch(&key);
732 true
733 }
734 FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
735 if let Some(next_peer) = q.retry_fetch(&key) {
736 let p2p = Arc::clone(&p2p);
738 let storage = Arc::clone(&storage);
739 let config = Arc::clone(&config);
740 let token = shutdown.clone();
741 let fetch_key = key;
742 in_flight.push(Box::pin(async move {
743 let handle = tokio::spawn(async move {
744 tokio::select! {
745 () = token.cancelled() => FetchOutcome {
746 key: fetch_key,
747 result: FetchResult::SourceFailed,
748 },
749 outcome = execute_single_fetch(
750 p2p, storage, config, fetch_key, next_peer,
751 ) => outcome,
752 }
753 });
754 match handle.await {
755 Ok(outcome) => (outcome.key, Some(outcome)),
756 Err(e) => {
757 error!(
758 "Fetch task for {} panicked: {e}",
759 hex::encode(fetch_key)
760 );
761 (fetch_key, None)
762 }
763 }
764 }));
765 false
766 } else {
767 q.complete_fetch(&key);
768 true
769 }
770 }
771 }
772 } else {
773 q.complete_fetch(&key);
775 true
776 };
777
778 if terminal {
780 drop(q); if !bootstrap_state.read().await.is_drained() {
782 bootstrap_state.write().await.remove_key(&key);
783 let q = queues.read().await;
784 if bootstrap::check_bootstrap_drained(
785 &bootstrap_state,
786 &q,
787 )
788 .await
789 {
790 complete_bootstrap(
791 &is_bootstrapping,
792 &bootstrap_complete_notify,
793 ).await;
794 }
795 }
796 }
797 }
798 }
799 }
800
801 while in_flight.next().await.is_some() {}
805 debug!("Fetch worker shut down");
806 });
807 self.task_handles.push(handle);
808 }
809
810 fn start_verification_worker(&mut self) {
811 let p2p = Arc::clone(&self.p2p_node);
812 let storage = Arc::clone(&self.storage);
813 let queues = Arc::clone(&self.queues);
814 let paid_list = Arc::clone(&self.paid_list);
815 let config = Arc::clone(&self.config);
816 let shutdown = self.shutdown.clone();
817 let bootstrap_state = Arc::clone(&self.bootstrap_state);
818 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
819 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
820
821 let handle = tokio::spawn(async move {
822 loop {
823 tokio::select! {
824 () = shutdown.cancelled() => break,
825 () = tokio::time::sleep(
826 std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
827 ) => {
828 let ctx = VerificationCycleContext {
829 p2p_node: &p2p,
830 paid_list: &paid_list,
831 storage: &storage,
832 queues: &queues,
833 config: &config,
834 bootstrap_state: &bootstrap_state,
835 is_bootstrapping: &is_bootstrapping,
836 bootstrap_complete_notify: &bootstrap_complete_notify,
837 };
838 run_verification_cycle(ctx).await;
839 }
840 }
841 }
842 debug!("Verification worker shut down");
843 });
844 self.task_handles.push(handle);
845 }
846
847 #[allow(clippy::too_many_lines)]
859 fn start_bootstrap_sync(
860 &mut self,
861 dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
862 ) {
863 let p2p = Arc::clone(&self.p2p_node);
864 let storage = Arc::clone(&self.storage);
865 let paid_list = Arc::clone(&self.paid_list);
866 let queues = Arc::clone(&self.queues);
867 let config = Arc::clone(&self.config);
868 let shutdown = self.shutdown.clone();
869 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
870 let bootstrap_state = Arc::clone(&self.bootstrap_state);
871 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
872 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
873 let repair_proofs = Arc::clone(&self.repair_proofs);
874
875 let handle = tokio::spawn(async move {
876 let gate = bootstrap::wait_for_bootstrap_complete(
880 dht_events,
881 config.bootstrap_complete_timeout_secs,
882 &shutdown,
883 )
884 .await;
885
886 if gate == bootstrap::BootstrapGateResult::Shutdown {
887 return;
888 }
889
890 let self_id = *p2p.peer_id();
891 let neighbors =
892 neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
893 .await;
894
895 if neighbors.is_empty() {
896 info!("Bootstrap sync: no close neighbors found, marking drained");
897 bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
898 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
899 return;
900 }
901
902 let neighbor_count = neighbors.len();
903 info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
904
905 for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
907 if shutdown.is_cancelled() {
908 break;
909 }
910
911 let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
912 batch,
913 &storage,
914 &paid_list,
915 &p2p,
916 config.close_group_size,
917 config.paid_list_close_group_size,
918 )
919 .await;
920
921 for peer in batch {
922 if shutdown.is_cancelled() {
923 break;
924 }
925
926 let bootstrapping = *is_bootstrapping.read().await;
928
929 bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
930
931 let hints = hints_by_peer.remove(peer).unwrap_or_default();
932 let outcome = neighbor_sync::sync_with_peer_with_hints(
933 peer,
934 &p2p,
935 &config,
936 bootstrapping,
937 hints,
938 )
939 .await;
940
941 bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
942
943 if let Some(outcome) = outcome {
944 if !outcome.response.bootstrapping {
945 record_sent_replica_hints(
946 peer,
947 &outcome.sent_replica_hints,
948 &repair_proofs,
949 &sync_cycle_epoch,
950 )
951 .await;
952 let outcome = admit_and_queue_hints(
954 &self_id,
955 peer,
956 &outcome.response.replica_hints,
957 &outcome.response.paid_hints,
958 &p2p,
959 &config,
960 &storage,
961 &paid_list,
962 &queues,
963 )
964 .await;
965
966 if !outcome.discovered.is_empty() {
968 bootstrap::track_discovered_keys(
969 &bootstrap_state,
970 &outcome.discovered,
971 )
972 .await;
973 }
974
975 if outcome.capacity_rejected_count > 0 {
980 bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
981 } else {
982 bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
983 }
984 }
985 }
986 }
987 }
988
989 {
991 let q = queues.read().await;
992 if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
993 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
994 }
995 }
996
997 info!("Bootstrap sync completed");
998 });
999 self.task_handles.push(handle);
1000 }
1001}
1002
1003#[allow(clippy::too_many_arguments)]
1013async fn handle_replication_message(
1014 source: &PeerId,
1015 data: &[u8],
1016 p2p_node: &Arc<P2PNode>,
1017 storage: &Arc<LmdbStorage>,
1018 paid_list: &Arc<PaidList>,
1019 payment_verifier: &Arc<PaymentVerifier>,
1020 queues: &Arc<RwLock<ReplicationQueues>>,
1021 config: &ReplicationConfig,
1022 is_bootstrapping: &Arc<RwLock<bool>>,
1023 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1024 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1025 sync_cycle_epoch: &Arc<RwLock<u64>>,
1026 repair_proofs: &Arc<RwLock<RepairProofs>>,
1027 rr_message_id: Option<&str>,
1028) -> Result<()> {
1029 let msg = ReplicationMessage::decode(data)
1030 .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
1031
1032 match msg.body {
1033 ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
1034 handle_fresh_offer(
1035 source,
1036 offer,
1037 storage,
1038 paid_list,
1039 payment_verifier,
1040 p2p_node,
1041 config,
1042 msg.request_id,
1043 rr_message_id,
1044 )
1045 .await
1046 }
1047 ReplicationMessageBody::PaidNotify(ref notify) => {
1048 handle_paid_notify(
1049 source,
1050 notify,
1051 paid_list,
1052 payment_verifier,
1053 p2p_node,
1054 config,
1055 )
1056 .await
1057 }
1058 ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1059 let bootstrapping = *is_bootstrapping.read().await;
1060 handle_neighbor_sync_request(
1061 source,
1062 request,
1063 p2p_node,
1064 storage,
1065 paid_list,
1066 queues,
1067 config,
1068 bootstrapping,
1069 bootstrap_state,
1070 sync_history,
1071 sync_cycle_epoch,
1072 repair_proofs,
1073 msg.request_id,
1074 rr_message_id,
1075 )
1076 .await
1077 }
1078 ReplicationMessageBody::VerificationRequest(ref request) => {
1079 handle_verification_request(
1080 source,
1081 request,
1082 storage,
1083 paid_list,
1084 p2p_node,
1085 msg.request_id,
1086 rr_message_id,
1087 )
1088 .await
1089 }
1090 ReplicationMessageBody::FetchRequest(ref request) => {
1091 handle_fetch_request(
1092 source,
1093 request,
1094 storage,
1095 p2p_node,
1096 msg.request_id,
1097 rr_message_id,
1098 )
1099 .await
1100 }
1101 ReplicationMessageBody::AuditChallenge(ref challenge) => {
1102 let bootstrapping = *is_bootstrapping.read().await;
1103 handle_audit_challenge_msg(
1104 source,
1105 challenge,
1106 storage,
1107 p2p_node,
1108 bootstrapping,
1109 msg.request_id,
1110 rr_message_id,
1111 )
1112 .await
1113 }
1114 ReplicationMessageBody::FreshReplicationResponse(_)
1116 | ReplicationMessageBody::NeighborSyncResponse(_)
1117 | ReplicationMessageBody::VerificationResponse(_)
1118 | ReplicationMessageBody::FetchResponse(_)
1119 | ReplicationMessageBody::AuditResponse(_) => Ok(()),
1120 }
1121}
1122
1123#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1128async fn handle_fresh_offer(
1129 source: &PeerId,
1130 offer: &protocol::FreshReplicationOffer,
1131 storage: &Arc<LmdbStorage>,
1132 paid_list: &Arc<PaidList>,
1133 payment_verifier: &Arc<PaymentVerifier>,
1134 p2p_node: &Arc<P2PNode>,
1135 config: &ReplicationConfig,
1136 request_id: u64,
1137 rr_message_id: Option<&str>,
1138) -> Result<()> {
1139 let self_id = *p2p_node.peer_id();
1140
1141 if offer.proof_of_payment.is_empty() {
1143 send_replication_response(
1144 source,
1145 p2p_node,
1146 request_id,
1147 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1148 key: offer.key,
1149 reason: "Missing proof of payment".to_string(),
1150 }),
1151 rr_message_id,
1152 )
1153 .await;
1154 return Ok(());
1155 }
1156
1157 if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1161 warn!(
1162 "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1163 hex::encode(offer.key),
1164 offer.data.len(),
1165 crate::ant_protocol::MAX_CHUNK_SIZE,
1166 );
1167 p2p_node
1168 .report_trust_event(
1169 source,
1170 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1171 )
1172 .await;
1173 send_replication_response(
1174 source,
1175 p2p_node,
1176 request_id,
1177 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1178 key: offer.key,
1179 reason: format!(
1180 "Data size {} exceeds maximum chunk size {}",
1181 offer.data.len(),
1182 crate::ant_protocol::MAX_CHUNK_SIZE,
1183 ),
1184 }),
1185 rr_message_id,
1186 )
1187 .await;
1188 return Ok(());
1189 }
1190
1191 let computed_key = crate::client::compute_address(&offer.data);
1194 if computed_key != offer.key {
1195 warn!(
1196 "Rejecting fresh offer for key {}: content address mismatch, computed {}",
1197 hex::encode(offer.key),
1198 hex::encode(computed_key),
1199 );
1200 p2p_node
1201 .report_trust_event(
1202 source,
1203 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1204 )
1205 .await;
1206 send_replication_response(
1207 source,
1208 p2p_node,
1209 request_id,
1210 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1211 key: offer.key,
1212 reason: format!(
1213 "Content address mismatch: expected {}, computed {}",
1214 hex::encode(offer.key),
1215 hex::encode(computed_key),
1216 ),
1217 }),
1218 rr_message_id,
1219 )
1220 .await;
1221 return Ok(());
1222 }
1223
1224 if !admission::is_responsible(
1227 &self_id,
1228 &offer.key,
1229 p2p_node,
1230 storage_admission_width(config.close_group_size),
1231 )
1232 .await
1233 {
1234 send_replication_response(
1235 source,
1236 p2p_node,
1237 request_id,
1238 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1239 key: offer.key,
1240 reason: "Not in storage-admission range for this key".to_string(),
1241 }),
1242 rr_message_id,
1243 )
1244 .await;
1245 return Ok(());
1246 }
1247
1248 if let Err(e) = storage.check_capacity() {
1255 info!(
1256 target: "ant_node::storage::disk_precheck",
1257 key = %hex::encode(offer.key),
1258 "Rejecting fresh replication offer before payment verification: {e}"
1259 );
1260 send_replication_response(
1261 source,
1262 p2p_node,
1263 request_id,
1264 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1265 key: offer.key,
1266 reason: e.to_string(),
1267 }),
1268 rr_message_id,
1269 )
1270 .await;
1271 return Ok(());
1272 }
1273
1274 match payment_verifier
1282 .verify_payment(
1283 &offer.key,
1284 Some(&offer.proof_of_payment),
1285 fresh_offer_payment_context(),
1286 )
1287 .await
1288 {
1289 Ok(status) if status.can_store() => {
1290 debug!(
1291 "PoP validated for fresh offer key {}",
1292 hex::encode(offer.key)
1293 );
1294 }
1295 Ok(_) => {
1296 send_replication_response(
1297 source,
1298 p2p_node,
1299 request_id,
1300 ReplicationMessageBody::FreshReplicationResponse(
1301 FreshReplicationResponse::Rejected {
1302 key: offer.key,
1303 reason: "Payment verification failed: payment required".to_string(),
1304 },
1305 ),
1306 rr_message_id,
1307 )
1308 .await;
1309 return Ok(());
1310 }
1311 Err(e) => {
1312 warn!(
1313 "PoP verification error for key {}: {e}",
1314 hex::encode(offer.key)
1315 );
1316 send_replication_response(
1317 source,
1318 p2p_node,
1319 request_id,
1320 ReplicationMessageBody::FreshReplicationResponse(
1321 FreshReplicationResponse::Rejected {
1322 key: offer.key,
1323 reason: format!("Payment verification error: {e}"),
1324 },
1325 ),
1326 rr_message_id,
1327 )
1328 .await;
1329 return Ok(());
1330 }
1331 }
1332
1333 if let Err(e) = paid_list.insert(&offer.key).await {
1335 warn!("Failed to add key to PaidForList: {e}");
1336 }
1337
1338 match storage.put(&offer.key, &offer.data).await {
1340 Ok(_) => {
1341 send_replication_response(
1342 source,
1343 p2p_node,
1344 request_id,
1345 ReplicationMessageBody::FreshReplicationResponse(
1346 FreshReplicationResponse::Accepted { key: offer.key },
1347 ),
1348 rr_message_id,
1349 )
1350 .await;
1351 }
1352 Err(e) => {
1353 send_replication_response(
1354 source,
1355 p2p_node,
1356 request_id,
1357 ReplicationMessageBody::FreshReplicationResponse(
1358 FreshReplicationResponse::Rejected {
1359 key: offer.key,
1360 reason: e.to_string(),
1361 },
1362 ),
1363 rr_message_id,
1364 )
1365 .await;
1366 }
1367 }
1368
1369 Ok(())
1370}
1371
1372async fn handle_paid_notify(
1373 _source: &PeerId,
1374 notify: &protocol::PaidNotify,
1375 paid_list: &Arc<PaidList>,
1376 payment_verifier: &Arc<PaymentVerifier>,
1377 p2p_node: &Arc<P2PNode>,
1378 config: &ReplicationConfig,
1379) -> Result<()> {
1380 let self_id = *p2p_node.peer_id();
1381
1382 if notify.proof_of_payment.is_empty() {
1384 return Ok(());
1385 }
1386
1387 if !admission::is_in_paid_close_group(
1389 &self_id,
1390 ¬ify.key,
1391 p2p_node,
1392 config.paid_list_close_group_size,
1393 )
1394 .await
1395 {
1396 return Ok(());
1397 }
1398
1399 match payment_verifier
1404 .verify_payment(
1405 ¬ify.key,
1406 Some(¬ify.proof_of_payment),
1407 paid_notify_payment_context(),
1408 )
1409 .await
1410 {
1411 Ok(status) if status.can_store() => {
1412 debug!(
1413 "PoP validated for paid notify key {}",
1414 hex::encode(notify.key)
1415 );
1416 }
1417 Ok(_) => {
1418 warn!(
1419 "Paid notify rejected: payment required for key {}",
1420 hex::encode(notify.key)
1421 );
1422 return Ok(());
1423 }
1424 Err(e) => {
1425 warn!(
1426 "PoP verification error for paid notify key {}: {e}",
1427 hex::encode(notify.key)
1428 );
1429 return Ok(());
1430 }
1431 }
1432
1433 if let Err(e) = paid_list.insert(¬ify.key).await {
1434 warn!("Failed to add paid notify key to PaidForList: {e}");
1435 }
1436
1437 Ok(())
1438}
1439
1440#[allow(clippy::too_many_arguments)]
1441async fn handle_neighbor_sync_request(
1442 source: &PeerId,
1443 request: &protocol::NeighborSyncRequest,
1444 p2p_node: &Arc<P2PNode>,
1445 storage: &Arc<LmdbStorage>,
1446 paid_list: &Arc<PaidList>,
1447 queues: &Arc<RwLock<ReplicationQueues>>,
1448 config: &ReplicationConfig,
1449 is_bootstrapping: bool,
1450 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1451 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1452 sync_cycle_epoch: &Arc<RwLock<u64>>,
1453 repair_proofs: &Arc<RwLock<RepairProofs>>,
1454 request_id: u64,
1455 rr_message_id: Option<&str>,
1456) -> Result<()> {
1457 let self_id = *p2p_node.peer_id();
1458
1459 let (response, sent_replica_hints, sender_in_rt) =
1467 neighbor_sync::handle_sync_request_with_proofs(
1468 source,
1469 request,
1470 p2p_node,
1471 storage,
1472 paid_list,
1473 config,
1474 is_bootstrapping,
1475 )
1476 .await;
1477
1478 let response_sent = send_replication_response_checked(
1480 source,
1481 p2p_node,
1482 request_id,
1483 ReplicationMessageBody::NeighborSyncResponse(response),
1484 rr_message_id,
1485 )
1486 .await;
1487
1488 if !sender_in_rt {
1490 return Ok(());
1491 }
1492
1493 {
1496 let mut history = sync_history.write().await;
1497 let record = history.entry(*source).or_insert(PeerSyncRecord {
1498 last_sync: None,
1499 cycles_since_sync: 0,
1500 });
1501 record.last_sync = Some(Instant::now());
1502 record.cycles_since_sync = 0;
1503 }
1504
1505 if response_sent && !request.bootstrapping {
1506 record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
1507 .await;
1508 }
1509
1510 let outcome = admit_and_queue_hints(
1512 &self_id,
1513 source,
1514 &request.replica_hints,
1515 &request.paid_hints,
1516 p2p_node,
1517 config,
1518 storage,
1519 paid_list,
1520 queues,
1521 )
1522 .await;
1523
1524 if is_bootstrapping {
1529 if !outcome.discovered.is_empty() {
1530 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1531 }
1532 if outcome.capacity_rejected_count > 0 {
1533 bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
1534 } else {
1535 bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
1536 }
1537 }
1538
1539 Ok(())
1540}
1541
1542async fn handle_verification_request(
1543 source: &PeerId,
1544 request: &protocol::VerificationRequest,
1545 storage: &Arc<LmdbStorage>,
1546 paid_list: &Arc<PaidList>,
1547 p2p_node: &Arc<P2PNode>,
1548 request_id: u64,
1549 rr_message_id: Option<&str>,
1550) -> Result<()> {
1551 #[allow(clippy::cast_possible_truncation)]
1557 let keys_len = request.keys.len() as u32;
1558 let paid_check_set: HashSet<u32> = request
1559 .paid_list_check_indices
1560 .iter()
1561 .copied()
1562 .filter(|&idx| {
1563 if idx >= keys_len {
1564 warn!(
1565 "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1566 request.keys.len(),
1567 );
1568 false
1569 } else {
1570 true
1571 }
1572 })
1573 .collect();
1574
1575 let mut results = Vec::with_capacity(request.keys.len());
1576 for (i, key) in request.keys.iter().enumerate() {
1577 let present = storage.exists(key).unwrap_or(false);
1578 let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1579 Some(paid_list.contains(key).unwrap_or(false))
1580 } else {
1581 None
1582 };
1583 results.push(protocol::KeyVerificationResult {
1584 key: *key,
1585 present,
1586 paid,
1587 });
1588 }
1589
1590 send_replication_response(
1591 source,
1592 p2p_node,
1593 request_id,
1594 ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1595 rr_message_id,
1596 )
1597 .await;
1598
1599 Ok(())
1600}
1601
1602async fn handle_fetch_request(
1603 source: &PeerId,
1604 request: &protocol::FetchRequest,
1605 storage: &Arc<LmdbStorage>,
1606 p2p_node: &Arc<P2PNode>,
1607 request_id: u64,
1608 rr_message_id: Option<&str>,
1609) -> Result<()> {
1610 let response = match storage.get(&request.key).await {
1611 Ok(Some(data)) => protocol::FetchResponse::Success {
1612 key: request.key,
1613 data,
1614 },
1615 Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1616 Err(e) => protocol::FetchResponse::Error {
1617 key: request.key,
1618 reason: format!("{e}"),
1619 },
1620 };
1621
1622 send_replication_response(
1623 source,
1624 p2p_node,
1625 request_id,
1626 ReplicationMessageBody::FetchResponse(response),
1627 rr_message_id,
1628 )
1629 .await;
1630
1631 Ok(())
1632}
1633
1634async fn handle_audit_challenge_msg(
1635 source: &PeerId,
1636 challenge: &protocol::AuditChallenge,
1637 storage: &Arc<LmdbStorage>,
1638 p2p_node: &Arc<P2PNode>,
1639 is_bootstrapping: bool,
1640 request_id: u64,
1641 rr_message_id: Option<&str>,
1642) -> Result<()> {
1643 #[allow(clippy::cast_possible_truncation)]
1644 let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1645 let response = audit::handle_audit_challenge(
1646 challenge,
1647 storage,
1648 p2p_node.peer_id(),
1649 is_bootstrapping,
1650 stored_chunks,
1651 )
1652 .await;
1653
1654 send_replication_response(
1655 source,
1656 p2p_node,
1657 request_id,
1658 ReplicationMessageBody::AuditResponse(response),
1659 rr_message_id,
1660 )
1661 .await;
1662
1663 Ok(())
1664}
1665
1666async fn send_replication_response(
1676 peer: &PeerId,
1677 p2p_node: &Arc<P2PNode>,
1678 request_id: u64,
1679 body: ReplicationMessageBody,
1680 rr_message_id: Option<&str>,
1681) {
1682 let _ =
1683 send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
1684}
1685
1686async fn send_replication_response_checked(
1696 peer: &PeerId,
1697 p2p_node: &Arc<P2PNode>,
1698 request_id: u64,
1699 body: ReplicationMessageBody,
1700 rr_message_id: Option<&str>,
1701) -> bool {
1702 let msg = ReplicationMessage { request_id, body };
1703 let encoded = match msg.encode() {
1704 Ok(data) => data,
1705 Err(e) => {
1706 warn!("Failed to encode replication response: {e}");
1707 return false;
1708 }
1709 };
1710 let result = if let Some(msg_id) = rr_message_id {
1711 p2p_node
1712 .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1713 .await
1714 } else {
1715 p2p_node
1716 .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1717 .await
1718 };
1719 if let Err(e) = result {
1720 debug!("Failed to send replication response to {peer}: {e}");
1721 return false;
1722 }
1723 true
1724}
1725
1726async fn record_sent_replica_hints(
1727 peer: &PeerId,
1728 hints: &[neighbor_sync::SentReplicaHint],
1729 repair_proofs: &Arc<RwLock<RepairProofs>>,
1730 sync_cycle_epoch: &Arc<RwLock<u64>>,
1731) {
1732 if hints.is_empty() {
1733 return;
1734 }
1735
1736 let hinted_at_epoch = *sync_cycle_epoch.read().await;
1737 let mut proofs = repair_proofs.write().await;
1738 for hint in hints {
1739 if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
1740 debug!(
1741 "Recorded repair hint proof for peer {peer} and key {}",
1742 hex::encode(hint.key)
1743 );
1744 }
1745 }
1746}
1747
1748#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1754async fn run_neighbor_sync_round(
1755 p2p_node: &Arc<P2PNode>,
1756 storage: &Arc<LmdbStorage>,
1757 paid_list: &Arc<PaidList>,
1758 queues: &Arc<RwLock<ReplicationQueues>>,
1759 config: &ReplicationConfig,
1760 sync_state: &Arc<RwLock<NeighborSyncState>>,
1761 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1762 sync_cycle_epoch: &Arc<RwLock<u64>>,
1763 repair_proofs: &Arc<RwLock<RepairProofs>>,
1764 is_bootstrapping: &Arc<RwLock<bool>>,
1765 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1766) {
1767 let self_id = *p2p_node.peer_id();
1768 let bootstrapping = *is_bootstrapping.read().await;
1769
1770 let cycle_complete = sync_state.read().await.is_cycle_complete();
1774 if cycle_complete {
1775 {
1779 let mut history = sync_history.write().await;
1780 for record in history.values_mut() {
1781 record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1782 }
1783 }
1784 let current_sync_epoch = {
1785 let mut epoch = sync_cycle_epoch.write().await;
1786 *epoch = epoch.saturating_add(1);
1787 *epoch
1788 };
1789
1790 let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
1794 pruning::run_prune_pass_with_context(pruning::PrunePassContext {
1795 self_id: &self_id,
1796 storage,
1797 paid_list,
1798 p2p_node,
1799 config,
1800 sync_state,
1801 repair_proofs,
1802 current_sync_epoch,
1803 #[cfg(any(test, feature = "test-utils"))]
1804 repair_proof_now: None,
1805 allow_remote_prune_audits,
1806 })
1807 .await;
1808
1809 let neighbors =
1811 neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1812 .await;
1813
1814 let mut state = sync_state.write().await;
1816 if state.is_cycle_complete() {
1817 let old_sync_times = std::mem::take(&mut state.last_sync_times);
1821 let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1822 let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
1823 let old_prune_cursor = state.prune_cursor;
1824 *state = NeighborSyncState::new_cycle(neighbors);
1825 state.last_sync_times = old_sync_times;
1826 state.bootstrap_claims = old_bootstrap_claims;
1827 state.bootstrap_claim_history = old_bootstrap_claim_history;
1828 state.prune_cursor = old_prune_cursor;
1829 }
1830 }
1831
1832 let batch = {
1834 let mut state = sync_state.write().await;
1835 neighbor_sync::select_sync_batch(
1836 &mut state,
1837 config.neighbor_sync_peer_count,
1838 config.neighbor_sync_cooldown,
1839 )
1840 };
1841
1842 if batch.is_empty() {
1843 return;
1844 }
1845
1846 debug!("Neighbor sync: syncing with {} peers", batch.len());
1847
1848 let mut hints_by_peer = neighbor_sync::build_sync_hints_for_peers(
1849 &batch,
1850 storage,
1851 paid_list,
1852 p2p_node,
1853 config.close_group_size,
1854 config.paid_list_close_group_size,
1855 )
1856 .await;
1857
1858 for peer in &batch {
1860 let hints = hints_by_peer.remove(peer).unwrap_or_default();
1861 let outcome =
1862 neighbor_sync::sync_with_peer_with_hints(peer, p2p_node, config, bootstrapping, hints)
1863 .await;
1864
1865 if let Some(outcome) = outcome {
1866 handle_sync_response(
1867 &self_id,
1868 peer,
1869 &outcome.response,
1870 &outcome.sent_replica_hints,
1871 p2p_node,
1872 config,
1873 bootstrapping,
1874 bootstrap_state,
1875 storage,
1876 paid_list,
1877 queues,
1878 sync_state,
1879 sync_history,
1880 sync_cycle_epoch,
1881 repair_proofs,
1882 )
1883 .await;
1884 } else {
1885 let replacement = {
1887 let mut state = sync_state.write().await;
1888 neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1889 };
1890
1891 if let Some(replacement_peer) = replacement {
1893 let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers(
1894 std::slice::from_ref(&replacement_peer),
1895 storage,
1896 paid_list,
1897 p2p_node,
1898 config.close_group_size,
1899 config.paid_list_close_group_size,
1900 )
1901 .await;
1902 let hints = replacement_hints
1903 .remove(&replacement_peer)
1904 .unwrap_or_default();
1905 let replacement_outcome = neighbor_sync::sync_with_peer_with_hints(
1906 &replacement_peer,
1907 p2p_node,
1908 config,
1909 bootstrapping,
1910 hints,
1911 )
1912 .await;
1913
1914 if let Some(outcome) = replacement_outcome {
1915 handle_sync_response(
1916 &self_id,
1917 &replacement_peer,
1918 &outcome.response,
1919 &outcome.sent_replica_hints,
1920 p2p_node,
1921 config,
1922 bootstrapping,
1923 bootstrap_state,
1924 storage,
1925 paid_list,
1926 queues,
1927 sync_state,
1928 sync_history,
1929 sync_cycle_epoch,
1930 repair_proofs,
1931 )
1932 .await;
1933 }
1934 }
1935 }
1936 }
1937}
1938
1939#[allow(clippy::too_many_arguments)]
1942async fn handle_sync_response(
1943 self_id: &PeerId,
1944 peer: &PeerId,
1945 resp: &NeighborSyncResponse,
1946 sent_replica_hints: &[neighbor_sync::SentReplicaHint],
1947 p2p_node: &Arc<P2PNode>,
1948 config: &ReplicationConfig,
1949 bootstrapping: bool,
1950 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1951 storage: &Arc<LmdbStorage>,
1952 paid_list: &Arc<PaidList>,
1953 queues: &Arc<RwLock<ReplicationQueues>>,
1954 sync_state: &Arc<RwLock<NeighborSyncState>>,
1955 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1956 sync_cycle_epoch: &Arc<RwLock<u64>>,
1957 repair_proofs: &Arc<RwLock<RepairProofs>>,
1958) {
1959 {
1961 let mut state = sync_state.write().await;
1962 neighbor_sync::record_successful_sync(&mut state, peer);
1963 }
1964 {
1965 let mut history = sync_history.write().await;
1966 let record = history.entry(*peer).or_insert(PeerSyncRecord {
1967 last_sync: None,
1968 cycles_since_sync: 0,
1969 });
1970 record.last_sync = Some(Instant::now());
1971 record.cycles_since_sync = 0;
1972 }
1973
1974 if resp.bootstrapping {
1976 let should_report = {
1980 let now = Instant::now();
1981 let mut state = sync_state.write().await;
1982 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
1983 BootstrapClaimObservation::WithinGrace { .. } => false,
1984 BootstrapClaimObservation::PastGrace { first_seen } => {
1985 warn!(
1986 "Peer {peer} has been claiming bootstrap for {:?}, \
1987 exceeding grace period of {:?} — reporting abuse",
1988 now.duration_since(first_seen),
1989 config.bootstrap_claim_grace_period,
1990 );
1991 true
1992 }
1993 BootstrapClaimObservation::Repeated { first_seen } => {
1994 warn!(
1995 "Peer {peer} repeated bootstrap claim after previously stopping; \
1996 first claim was {:?} ago — reporting abuse",
1997 now.duration_since(first_seen),
1998 );
1999 true
2000 }
2001 }
2002 };
2003 if should_report {
2004 p2p_node
2005 .report_trust_event(
2006 peer,
2007 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2008 )
2009 .await;
2010 }
2011 } else {
2012 {
2015 let mut state = sync_state.write().await;
2016 state.clear_active_bootstrap_claim(peer);
2017 }
2018 record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
2019 let outcome = admit_and_queue_hints(
2020 self_id,
2021 peer,
2022 &resp.replica_hints,
2023 &resp.paid_hints,
2024 p2p_node,
2025 config,
2026 storage,
2027 paid_list,
2028 queues,
2029 )
2030 .await;
2031
2032 if bootstrapping {
2037 if !outcome.discovered.is_empty() {
2038 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
2039 }
2040 if outcome.capacity_rejected_count > 0 {
2041 bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
2042 } else {
2043 bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
2044 }
2045 }
2046 }
2047}
2048
2049#[allow(clippy::too_many_arguments)]
2054struct AdmissionOutcome {
2062 discovered: HashSet<XorName>,
2063 capacity_rejected_count: usize,
2064}
2065
2066#[allow(clippy::too_many_arguments)]
2067async fn admit_and_queue_hints(
2068 self_id: &PeerId,
2069 source_peer: &PeerId,
2070 replica_hints: &[XorName],
2071 paid_hints: &[XorName],
2072 p2p_node: &Arc<P2PNode>,
2073 config: &ReplicationConfig,
2074 storage: &Arc<LmdbStorage>,
2075 paid_list: &Arc<PaidList>,
2076 queues: &Arc<RwLock<ReplicationQueues>>,
2077) -> AdmissionOutcome {
2078 let pending_keys: HashSet<XorName> = {
2079 let q = queues.read().await;
2080 q.pending_keys().into_iter().collect()
2081 };
2082
2083 let admitted = admission::admit_hints(
2084 self_id,
2085 replica_hints,
2086 paid_hints,
2087 p2p_node,
2088 config,
2089 storage,
2090 paid_list,
2091 &pending_keys,
2092 )
2093 .await;
2094
2095 let mut discovered = HashSet::new();
2096 let mut capacity_rejected_count: usize = 0;
2097 let mut q = queues.write().await;
2098 let now = Instant::now();
2099
2100 for key in admitted.replica_keys {
2101 if !storage.exists(&key).unwrap_or(false) {
2102 let result = q.add_pending_verify(
2103 key,
2104 VerificationEntry {
2105 state: VerificationState::PendingVerify,
2106 pipeline: HintPipeline::Replica,
2107 verified_sources: Vec::new(),
2108 tried_sources: HashSet::new(),
2109 created_at: now,
2110 hint_sender: *source_peer,
2111 },
2112 );
2113 match result {
2114 crate::replication::scheduling::AdmissionResult::Admitted => {
2115 discovered.insert(key);
2116 }
2117 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2118 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2119 capacity_rejected_count += 1;
2120 }
2121 }
2122 }
2123 }
2124
2125 for key in admitted.paid_only_keys {
2126 let result = q.add_pending_verify(
2127 key,
2128 VerificationEntry {
2129 state: VerificationState::PendingVerify,
2130 pipeline: HintPipeline::PaidOnly,
2131 verified_sources: Vec::new(),
2132 tried_sources: HashSet::new(),
2133 created_at: now,
2134 hint_sender: *source_peer,
2135 },
2136 );
2137 match result {
2138 crate::replication::scheduling::AdmissionResult::Admitted => {
2139 discovered.insert(key);
2140 }
2141 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2142 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2143 capacity_rejected_count += 1;
2144 }
2145 }
2146 }
2147
2148 if capacity_rejected_count > 0 {
2149 debug!(
2150 "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
2151 rejected at queue capacity; source will need to re-hint after pending_verify drains"
2152 );
2153 }
2154
2155 AdmissionOutcome {
2156 discovered,
2157 capacity_rejected_count,
2158 }
2159}
2160
2161#[allow(clippy::too_many_lines)]
2167async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
2168 let cycle_started = Instant::now();
2169 let VerificationCycleContext {
2170 p2p_node,
2171 paid_list,
2172 storage,
2173 queues,
2174 config,
2175 bootstrap_state,
2176 is_bootstrapping,
2177 bootstrap_complete_notify,
2178 } = ctx;
2179
2180 {
2183 let mut q = queues.write().await;
2184 q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
2185 }
2186
2187 let pending_keys = {
2188 let q = queues.read().await;
2189 q.pending_keys()
2190 };
2191
2192 if pending_keys.is_empty() {
2193 return;
2194 }
2195 let initial_pending_count = pending_keys.len();
2196
2197 let self_id = *p2p_node.peer_id();
2198
2199 let mut local_paid_presence_probe_keys = Vec::new();
2202 let mut local_paid_paid_only_keys = Vec::new();
2203 let mut keys_needing_network = Vec::new();
2204 let mut terminal_keys: Vec<XorName> = Vec::new();
2205 {
2206 let mut q = queues.write().await;
2207 for key in &pending_keys {
2208 if paid_list.contains(key).unwrap_or(false) {
2209 if let Some(pipeline) =
2210 q.set_pending_state(key, VerificationState::PaidListVerified)
2211 {
2212 match pipeline {
2213 HintPipeline::PaidOnly => {
2214 local_paid_paid_only_keys.push(*key);
2219 }
2220 HintPipeline::Replica => {
2221 local_paid_presence_probe_keys.push(*key);
2226 }
2227 }
2228 }
2229 } else {
2230 keys_needing_network.push(*key);
2231 }
2232 }
2233 }
2234
2235 if !local_paid_paid_only_keys.is_empty() {
2236 let mut terminal_paid_only = Vec::new();
2237 for key in local_paid_paid_only_keys {
2238 if storage.exists(&key).unwrap_or(false) {
2239 terminal_paid_only.push(key);
2240 } else if admission::is_responsible(
2241 &self_id,
2242 &key,
2243 p2p_node,
2244 storage_admission_width(config.close_group_size),
2245 )
2246 .await
2247 {
2248 local_paid_presence_probe_keys.push(key);
2249 } else {
2250 terminal_paid_only.push(key);
2251 }
2252 }
2253
2254 if !terminal_paid_only.is_empty() {
2255 let mut q = queues.write().await;
2256 for key in terminal_paid_only {
2257 q.remove_pending(&key);
2258 terminal_keys.push(key);
2259 }
2260 }
2261 }
2262
2263 let local_paid_probe_count = local_paid_presence_probe_keys.len();
2264 let keys_needing_network_count = keys_needing_network.len();
2265
2266 if !local_paid_presence_probe_keys.is_empty() {
2270 let targets = quorum::compute_presence_targets(
2271 &local_paid_presence_probe_keys,
2272 p2p_node,
2273 config,
2274 &self_id,
2275 )
2276 .await;
2277 let evidence = quorum::run_verification_round(
2278 &local_paid_presence_probe_keys,
2279 &targets,
2280 p2p_node,
2281 config,
2282 )
2283 .await;
2284
2285 let mut q = queues.write().await;
2286 for key in local_paid_presence_probe_keys {
2287 if storage.exists(&key).unwrap_or(false) {
2288 q.remove_pending(&key);
2289 terminal_keys.push(key);
2290 continue;
2291 }
2292 let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
2293 quorum::present_sources_for_key(&key, ev, &targets)
2294 });
2295 if sources.is_empty() {
2296 q.remove_pending(&key);
2298 warn!(
2299 "Locally paid key {} has no responding holders (possible data loss)",
2300 hex::encode(key)
2301 );
2302 terminal_keys.push(key);
2303 } else {
2304 let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2305 let _ = q.promote_pending_to_fetch(key, distance, sources);
2309 }
2310 }
2311 }
2312
2313 if !keys_needing_network.is_empty() {
2315 let targets =
2317 quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
2318 .await;
2319
2320 let evidence =
2321 quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
2322
2323 let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
2326 {
2327 let q = queues.read().await;
2328 for key in &keys_needing_network {
2329 let Some(ev) = evidence.get(key) else {
2330 continue;
2331 };
2332 let Some(entry) = q.get_pending(key) else {
2333 continue;
2334 };
2335 let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
2336 evaluated.push((*key, outcome, entry.pipeline));
2337 }
2338 } let mut paid_insert_keys: Vec<XorName> = Vec::new();
2342 for (key, outcome, _) in &evaluated {
2343 if matches!(
2344 outcome,
2345 KeyVerificationOutcome::QuorumVerified { .. }
2346 | KeyVerificationOutcome::PaidListVerified { .. }
2347 ) {
2348 paid_insert_keys.push(*key);
2349 }
2350 }
2351 for key in &paid_insert_keys {
2352 if let Err(e) = paid_list.insert(key).await {
2353 warn!("Failed to add verified key to PaidForList: {e}");
2354 }
2355 }
2356
2357 let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
2362 for (key, outcome, pipeline) in &evaluated {
2363 if *pipeline == HintPipeline::PaidOnly
2364 && matches!(
2365 outcome,
2366 KeyVerificationOutcome::QuorumVerified { .. }
2367 | KeyVerificationOutcome::PaidListVerified { .. }
2368 )
2369 && !storage.exists(key).unwrap_or(false)
2370 && admission::is_responsible(
2371 &self_id,
2372 key,
2373 p2p_node,
2374 storage_admission_width(config.close_group_size),
2375 )
2376 .await
2377 {
2378 paid_only_fetch_keys.insert(*key);
2379 }
2380 }
2381
2382 let mut q = queues.write().await;
2384 for (key, outcome, pipeline) in evaluated {
2385 match outcome {
2386 KeyVerificationOutcome::QuorumVerified { sources }
2387 | KeyVerificationOutcome::PaidListVerified { sources } => {
2388 let fetch_eligible =
2389 pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
2390 if fetch_eligible && !sources.is_empty() {
2391 let distance =
2392 crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2393 let _ = q.promote_pending_to_fetch(key, distance, sources);
2397 } else if fetch_eligible && sources.is_empty() {
2400 warn!(
2401 "Verified storage-admitted key {} has no holders (possible data loss)",
2402 hex::encode(key)
2403 );
2404 q.remove_pending(&key);
2405 terminal_keys.push(key);
2406 } else {
2407 q.remove_pending(&key);
2408 terminal_keys.push(key);
2409 }
2410 }
2411 KeyVerificationOutcome::QuorumFailed
2412 | KeyVerificationOutcome::QuorumInconclusive => {
2413 q.remove_pending(&key);
2414 terminal_keys.push(key);
2415 }
2416 }
2417 }
2418 }
2419
2420 update_bootstrap_after_verification(
2423 &terminal_keys,
2424 bootstrap_state,
2425 queues,
2426 is_bootstrapping,
2427 bootstrap_complete_notify,
2428 )
2429 .await;
2430
2431 let (pending_after, fetch_after, in_flight_after) = {
2432 let q = queues.read().await;
2433 (
2434 q.pending_count(),
2435 q.fetch_queue_count(),
2436 q.in_flight_count(),
2437 )
2438 };
2439 let terminal_key_count = terminal_keys.len();
2440 let elapsed_ms = cycle_started.elapsed().as_millis();
2441
2442 if elapsed_ms >= VERIFICATION_CYCLE_SLOW_LOG_MS {
2443 info!(
2444 target: "ant_node::replication::verification",
2445 "Slow replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
2446 );
2447 } else {
2448 debug!(
2449 target: "ant_node::replication::verification",
2450 "Replication verification cycle: pending_start={initial_pending_count}, local_paid_probe={local_paid_probe_count}, network_verify={keys_needing_network_count}, terminal={terminal_key_count}, pending_after={pending_after}, fetch_after={fetch_after}, in_flight_after={in_flight_after}, elapsed_ms={elapsed_ms}",
2451 );
2452 }
2453}
2454
2455async fn update_bootstrap_after_verification(
2458 terminal_keys: &[XorName],
2459 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2460 queues: &Arc<RwLock<ReplicationQueues>>,
2461 is_bootstrapping: &Arc<RwLock<bool>>,
2462 bootstrap_complete_notify: &Arc<Notify>,
2463) {
2464 if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
2465 return;
2466 }
2467 {
2468 let mut bs = bootstrap_state.write().await;
2469 for key in terminal_keys {
2470 bs.remove_key(key);
2471 }
2472 }
2473 let q = queues.read().await;
2474 if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
2475 complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
2476 }
2477}
2478
2479async fn complete_bootstrap(
2481 is_bootstrapping: &Arc<RwLock<bool>>,
2482 bootstrap_complete_notify: &Arc<Notify>,
2483) {
2484 *is_bootstrapping.write().await = false;
2485 bootstrap_complete_notify.notify_waiters();
2486 info!("Replication bootstrap complete");
2487}
2488
2489enum FetchResult {
2495 Stored,
2497 IntegrityFailed,
2499 SourceFailed,
2501}
2502
2503struct FetchOutcome {
2506 key: XorName,
2507 result: FetchResult,
2508}
2509
2510#[allow(clippy::too_many_lines)]
2511async fn execute_single_fetch(
2517 p2p_node: Arc<P2PNode>,
2518 storage: Arc<LmdbStorage>,
2519 config: Arc<ReplicationConfig>,
2520 key: XorName,
2521 source: PeerId,
2522) -> FetchOutcome {
2523 let request = protocol::FetchRequest { key };
2524 let msg = ReplicationMessage {
2525 request_id: rand::thread_rng().gen::<u64>(),
2526 body: ReplicationMessageBody::FetchRequest(request),
2527 };
2528
2529 let encoded = match msg.encode() {
2530 Ok(data) => data,
2531 Err(e) => {
2532 warn!("Failed to encode fetch request: {e}");
2533 return FetchOutcome {
2534 key,
2535 result: FetchResult::SourceFailed,
2536 };
2537 }
2538 };
2539
2540 let result = p2p_node
2541 .send_request(
2542 &source,
2543 REPLICATION_PROTOCOL_ID,
2544 encoded,
2545 config.fetch_request_timeout,
2546 )
2547 .await;
2548
2549 match result {
2550 Ok(response) => {
2551 let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2552 p2p_node
2553 .report_trust_event(
2554 &source,
2555 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2556 )
2557 .await;
2558 return FetchOutcome {
2559 key,
2560 result: FetchResult::SourceFailed,
2561 };
2562 };
2563
2564 match resp_msg.body {
2565 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2566 key: resp_key,
2567 data,
2568 }) => {
2569 if resp_key != key {
2574 warn!(
2575 "Fetch response key mismatch: requested {}, got {}",
2576 hex::encode(key),
2577 hex::encode(resp_key)
2578 );
2579 p2p_node
2580 .report_trust_event(
2581 &source,
2582 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2583 )
2584 .await;
2585 return FetchOutcome {
2586 key,
2587 result: FetchResult::IntegrityFailed,
2588 };
2589 }
2590
2591 if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2595 warn!(
2596 "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2597 hex::encode(resp_key),
2598 data.len(),
2599 crate::ant_protocol::MAX_CHUNK_SIZE,
2600 );
2601 p2p_node
2602 .report_trust_event(
2603 &source,
2604 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2605 )
2606 .await;
2607 return FetchOutcome {
2608 key,
2609 result: FetchResult::IntegrityFailed,
2610 };
2611 }
2612
2613 let computed = crate::client::compute_address(&data);
2615 if computed != resp_key {
2616 warn!(
2617 "Fetched record integrity check failed: expected {}, got {}",
2618 hex::encode(resp_key),
2619 hex::encode(computed)
2620 );
2621 p2p_node
2622 .report_trust_event(
2623 &source,
2624 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2625 )
2626 .await;
2627 return FetchOutcome {
2628 key,
2629 result: FetchResult::IntegrityFailed,
2630 };
2631 }
2632
2633 if let Err(e) = storage.put(&resp_key, &data).await {
2634 warn!(
2635 "Failed to store fetched record {}: {e}",
2636 hex::encode(resp_key)
2637 );
2638 return FetchOutcome {
2639 key,
2640 result: FetchResult::SourceFailed,
2641 };
2642 }
2643
2644 FetchOutcome {
2645 key,
2646 result: FetchResult::Stored,
2647 }
2648 }
2649 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2650 ..
2651 }) => {
2652 warn!(
2658 "Fetch: verified source {source} returned NotFound for {}",
2659 hex::encode(key)
2660 );
2661 p2p_node
2662 .report_trust_event(
2663 &source,
2664 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2665 )
2666 .await;
2667 FetchOutcome {
2668 key,
2669 result: FetchResult::SourceFailed,
2670 }
2671 }
2672 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2673 reason,
2674 ..
2675 }) => {
2676 warn!(
2677 "Fetch: peer {source} returned error for {}: {reason}",
2678 hex::encode(key)
2679 );
2680 p2p_node
2681 .report_trust_event(
2682 &source,
2683 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2684 )
2685 .await;
2686 FetchOutcome {
2687 key,
2688 result: FetchResult::SourceFailed,
2689 }
2690 }
2691 _ => {
2692 p2p_node
2694 .report_trust_event(
2695 &source,
2696 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2697 )
2698 .await;
2699 FetchOutcome {
2700 key,
2701 result: FetchResult::SourceFailed,
2702 }
2703 }
2704 }
2705 }
2706 Err(e) => {
2707 debug!("Fetch request to {source} failed: {e}");
2708 FetchOutcome {
2711 key,
2712 result: FetchResult::SourceFailed,
2713 }
2714 }
2715 }
2716}
2717
2718fn first_failed_key_label(confirmed_failed_keys: &[XorName]) -> String {
2734 confirmed_failed_keys.first().map_or_else(
2735 || "0x".to_string(),
2736 |k| format!("0x{}", hex::encode(&k[..8])),
2737 )
2738}
2739
2740async fn handle_audit_result(
2742 result: &AuditTickResult,
2743 p2p_node: &Arc<P2PNode>,
2744 sync_state: &Arc<RwLock<NeighborSyncState>>,
2745 config: &ReplicationConfig,
2746) {
2747 match result {
2748 AuditTickResult::Passed {
2749 challenged_peer,
2750 keys_checked,
2751 } => {
2752 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2753 {
2756 let mut state = sync_state.write().await;
2757 state.clear_active_bootstrap_claim(challenged_peer);
2758 }
2759 p2p_node
2760 .report_trust_event(
2761 challenged_peer,
2762 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2763 )
2764 .await;
2765 }
2766 AuditTickResult::Failed { evidence } => {
2767 if let FailureEvidence::AuditFailure {
2768 challenged_peer,
2769 confirmed_failed_keys,
2770 summary,
2771 reason,
2772 ..
2773 } = evidence
2774 {
2775 let first_failed_key = first_failed_key_label(confirmed_failed_keys);
2776 error!(
2777 "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}, first_failed_key={first_failed_key}",
2778 confirmed_failed_keys.len(),
2779 summary.challenged_keys,
2780 summary.absent_keys,
2781 summary.digest_mismatch_keys,
2782 );
2783 if audit_failure_clears_bootstrap_claim(reason) {
2784 let mut state = sync_state.write().await;
2787 state.clear_active_bootstrap_claim(challenged_peer);
2788 } else {
2789 debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
2790 }
2791 p2p_node
2792 .report_trust_event(
2793 challenged_peer,
2794 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2795 )
2796 .await;
2797 }
2798 }
2799 AuditTickResult::BootstrapClaim { peer } => {
2800 let should_report = {
2804 let now = Instant::now();
2805 let mut state = sync_state.write().await;
2806 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
2807 {
2808 BootstrapClaimObservation::WithinGrace { .. } => {
2809 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2810 false
2811 }
2812 BootstrapClaimObservation::PastGrace { first_seen } => {
2813 warn!(
2814 "Audit: peer {peer} claiming bootstrap past grace period \
2815 ({:?} > {:?}), reporting abuse",
2816 now.duration_since(first_seen),
2817 config.bootstrap_claim_grace_period,
2818 );
2819 true
2820 }
2821 BootstrapClaimObservation::Repeated { first_seen } => {
2822 warn!(
2823 "Audit: peer {peer} repeated bootstrap claim after previously \
2824 stopping; first claim was {:?} ago, reporting abuse",
2825 now.duration_since(first_seen),
2826 );
2827 true
2828 }
2829 }
2830 };
2831 if should_report {
2832 p2p_node
2833 .report_trust_event(
2834 peer,
2835 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2836 )
2837 .await;
2838 }
2839 }
2840 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2841 }
2842}
2843
2844fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
2845 !matches!(reason, AuditFailureReason::Timeout)
2846}
2847
2848#[cfg(test)]
2851#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2852mod tests {
2853 use super::{
2854 audit_failure_clears_bootstrap_claim, first_failed_key_label, fresh_offer_payment_context,
2855 paid_notify_payment_context,
2856 };
2857 use crate::payment::VerificationContext;
2858 use crate::replication::types::AuditFailureReason;
2859
2860 #[test]
2861 fn fresh_offer_runs_client_put_payment_checks() {
2862 assert_eq!(
2863 fresh_offer_payment_context(),
2864 VerificationContext::ClientPut
2865 );
2866 }
2867
2868 #[test]
2869 fn paid_notify_uses_paid_list_admission_payment_checks() {
2870 assert_eq!(
2871 paid_notify_payment_context(),
2872 VerificationContext::PaidListAdmission
2873 );
2874 }
2875
2876 #[test]
2877 fn audit_timeout_preserves_active_bootstrap_claim() {
2878 assert!(!audit_failure_clears_bootstrap_claim(
2879 &AuditFailureReason::Timeout
2880 ));
2881 }
2882
2883 #[test]
2884 fn decoded_audit_failures_clear_active_bootstrap_claim() {
2885 for reason in [
2886 AuditFailureReason::MalformedResponse,
2887 AuditFailureReason::DigestMismatch,
2888 AuditFailureReason::KeyAbsent,
2889 AuditFailureReason::Rejected,
2890 ] {
2891 assert!(
2892 audit_failure_clears_bootstrap_claim(&reason),
2893 "decoded non-bootstrap failure {reason:?} should clear active claim"
2894 );
2895 }
2896 }
2897
2898 #[test]
2899 fn first_failed_key_label_truncates_to_16_hex_chars() {
2900 let mut key = [0u8; 32];
2903 key[0] = 0x18;
2904 key[7] = 0xff;
2905 for byte in &mut key[8..] {
2908 *byte = 0xAA;
2909 }
2910 let label = first_failed_key_label(&[key]);
2911 assert_eq!(label, "0x18000000000000ff");
2913 assert_eq!(label.len(), "0x".len() + 16);
2914 }
2915
2916 #[test]
2917 fn first_failed_key_label_falls_back_when_empty() {
2918 assert_eq!(first_failed_key_label(&[]), "0x");
2922 }
2923
2924 #[test]
2925 fn first_failed_key_label_uses_first_key_only() {
2926 let first = [0x11u8; 32];
2927 let second = [0x22u8; 32];
2928 assert_eq!(
2929 first_failed_key_label(&[first, second]),
2930 format!("0x{}", hex::encode(&first[..8]))
2931 );
2932 }
2933}