1#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::PaymentVerifier;
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50 max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51 REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55 FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56 VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61 BootstrapState, FailureEvidence, HintPipeline, NeighborSyncState, PeerSyncRecord,
62 VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68const RR_PREFIX: &str = "/rr/";
74
75type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78const FETCH_WORKER_POLL_MS: u64 = 100;
80
81const VERIFICATION_WORKER_POLL_MS: u64 = 250;
83
84const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
86
87const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
93
94pub struct ReplicationEngine {
100 config: Arc<ReplicationConfig>,
102 p2p_node: Arc<P2PNode>,
104 storage: Arc<LmdbStorage>,
106 paid_list: Arc<PaidList>,
108 payment_verifier: Arc<PaymentVerifier>,
110 queues: Arc<RwLock<ReplicationQueues>>,
112 sync_state: Arc<RwLock<NeighborSyncState>>,
114 sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
120 bootstrap_state: Arc<RwLock<BootstrapState>>,
122 is_bootstrapping: Arc<RwLock<bool>>,
124 sync_trigger: Arc<Notify>,
126 bootstrap_complete_notify: Arc<Notify>,
128 send_semaphore: Arc<Semaphore>,
131 fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
136 shutdown: CancellationToken,
138 task_handles: Vec<JoinHandle<()>>,
140}
141
142impl ReplicationEngine {
143 pub async fn new(
150 config: ReplicationConfig,
151 p2p_node: Arc<P2PNode>,
152 storage: Arc<LmdbStorage>,
153 payment_verifier: Arc<PaymentVerifier>,
154 root_dir: &Path,
155 fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
156 shutdown: CancellationToken,
157 ) -> Result<Self> {
158 config.validate().map_err(Error::Config)?;
159
160 let paid_list = Arc::new(
161 PaidList::new(root_dir)
162 .await
163 .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
164 );
165
166 let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
167 let config = Arc::new(config);
168
169 Ok(Self {
170 config: Arc::clone(&config),
171 p2p_node,
172 storage,
173 paid_list,
174 payment_verifier,
175 queues: Arc::new(RwLock::new(ReplicationQueues::new())),
176 sync_state: Arc::new(RwLock::new(initial_neighbors)),
177 sync_history: Arc::new(RwLock::new(HashMap::new())),
178 bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
179 is_bootstrapping: Arc::new(RwLock::new(true)),
180 sync_trigger: Arc::new(Notify::new()),
181 bootstrap_complete_notify: Arc::new(Notify::new()),
182 send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
183 fresh_write_rx: Some(fresh_write_rx),
184 shutdown,
185 task_handles: Vec::new(),
186 })
187 }
188
189 #[must_use]
191 pub fn paid_list(&self) -> &Arc<PaidList> {
192 &self.paid_list
193 }
194
195 pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
201 if !self.task_handles.is_empty() {
202 error!("ReplicationEngine::start() called while already running — ignoring");
203 return;
204 }
205 info!("Starting replication engine");
206
207 self.start_message_handler();
208 self.start_neighbor_sync_loop();
209 self.start_self_lookup_loop();
210 self.start_audit_loop();
211 self.start_fetch_worker();
212 self.start_verification_worker();
213 self.start_bootstrap_sync(dht_events);
214 self.start_fresh_write_drainer();
215
216 info!(
217 "Replication engine started with {} background tasks",
218 self.task_handles.len()
219 );
220 }
221
222 pub async fn is_bootstrapping(&self) -> bool {
227 *self.is_bootstrapping.read().await
228 }
229
230 pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
239 let notified = self.bootstrap_complete_notify.notified();
242 tokio::pin!(notified);
243 notified.as_mut().enable();
244
245 if !*self.is_bootstrapping.read().await {
246 return true;
247 }
248
249 tokio::time::timeout(timeout, notified).await.is_ok()
250 }
251
252 pub async fn shutdown(&mut self) {
258 self.shutdown.cancel();
259 for (i, mut handle) in self.task_handles.drain(..).enumerate() {
260 match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
261 Ok(Ok(())) => {}
262 Ok(Err(e)) if e.is_cancelled() => {}
263 Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
264 Err(_) => {
265 warn!("Replication task {i} did not stop within 10s, aborting");
266 handle.abort();
267 }
268 }
269 }
270 }
271
272 pub fn trigger_neighbor_sync(&self) {
278 self.sync_trigger.notify_one();
279 }
280
281 pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
283 fresh::replicate_fresh(
284 key,
285 data,
286 proof_of_payment,
287 &self.p2p_node,
288 &self.paid_list,
289 &self.config,
290 &self.send_semaphore,
291 )
292 .await;
293 }
294
295 fn start_fresh_write_drainer(&mut self) {
302 let Some(mut rx) = self.fresh_write_rx.take() else {
303 return;
304 };
305 let p2p = Arc::clone(&self.p2p_node);
306 let paid_list = Arc::clone(&self.paid_list);
307 let config = Arc::clone(&self.config);
308 let send_semaphore = Arc::clone(&self.send_semaphore);
309 let shutdown = self.shutdown.clone();
310
311 let handle = tokio::spawn(async move {
312 loop {
313 tokio::select! {
314 () = shutdown.cancelled() => break,
315 event = rx.recv() => {
316 let Some(event) = event else { break };
317 fresh::replicate_fresh(
318 &event.key,
319 &event.data,
320 &event.payment_proof,
321 &p2p,
322 &paid_list,
323 &config,
324 &send_semaphore,
325 )
326 .await;
327 }
328 }
329 }
330 debug!("Fresh-write drainer shut down");
331 });
332 self.task_handles.push(handle);
333 }
334
335 #[allow(clippy::too_many_lines)]
336 fn start_message_handler(&mut self) {
337 let mut p2p_events = self.p2p_node.subscribe_events();
338 let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
339 let p2p = Arc::clone(&self.p2p_node);
340 let storage = Arc::clone(&self.storage);
341 let paid_list = Arc::clone(&self.paid_list);
342 let payment_verifier = Arc::clone(&self.payment_verifier);
343 let queues = Arc::clone(&self.queues);
344 let config = Arc::clone(&self.config);
345 let shutdown = self.shutdown.clone();
346 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
347 let bootstrap_state = Arc::clone(&self.bootstrap_state);
348 let sync_history = Arc::clone(&self.sync_history);
349 let sync_trigger = Arc::clone(&self.sync_trigger);
350
351 let handle = tokio::spawn(async move {
352 loop {
353 tokio::select! {
354 () = shutdown.cancelled() => break,
355 event = p2p_events.recv() => {
356 let Ok(event) = event else { continue };
357 if let P2PEvent::Message {
358 topic,
359 source: Some(source),
360 data,
361 ..
362 } = event {
363 let rr_info = if topic == REPLICATION_PROTOCOL_ID {
367 Some((data.clone(), None))
368 } else if topic.starts_with(RR_PREFIX)
369 && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
370 {
371 P2PNode::parse_request_envelope(&data)
372 .filter(|(_, is_resp, _)| !is_resp)
373 .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
374 } else {
375 None
376 };
377 if let Some((payload, rr_message_id)) = rr_info {
378 match handle_replication_message(
379 &source,
380 &payload,
381 &p2p,
382 &storage,
383 &paid_list,
384 &payment_verifier,
385 &queues,
386 &config,
387 &is_bootstrapping,
388 &bootstrap_state,
389 &sync_history,
390 rr_message_id.as_deref(),
391 ).await {
392 Ok(()) => {}
393 Err(e) => {
394 debug!(
395 "Replication message from {source} error: {e}"
396 );
397 }
398 }
399 }
400 }
401 }
402 dht_event = dht_events.recv() => {
410 let Ok(dht_event) = dht_event else { continue };
411 if let DhtNetworkEvent::KClosestPeersChanged { .. } = dht_event {
412 debug!(
413 "K-closest peers changed, triggering early neighbor sync"
414 );
415 sync_trigger.notify_one();
416 }
417 }
418 }
419 }
420 debug!("Replication message handler shut down");
421 });
422 self.task_handles.push(handle);
423 }
424
425 fn start_neighbor_sync_loop(&mut self) {
426 let p2p = Arc::clone(&self.p2p_node);
427 let storage = Arc::clone(&self.storage);
428 let paid_list = Arc::clone(&self.paid_list);
429 let queues = Arc::clone(&self.queues);
430 let config = Arc::clone(&self.config);
431 let shutdown = self.shutdown.clone();
432 let sync_state = Arc::clone(&self.sync_state);
433 let sync_history = Arc::clone(&self.sync_history);
434 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
435 let bootstrap_state = Arc::clone(&self.bootstrap_state);
436 let sync_trigger = Arc::clone(&self.sync_trigger);
437
438 let handle = tokio::spawn(async move {
439 loop {
440 let interval = config.random_neighbor_sync_interval();
441 tokio::select! {
442 () = shutdown.cancelled() => break,
443 () = tokio::time::sleep(interval) => {}
444 () = sync_trigger.notified() => {
445 debug!("Neighbor sync triggered by topology change");
446 }
447 }
448 tokio::select! {
452 () = shutdown.cancelled() => break,
453 () = run_neighbor_sync_round(
454 &p2p,
455 &storage,
456 &paid_list,
457 &queues,
458 &config,
459 &sync_state,
460 &sync_history,
461 &is_bootstrapping,
462 &bootstrap_state,
463 ) => {}
464 }
465 }
466 debug!("Neighbor sync loop shut down");
467 });
468 self.task_handles.push(handle);
469 }
470
471 fn start_self_lookup_loop(&mut self) {
472 let p2p = Arc::clone(&self.p2p_node);
473 let config = Arc::clone(&self.config);
474 let shutdown = self.shutdown.clone();
475
476 let handle = tokio::spawn(async move {
477 loop {
478 let interval = config.random_self_lookup_interval();
479 tokio::select! {
480 () = shutdown.cancelled() => break,
481 () = tokio::time::sleep(interval) => {
482 if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
483 debug!("Self-lookup failed: {e}");
484 }
485 }
486 }
487 }
488 debug!("Self-lookup loop shut down");
489 });
490 self.task_handles.push(handle);
491 }
492
493 fn start_audit_loop(&mut self) {
494 let p2p = Arc::clone(&self.p2p_node);
495 let storage = Arc::clone(&self.storage);
496 let config = Arc::clone(&self.config);
497 let shutdown = self.shutdown.clone();
498 let sync_history = Arc::clone(&self.sync_history);
499 let bootstrap_state = Arc::clone(&self.bootstrap_state);
500 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
501 let sync_state = Arc::clone(&self.sync_state);
502
503 let handle = tokio::spawn(async move {
504 loop {
506 tokio::select! {
507 () = shutdown.cancelled() => return,
508 () = tokio::time::sleep(
509 std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
510 ) => {
511 if bootstrap_state.read().await.is_drained() {
512 break;
513 }
514 }
515 }
516 }
517
518 {
520 let bootstrapping = *is_bootstrapping.read().await;
521 let result = {
524 let claims = sync_state.read().await;
525 let history = sync_history.read().await;
526 audit::audit_tick(
527 &p2p,
528 &storage,
529 &config,
530 &history,
531 &claims.bootstrap_claims,
532 bootstrapping,
533 )
534 .await
535 };
536 handle_audit_result(&result, &p2p, &sync_state, &config).await;
537 }
538
539 loop {
541 let interval = config.random_audit_tick_interval();
542 tokio::select! {
543 () = shutdown.cancelled() => break,
544 () = tokio::time::sleep(interval) => {
545 let bootstrapping = *is_bootstrapping.read().await;
546 let result = {
548 let claims = sync_state.read().await;
549 let history = sync_history.read().await;
550 audit::audit_tick(
551 &p2p, &storage, &config, &history,
552 &claims.bootstrap_claims,
553 bootstrapping,
554 )
555 .await
556 };
557 handle_audit_result(&result, &p2p, &sync_state, &config).await;
558 }
559 }
560 }
561 debug!("Audit loop shut down");
562 });
563 self.task_handles.push(handle);
564 }
565
566 #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
567 fn start_fetch_worker(&mut self) {
568 let p2p = Arc::clone(&self.p2p_node);
569 let storage = Arc::clone(&self.storage);
570 let queues = Arc::clone(&self.queues);
571 let config = Arc::clone(&self.config);
572 let shutdown = self.shutdown.clone();
573 let bootstrap_state = Arc::clone(&self.bootstrap_state);
574 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
575 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
576 let concurrency = max_parallel_fetch();
577
578 info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
579
580 let handle = tokio::spawn(async move {
581 let mut in_flight = FuturesUnordered::<FetchFuture>::new();
584
585 loop {
586 {
588 let mut q = queues.write().await;
589 while in_flight.len() < concurrency {
590 let Some(candidate) = q.dequeue_fetch() else {
591 break;
592 };
593 let Some(&source) = candidate.sources.first() else {
594 warn!(
595 "Fetch candidate {} has no sources — dropping",
596 hex::encode(candidate.key)
597 );
598 continue;
599 };
600 q.start_fetch(candidate.key, source, candidate.sources.clone());
601
602 let p2p = Arc::clone(&p2p);
603 let storage = Arc::clone(&storage);
604 let config = Arc::clone(&config);
605 let token = shutdown.clone();
606 let fetch_key = candidate.key;
607 in_flight.push(Box::pin(async move {
608 let handle = tokio::spawn(async move {
609 tokio::select! {
611 () = token.cancelled() => FetchOutcome {
612 key: fetch_key,
613 result: FetchResult::SourceFailed,
614 },
615 outcome = execute_single_fetch(
616 p2p, storage, config, fetch_key, source,
617 ) => outcome,
618 }
619 });
620 match handle.await {
621 Ok(outcome) => (outcome.key, Some(outcome)),
622 Err(e) => {
623 error!(
624 "Fetch task for {} panicked: {e}",
625 hex::encode(fetch_key)
626 );
627 (fetch_key, None)
628 }
629 }
630 }));
631 }
632 } if in_flight.is_empty() {
635 tokio::select! {
637 () = shutdown.cancelled() => break,
638 () = tokio::time::sleep(
639 std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
640 ) => continue,
641 }
642 }
643
644 tokio::select! {
646 () = shutdown.cancelled() => break,
647 Some((key, maybe_outcome)) = in_flight.next() => {
648 let mut q = queues.write().await;
649 let terminal = if let Some(outcome) = maybe_outcome {
650 match outcome.result {
651 FetchResult::Stored => {
652 q.complete_fetch(&key);
653 true
654 }
655 FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
656 if let Some(next_peer) = q.retry_fetch(&key) {
657 let p2p = Arc::clone(&p2p);
659 let storage = Arc::clone(&storage);
660 let config = Arc::clone(&config);
661 let token = shutdown.clone();
662 let fetch_key = key;
663 in_flight.push(Box::pin(async move {
664 let handle = tokio::spawn(async move {
665 tokio::select! {
666 () = token.cancelled() => FetchOutcome {
667 key: fetch_key,
668 result: FetchResult::SourceFailed,
669 },
670 outcome = execute_single_fetch(
671 p2p, storage, config, fetch_key, next_peer,
672 ) => outcome,
673 }
674 });
675 match handle.await {
676 Ok(outcome) => (outcome.key, Some(outcome)),
677 Err(e) => {
678 error!(
679 "Fetch task for {} panicked: {e}",
680 hex::encode(fetch_key)
681 );
682 (fetch_key, None)
683 }
684 }
685 }));
686 false
687 } else {
688 q.complete_fetch(&key);
689 true
690 }
691 }
692 }
693 } else {
694 q.complete_fetch(&key);
696 true
697 };
698
699 if terminal {
701 drop(q); if !bootstrap_state.read().await.is_drained() {
703 bootstrap_state.write().await.remove_key(&key);
704 let q = queues.read().await;
705 if bootstrap::check_bootstrap_drained(
706 &bootstrap_state,
707 &q,
708 )
709 .await
710 {
711 complete_bootstrap(
712 &is_bootstrapping,
713 &bootstrap_complete_notify,
714 ).await;
715 }
716 }
717 }
718 }
719 }
720 }
721
722 while in_flight.next().await.is_some() {}
726 debug!("Fetch worker shut down");
727 });
728 self.task_handles.push(handle);
729 }
730
731 fn start_verification_worker(&mut self) {
732 let p2p = Arc::clone(&self.p2p_node);
733 let queues = Arc::clone(&self.queues);
734 let paid_list = Arc::clone(&self.paid_list);
735 let config = Arc::clone(&self.config);
736 let shutdown = self.shutdown.clone();
737 let bootstrap_state = Arc::clone(&self.bootstrap_state);
738 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
739 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
740
741 let handle = tokio::spawn(async move {
742 loop {
743 tokio::select! {
744 () = shutdown.cancelled() => break,
745 () = tokio::time::sleep(
746 std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
747 ) => {
748 run_verification_cycle(
749 &p2p, &paid_list, &queues, &config,
750 &bootstrap_state, &is_bootstrapping,
751 &bootstrap_complete_notify,
752 ).await;
753 }
754 }
755 }
756 debug!("Verification worker shut down");
757 });
758 self.task_handles.push(handle);
759 }
760
761 fn start_bootstrap_sync(
773 &mut self,
774 dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
775 ) {
776 let p2p = Arc::clone(&self.p2p_node);
777 let storage = Arc::clone(&self.storage);
778 let paid_list = Arc::clone(&self.paid_list);
779 let queues = Arc::clone(&self.queues);
780 let config = Arc::clone(&self.config);
781 let shutdown = self.shutdown.clone();
782 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
783 let bootstrap_state = Arc::clone(&self.bootstrap_state);
784 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
785
786 let handle = tokio::spawn(async move {
787 let gate = bootstrap::wait_for_bootstrap_complete(
791 dht_events,
792 config.bootstrap_complete_timeout_secs,
793 &shutdown,
794 )
795 .await;
796
797 if gate == bootstrap::BootstrapGateResult::Shutdown {
798 return;
799 }
800
801 let self_id = *p2p.peer_id();
802 let neighbors =
803 neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
804 .await;
805
806 if neighbors.is_empty() {
807 info!("Bootstrap sync: no close neighbors found, marking drained");
808 bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
809 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
810 return;
811 }
812
813 let neighbor_count = neighbors.len();
814 info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
815
816 for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
818 if shutdown.is_cancelled() {
819 break;
820 }
821
822 for peer in batch {
823 if shutdown.is_cancelled() {
824 break;
825 }
826
827 let bootstrapping = *is_bootstrapping.read().await;
829
830 bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
831
832 let response = neighbor_sync::sync_with_peer(
833 peer,
834 &p2p,
835 &storage,
836 &paid_list,
837 &config,
838 bootstrapping,
839 )
840 .await;
841
842 bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
843
844 if let Some(resp) = response {
845 if !resp.bootstrapping {
846 let admitted_keys = admit_and_queue_hints(
848 &self_id,
849 peer,
850 &resp.replica_hints,
851 &resp.paid_hints,
852 &p2p,
853 &config,
854 &storage,
855 &paid_list,
856 &queues,
857 )
858 .await;
859
860 if !admitted_keys.is_empty() {
862 bootstrap::track_discovered_keys(&bootstrap_state, &admitted_keys)
863 .await;
864 }
865 }
866 }
867 }
868 }
869
870 {
872 let q = queues.read().await;
873 if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
874 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
875 }
876 }
877
878 info!("Bootstrap sync completed");
879 });
880 self.task_handles.push(handle);
881 }
882}
883
884#[allow(clippy::too_many_arguments)]
894async fn handle_replication_message(
895 source: &PeerId,
896 data: &[u8],
897 p2p_node: &Arc<P2PNode>,
898 storage: &Arc<LmdbStorage>,
899 paid_list: &Arc<PaidList>,
900 payment_verifier: &Arc<PaymentVerifier>,
901 queues: &Arc<RwLock<ReplicationQueues>>,
902 config: &ReplicationConfig,
903 is_bootstrapping: &Arc<RwLock<bool>>,
904 bootstrap_state: &Arc<RwLock<BootstrapState>>,
905 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
906 rr_message_id: Option<&str>,
907) -> Result<()> {
908 let msg = ReplicationMessage::decode(data)
909 .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
910
911 match msg.body {
912 ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
913 handle_fresh_offer(
914 source,
915 offer,
916 storage,
917 paid_list,
918 payment_verifier,
919 p2p_node,
920 config,
921 msg.request_id,
922 rr_message_id,
923 )
924 .await
925 }
926 ReplicationMessageBody::PaidNotify(ref notify) => {
927 handle_paid_notify(
928 source,
929 notify,
930 paid_list,
931 payment_verifier,
932 p2p_node,
933 config,
934 )
935 .await
936 }
937 ReplicationMessageBody::NeighborSyncRequest(ref request) => {
938 let bootstrapping = *is_bootstrapping.read().await;
939 handle_neighbor_sync_request(
940 source,
941 request,
942 p2p_node,
943 storage,
944 paid_list,
945 queues,
946 config,
947 bootstrapping,
948 bootstrap_state,
949 sync_history,
950 msg.request_id,
951 rr_message_id,
952 )
953 .await
954 }
955 ReplicationMessageBody::VerificationRequest(ref request) => {
956 handle_verification_request(
957 source,
958 request,
959 storage,
960 paid_list,
961 p2p_node,
962 msg.request_id,
963 rr_message_id,
964 )
965 .await
966 }
967 ReplicationMessageBody::FetchRequest(ref request) => {
968 handle_fetch_request(
969 source,
970 request,
971 storage,
972 p2p_node,
973 msg.request_id,
974 rr_message_id,
975 )
976 .await
977 }
978 ReplicationMessageBody::AuditChallenge(ref challenge) => {
979 let bootstrapping = *is_bootstrapping.read().await;
980 handle_audit_challenge_msg(
981 source,
982 challenge,
983 storage,
984 p2p_node,
985 bootstrapping,
986 msg.request_id,
987 rr_message_id,
988 )
989 .await
990 }
991 ReplicationMessageBody::FreshReplicationResponse(_)
993 | ReplicationMessageBody::NeighborSyncResponse(_)
994 | ReplicationMessageBody::VerificationResponse(_)
995 | ReplicationMessageBody::FetchResponse(_)
996 | ReplicationMessageBody::AuditResponse(_) => Ok(()),
997 }
998}
999
1000#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1005async fn handle_fresh_offer(
1006 source: &PeerId,
1007 offer: &protocol::FreshReplicationOffer,
1008 storage: &Arc<LmdbStorage>,
1009 paid_list: &Arc<PaidList>,
1010 payment_verifier: &Arc<PaymentVerifier>,
1011 p2p_node: &Arc<P2PNode>,
1012 config: &ReplicationConfig,
1013 request_id: u64,
1014 rr_message_id: Option<&str>,
1015) -> Result<()> {
1016 let self_id = *p2p_node.peer_id();
1017
1018 if offer.proof_of_payment.is_empty() {
1020 send_replication_response(
1021 source,
1022 p2p_node,
1023 request_id,
1024 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1025 key: offer.key,
1026 reason: "Missing proof of payment".to_string(),
1027 }),
1028 rr_message_id,
1029 )
1030 .await;
1031 return Ok(());
1032 }
1033
1034 if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1038 warn!(
1039 "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1040 hex::encode(offer.key),
1041 offer.data.len(),
1042 crate::ant_protocol::MAX_CHUNK_SIZE,
1043 );
1044 p2p_node
1045 .report_trust_event(
1046 source,
1047 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1048 )
1049 .await;
1050 send_replication_response(
1051 source,
1052 p2p_node,
1053 request_id,
1054 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1055 key: offer.key,
1056 reason: format!(
1057 "Data size {} exceeds maximum chunk size {}",
1058 offer.data.len(),
1059 crate::ant_protocol::MAX_CHUNK_SIZE,
1060 ),
1061 }),
1062 rr_message_id,
1063 )
1064 .await;
1065 return Ok(());
1066 }
1067
1068 if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1070 send_replication_response(
1071 source,
1072 p2p_node,
1073 request_id,
1074 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1075 key: offer.key,
1076 reason: "Not responsible for this key".to_string(),
1077 }),
1078 rr_message_id,
1079 )
1080 .await;
1081 return Ok(());
1082 }
1083
1084 match payment_verifier
1086 .verify_payment(&offer.key, Some(&offer.proof_of_payment))
1087 .await
1088 {
1089 Ok(status) if status.can_store() => {
1090 debug!(
1091 "PoP validated for fresh offer key {}",
1092 hex::encode(offer.key)
1093 );
1094 }
1095 Ok(_) => {
1096 send_replication_response(
1097 source,
1098 p2p_node,
1099 request_id,
1100 ReplicationMessageBody::FreshReplicationResponse(
1101 FreshReplicationResponse::Rejected {
1102 key: offer.key,
1103 reason: "Payment verification failed: payment required".to_string(),
1104 },
1105 ),
1106 rr_message_id,
1107 )
1108 .await;
1109 return Ok(());
1110 }
1111 Err(e) => {
1112 warn!(
1113 "PoP verification error for key {}: {e}",
1114 hex::encode(offer.key)
1115 );
1116 send_replication_response(
1117 source,
1118 p2p_node,
1119 request_id,
1120 ReplicationMessageBody::FreshReplicationResponse(
1121 FreshReplicationResponse::Rejected {
1122 key: offer.key,
1123 reason: format!("Payment verification error: {e}"),
1124 },
1125 ),
1126 rr_message_id,
1127 )
1128 .await;
1129 return Ok(());
1130 }
1131 }
1132
1133 if let Err(e) = paid_list.insert(&offer.key).await {
1135 warn!("Failed to add key to PaidForList: {e}");
1136 }
1137
1138 match storage.put(&offer.key, &offer.data).await {
1140 Ok(_) => {
1141 send_replication_response(
1142 source,
1143 p2p_node,
1144 request_id,
1145 ReplicationMessageBody::FreshReplicationResponse(
1146 FreshReplicationResponse::Accepted { key: offer.key },
1147 ),
1148 rr_message_id,
1149 )
1150 .await;
1151 }
1152 Err(e) => {
1153 send_replication_response(
1154 source,
1155 p2p_node,
1156 request_id,
1157 ReplicationMessageBody::FreshReplicationResponse(
1158 FreshReplicationResponse::Rejected {
1159 key: offer.key,
1160 reason: format!("Storage error: {e}"),
1161 },
1162 ),
1163 rr_message_id,
1164 )
1165 .await;
1166 }
1167 }
1168
1169 Ok(())
1170}
1171
1172async fn handle_paid_notify(
1173 _source: &PeerId,
1174 notify: &protocol::PaidNotify,
1175 paid_list: &Arc<PaidList>,
1176 payment_verifier: &Arc<PaymentVerifier>,
1177 p2p_node: &Arc<P2PNode>,
1178 config: &ReplicationConfig,
1179) -> Result<()> {
1180 let self_id = *p2p_node.peer_id();
1181
1182 if notify.proof_of_payment.is_empty() {
1184 return Ok(());
1185 }
1186
1187 if !admission::is_in_paid_close_group(
1189 &self_id,
1190 ¬ify.key,
1191 p2p_node,
1192 config.paid_list_close_group_size,
1193 )
1194 .await
1195 {
1196 return Ok(());
1197 }
1198
1199 match payment_verifier
1201 .verify_payment(¬ify.key, Some(¬ify.proof_of_payment))
1202 .await
1203 {
1204 Ok(status) if status.can_store() => {
1205 debug!(
1206 "PoP validated for paid notify key {}",
1207 hex::encode(notify.key)
1208 );
1209 }
1210 Ok(_) => {
1211 warn!(
1212 "Paid notify rejected: payment required for key {}",
1213 hex::encode(notify.key)
1214 );
1215 return Ok(());
1216 }
1217 Err(e) => {
1218 warn!(
1219 "PoP verification error for paid notify key {}: {e}",
1220 hex::encode(notify.key)
1221 );
1222 return Ok(());
1223 }
1224 }
1225
1226 if let Err(e) = paid_list.insert(¬ify.key).await {
1227 warn!("Failed to add paid notify key to PaidForList: {e}");
1228 }
1229
1230 Ok(())
1231}
1232
1233#[allow(clippy::too_many_arguments)]
1234async fn handle_neighbor_sync_request(
1235 source: &PeerId,
1236 request: &protocol::NeighborSyncRequest,
1237 p2p_node: &Arc<P2PNode>,
1238 storage: &Arc<LmdbStorage>,
1239 paid_list: &Arc<PaidList>,
1240 queues: &Arc<RwLock<ReplicationQueues>>,
1241 config: &ReplicationConfig,
1242 is_bootstrapping: bool,
1243 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1244 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1245 request_id: u64,
1246 rr_message_id: Option<&str>,
1247) -> Result<()> {
1248 let self_id = *p2p_node.peer_id();
1249
1250 let (response, sender_in_rt) = neighbor_sync::handle_sync_request(
1258 source,
1259 request,
1260 p2p_node,
1261 storage,
1262 paid_list,
1263 config,
1264 is_bootstrapping,
1265 )
1266 .await;
1267
1268 send_replication_response(
1270 source,
1271 p2p_node,
1272 request_id,
1273 ReplicationMessageBody::NeighborSyncResponse(response),
1274 rr_message_id,
1275 )
1276 .await;
1277
1278 if !sender_in_rt {
1280 return Ok(());
1281 }
1282
1283 {
1285 let mut history = sync_history.write().await;
1286 let record = history.entry(*source).or_insert(PeerSyncRecord {
1287 last_sync: None,
1288 cycles_since_sync: 0,
1289 });
1290 record.last_sync = Some(Instant::now());
1291 record.cycles_since_sync = 0;
1292 }
1293
1294 let admitted_keys = admit_and_queue_hints(
1296 &self_id,
1297 source,
1298 &request.replica_hints,
1299 &request.paid_hints,
1300 p2p_node,
1301 config,
1302 storage,
1303 paid_list,
1304 queues,
1305 )
1306 .await;
1307
1308 if is_bootstrapping && !admitted_keys.is_empty() {
1311 bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await;
1312 }
1313
1314 Ok(())
1315}
1316
1317async fn handle_verification_request(
1318 source: &PeerId,
1319 request: &protocol::VerificationRequest,
1320 storage: &Arc<LmdbStorage>,
1321 paid_list: &Arc<PaidList>,
1322 p2p_node: &Arc<P2PNode>,
1323 request_id: u64,
1324 rr_message_id: Option<&str>,
1325) -> Result<()> {
1326 #[allow(clippy::cast_possible_truncation)]
1332 let keys_len = request.keys.len() as u32;
1333 let paid_check_set: HashSet<u32> = request
1334 .paid_list_check_indices
1335 .iter()
1336 .copied()
1337 .filter(|&idx| {
1338 if idx >= keys_len {
1339 warn!(
1340 "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1341 request.keys.len(),
1342 );
1343 false
1344 } else {
1345 true
1346 }
1347 })
1348 .collect();
1349
1350 let mut results = Vec::with_capacity(request.keys.len());
1351 for (i, key) in request.keys.iter().enumerate() {
1352 let present = storage.exists(key).unwrap_or(false);
1353 let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1354 Some(paid_list.contains(key).unwrap_or(false))
1355 } else {
1356 None
1357 };
1358 results.push(protocol::KeyVerificationResult {
1359 key: *key,
1360 present,
1361 paid,
1362 });
1363 }
1364
1365 send_replication_response(
1366 source,
1367 p2p_node,
1368 request_id,
1369 ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1370 rr_message_id,
1371 )
1372 .await;
1373
1374 Ok(())
1375}
1376
1377async fn handle_fetch_request(
1378 source: &PeerId,
1379 request: &protocol::FetchRequest,
1380 storage: &Arc<LmdbStorage>,
1381 p2p_node: &Arc<P2PNode>,
1382 request_id: u64,
1383 rr_message_id: Option<&str>,
1384) -> Result<()> {
1385 let response = match storage.get(&request.key).await {
1386 Ok(Some(data)) => protocol::FetchResponse::Success {
1387 key: request.key,
1388 data,
1389 },
1390 Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1391 Err(e) => protocol::FetchResponse::Error {
1392 key: request.key,
1393 reason: format!("{e}"),
1394 },
1395 };
1396
1397 send_replication_response(
1398 source,
1399 p2p_node,
1400 request_id,
1401 ReplicationMessageBody::FetchResponse(response),
1402 rr_message_id,
1403 )
1404 .await;
1405
1406 Ok(())
1407}
1408
1409async fn handle_audit_challenge_msg(
1410 source: &PeerId,
1411 challenge: &protocol::AuditChallenge,
1412 storage: &Arc<LmdbStorage>,
1413 p2p_node: &Arc<P2PNode>,
1414 is_bootstrapping: bool,
1415 request_id: u64,
1416 rr_message_id: Option<&str>,
1417) -> Result<()> {
1418 #[allow(clippy::cast_possible_truncation)]
1419 let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1420 let response = audit::handle_audit_challenge(
1421 challenge,
1422 storage,
1423 p2p_node.peer_id(),
1424 is_bootstrapping,
1425 stored_chunks,
1426 )
1427 .await;
1428
1429 send_replication_response(
1430 source,
1431 p2p_node,
1432 request_id,
1433 ReplicationMessageBody::AuditResponse(response),
1434 rr_message_id,
1435 )
1436 .await;
1437
1438 Ok(())
1439}
1440
1441async fn send_replication_response(
1452 peer: &PeerId,
1453 p2p_node: &Arc<P2PNode>,
1454 request_id: u64,
1455 body: ReplicationMessageBody,
1456 rr_message_id: Option<&str>,
1457) {
1458 let msg = ReplicationMessage { request_id, body };
1459 let encoded = match msg.encode() {
1460 Ok(data) => data,
1461 Err(e) => {
1462 warn!("Failed to encode replication response: {e}");
1463 return;
1464 }
1465 };
1466 let result = if let Some(msg_id) = rr_message_id {
1467 p2p_node
1468 .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1469 .await
1470 } else {
1471 p2p_node
1472 .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1473 .await
1474 };
1475 if let Err(e) = result {
1476 debug!("Failed to send replication response to {peer}: {e}");
1477 }
1478}
1479
1480#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1486async fn run_neighbor_sync_round(
1487 p2p_node: &Arc<P2PNode>,
1488 storage: &Arc<LmdbStorage>,
1489 paid_list: &Arc<PaidList>,
1490 queues: &Arc<RwLock<ReplicationQueues>>,
1491 config: &ReplicationConfig,
1492 sync_state: &Arc<RwLock<NeighborSyncState>>,
1493 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1494 is_bootstrapping: &Arc<RwLock<bool>>,
1495 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1496) {
1497 let self_id = *p2p_node.peer_id();
1498 let bootstrapping = *is_bootstrapping.read().await;
1499
1500 let cycle_complete = sync_state.read().await.is_cycle_complete();
1504 if cycle_complete {
1505 pruning::run_prune_pass(&self_id, storage, paid_list, p2p_node, config).await;
1507
1508 {
1510 let mut history = sync_history.write().await;
1511 for record in history.values_mut() {
1512 record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1513 }
1514 }
1515
1516 let neighbors =
1518 neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1519 .await;
1520
1521 let mut state = sync_state.write().await;
1523 if state.is_cycle_complete() {
1524 let old_sync_times = std::mem::take(&mut state.last_sync_times);
1528 let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1529 *state = NeighborSyncState::new_cycle(neighbors);
1530 state.last_sync_times = old_sync_times;
1531 state.bootstrap_claims = old_bootstrap_claims;
1532 }
1533 }
1534
1535 let batch = {
1537 let mut state = sync_state.write().await;
1538 neighbor_sync::select_sync_batch(
1539 &mut state,
1540 config.neighbor_sync_peer_count,
1541 config.neighbor_sync_cooldown,
1542 )
1543 };
1544
1545 if batch.is_empty() {
1546 return;
1547 }
1548
1549 debug!("Neighbor sync: syncing with {} peers", batch.len());
1550
1551 for peer in &batch {
1553 let response = neighbor_sync::sync_with_peer(
1554 peer,
1555 p2p_node,
1556 storage,
1557 paid_list,
1558 config,
1559 bootstrapping,
1560 )
1561 .await;
1562
1563 if let Some(resp) = response {
1564 handle_sync_response(
1565 &self_id,
1566 peer,
1567 &resp,
1568 p2p_node,
1569 config,
1570 bootstrapping,
1571 bootstrap_state,
1572 storage,
1573 paid_list,
1574 queues,
1575 sync_state,
1576 sync_history,
1577 )
1578 .await;
1579 } else {
1580 let replacement = {
1582 let mut state = sync_state.write().await;
1583 neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1584 };
1585
1586 if let Some(replacement_peer) = replacement {
1588 let replacement_resp = neighbor_sync::sync_with_peer(
1589 &replacement_peer,
1590 p2p_node,
1591 storage,
1592 paid_list,
1593 config,
1594 bootstrapping,
1595 )
1596 .await;
1597
1598 if let Some(resp) = replacement_resp {
1599 handle_sync_response(
1600 &self_id,
1601 &replacement_peer,
1602 &resp,
1603 p2p_node,
1604 config,
1605 bootstrapping,
1606 bootstrap_state,
1607 storage,
1608 paid_list,
1609 queues,
1610 sync_state,
1611 sync_history,
1612 )
1613 .await;
1614 }
1615 }
1616 }
1617 }
1618}
1619
1620#[allow(clippy::too_many_arguments)]
1623async fn handle_sync_response(
1624 self_id: &PeerId,
1625 peer: &PeerId,
1626 resp: &NeighborSyncResponse,
1627 p2p_node: &Arc<P2PNode>,
1628 config: &ReplicationConfig,
1629 bootstrapping: bool,
1630 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1631 storage: &Arc<LmdbStorage>,
1632 paid_list: &Arc<PaidList>,
1633 queues: &Arc<RwLock<ReplicationQueues>>,
1634 sync_state: &Arc<RwLock<NeighborSyncState>>,
1635 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1636) {
1637 {
1639 let mut state = sync_state.write().await;
1640 neighbor_sync::record_successful_sync(&mut state, peer);
1641 }
1642 {
1643 let mut history = sync_history.write().await;
1644 let record = history.entry(*peer).or_insert(PeerSyncRecord {
1645 last_sync: None,
1646 cycles_since_sync: 0,
1647 });
1648 record.last_sync = Some(Instant::now());
1649 record.cycles_since_sync = 0;
1650 }
1651
1652 if resp.bootstrapping {
1654 let should_report = {
1658 let now = Instant::now();
1659 let mut state = sync_state.write().await;
1660 let first_seen = state.bootstrap_claims.entry(*peer).or_insert(now);
1661 let claim_age = now.duration_since(*first_seen);
1662 if claim_age > config.bootstrap_claim_grace_period {
1663 warn!(
1664 "Peer {peer} has been claiming bootstrap for {:?}, \
1665 exceeding grace period of {:?} — reporting abuse",
1666 claim_age, config.bootstrap_claim_grace_period,
1667 );
1668 true
1669 } else {
1670 false
1671 }
1672 };
1673 if should_report {
1674 p2p_node
1675 .report_trust_event(
1676 peer,
1677 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1678 )
1679 .await;
1680 }
1681 } else {
1682 {
1684 let mut state = sync_state.write().await;
1685 state.bootstrap_claims.remove(peer);
1686 }
1687 let admitted_keys = admit_and_queue_hints(
1688 self_id,
1689 peer,
1690 &resp.replica_hints,
1691 &resp.paid_hints,
1692 p2p_node,
1693 config,
1694 storage,
1695 paid_list,
1696 queues,
1697 )
1698 .await;
1699
1700 if bootstrapping && !admitted_keys.is_empty() {
1703 bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await;
1704 }
1705 }
1706}
1707
1708#[allow(clippy::too_many_arguments)]
1713async fn admit_and_queue_hints(
1714 self_id: &PeerId,
1715 source_peer: &PeerId,
1716 replica_hints: &[XorName],
1717 paid_hints: &[XorName],
1718 p2p_node: &Arc<P2PNode>,
1719 config: &ReplicationConfig,
1720 storage: &Arc<LmdbStorage>,
1721 paid_list: &Arc<PaidList>,
1722 queues: &Arc<RwLock<ReplicationQueues>>,
1723) -> HashSet<XorName> {
1724 let pending_keys: HashSet<XorName> = {
1725 let q = queues.read().await;
1726 q.pending_keys().into_iter().collect()
1727 };
1728
1729 let admitted = admission::admit_hints(
1730 self_id,
1731 replica_hints,
1732 paid_hints,
1733 p2p_node,
1734 config,
1735 storage,
1736 paid_list,
1737 &pending_keys,
1738 )
1739 .await;
1740
1741 let mut discovered = HashSet::new();
1742 let mut q = queues.write().await;
1743 let now = Instant::now();
1744
1745 for key in admitted.replica_keys {
1746 if !storage.exists(&key).unwrap_or(false) {
1747 let added = q.add_pending_verify(
1748 key,
1749 VerificationEntry {
1750 state: VerificationState::PendingVerify,
1751 pipeline: HintPipeline::Replica,
1752 verified_sources: Vec::new(),
1753 tried_sources: HashSet::new(),
1754 created_at: now,
1755 hint_sender: *source_peer,
1756 },
1757 );
1758 if added {
1759 discovered.insert(key);
1760 }
1761 }
1762 }
1763
1764 for key in admitted.paid_only_keys {
1765 let added = q.add_pending_verify(
1766 key,
1767 VerificationEntry {
1768 state: VerificationState::PendingVerify,
1769 pipeline: HintPipeline::PaidOnly,
1770 verified_sources: Vec::new(),
1771 tried_sources: HashSet::new(),
1772 created_at: now,
1773 hint_sender: *source_peer,
1774 },
1775 );
1776 if added {
1777 discovered.insert(key);
1778 }
1779 }
1780
1781 discovered
1782}
1783
1784#[allow(clippy::too_many_lines)]
1790async fn run_verification_cycle(
1791 p2p_node: &Arc<P2PNode>,
1792 paid_list: &Arc<PaidList>,
1793 queues: &Arc<RwLock<ReplicationQueues>>,
1794 config: &ReplicationConfig,
1795 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1796 is_bootstrapping: &Arc<RwLock<bool>>,
1797 bootstrap_complete_notify: &Arc<Notify>,
1798) {
1799 {
1802 let mut q = queues.write().await;
1803 q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
1804 }
1805
1806 let pending_keys = {
1807 let q = queues.read().await;
1808 q.pending_keys()
1809 };
1810
1811 if pending_keys.is_empty() {
1812 return;
1813 }
1814
1815 let self_id = *p2p_node.peer_id();
1816
1817 let mut keys_needing_network = Vec::new();
1820 let mut terminal_keys: Vec<XorName> = Vec::new();
1821 {
1822 let mut q = queues.write().await;
1823 for key in &pending_keys {
1824 if paid_list.contains(key).unwrap_or(false) {
1825 if let Some(entry) = q.get_pending_mut(key) {
1826 entry.state = VerificationState::PaidListVerified;
1827 if entry.pipeline == HintPipeline::PaidOnly {
1828 q.remove_pending(key);
1830 terminal_keys.push(*key);
1831 continue;
1832 }
1833 }
1834 }
1835 keys_needing_network.push(*key);
1837 }
1838 }
1839
1840 if !keys_needing_network.is_empty() {
1842 let targets =
1844 quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
1845 .await;
1846
1847 let evidence =
1848 quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
1849
1850 let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
1853 {
1854 let q = queues.read().await;
1855 for key in &keys_needing_network {
1856 let Some(ev) = evidence.get(key) else {
1857 continue;
1858 };
1859 let Some(entry) = q.get_pending(key) else {
1860 continue;
1861 };
1862 let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
1863 evaluated.push((*key, outcome, entry.pipeline));
1864 }
1865 } let mut paid_insert_keys: Vec<XorName> = Vec::new();
1869 for (key, outcome, _) in &evaluated {
1870 if matches!(
1871 outcome,
1872 KeyVerificationOutcome::QuorumVerified { .. }
1873 | KeyVerificationOutcome::PaidListVerified { .. }
1874 ) {
1875 paid_insert_keys.push(*key);
1876 }
1877 }
1878 for key in &paid_insert_keys {
1879 if let Err(e) = paid_list.insert(key).await {
1880 warn!("Failed to add verified key to PaidForList: {e}");
1881 }
1882 }
1883
1884 let mut q = queues.write().await;
1886 for (key, outcome, pipeline) in evaluated {
1887 match outcome {
1888 KeyVerificationOutcome::QuorumVerified { sources }
1889 | KeyVerificationOutcome::PaidListVerified { sources } => {
1890 if pipeline == HintPipeline::Replica && !sources.is_empty() {
1891 let distance =
1892 crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
1893 q.remove_pending(&key);
1894 q.enqueue_fetch(key, distance, sources);
1895 } else if pipeline == HintPipeline::Replica && sources.is_empty() {
1897 warn!(
1898 "Verified key {} has no holders (possible data loss)",
1899 hex::encode(key)
1900 );
1901 q.remove_pending(&key);
1902 terminal_keys.push(key);
1903 } else {
1904 q.remove_pending(&key);
1905 terminal_keys.push(key);
1906 }
1907 }
1908 KeyVerificationOutcome::QuorumFailed
1909 | KeyVerificationOutcome::QuorumInconclusive => {
1910 q.remove_pending(&key);
1911 terminal_keys.push(key);
1912 }
1913 }
1914 }
1915 }
1916
1917 update_bootstrap_after_verification(
1920 &terminal_keys,
1921 bootstrap_state,
1922 queues,
1923 is_bootstrapping,
1924 bootstrap_complete_notify,
1925 )
1926 .await;
1927}
1928
1929async fn update_bootstrap_after_verification(
1932 terminal_keys: &[XorName],
1933 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1934 queues: &Arc<RwLock<ReplicationQueues>>,
1935 is_bootstrapping: &Arc<RwLock<bool>>,
1936 bootstrap_complete_notify: &Arc<Notify>,
1937) {
1938 if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
1939 return;
1940 }
1941 {
1942 let mut bs = bootstrap_state.write().await;
1943 for key in terminal_keys {
1944 bs.remove_key(key);
1945 }
1946 }
1947 let q = queues.read().await;
1948 if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
1949 complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
1950 }
1951}
1952
1953async fn complete_bootstrap(
1955 is_bootstrapping: &Arc<RwLock<bool>>,
1956 bootstrap_complete_notify: &Arc<Notify>,
1957) {
1958 *is_bootstrapping.write().await = false;
1959 bootstrap_complete_notify.notify_waiters();
1960 info!("Replication bootstrap complete");
1961}
1962
1963enum FetchResult {
1969 Stored,
1971 IntegrityFailed,
1973 SourceFailed,
1975}
1976
1977struct FetchOutcome {
1980 key: XorName,
1981 result: FetchResult,
1982}
1983
1984#[allow(clippy::too_many_lines)]
1985async fn execute_single_fetch(
1991 p2p_node: Arc<P2PNode>,
1992 storage: Arc<LmdbStorage>,
1993 config: Arc<ReplicationConfig>,
1994 key: XorName,
1995 source: PeerId,
1996) -> FetchOutcome {
1997 let request = protocol::FetchRequest { key };
1998 let msg = ReplicationMessage {
1999 request_id: rand::thread_rng().gen::<u64>(),
2000 body: ReplicationMessageBody::FetchRequest(request),
2001 };
2002
2003 let encoded = match msg.encode() {
2004 Ok(data) => data,
2005 Err(e) => {
2006 warn!("Failed to encode fetch request: {e}");
2007 return FetchOutcome {
2008 key,
2009 result: FetchResult::SourceFailed,
2010 };
2011 }
2012 };
2013
2014 let result = p2p_node
2015 .send_request(
2016 &source,
2017 REPLICATION_PROTOCOL_ID,
2018 encoded,
2019 config.fetch_request_timeout,
2020 )
2021 .await;
2022
2023 match result {
2024 Ok(response) => {
2025 let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2026 p2p_node
2027 .report_trust_event(
2028 &source,
2029 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2030 )
2031 .await;
2032 return FetchOutcome {
2033 key,
2034 result: FetchResult::SourceFailed,
2035 };
2036 };
2037
2038 match resp_msg.body {
2039 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2040 key: resp_key,
2041 data,
2042 }) => {
2043 if resp_key != key {
2048 warn!(
2049 "Fetch response key mismatch: requested {}, got {}",
2050 hex::encode(key),
2051 hex::encode(resp_key)
2052 );
2053 p2p_node
2054 .report_trust_event(
2055 &source,
2056 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2057 )
2058 .await;
2059 return FetchOutcome {
2060 key,
2061 result: FetchResult::IntegrityFailed,
2062 };
2063 }
2064
2065 if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2069 warn!(
2070 "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2071 hex::encode(resp_key),
2072 data.len(),
2073 crate::ant_protocol::MAX_CHUNK_SIZE,
2074 );
2075 p2p_node
2076 .report_trust_event(
2077 &source,
2078 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2079 )
2080 .await;
2081 return FetchOutcome {
2082 key,
2083 result: FetchResult::IntegrityFailed,
2084 };
2085 }
2086
2087 let computed = crate::client::compute_address(&data);
2089 if computed != resp_key {
2090 warn!(
2091 "Fetched record integrity check failed: expected {}, got {}",
2092 hex::encode(resp_key),
2093 hex::encode(computed)
2094 );
2095 p2p_node
2096 .report_trust_event(
2097 &source,
2098 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2099 )
2100 .await;
2101 return FetchOutcome {
2102 key,
2103 result: FetchResult::IntegrityFailed,
2104 };
2105 }
2106
2107 if let Err(e) = storage.put(&resp_key, &data).await {
2108 warn!(
2109 "Failed to store fetched record {}: {e}",
2110 hex::encode(resp_key)
2111 );
2112 return FetchOutcome {
2113 key,
2114 result: FetchResult::SourceFailed,
2115 };
2116 }
2117
2118 FetchOutcome {
2119 key,
2120 result: FetchResult::Stored,
2121 }
2122 }
2123 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2124 ..
2125 }) => {
2126 debug!("Fetch: peer {source} does not have {}", hex::encode(key));
2129 FetchOutcome {
2130 key,
2131 result: FetchResult::SourceFailed,
2132 }
2133 }
2134 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2135 reason,
2136 ..
2137 }) => {
2138 warn!(
2139 "Fetch: peer {source} returned error for {}: {reason}",
2140 hex::encode(key)
2141 );
2142 p2p_node
2143 .report_trust_event(
2144 &source,
2145 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2146 )
2147 .await;
2148 FetchOutcome {
2149 key,
2150 result: FetchResult::SourceFailed,
2151 }
2152 }
2153 _ => {
2154 p2p_node
2156 .report_trust_event(
2157 &source,
2158 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2159 )
2160 .await;
2161 FetchOutcome {
2162 key,
2163 result: FetchResult::SourceFailed,
2164 }
2165 }
2166 }
2167 }
2168 Err(e) => {
2169 debug!("Fetch request to {source} failed: {e}");
2170 FetchOutcome {
2173 key,
2174 result: FetchResult::SourceFailed,
2175 }
2176 }
2177 }
2178}
2179
2180async fn handle_audit_result(
2186 result: &AuditTickResult,
2187 p2p_node: &Arc<P2PNode>,
2188 sync_state: &Arc<RwLock<NeighborSyncState>>,
2189 config: &ReplicationConfig,
2190) {
2191 match result {
2192 AuditTickResult::Passed {
2193 challenged_peer,
2194 keys_checked,
2195 } => {
2196 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2197 {
2200 let mut state = sync_state.write().await;
2201 state.bootstrap_claims.remove(challenged_peer);
2202 }
2203 p2p_node
2204 .report_trust_event(
2205 challenged_peer,
2206 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2207 )
2208 .await;
2209 }
2210 AuditTickResult::Failed { evidence } => {
2211 if let FailureEvidence::AuditFailure {
2212 challenged_peer,
2213 confirmed_failed_keys,
2214 ..
2215 } = evidence
2216 {
2217 error!(
2218 "Audit failure for {challenged_peer}: {} confirmed failed keys",
2219 confirmed_failed_keys.len()
2220 );
2221 {
2224 let mut state = sync_state.write().await;
2225 state.bootstrap_claims.remove(challenged_peer);
2226 }
2227 p2p_node
2228 .report_trust_event(
2229 challenged_peer,
2230 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2231 )
2232 .await;
2233 }
2234 }
2235 AuditTickResult::BootstrapClaim { peer } => {
2236 let should_report = {
2240 let now = Instant::now();
2241 let mut state = sync_state.write().await;
2242 let first_seen = state.bootstrap_claims.entry(*peer).or_insert(now);
2243 let claim_age = now.duration_since(*first_seen);
2244 if claim_age > config.bootstrap_claim_grace_period {
2245 warn!(
2246 "Audit: peer {peer} claiming bootstrap past grace period \
2247 ({:?} > {:?}), reporting abuse",
2248 claim_age, config.bootstrap_claim_grace_period,
2249 );
2250 true
2251 } else {
2252 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2253 false
2254 }
2255 };
2256 if should_report {
2257 p2p_node
2258 .report_trust_event(
2259 peer,
2260 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2261 )
2262 .await;
2263 }
2264 }
2265 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2266 }
2267}
2268
2269