1#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::PaymentVerifier;
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50 max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51 REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55 FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56 VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61 BootstrapState, FailureEvidence, HintPipeline, NeighborSyncState, PeerSyncRecord,
62 VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68const RR_PREFIX: &str = "/rr/";
74
75type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78const FETCH_WORKER_POLL_MS: u64 = 100;
80
81const VERIFICATION_WORKER_POLL_MS: u64 = 250;
83
84const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
86
87const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
93
94pub struct ReplicationEngine {
100 config: Arc<ReplicationConfig>,
102 p2p_node: Arc<P2PNode>,
104 storage: Arc<LmdbStorage>,
106 paid_list: Arc<PaidList>,
108 payment_verifier: Arc<PaymentVerifier>,
110 queues: Arc<RwLock<ReplicationQueues>>,
112 sync_state: Arc<RwLock<NeighborSyncState>>,
114 sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
120 bootstrap_state: Arc<RwLock<BootstrapState>>,
122 is_bootstrapping: Arc<RwLock<bool>>,
124 sync_trigger: Arc<Notify>,
126 bootstrap_complete_notify: Arc<Notify>,
128 send_semaphore: Arc<Semaphore>,
131 fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
136 shutdown: CancellationToken,
138 task_handles: Vec<JoinHandle<()>>,
140}
141
142impl ReplicationEngine {
143 pub async fn new(
150 config: ReplicationConfig,
151 p2p_node: Arc<P2PNode>,
152 storage: Arc<LmdbStorage>,
153 payment_verifier: Arc<PaymentVerifier>,
154 root_dir: &Path,
155 fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
156 shutdown: CancellationToken,
157 ) -> Result<Self> {
158 config.validate().map_err(Error::Config)?;
159
160 let paid_list = Arc::new(
161 PaidList::new(root_dir)
162 .await
163 .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
164 );
165
166 let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
167 let config = Arc::new(config);
168
169 Ok(Self {
170 config: Arc::clone(&config),
171 p2p_node,
172 storage,
173 paid_list,
174 payment_verifier,
175 queues: Arc::new(RwLock::new(ReplicationQueues::new())),
176 sync_state: Arc::new(RwLock::new(initial_neighbors)),
177 sync_history: Arc::new(RwLock::new(HashMap::new())),
178 bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
179 is_bootstrapping: Arc::new(RwLock::new(true)),
180 sync_trigger: Arc::new(Notify::new()),
181 bootstrap_complete_notify: Arc::new(Notify::new()),
182 send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
183 fresh_write_rx: Some(fresh_write_rx),
184 shutdown,
185 task_handles: Vec::new(),
186 })
187 }
188
189 #[must_use]
191 pub fn paid_list(&self) -> &Arc<PaidList> {
192 &self.paid_list
193 }
194
195 pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
201 if !self.task_handles.is_empty() {
202 error!("ReplicationEngine::start() called while already running — ignoring");
203 return;
204 }
205 info!("Starting replication engine");
206
207 self.start_message_handler();
208 self.start_neighbor_sync_loop();
209 self.start_self_lookup_loop();
210 self.start_audit_loop();
211 self.start_fetch_worker();
212 self.start_verification_worker();
213 self.start_bootstrap_sync(dht_events);
214 self.start_fresh_write_drainer();
215
216 info!(
217 "Replication engine started with {} background tasks",
218 self.task_handles.len()
219 );
220 }
221
222 pub async fn is_bootstrapping(&self) -> bool {
227 *self.is_bootstrapping.read().await
228 }
229
230 pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
239 let notified = self.bootstrap_complete_notify.notified();
242 tokio::pin!(notified);
243 notified.as_mut().enable();
244
245 if !*self.is_bootstrapping.read().await {
246 return true;
247 }
248
249 tokio::time::timeout(timeout, notified).await.is_ok()
250 }
251
252 pub async fn shutdown(&mut self) {
258 self.shutdown.cancel();
259 for (i, mut handle) in self.task_handles.drain(..).enumerate() {
260 match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
261 Ok(Ok(())) => {}
262 Ok(Err(e)) if e.is_cancelled() => {}
263 Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
264 Err(_) => {
265 warn!("Replication task {i} did not stop within 10s, aborting");
266 handle.abort();
267 }
268 }
269 }
270 }
271
272 pub fn trigger_neighbor_sync(&self) {
278 self.sync_trigger.notify_one();
279 }
280
281 pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
283 fresh::replicate_fresh(
284 key,
285 data,
286 proof_of_payment,
287 &self.p2p_node,
288 &self.paid_list,
289 &self.config,
290 &self.send_semaphore,
291 )
292 .await;
293 }
294
295 fn start_fresh_write_drainer(&mut self) {
302 let Some(mut rx) = self.fresh_write_rx.take() else {
303 return;
304 };
305 let p2p = Arc::clone(&self.p2p_node);
306 let paid_list = Arc::clone(&self.paid_list);
307 let config = Arc::clone(&self.config);
308 let send_semaphore = Arc::clone(&self.send_semaphore);
309 let shutdown = self.shutdown.clone();
310
311 let handle = tokio::spawn(async move {
312 loop {
313 tokio::select! {
314 () = shutdown.cancelled() => break,
315 event = rx.recv() => {
316 let Some(event) = event else { break };
317 fresh::replicate_fresh(
318 &event.key,
319 &event.data,
320 &event.payment_proof,
321 &p2p,
322 &paid_list,
323 &config,
324 &send_semaphore,
325 )
326 .await;
327 }
328 }
329 }
330 debug!("Fresh-write drainer shut down");
331 });
332 self.task_handles.push(handle);
333 }
334
335 #[allow(clippy::too_many_lines)]
336 fn start_message_handler(&mut self) {
337 let mut p2p_events = self.p2p_node.subscribe_events();
338 let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
339 let p2p = Arc::clone(&self.p2p_node);
340 let storage = Arc::clone(&self.storage);
341 let paid_list = Arc::clone(&self.paid_list);
342 let payment_verifier = Arc::clone(&self.payment_verifier);
343 let queues = Arc::clone(&self.queues);
344 let config = Arc::clone(&self.config);
345 let shutdown = self.shutdown.clone();
346 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
347 let bootstrap_state = Arc::clone(&self.bootstrap_state);
348 let sync_history = Arc::clone(&self.sync_history);
349 let sync_trigger = Arc::clone(&self.sync_trigger);
350
351 let handle = tokio::spawn(async move {
352 loop {
353 tokio::select! {
354 () = shutdown.cancelled() => break,
355 event = p2p_events.recv() => {
356 let Ok(event) = event else { continue };
357 if let P2PEvent::Message {
358 topic,
359 source: Some(source),
360 data,
361 } = event {
362 let rr_info = if topic == REPLICATION_PROTOCOL_ID {
366 Some((data.clone(), None))
367 } else if topic.starts_with(RR_PREFIX)
368 && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
369 {
370 P2PNode::parse_request_envelope(&data)
371 .filter(|(_, is_resp, _)| !is_resp)
372 .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
373 } else {
374 None
375 };
376 if let Some((payload, rr_message_id)) = rr_info {
377 match handle_replication_message(
378 &source,
379 &payload,
380 &p2p,
381 &storage,
382 &paid_list,
383 &payment_verifier,
384 &queues,
385 &config,
386 &is_bootstrapping,
387 &bootstrap_state,
388 &sync_history,
389 rr_message_id.as_deref(),
390 ).await {
391 Ok(()) => {}
392 Err(e) => {
393 debug!(
394 "Replication message from {source} error: {e}"
395 );
396 }
397 }
398 }
399 }
400 }
401 dht_event = dht_events.recv() => {
409 let Ok(dht_event) = dht_event else { continue };
410 if let DhtNetworkEvent::KClosestPeersChanged { .. } = dht_event {
411 debug!(
412 "K-closest peers changed, triggering early neighbor sync"
413 );
414 sync_trigger.notify_one();
415 }
416 }
417 }
418 }
419 debug!("Replication message handler shut down");
420 });
421 self.task_handles.push(handle);
422 }
423
424 fn start_neighbor_sync_loop(&mut self) {
425 let p2p = Arc::clone(&self.p2p_node);
426 let storage = Arc::clone(&self.storage);
427 let paid_list = Arc::clone(&self.paid_list);
428 let queues = Arc::clone(&self.queues);
429 let config = Arc::clone(&self.config);
430 let shutdown = self.shutdown.clone();
431 let sync_state = Arc::clone(&self.sync_state);
432 let sync_history = Arc::clone(&self.sync_history);
433 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
434 let bootstrap_state = Arc::clone(&self.bootstrap_state);
435 let sync_trigger = Arc::clone(&self.sync_trigger);
436
437 let handle = tokio::spawn(async move {
438 loop {
439 let interval = config.random_neighbor_sync_interval();
440 tokio::select! {
441 () = shutdown.cancelled() => break,
442 () = tokio::time::sleep(interval) => {}
443 () = sync_trigger.notified() => {
444 debug!("Neighbor sync triggered by topology change");
445 }
446 }
447 tokio::select! {
451 () = shutdown.cancelled() => break,
452 () = run_neighbor_sync_round(
453 &p2p,
454 &storage,
455 &paid_list,
456 &queues,
457 &config,
458 &sync_state,
459 &sync_history,
460 &is_bootstrapping,
461 &bootstrap_state,
462 ) => {}
463 }
464 }
465 debug!("Neighbor sync loop shut down");
466 });
467 self.task_handles.push(handle);
468 }
469
470 fn start_self_lookup_loop(&mut self) {
471 let p2p = Arc::clone(&self.p2p_node);
472 let config = Arc::clone(&self.config);
473 let shutdown = self.shutdown.clone();
474
475 let handle = tokio::spawn(async move {
476 loop {
477 let interval = config.random_self_lookup_interval();
478 tokio::select! {
479 () = shutdown.cancelled() => break,
480 () = tokio::time::sleep(interval) => {
481 if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
482 debug!("Self-lookup failed: {e}");
483 }
484 }
485 }
486 }
487 debug!("Self-lookup loop shut down");
488 });
489 self.task_handles.push(handle);
490 }
491
492 fn start_audit_loop(&mut self) {
493 let p2p = Arc::clone(&self.p2p_node);
494 let storage = Arc::clone(&self.storage);
495 let config = Arc::clone(&self.config);
496 let shutdown = self.shutdown.clone();
497 let sync_history = Arc::clone(&self.sync_history);
498 let bootstrap_state = Arc::clone(&self.bootstrap_state);
499 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
500 let sync_state = Arc::clone(&self.sync_state);
501
502 let handle = tokio::spawn(async move {
503 loop {
505 tokio::select! {
506 () = shutdown.cancelled() => return,
507 () = tokio::time::sleep(
508 std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
509 ) => {
510 if bootstrap_state.read().await.is_drained() {
511 break;
512 }
513 }
514 }
515 }
516
517 {
519 let bootstrapping = *is_bootstrapping.read().await;
520 let result = {
523 let claims = sync_state.read().await;
524 let history = sync_history.read().await;
525 audit::audit_tick(
526 &p2p,
527 &storage,
528 &config,
529 &history,
530 &claims.bootstrap_claims,
531 bootstrapping,
532 )
533 .await
534 };
535 handle_audit_result(&result, &p2p, &sync_state, &config).await;
536 }
537
538 loop {
540 let interval = config.random_audit_tick_interval();
541 tokio::select! {
542 () = shutdown.cancelled() => break,
543 () = tokio::time::sleep(interval) => {
544 let bootstrapping = *is_bootstrapping.read().await;
545 let result = {
547 let claims = sync_state.read().await;
548 let history = sync_history.read().await;
549 audit::audit_tick(
550 &p2p, &storage, &config, &history,
551 &claims.bootstrap_claims,
552 bootstrapping,
553 )
554 .await
555 };
556 handle_audit_result(&result, &p2p, &sync_state, &config).await;
557 }
558 }
559 }
560 debug!("Audit loop shut down");
561 });
562 self.task_handles.push(handle);
563 }
564
565 #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
566 fn start_fetch_worker(&mut self) {
567 let p2p = Arc::clone(&self.p2p_node);
568 let storage = Arc::clone(&self.storage);
569 let queues = Arc::clone(&self.queues);
570 let config = Arc::clone(&self.config);
571 let shutdown = self.shutdown.clone();
572 let bootstrap_state = Arc::clone(&self.bootstrap_state);
573 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
574 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
575 let concurrency = max_parallel_fetch();
576
577 info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
578
579 let handle = tokio::spawn(async move {
580 let mut in_flight = FuturesUnordered::<FetchFuture>::new();
583
584 loop {
585 {
587 let mut q = queues.write().await;
588 while in_flight.len() < concurrency {
589 let Some(candidate) = q.dequeue_fetch() else {
590 break;
591 };
592 let Some(&source) = candidate.sources.first() else {
593 warn!(
594 "Fetch candidate {} has no sources — dropping",
595 hex::encode(candidate.key)
596 );
597 continue;
598 };
599 q.start_fetch(candidate.key, source, candidate.sources.clone());
600
601 let p2p = Arc::clone(&p2p);
602 let storage = Arc::clone(&storage);
603 let config = Arc::clone(&config);
604 let token = shutdown.clone();
605 let fetch_key = candidate.key;
606 in_flight.push(Box::pin(async move {
607 let handle = tokio::spawn(async move {
608 tokio::select! {
610 () = token.cancelled() => FetchOutcome {
611 key: fetch_key,
612 result: FetchResult::SourceFailed,
613 },
614 outcome = execute_single_fetch(
615 p2p, storage, config, fetch_key, source,
616 ) => outcome,
617 }
618 });
619 match handle.await {
620 Ok(outcome) => (outcome.key, Some(outcome)),
621 Err(e) => {
622 error!(
623 "Fetch task for {} panicked: {e}",
624 hex::encode(fetch_key)
625 );
626 (fetch_key, None)
627 }
628 }
629 }));
630 }
631 } if in_flight.is_empty() {
634 tokio::select! {
636 () = shutdown.cancelled() => break,
637 () = tokio::time::sleep(
638 std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
639 ) => continue,
640 }
641 }
642
643 tokio::select! {
645 () = shutdown.cancelled() => break,
646 Some((key, maybe_outcome)) = in_flight.next() => {
647 let mut q = queues.write().await;
648 let terminal = if let Some(outcome) = maybe_outcome {
649 match outcome.result {
650 FetchResult::Stored => {
651 q.complete_fetch(&key);
652 true
653 }
654 FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
655 if let Some(next_peer) = q.retry_fetch(&key) {
656 let p2p = Arc::clone(&p2p);
658 let storage = Arc::clone(&storage);
659 let config = Arc::clone(&config);
660 let token = shutdown.clone();
661 let fetch_key = key;
662 in_flight.push(Box::pin(async move {
663 let handle = tokio::spawn(async move {
664 tokio::select! {
665 () = token.cancelled() => FetchOutcome {
666 key: fetch_key,
667 result: FetchResult::SourceFailed,
668 },
669 outcome = execute_single_fetch(
670 p2p, storage, config, fetch_key, next_peer,
671 ) => outcome,
672 }
673 });
674 match handle.await {
675 Ok(outcome) => (outcome.key, Some(outcome)),
676 Err(e) => {
677 error!(
678 "Fetch task for {} panicked: {e}",
679 hex::encode(fetch_key)
680 );
681 (fetch_key, None)
682 }
683 }
684 }));
685 false
686 } else {
687 q.complete_fetch(&key);
688 true
689 }
690 }
691 }
692 } else {
693 q.complete_fetch(&key);
695 true
696 };
697
698 if terminal {
700 drop(q); if !bootstrap_state.read().await.is_drained() {
702 bootstrap_state.write().await.remove_key(&key);
703 let q = queues.read().await;
704 if bootstrap::check_bootstrap_drained(
705 &bootstrap_state,
706 &q,
707 )
708 .await
709 {
710 complete_bootstrap(
711 &is_bootstrapping,
712 &bootstrap_complete_notify,
713 ).await;
714 }
715 }
716 }
717 }
718 }
719 }
720
721 while in_flight.next().await.is_some() {}
725 debug!("Fetch worker shut down");
726 });
727 self.task_handles.push(handle);
728 }
729
730 fn start_verification_worker(&mut self) {
731 let p2p = Arc::clone(&self.p2p_node);
732 let queues = Arc::clone(&self.queues);
733 let paid_list = Arc::clone(&self.paid_list);
734 let config = Arc::clone(&self.config);
735 let shutdown = self.shutdown.clone();
736 let bootstrap_state = Arc::clone(&self.bootstrap_state);
737 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
738 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
739
740 let handle = tokio::spawn(async move {
741 loop {
742 tokio::select! {
743 () = shutdown.cancelled() => break,
744 () = tokio::time::sleep(
745 std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
746 ) => {
747 run_verification_cycle(
748 &p2p, &paid_list, &queues, &config,
749 &bootstrap_state, &is_bootstrapping,
750 &bootstrap_complete_notify,
751 ).await;
752 }
753 }
754 }
755 debug!("Verification worker shut down");
756 });
757 self.task_handles.push(handle);
758 }
759
760 fn start_bootstrap_sync(
772 &mut self,
773 dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
774 ) {
775 let p2p = Arc::clone(&self.p2p_node);
776 let storage = Arc::clone(&self.storage);
777 let paid_list = Arc::clone(&self.paid_list);
778 let queues = Arc::clone(&self.queues);
779 let config = Arc::clone(&self.config);
780 let shutdown = self.shutdown.clone();
781 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
782 let bootstrap_state = Arc::clone(&self.bootstrap_state);
783 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
784
785 let handle = tokio::spawn(async move {
786 let gate = bootstrap::wait_for_bootstrap_complete(
790 dht_events,
791 config.bootstrap_complete_timeout_secs,
792 &shutdown,
793 )
794 .await;
795
796 if gate == bootstrap::BootstrapGateResult::Shutdown {
797 return;
798 }
799
800 let self_id = *p2p.peer_id();
801 let neighbors =
802 neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
803 .await;
804
805 if neighbors.is_empty() {
806 info!("Bootstrap sync: no close neighbors found, marking drained");
807 bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
808 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
809 return;
810 }
811
812 let neighbor_count = neighbors.len();
813 info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
814
815 for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
817 if shutdown.is_cancelled() {
818 break;
819 }
820
821 for peer in batch {
822 if shutdown.is_cancelled() {
823 break;
824 }
825
826 let bootstrapping = *is_bootstrapping.read().await;
828
829 bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
830
831 let response = neighbor_sync::sync_with_peer(
832 peer,
833 &p2p,
834 &storage,
835 &paid_list,
836 &config,
837 bootstrapping,
838 )
839 .await;
840
841 bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
842
843 if let Some(resp) = response {
844 if !resp.bootstrapping {
845 let admitted_keys = admit_and_queue_hints(
847 &self_id,
848 peer,
849 &resp.replica_hints,
850 &resp.paid_hints,
851 &p2p,
852 &config,
853 &storage,
854 &paid_list,
855 &queues,
856 )
857 .await;
858
859 if !admitted_keys.is_empty() {
861 bootstrap::track_discovered_keys(&bootstrap_state, &admitted_keys)
862 .await;
863 }
864 }
865 }
866 }
867 }
868
869 {
871 let q = queues.read().await;
872 if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
873 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
874 }
875 }
876
877 info!("Bootstrap sync completed");
878 });
879 self.task_handles.push(handle);
880 }
881}
882
883#[allow(clippy::too_many_arguments)]
893async fn handle_replication_message(
894 source: &PeerId,
895 data: &[u8],
896 p2p_node: &Arc<P2PNode>,
897 storage: &Arc<LmdbStorage>,
898 paid_list: &Arc<PaidList>,
899 payment_verifier: &Arc<PaymentVerifier>,
900 queues: &Arc<RwLock<ReplicationQueues>>,
901 config: &ReplicationConfig,
902 is_bootstrapping: &Arc<RwLock<bool>>,
903 bootstrap_state: &Arc<RwLock<BootstrapState>>,
904 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
905 rr_message_id: Option<&str>,
906) -> Result<()> {
907 let msg = ReplicationMessage::decode(data)
908 .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
909
910 match msg.body {
911 ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
912 handle_fresh_offer(
913 source,
914 offer,
915 storage,
916 paid_list,
917 payment_verifier,
918 p2p_node,
919 config,
920 msg.request_id,
921 rr_message_id,
922 )
923 .await
924 }
925 ReplicationMessageBody::PaidNotify(ref notify) => {
926 handle_paid_notify(
927 source,
928 notify,
929 paid_list,
930 payment_verifier,
931 p2p_node,
932 config,
933 )
934 .await
935 }
936 ReplicationMessageBody::NeighborSyncRequest(ref request) => {
937 let bootstrapping = *is_bootstrapping.read().await;
938 handle_neighbor_sync_request(
939 source,
940 request,
941 p2p_node,
942 storage,
943 paid_list,
944 queues,
945 config,
946 bootstrapping,
947 bootstrap_state,
948 sync_history,
949 msg.request_id,
950 rr_message_id,
951 )
952 .await
953 }
954 ReplicationMessageBody::VerificationRequest(ref request) => {
955 handle_verification_request(
956 source,
957 request,
958 storage,
959 paid_list,
960 p2p_node,
961 msg.request_id,
962 rr_message_id,
963 )
964 .await
965 }
966 ReplicationMessageBody::FetchRequest(ref request) => {
967 handle_fetch_request(
968 source,
969 request,
970 storage,
971 p2p_node,
972 msg.request_id,
973 rr_message_id,
974 )
975 .await
976 }
977 ReplicationMessageBody::AuditChallenge(ref challenge) => {
978 let bootstrapping = *is_bootstrapping.read().await;
979 handle_audit_challenge_msg(
980 source,
981 challenge,
982 storage,
983 p2p_node,
984 bootstrapping,
985 msg.request_id,
986 rr_message_id,
987 )
988 .await
989 }
990 ReplicationMessageBody::FreshReplicationResponse(_)
992 | ReplicationMessageBody::NeighborSyncResponse(_)
993 | ReplicationMessageBody::VerificationResponse(_)
994 | ReplicationMessageBody::FetchResponse(_)
995 | ReplicationMessageBody::AuditResponse(_) => Ok(()),
996 }
997}
998
999#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1004async fn handle_fresh_offer(
1005 source: &PeerId,
1006 offer: &protocol::FreshReplicationOffer,
1007 storage: &Arc<LmdbStorage>,
1008 paid_list: &Arc<PaidList>,
1009 payment_verifier: &Arc<PaymentVerifier>,
1010 p2p_node: &Arc<P2PNode>,
1011 config: &ReplicationConfig,
1012 request_id: u64,
1013 rr_message_id: Option<&str>,
1014) -> Result<()> {
1015 let self_id = *p2p_node.peer_id();
1016
1017 if offer.proof_of_payment.is_empty() {
1019 send_replication_response(
1020 source,
1021 p2p_node,
1022 request_id,
1023 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1024 key: offer.key,
1025 reason: "Missing proof of payment".to_string(),
1026 }),
1027 rr_message_id,
1028 )
1029 .await;
1030 return Ok(());
1031 }
1032
1033 if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1037 warn!(
1038 "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1039 hex::encode(offer.key),
1040 offer.data.len(),
1041 crate::ant_protocol::MAX_CHUNK_SIZE,
1042 );
1043 p2p_node
1044 .report_trust_event(
1045 source,
1046 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1047 )
1048 .await;
1049 send_replication_response(
1050 source,
1051 p2p_node,
1052 request_id,
1053 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1054 key: offer.key,
1055 reason: format!(
1056 "Data size {} exceeds maximum chunk size {}",
1057 offer.data.len(),
1058 crate::ant_protocol::MAX_CHUNK_SIZE,
1059 ),
1060 }),
1061 rr_message_id,
1062 )
1063 .await;
1064 return Ok(());
1065 }
1066
1067 if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1069 send_replication_response(
1070 source,
1071 p2p_node,
1072 request_id,
1073 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1074 key: offer.key,
1075 reason: "Not responsible for this key".to_string(),
1076 }),
1077 rr_message_id,
1078 )
1079 .await;
1080 return Ok(());
1081 }
1082
1083 match payment_verifier
1085 .verify_payment(&offer.key, Some(&offer.proof_of_payment))
1086 .await
1087 {
1088 Ok(status) if status.can_store() => {
1089 debug!(
1090 "PoP validated for fresh offer key {}",
1091 hex::encode(offer.key)
1092 );
1093 }
1094 Ok(_) => {
1095 send_replication_response(
1096 source,
1097 p2p_node,
1098 request_id,
1099 ReplicationMessageBody::FreshReplicationResponse(
1100 FreshReplicationResponse::Rejected {
1101 key: offer.key,
1102 reason: "Payment verification failed: payment required".to_string(),
1103 },
1104 ),
1105 rr_message_id,
1106 )
1107 .await;
1108 return Ok(());
1109 }
1110 Err(e) => {
1111 warn!(
1112 "PoP verification error for key {}: {e}",
1113 hex::encode(offer.key)
1114 );
1115 send_replication_response(
1116 source,
1117 p2p_node,
1118 request_id,
1119 ReplicationMessageBody::FreshReplicationResponse(
1120 FreshReplicationResponse::Rejected {
1121 key: offer.key,
1122 reason: format!("Payment verification error: {e}"),
1123 },
1124 ),
1125 rr_message_id,
1126 )
1127 .await;
1128 return Ok(());
1129 }
1130 }
1131
1132 if let Err(e) = paid_list.insert(&offer.key).await {
1134 warn!("Failed to add key to PaidForList: {e}");
1135 }
1136
1137 match storage.put(&offer.key, &offer.data).await {
1139 Ok(_) => {
1140 send_replication_response(
1141 source,
1142 p2p_node,
1143 request_id,
1144 ReplicationMessageBody::FreshReplicationResponse(
1145 FreshReplicationResponse::Accepted { key: offer.key },
1146 ),
1147 rr_message_id,
1148 )
1149 .await;
1150 }
1151 Err(e) => {
1152 send_replication_response(
1153 source,
1154 p2p_node,
1155 request_id,
1156 ReplicationMessageBody::FreshReplicationResponse(
1157 FreshReplicationResponse::Rejected {
1158 key: offer.key,
1159 reason: format!("Storage error: {e}"),
1160 },
1161 ),
1162 rr_message_id,
1163 )
1164 .await;
1165 }
1166 }
1167
1168 Ok(())
1169}
1170
1171async fn handle_paid_notify(
1172 _source: &PeerId,
1173 notify: &protocol::PaidNotify,
1174 paid_list: &Arc<PaidList>,
1175 payment_verifier: &Arc<PaymentVerifier>,
1176 p2p_node: &Arc<P2PNode>,
1177 config: &ReplicationConfig,
1178) -> Result<()> {
1179 let self_id = *p2p_node.peer_id();
1180
1181 if notify.proof_of_payment.is_empty() {
1183 return Ok(());
1184 }
1185
1186 if !admission::is_in_paid_close_group(
1188 &self_id,
1189 ¬ify.key,
1190 p2p_node,
1191 config.paid_list_close_group_size,
1192 )
1193 .await
1194 {
1195 return Ok(());
1196 }
1197
1198 match payment_verifier
1200 .verify_payment(¬ify.key, Some(¬ify.proof_of_payment))
1201 .await
1202 {
1203 Ok(status) if status.can_store() => {
1204 debug!(
1205 "PoP validated for paid notify key {}",
1206 hex::encode(notify.key)
1207 );
1208 }
1209 Ok(_) => {
1210 warn!(
1211 "Paid notify rejected: payment required for key {}",
1212 hex::encode(notify.key)
1213 );
1214 return Ok(());
1215 }
1216 Err(e) => {
1217 warn!(
1218 "PoP verification error for paid notify key {}: {e}",
1219 hex::encode(notify.key)
1220 );
1221 return Ok(());
1222 }
1223 }
1224
1225 if let Err(e) = paid_list.insert(¬ify.key).await {
1226 warn!("Failed to add paid notify key to PaidForList: {e}");
1227 }
1228
1229 Ok(())
1230}
1231
1232#[allow(clippy::too_many_arguments)]
1233async fn handle_neighbor_sync_request(
1234 source: &PeerId,
1235 request: &protocol::NeighborSyncRequest,
1236 p2p_node: &Arc<P2PNode>,
1237 storage: &Arc<LmdbStorage>,
1238 paid_list: &Arc<PaidList>,
1239 queues: &Arc<RwLock<ReplicationQueues>>,
1240 config: &ReplicationConfig,
1241 is_bootstrapping: bool,
1242 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1243 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1244 request_id: u64,
1245 rr_message_id: Option<&str>,
1246) -> Result<()> {
1247 let self_id = *p2p_node.peer_id();
1248
1249 let (response, sender_in_rt) = neighbor_sync::handle_sync_request(
1257 source,
1258 request,
1259 p2p_node,
1260 storage,
1261 paid_list,
1262 config,
1263 is_bootstrapping,
1264 )
1265 .await;
1266
1267 send_replication_response(
1269 source,
1270 p2p_node,
1271 request_id,
1272 ReplicationMessageBody::NeighborSyncResponse(response),
1273 rr_message_id,
1274 )
1275 .await;
1276
1277 if !sender_in_rt {
1279 return Ok(());
1280 }
1281
1282 {
1284 let mut history = sync_history.write().await;
1285 let record = history.entry(*source).or_insert(PeerSyncRecord {
1286 last_sync: None,
1287 cycles_since_sync: 0,
1288 });
1289 record.last_sync = Some(Instant::now());
1290 record.cycles_since_sync = 0;
1291 }
1292
1293 let admitted_keys = admit_and_queue_hints(
1295 &self_id,
1296 source,
1297 &request.replica_hints,
1298 &request.paid_hints,
1299 p2p_node,
1300 config,
1301 storage,
1302 paid_list,
1303 queues,
1304 )
1305 .await;
1306
1307 if is_bootstrapping && !admitted_keys.is_empty() {
1310 bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await;
1311 }
1312
1313 Ok(())
1314}
1315
1316async fn handle_verification_request(
1317 source: &PeerId,
1318 request: &protocol::VerificationRequest,
1319 storage: &Arc<LmdbStorage>,
1320 paid_list: &Arc<PaidList>,
1321 p2p_node: &Arc<P2PNode>,
1322 request_id: u64,
1323 rr_message_id: Option<&str>,
1324) -> Result<()> {
1325 #[allow(clippy::cast_possible_truncation)]
1331 let keys_len = request.keys.len() as u32;
1332 let paid_check_set: HashSet<u32> = request
1333 .paid_list_check_indices
1334 .iter()
1335 .copied()
1336 .filter(|&idx| {
1337 if idx >= keys_len {
1338 warn!(
1339 "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1340 request.keys.len(),
1341 );
1342 false
1343 } else {
1344 true
1345 }
1346 })
1347 .collect();
1348
1349 let mut results = Vec::with_capacity(request.keys.len());
1350 for (i, key) in request.keys.iter().enumerate() {
1351 let present = storage.exists(key).unwrap_or(false);
1352 let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1353 Some(paid_list.contains(key).unwrap_or(false))
1354 } else {
1355 None
1356 };
1357 results.push(protocol::KeyVerificationResult {
1358 key: *key,
1359 present,
1360 paid,
1361 });
1362 }
1363
1364 send_replication_response(
1365 source,
1366 p2p_node,
1367 request_id,
1368 ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1369 rr_message_id,
1370 )
1371 .await;
1372
1373 Ok(())
1374}
1375
1376async fn handle_fetch_request(
1377 source: &PeerId,
1378 request: &protocol::FetchRequest,
1379 storage: &Arc<LmdbStorage>,
1380 p2p_node: &Arc<P2PNode>,
1381 request_id: u64,
1382 rr_message_id: Option<&str>,
1383) -> Result<()> {
1384 let response = match storage.get(&request.key).await {
1385 Ok(Some(data)) => protocol::FetchResponse::Success {
1386 key: request.key,
1387 data,
1388 },
1389 Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1390 Err(e) => protocol::FetchResponse::Error {
1391 key: request.key,
1392 reason: format!("{e}"),
1393 },
1394 };
1395
1396 send_replication_response(
1397 source,
1398 p2p_node,
1399 request_id,
1400 ReplicationMessageBody::FetchResponse(response),
1401 rr_message_id,
1402 )
1403 .await;
1404
1405 Ok(())
1406}
1407
1408async fn handle_audit_challenge_msg(
1409 source: &PeerId,
1410 challenge: &protocol::AuditChallenge,
1411 storage: &Arc<LmdbStorage>,
1412 p2p_node: &Arc<P2PNode>,
1413 is_bootstrapping: bool,
1414 request_id: u64,
1415 rr_message_id: Option<&str>,
1416) -> Result<()> {
1417 #[allow(clippy::cast_possible_truncation)]
1418 let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1419 let response = audit::handle_audit_challenge(
1420 challenge,
1421 storage,
1422 p2p_node.peer_id(),
1423 is_bootstrapping,
1424 stored_chunks,
1425 )
1426 .await;
1427
1428 send_replication_response(
1429 source,
1430 p2p_node,
1431 request_id,
1432 ReplicationMessageBody::AuditResponse(response),
1433 rr_message_id,
1434 )
1435 .await;
1436
1437 Ok(())
1438}
1439
1440async fn send_replication_response(
1451 peer: &PeerId,
1452 p2p_node: &Arc<P2PNode>,
1453 request_id: u64,
1454 body: ReplicationMessageBody,
1455 rr_message_id: Option<&str>,
1456) {
1457 let msg = ReplicationMessage { request_id, body };
1458 let encoded = match msg.encode() {
1459 Ok(data) => data,
1460 Err(e) => {
1461 warn!("Failed to encode replication response: {e}");
1462 return;
1463 }
1464 };
1465 let result = if let Some(msg_id) = rr_message_id {
1466 p2p_node
1467 .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1468 .await
1469 } else {
1470 p2p_node
1471 .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1472 .await
1473 };
1474 if let Err(e) = result {
1475 debug!("Failed to send replication response to {peer}: {e}");
1476 }
1477}
1478
1479#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1485async fn run_neighbor_sync_round(
1486 p2p_node: &Arc<P2PNode>,
1487 storage: &Arc<LmdbStorage>,
1488 paid_list: &Arc<PaidList>,
1489 queues: &Arc<RwLock<ReplicationQueues>>,
1490 config: &ReplicationConfig,
1491 sync_state: &Arc<RwLock<NeighborSyncState>>,
1492 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1493 is_bootstrapping: &Arc<RwLock<bool>>,
1494 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1495) {
1496 let self_id = *p2p_node.peer_id();
1497 let bootstrapping = *is_bootstrapping.read().await;
1498
1499 let cycle_complete = sync_state.read().await.is_cycle_complete();
1503 if cycle_complete {
1504 pruning::run_prune_pass(&self_id, storage, paid_list, p2p_node, config).await;
1506
1507 {
1509 let mut history = sync_history.write().await;
1510 for record in history.values_mut() {
1511 record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1512 }
1513 }
1514
1515 let neighbors =
1517 neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1518 .await;
1519
1520 let mut state = sync_state.write().await;
1522 if state.is_cycle_complete() {
1523 let old_sync_times = std::mem::take(&mut state.last_sync_times);
1527 let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1528 *state = NeighborSyncState::new_cycle(neighbors);
1529 state.last_sync_times = old_sync_times;
1530 state.bootstrap_claims = old_bootstrap_claims;
1531 }
1532 }
1533
1534 let batch = {
1536 let mut state = sync_state.write().await;
1537 neighbor_sync::select_sync_batch(
1538 &mut state,
1539 config.neighbor_sync_peer_count,
1540 config.neighbor_sync_cooldown,
1541 )
1542 };
1543
1544 if batch.is_empty() {
1545 return;
1546 }
1547
1548 debug!("Neighbor sync: syncing with {} peers", batch.len());
1549
1550 for peer in &batch {
1552 let response = neighbor_sync::sync_with_peer(
1553 peer,
1554 p2p_node,
1555 storage,
1556 paid_list,
1557 config,
1558 bootstrapping,
1559 )
1560 .await;
1561
1562 if let Some(resp) = response {
1563 handle_sync_response(
1564 &self_id,
1565 peer,
1566 &resp,
1567 p2p_node,
1568 config,
1569 bootstrapping,
1570 bootstrap_state,
1571 storage,
1572 paid_list,
1573 queues,
1574 sync_state,
1575 sync_history,
1576 )
1577 .await;
1578 } else {
1579 let replacement = {
1581 let mut state = sync_state.write().await;
1582 neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1583 };
1584
1585 if let Some(replacement_peer) = replacement {
1587 let replacement_resp = neighbor_sync::sync_with_peer(
1588 &replacement_peer,
1589 p2p_node,
1590 storage,
1591 paid_list,
1592 config,
1593 bootstrapping,
1594 )
1595 .await;
1596
1597 if let Some(resp) = replacement_resp {
1598 handle_sync_response(
1599 &self_id,
1600 &replacement_peer,
1601 &resp,
1602 p2p_node,
1603 config,
1604 bootstrapping,
1605 bootstrap_state,
1606 storage,
1607 paid_list,
1608 queues,
1609 sync_state,
1610 sync_history,
1611 )
1612 .await;
1613 }
1614 }
1615 }
1616 }
1617}
1618
1619#[allow(clippy::too_many_arguments)]
1622async fn handle_sync_response(
1623 self_id: &PeerId,
1624 peer: &PeerId,
1625 resp: &NeighborSyncResponse,
1626 p2p_node: &Arc<P2PNode>,
1627 config: &ReplicationConfig,
1628 bootstrapping: bool,
1629 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1630 storage: &Arc<LmdbStorage>,
1631 paid_list: &Arc<PaidList>,
1632 queues: &Arc<RwLock<ReplicationQueues>>,
1633 sync_state: &Arc<RwLock<NeighborSyncState>>,
1634 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1635) {
1636 {
1638 let mut state = sync_state.write().await;
1639 neighbor_sync::record_successful_sync(&mut state, peer);
1640 }
1641 {
1642 let mut history = sync_history.write().await;
1643 let record = history.entry(*peer).or_insert(PeerSyncRecord {
1644 last_sync: None,
1645 cycles_since_sync: 0,
1646 });
1647 record.last_sync = Some(Instant::now());
1648 record.cycles_since_sync = 0;
1649 }
1650
1651 if resp.bootstrapping {
1653 let should_report = {
1657 let now = Instant::now();
1658 let mut state = sync_state.write().await;
1659 let first_seen = state.bootstrap_claims.entry(*peer).or_insert(now);
1660 let claim_age = now.duration_since(*first_seen);
1661 if claim_age > config.bootstrap_claim_grace_period {
1662 warn!(
1663 "Peer {peer} has been claiming bootstrap for {:?}, \
1664 exceeding grace period of {:?} — reporting abuse",
1665 claim_age, config.bootstrap_claim_grace_period,
1666 );
1667 true
1668 } else {
1669 false
1670 }
1671 };
1672 if should_report {
1673 p2p_node
1674 .report_trust_event(
1675 peer,
1676 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1677 )
1678 .await;
1679 }
1680 } else {
1681 {
1683 let mut state = sync_state.write().await;
1684 state.bootstrap_claims.remove(peer);
1685 }
1686 let admitted_keys = admit_and_queue_hints(
1687 self_id,
1688 peer,
1689 &resp.replica_hints,
1690 &resp.paid_hints,
1691 p2p_node,
1692 config,
1693 storage,
1694 paid_list,
1695 queues,
1696 )
1697 .await;
1698
1699 if bootstrapping && !admitted_keys.is_empty() {
1702 bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await;
1703 }
1704 }
1705}
1706
1707#[allow(clippy::too_many_arguments)]
1712async fn admit_and_queue_hints(
1713 self_id: &PeerId,
1714 source_peer: &PeerId,
1715 replica_hints: &[XorName],
1716 paid_hints: &[XorName],
1717 p2p_node: &Arc<P2PNode>,
1718 config: &ReplicationConfig,
1719 storage: &Arc<LmdbStorage>,
1720 paid_list: &Arc<PaidList>,
1721 queues: &Arc<RwLock<ReplicationQueues>>,
1722) -> HashSet<XorName> {
1723 let pending_keys: HashSet<XorName> = {
1724 let q = queues.read().await;
1725 q.pending_keys().into_iter().collect()
1726 };
1727
1728 let admitted = admission::admit_hints(
1729 self_id,
1730 replica_hints,
1731 paid_hints,
1732 p2p_node,
1733 config,
1734 storage,
1735 paid_list,
1736 &pending_keys,
1737 )
1738 .await;
1739
1740 let mut discovered = HashSet::new();
1741 let mut q = queues.write().await;
1742 let now = Instant::now();
1743
1744 for key in admitted.replica_keys {
1745 if !storage.exists(&key).unwrap_or(false) {
1746 let added = q.add_pending_verify(
1747 key,
1748 VerificationEntry {
1749 state: VerificationState::PendingVerify,
1750 pipeline: HintPipeline::Replica,
1751 verified_sources: Vec::new(),
1752 tried_sources: HashSet::new(),
1753 created_at: now,
1754 hint_sender: *source_peer,
1755 },
1756 );
1757 if added {
1758 discovered.insert(key);
1759 }
1760 }
1761 }
1762
1763 for key in admitted.paid_only_keys {
1764 let added = q.add_pending_verify(
1765 key,
1766 VerificationEntry {
1767 state: VerificationState::PendingVerify,
1768 pipeline: HintPipeline::PaidOnly,
1769 verified_sources: Vec::new(),
1770 tried_sources: HashSet::new(),
1771 created_at: now,
1772 hint_sender: *source_peer,
1773 },
1774 );
1775 if added {
1776 discovered.insert(key);
1777 }
1778 }
1779
1780 discovered
1781}
1782
1783#[allow(clippy::too_many_lines)]
1789async fn run_verification_cycle(
1790 p2p_node: &Arc<P2PNode>,
1791 paid_list: &Arc<PaidList>,
1792 queues: &Arc<RwLock<ReplicationQueues>>,
1793 config: &ReplicationConfig,
1794 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1795 is_bootstrapping: &Arc<RwLock<bool>>,
1796 bootstrap_complete_notify: &Arc<Notify>,
1797) {
1798 {
1801 let mut q = queues.write().await;
1802 q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
1803 }
1804
1805 let pending_keys = {
1806 let q = queues.read().await;
1807 q.pending_keys()
1808 };
1809
1810 if pending_keys.is_empty() {
1811 return;
1812 }
1813
1814 let self_id = *p2p_node.peer_id();
1815
1816 let mut keys_needing_network = Vec::new();
1819 let mut terminal_keys: Vec<XorName> = Vec::new();
1820 {
1821 let mut q = queues.write().await;
1822 for key in &pending_keys {
1823 if paid_list.contains(key).unwrap_or(false) {
1824 if let Some(entry) = q.get_pending_mut(key) {
1825 entry.state = VerificationState::PaidListVerified;
1826 if entry.pipeline == HintPipeline::PaidOnly {
1827 q.remove_pending(key);
1829 terminal_keys.push(*key);
1830 continue;
1831 }
1832 }
1833 }
1834 keys_needing_network.push(*key);
1836 }
1837 }
1838
1839 if !keys_needing_network.is_empty() {
1841 let targets =
1843 quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
1844 .await;
1845
1846 let evidence =
1847 quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
1848
1849 let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
1852 {
1853 let q = queues.read().await;
1854 for key in &keys_needing_network {
1855 let Some(ev) = evidence.get(key) else {
1856 continue;
1857 };
1858 let Some(entry) = q.get_pending(key) else {
1859 continue;
1860 };
1861 let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
1862 evaluated.push((*key, outcome, entry.pipeline));
1863 }
1864 } let mut paid_insert_keys: Vec<XorName> = Vec::new();
1868 for (key, outcome, _) in &evaluated {
1869 if matches!(
1870 outcome,
1871 KeyVerificationOutcome::QuorumVerified { .. }
1872 | KeyVerificationOutcome::PaidListVerified { .. }
1873 ) {
1874 paid_insert_keys.push(*key);
1875 }
1876 }
1877 for key in &paid_insert_keys {
1878 if let Err(e) = paid_list.insert(key).await {
1879 warn!("Failed to add verified key to PaidForList: {e}");
1880 }
1881 }
1882
1883 let mut q = queues.write().await;
1885 for (key, outcome, pipeline) in evaluated {
1886 match outcome {
1887 KeyVerificationOutcome::QuorumVerified { sources }
1888 | KeyVerificationOutcome::PaidListVerified { sources } => {
1889 if pipeline == HintPipeline::Replica && !sources.is_empty() {
1890 let distance =
1891 crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
1892 q.remove_pending(&key);
1893 q.enqueue_fetch(key, distance, sources);
1894 } else if pipeline == HintPipeline::Replica && sources.is_empty() {
1896 warn!(
1897 "Verified key {} has no holders (possible data loss)",
1898 hex::encode(key)
1899 );
1900 q.remove_pending(&key);
1901 terminal_keys.push(key);
1902 } else {
1903 q.remove_pending(&key);
1904 terminal_keys.push(key);
1905 }
1906 }
1907 KeyVerificationOutcome::QuorumFailed
1908 | KeyVerificationOutcome::QuorumInconclusive => {
1909 q.remove_pending(&key);
1910 terminal_keys.push(key);
1911 }
1912 }
1913 }
1914 }
1915
1916 update_bootstrap_after_verification(
1919 &terminal_keys,
1920 bootstrap_state,
1921 queues,
1922 is_bootstrapping,
1923 bootstrap_complete_notify,
1924 )
1925 .await;
1926}
1927
1928async fn update_bootstrap_after_verification(
1931 terminal_keys: &[XorName],
1932 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1933 queues: &Arc<RwLock<ReplicationQueues>>,
1934 is_bootstrapping: &Arc<RwLock<bool>>,
1935 bootstrap_complete_notify: &Arc<Notify>,
1936) {
1937 if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
1938 return;
1939 }
1940 {
1941 let mut bs = bootstrap_state.write().await;
1942 for key in terminal_keys {
1943 bs.remove_key(key);
1944 }
1945 }
1946 let q = queues.read().await;
1947 if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
1948 complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
1949 }
1950}
1951
1952async fn complete_bootstrap(
1954 is_bootstrapping: &Arc<RwLock<bool>>,
1955 bootstrap_complete_notify: &Arc<Notify>,
1956) {
1957 *is_bootstrapping.write().await = false;
1958 bootstrap_complete_notify.notify_waiters();
1959 info!("Replication bootstrap complete");
1960}
1961
1962enum FetchResult {
1968 Stored,
1970 IntegrityFailed,
1972 SourceFailed,
1974}
1975
1976struct FetchOutcome {
1979 key: XorName,
1980 result: FetchResult,
1981}
1982
1983#[allow(clippy::too_many_lines)]
1984async fn execute_single_fetch(
1990 p2p_node: Arc<P2PNode>,
1991 storage: Arc<LmdbStorage>,
1992 config: Arc<ReplicationConfig>,
1993 key: XorName,
1994 source: PeerId,
1995) -> FetchOutcome {
1996 let request = protocol::FetchRequest { key };
1997 let msg = ReplicationMessage {
1998 request_id: rand::thread_rng().gen::<u64>(),
1999 body: ReplicationMessageBody::FetchRequest(request),
2000 };
2001
2002 let encoded = match msg.encode() {
2003 Ok(data) => data,
2004 Err(e) => {
2005 warn!("Failed to encode fetch request: {e}");
2006 return FetchOutcome {
2007 key,
2008 result: FetchResult::SourceFailed,
2009 };
2010 }
2011 };
2012
2013 let result = p2p_node
2014 .send_request(
2015 &source,
2016 REPLICATION_PROTOCOL_ID,
2017 encoded,
2018 config.fetch_request_timeout,
2019 )
2020 .await;
2021
2022 match result {
2023 Ok(response) => {
2024 let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2025 p2p_node
2026 .report_trust_event(
2027 &source,
2028 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2029 )
2030 .await;
2031 return FetchOutcome {
2032 key,
2033 result: FetchResult::SourceFailed,
2034 };
2035 };
2036
2037 match resp_msg.body {
2038 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2039 key: resp_key,
2040 data,
2041 }) => {
2042 if resp_key != key {
2047 warn!(
2048 "Fetch response key mismatch: requested {}, got {}",
2049 hex::encode(key),
2050 hex::encode(resp_key)
2051 );
2052 p2p_node
2053 .report_trust_event(
2054 &source,
2055 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2056 )
2057 .await;
2058 return FetchOutcome {
2059 key,
2060 result: FetchResult::IntegrityFailed,
2061 };
2062 }
2063
2064 if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2068 warn!(
2069 "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2070 hex::encode(resp_key),
2071 data.len(),
2072 crate::ant_protocol::MAX_CHUNK_SIZE,
2073 );
2074 p2p_node
2075 .report_trust_event(
2076 &source,
2077 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2078 )
2079 .await;
2080 return FetchOutcome {
2081 key,
2082 result: FetchResult::IntegrityFailed,
2083 };
2084 }
2085
2086 let computed = crate::client::compute_address(&data);
2088 if computed != resp_key {
2089 warn!(
2090 "Fetched record integrity check failed: expected {}, got {}",
2091 hex::encode(resp_key),
2092 hex::encode(computed)
2093 );
2094 p2p_node
2095 .report_trust_event(
2096 &source,
2097 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2098 )
2099 .await;
2100 return FetchOutcome {
2101 key,
2102 result: FetchResult::IntegrityFailed,
2103 };
2104 }
2105
2106 if let Err(e) = storage.put(&resp_key, &data).await {
2107 warn!(
2108 "Failed to store fetched record {}: {e}",
2109 hex::encode(resp_key)
2110 );
2111 return FetchOutcome {
2112 key,
2113 result: FetchResult::SourceFailed,
2114 };
2115 }
2116
2117 FetchOutcome {
2118 key,
2119 result: FetchResult::Stored,
2120 }
2121 }
2122 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2123 ..
2124 }) => {
2125 debug!("Fetch: peer {source} does not have {}", hex::encode(key));
2128 FetchOutcome {
2129 key,
2130 result: FetchResult::SourceFailed,
2131 }
2132 }
2133 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2134 reason,
2135 ..
2136 }) => {
2137 warn!(
2138 "Fetch: peer {source} returned error for {}: {reason}",
2139 hex::encode(key)
2140 );
2141 p2p_node
2142 .report_trust_event(
2143 &source,
2144 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2145 )
2146 .await;
2147 FetchOutcome {
2148 key,
2149 result: FetchResult::SourceFailed,
2150 }
2151 }
2152 _ => {
2153 p2p_node
2155 .report_trust_event(
2156 &source,
2157 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2158 )
2159 .await;
2160 FetchOutcome {
2161 key,
2162 result: FetchResult::SourceFailed,
2163 }
2164 }
2165 }
2166 }
2167 Err(e) => {
2168 debug!("Fetch request to {source} failed: {e}");
2169 FetchOutcome {
2172 key,
2173 result: FetchResult::SourceFailed,
2174 }
2175 }
2176 }
2177}
2178
2179async fn handle_audit_result(
2185 result: &AuditTickResult,
2186 p2p_node: &Arc<P2PNode>,
2187 sync_state: &Arc<RwLock<NeighborSyncState>>,
2188 config: &ReplicationConfig,
2189) {
2190 match result {
2191 AuditTickResult::Passed {
2192 challenged_peer,
2193 keys_checked,
2194 } => {
2195 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2196 {
2199 let mut state = sync_state.write().await;
2200 state.bootstrap_claims.remove(challenged_peer);
2201 }
2202 p2p_node
2203 .report_trust_event(
2204 challenged_peer,
2205 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2206 )
2207 .await;
2208 }
2209 AuditTickResult::Failed { evidence } => {
2210 if let FailureEvidence::AuditFailure {
2211 challenged_peer,
2212 confirmed_failed_keys,
2213 ..
2214 } = evidence
2215 {
2216 error!(
2217 "Audit failure for {challenged_peer}: {} confirmed failed keys",
2218 confirmed_failed_keys.len()
2219 );
2220 {
2223 let mut state = sync_state.write().await;
2224 state.bootstrap_claims.remove(challenged_peer);
2225 }
2226 p2p_node
2227 .report_trust_event(
2228 challenged_peer,
2229 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2230 )
2231 .await;
2232 }
2233 }
2234 AuditTickResult::BootstrapClaim { peer } => {
2235 let should_report = {
2239 let now = Instant::now();
2240 let mut state = sync_state.write().await;
2241 let first_seen = state.bootstrap_claims.entry(*peer).or_insert(now);
2242 let claim_age = now.duration_since(*first_seen);
2243 if claim_age > config.bootstrap_claim_grace_period {
2244 warn!(
2245 "Audit: peer {peer} claiming bootstrap past grace period \
2246 ({:?} > {:?}), reporting abuse",
2247 claim_age, config.bootstrap_claim_grace_period,
2248 );
2249 true
2250 } else {
2251 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2252 false
2253 }
2254 };
2255 if should_report {
2256 p2p_node
2257 .report_trust_event(
2258 peer,
2259 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2260 )
2261 .await;
2262 }
2263 }
2264 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2265 }
2266}
2267
2268