1#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::PaymentVerifier;
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50 max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51 REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55 FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56 VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61 AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
62 NeighborSyncState, PeerSyncRecord, VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68const RR_PREFIX: &str = "/rr/";
74
75type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78struct VerificationCycleContext<'a> {
80 p2p_node: &'a Arc<P2PNode>,
81 paid_list: &'a Arc<PaidList>,
82 storage: &'a Arc<LmdbStorage>,
83 queues: &'a Arc<RwLock<ReplicationQueues>>,
84 config: &'a ReplicationConfig,
85 bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
86 is_bootstrapping: &'a Arc<RwLock<bool>>,
87 bootstrap_complete_notify: &'a Arc<Notify>,
88}
89
90const FETCH_WORKER_POLL_MS: u64 = 100;
92
93const VERIFICATION_WORKER_POLL_MS: u64 = 250;
95
96const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
98
99const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
105
106pub struct ReplicationEngine {
112 config: Arc<ReplicationConfig>,
114 p2p_node: Arc<P2PNode>,
116 storage: Arc<LmdbStorage>,
118 paid_list: Arc<PaidList>,
120 payment_verifier: Arc<PaymentVerifier>,
122 queues: Arc<RwLock<ReplicationQueues>>,
124 sync_state: Arc<RwLock<NeighborSyncState>>,
126 sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
132 bootstrap_state: Arc<RwLock<BootstrapState>>,
134 is_bootstrapping: Arc<RwLock<bool>>,
136 sync_trigger: Arc<Notify>,
138 bootstrap_complete_notify: Arc<Notify>,
140 send_semaphore: Arc<Semaphore>,
143 fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
148 shutdown: CancellationToken,
150 task_handles: Vec<JoinHandle<()>>,
152}
153
154impl ReplicationEngine {
155 pub async fn new(
162 config: ReplicationConfig,
163 p2p_node: Arc<P2PNode>,
164 storage: Arc<LmdbStorage>,
165 payment_verifier: Arc<PaymentVerifier>,
166 root_dir: &Path,
167 fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
168 shutdown: CancellationToken,
169 ) -> Result<Self> {
170 config.validate().map_err(Error::Config)?;
171
172 let paid_list = Arc::new(
173 PaidList::new(root_dir)
174 .await
175 .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
176 );
177
178 let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
179 let config = Arc::new(config);
180
181 Ok(Self {
182 config: Arc::clone(&config),
183 p2p_node,
184 storage,
185 paid_list,
186 payment_verifier,
187 queues: Arc::new(RwLock::new(ReplicationQueues::new())),
188 sync_state: Arc::new(RwLock::new(initial_neighbors)),
189 sync_history: Arc::new(RwLock::new(HashMap::new())),
190 bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
191 is_bootstrapping: Arc::new(RwLock::new(true)),
192 sync_trigger: Arc::new(Notify::new()),
193 bootstrap_complete_notify: Arc::new(Notify::new()),
194 send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
195 fresh_write_rx: Some(fresh_write_rx),
196 shutdown,
197 task_handles: Vec::new(),
198 })
199 }
200
201 #[must_use]
203 pub fn paid_list(&self) -> &Arc<PaidList> {
204 &self.paid_list
205 }
206
207 pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
213 if !self.task_handles.is_empty() {
214 error!("ReplicationEngine::start() called while already running — ignoring");
215 return;
216 }
217 info!("Starting replication engine");
218
219 self.start_message_handler();
220 self.start_neighbor_sync_loop();
221 self.start_self_lookup_loop();
222 self.start_audit_loop();
223 self.start_fetch_worker();
224 self.start_verification_worker();
225 self.start_bootstrap_sync(dht_events);
226 self.start_fresh_write_drainer();
227
228 info!(
229 "Replication engine started with {} background tasks",
230 self.task_handles.len()
231 );
232 }
233
234 pub async fn is_bootstrapping(&self) -> bool {
239 *self.is_bootstrapping.read().await
240 }
241
242 pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
251 let notified = self.bootstrap_complete_notify.notified();
254 tokio::pin!(notified);
255 notified.as_mut().enable();
256
257 if !*self.is_bootstrapping.read().await {
258 return true;
259 }
260
261 tokio::time::timeout(timeout, notified).await.is_ok()
262 }
263
264 pub async fn shutdown(&mut self) {
270 self.shutdown.cancel();
271 for (i, mut handle) in self.task_handles.drain(..).enumerate() {
272 match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
273 Ok(Ok(())) => {}
274 Ok(Err(e)) if e.is_cancelled() => {}
275 Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
276 Err(_) => {
277 warn!("Replication task {i} did not stop within 10s, aborting");
278 handle.abort();
279 }
280 }
281 }
282 }
283
284 pub fn trigger_neighbor_sync(&self) {
290 self.sync_trigger.notify_one();
291 }
292
293 pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
295 fresh::replicate_fresh(
296 key,
297 data,
298 proof_of_payment,
299 &self.p2p_node,
300 &self.paid_list,
301 &self.config,
302 &self.send_semaphore,
303 )
304 .await;
305 }
306
307 fn start_fresh_write_drainer(&mut self) {
314 let Some(mut rx) = self.fresh_write_rx.take() else {
315 return;
316 };
317 let p2p = Arc::clone(&self.p2p_node);
318 let paid_list = Arc::clone(&self.paid_list);
319 let config = Arc::clone(&self.config);
320 let send_semaphore = Arc::clone(&self.send_semaphore);
321 let shutdown = self.shutdown.clone();
322
323 let handle = tokio::spawn(async move {
324 loop {
325 tokio::select! {
326 () = shutdown.cancelled() => break,
327 event = rx.recv() => {
328 let Some(event) = event else { break };
329 fresh::replicate_fresh(
330 &event.key,
331 &event.data,
332 &event.payment_proof,
333 &p2p,
334 &paid_list,
335 &config,
336 &send_semaphore,
337 )
338 .await;
339 }
340 }
341 }
342 debug!("Fresh-write drainer shut down");
343 });
344 self.task_handles.push(handle);
345 }
346
347 #[allow(clippy::too_many_lines)]
348 fn start_message_handler(&mut self) {
349 let mut p2p_events = self.p2p_node.subscribe_events();
350 let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
351 let p2p = Arc::clone(&self.p2p_node);
352 let storage = Arc::clone(&self.storage);
353 let paid_list = Arc::clone(&self.paid_list);
354 let payment_verifier = Arc::clone(&self.payment_verifier);
355 let queues = Arc::clone(&self.queues);
356 let config = Arc::clone(&self.config);
357 let shutdown = self.shutdown.clone();
358 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
359 let bootstrap_state = Arc::clone(&self.bootstrap_state);
360 let sync_history = Arc::clone(&self.sync_history);
361 let sync_trigger = Arc::clone(&self.sync_trigger);
362
363 let handle = tokio::spawn(async move {
364 loop {
365 tokio::select! {
366 () = shutdown.cancelled() => break,
367 event = p2p_events.recv() => {
368 let Ok(event) = event else { continue };
369 if let P2PEvent::Message {
370 topic,
371 source: Some(source),
372 data,
373 ..
374 } = event {
375 let rr_info = if topic == REPLICATION_PROTOCOL_ID {
379 Some((data.clone(), None))
380 } else if topic.starts_with(RR_PREFIX)
381 && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
382 {
383 P2PNode::parse_request_envelope(&data)
384 .filter(|(_, is_resp, _)| !is_resp)
385 .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
386 } else {
387 None
388 };
389 if let Some((payload, rr_message_id)) = rr_info {
390 match handle_replication_message(
391 &source,
392 &payload,
393 &p2p,
394 &storage,
395 &paid_list,
396 &payment_verifier,
397 &queues,
398 &config,
399 &is_bootstrapping,
400 &bootstrap_state,
401 &sync_history,
402 rr_message_id.as_deref(),
403 ).await {
404 Ok(()) => {}
405 Err(e) => {
406 debug!(
407 "Replication message from {source} error: {e}"
408 );
409 }
410 }
411 }
412 }
413 }
414 dht_event = dht_events.recv() => {
422 let Ok(dht_event) = dht_event else { continue };
423 if let DhtNetworkEvent::KClosestPeersChanged { .. } = dht_event {
424 debug!(
425 "K-closest peers changed, triggering early neighbor sync"
426 );
427 sync_trigger.notify_one();
428 }
429 }
430 }
431 }
432 debug!("Replication message handler shut down");
433 });
434 self.task_handles.push(handle);
435 }
436
437 fn start_neighbor_sync_loop(&mut self) {
438 let p2p = Arc::clone(&self.p2p_node);
439 let storage = Arc::clone(&self.storage);
440 let paid_list = Arc::clone(&self.paid_list);
441 let queues = Arc::clone(&self.queues);
442 let config = Arc::clone(&self.config);
443 let shutdown = self.shutdown.clone();
444 let sync_state = Arc::clone(&self.sync_state);
445 let sync_history = Arc::clone(&self.sync_history);
446 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
447 let bootstrap_state = Arc::clone(&self.bootstrap_state);
448 let sync_trigger = Arc::clone(&self.sync_trigger);
449
450 let handle = tokio::spawn(async move {
451 loop {
452 let interval = config.random_neighbor_sync_interval();
453 tokio::select! {
454 () = shutdown.cancelled() => break,
455 () = tokio::time::sleep(interval) => {}
456 () = sync_trigger.notified() => {
457 debug!("Neighbor sync triggered by topology change");
458 }
459 }
460 tokio::select! {
464 () = shutdown.cancelled() => break,
465 () = run_neighbor_sync_round(
466 &p2p,
467 &storage,
468 &paid_list,
469 &queues,
470 &config,
471 &sync_state,
472 &sync_history,
473 &is_bootstrapping,
474 &bootstrap_state,
475 ) => {}
476 }
477 }
478 debug!("Neighbor sync loop shut down");
479 });
480 self.task_handles.push(handle);
481 }
482
483 fn start_self_lookup_loop(&mut self) {
484 let p2p = Arc::clone(&self.p2p_node);
485 let config = Arc::clone(&self.config);
486 let shutdown = self.shutdown.clone();
487
488 let handle = tokio::spawn(async move {
489 loop {
490 let interval = config.random_self_lookup_interval();
491 tokio::select! {
492 () = shutdown.cancelled() => break,
493 () = tokio::time::sleep(interval) => {
494 if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
495 debug!("Self-lookup failed: {e}");
496 }
497 }
498 }
499 }
500 debug!("Self-lookup loop shut down");
501 });
502 self.task_handles.push(handle);
503 }
504
505 fn start_audit_loop(&mut self) {
506 let p2p = Arc::clone(&self.p2p_node);
507 let storage = Arc::clone(&self.storage);
508 let config = Arc::clone(&self.config);
509 let shutdown = self.shutdown.clone();
510 let sync_history = Arc::clone(&self.sync_history);
511 let bootstrap_state = Arc::clone(&self.bootstrap_state);
512 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
513 let sync_state = Arc::clone(&self.sync_state);
514
515 let handle = tokio::spawn(async move {
516 loop {
518 tokio::select! {
519 () = shutdown.cancelled() => return,
520 () = tokio::time::sleep(
521 std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
522 ) => {
523 if bootstrap_state.read().await.is_drained() {
524 break;
525 }
526 }
527 }
528 }
529
530 {
532 let bootstrapping = *is_bootstrapping.read().await;
533 let result = {
534 let history = sync_history.read().await;
535 audit::audit_tick(&p2p, &storage, &config, &history, bootstrapping).await
536 };
537 handle_audit_result(&result, &p2p, &sync_state, &config).await;
538 }
539
540 loop {
542 let interval = config.random_audit_tick_interval();
543 tokio::select! {
544 () = shutdown.cancelled() => break,
545 () = tokio::time::sleep(interval) => {
546 let bootstrapping = *is_bootstrapping.read().await;
547 let result = {
548 let history = sync_history.read().await;
549 audit::audit_tick(
550 &p2p,
551 &storage,
552 &config,
553 &history,
554 bootstrapping,
555 )
556 .await
557 };
558 handle_audit_result(&result, &p2p, &sync_state, &config).await;
559 }
560 }
561 }
562 debug!("Audit loop shut down");
563 });
564 self.task_handles.push(handle);
565 }
566
567 #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
568 fn start_fetch_worker(&mut self) {
569 let p2p = Arc::clone(&self.p2p_node);
570 let storage = Arc::clone(&self.storage);
571 let queues = Arc::clone(&self.queues);
572 let config = Arc::clone(&self.config);
573 let shutdown = self.shutdown.clone();
574 let bootstrap_state = Arc::clone(&self.bootstrap_state);
575 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
576 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
577 let concurrency = max_parallel_fetch();
578
579 info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
580
581 let handle = tokio::spawn(async move {
582 let mut in_flight = FuturesUnordered::<FetchFuture>::new();
585
586 loop {
587 {
589 let mut q = queues.write().await;
590 while in_flight.len() < concurrency {
591 let Some(candidate) = q.dequeue_fetch() else {
592 break;
593 };
594 let Some(&source) = candidate.sources.first() else {
595 warn!(
596 "Fetch candidate {} has no sources — dropping",
597 hex::encode(candidate.key)
598 );
599 continue;
600 };
601 q.start_fetch(candidate.key, source, candidate.sources.clone());
602
603 let p2p = Arc::clone(&p2p);
604 let storage = Arc::clone(&storage);
605 let config = Arc::clone(&config);
606 let token = shutdown.clone();
607 let fetch_key = candidate.key;
608 in_flight.push(Box::pin(async move {
609 let handle = tokio::spawn(async move {
610 tokio::select! {
612 () = token.cancelled() => FetchOutcome {
613 key: fetch_key,
614 result: FetchResult::SourceFailed,
615 },
616 outcome = execute_single_fetch(
617 p2p, storage, config, fetch_key, source,
618 ) => outcome,
619 }
620 });
621 match handle.await {
622 Ok(outcome) => (outcome.key, Some(outcome)),
623 Err(e) => {
624 error!(
625 "Fetch task for {} panicked: {e}",
626 hex::encode(fetch_key)
627 );
628 (fetch_key, None)
629 }
630 }
631 }));
632 }
633 } if in_flight.is_empty() {
636 tokio::select! {
638 () = shutdown.cancelled() => break,
639 () = tokio::time::sleep(
640 std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
641 ) => continue,
642 }
643 }
644
645 tokio::select! {
647 () = shutdown.cancelled() => break,
648 Some((key, maybe_outcome)) = in_flight.next() => {
649 let mut q = queues.write().await;
650 let terminal = if let Some(outcome) = maybe_outcome {
651 match outcome.result {
652 FetchResult::Stored => {
653 q.complete_fetch(&key);
654 true
655 }
656 FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
657 if let Some(next_peer) = q.retry_fetch(&key) {
658 let p2p = Arc::clone(&p2p);
660 let storage = Arc::clone(&storage);
661 let config = Arc::clone(&config);
662 let token = shutdown.clone();
663 let fetch_key = key;
664 in_flight.push(Box::pin(async move {
665 let handle = tokio::spawn(async move {
666 tokio::select! {
667 () = token.cancelled() => FetchOutcome {
668 key: fetch_key,
669 result: FetchResult::SourceFailed,
670 },
671 outcome = execute_single_fetch(
672 p2p, storage, config, fetch_key, next_peer,
673 ) => outcome,
674 }
675 });
676 match handle.await {
677 Ok(outcome) => (outcome.key, Some(outcome)),
678 Err(e) => {
679 error!(
680 "Fetch task for {} panicked: {e}",
681 hex::encode(fetch_key)
682 );
683 (fetch_key, None)
684 }
685 }
686 }));
687 false
688 } else {
689 q.complete_fetch(&key);
690 true
691 }
692 }
693 }
694 } else {
695 q.complete_fetch(&key);
697 true
698 };
699
700 if terminal {
702 drop(q); if !bootstrap_state.read().await.is_drained() {
704 bootstrap_state.write().await.remove_key(&key);
705 let q = queues.read().await;
706 if bootstrap::check_bootstrap_drained(
707 &bootstrap_state,
708 &q,
709 )
710 .await
711 {
712 complete_bootstrap(
713 &is_bootstrapping,
714 &bootstrap_complete_notify,
715 ).await;
716 }
717 }
718 }
719 }
720 }
721 }
722
723 while in_flight.next().await.is_some() {}
727 debug!("Fetch worker shut down");
728 });
729 self.task_handles.push(handle);
730 }
731
732 fn start_verification_worker(&mut self) {
733 let p2p = Arc::clone(&self.p2p_node);
734 let storage = Arc::clone(&self.storage);
735 let queues = Arc::clone(&self.queues);
736 let paid_list = Arc::clone(&self.paid_list);
737 let config = Arc::clone(&self.config);
738 let shutdown = self.shutdown.clone();
739 let bootstrap_state = Arc::clone(&self.bootstrap_state);
740 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
741 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
742
743 let handle = tokio::spawn(async move {
744 loop {
745 tokio::select! {
746 () = shutdown.cancelled() => break,
747 () = tokio::time::sleep(
748 std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
749 ) => {
750 let ctx = VerificationCycleContext {
751 p2p_node: &p2p,
752 paid_list: &paid_list,
753 storage: &storage,
754 queues: &queues,
755 config: &config,
756 bootstrap_state: &bootstrap_state,
757 is_bootstrapping: &is_bootstrapping,
758 bootstrap_complete_notify: &bootstrap_complete_notify,
759 };
760 run_verification_cycle(ctx).await;
761 }
762 }
763 }
764 debug!("Verification worker shut down");
765 });
766 self.task_handles.push(handle);
767 }
768
769 fn start_bootstrap_sync(
781 &mut self,
782 dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
783 ) {
784 let p2p = Arc::clone(&self.p2p_node);
785 let storage = Arc::clone(&self.storage);
786 let paid_list = Arc::clone(&self.paid_list);
787 let queues = Arc::clone(&self.queues);
788 let config = Arc::clone(&self.config);
789 let shutdown = self.shutdown.clone();
790 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
791 let bootstrap_state = Arc::clone(&self.bootstrap_state);
792 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
793
794 let handle = tokio::spawn(async move {
795 let gate = bootstrap::wait_for_bootstrap_complete(
799 dht_events,
800 config.bootstrap_complete_timeout_secs,
801 &shutdown,
802 )
803 .await;
804
805 if gate == bootstrap::BootstrapGateResult::Shutdown {
806 return;
807 }
808
809 let self_id = *p2p.peer_id();
810 let neighbors =
811 neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
812 .await;
813
814 if neighbors.is_empty() {
815 info!("Bootstrap sync: no close neighbors found, marking drained");
816 bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
817 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
818 return;
819 }
820
821 let neighbor_count = neighbors.len();
822 info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
823
824 for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
826 if shutdown.is_cancelled() {
827 break;
828 }
829
830 for peer in batch {
831 if shutdown.is_cancelled() {
832 break;
833 }
834
835 let bootstrapping = *is_bootstrapping.read().await;
837
838 bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
839
840 let response = neighbor_sync::sync_with_peer(
841 peer,
842 &p2p,
843 &storage,
844 &paid_list,
845 &config,
846 bootstrapping,
847 )
848 .await;
849
850 bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
851
852 if let Some(resp) = response {
853 if !resp.bootstrapping {
854 let outcome = admit_and_queue_hints(
856 &self_id,
857 peer,
858 &resp.replica_hints,
859 &resp.paid_hints,
860 &p2p,
861 &config,
862 &storage,
863 &paid_list,
864 &queues,
865 )
866 .await;
867
868 if !outcome.discovered.is_empty() {
870 bootstrap::track_discovered_keys(
871 &bootstrap_state,
872 &outcome.discovered,
873 )
874 .await;
875 }
876
877 if outcome.capacity_rejected_count > 0 {
882 bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
883 } else {
884 bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
885 }
886 }
887 }
888 }
889 }
890
891 {
893 let q = queues.read().await;
894 if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
895 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
896 }
897 }
898
899 info!("Bootstrap sync completed");
900 });
901 self.task_handles.push(handle);
902 }
903}
904
905#[allow(clippy::too_many_arguments)]
915async fn handle_replication_message(
916 source: &PeerId,
917 data: &[u8],
918 p2p_node: &Arc<P2PNode>,
919 storage: &Arc<LmdbStorage>,
920 paid_list: &Arc<PaidList>,
921 payment_verifier: &Arc<PaymentVerifier>,
922 queues: &Arc<RwLock<ReplicationQueues>>,
923 config: &ReplicationConfig,
924 is_bootstrapping: &Arc<RwLock<bool>>,
925 bootstrap_state: &Arc<RwLock<BootstrapState>>,
926 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
927 rr_message_id: Option<&str>,
928) -> Result<()> {
929 let msg = ReplicationMessage::decode(data)
930 .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
931
932 match msg.body {
933 ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
934 handle_fresh_offer(
935 source,
936 offer,
937 storage,
938 paid_list,
939 payment_verifier,
940 p2p_node,
941 config,
942 msg.request_id,
943 rr_message_id,
944 )
945 .await
946 }
947 ReplicationMessageBody::PaidNotify(ref notify) => {
948 handle_paid_notify(
949 source,
950 notify,
951 paid_list,
952 payment_verifier,
953 p2p_node,
954 config,
955 )
956 .await
957 }
958 ReplicationMessageBody::NeighborSyncRequest(ref request) => {
959 let bootstrapping = *is_bootstrapping.read().await;
960 handle_neighbor_sync_request(
961 source,
962 request,
963 p2p_node,
964 storage,
965 paid_list,
966 queues,
967 config,
968 bootstrapping,
969 bootstrap_state,
970 sync_history,
971 msg.request_id,
972 rr_message_id,
973 )
974 .await
975 }
976 ReplicationMessageBody::VerificationRequest(ref request) => {
977 handle_verification_request(
978 source,
979 request,
980 storage,
981 paid_list,
982 p2p_node,
983 msg.request_id,
984 rr_message_id,
985 )
986 .await
987 }
988 ReplicationMessageBody::FetchRequest(ref request) => {
989 handle_fetch_request(
990 source,
991 request,
992 storage,
993 p2p_node,
994 msg.request_id,
995 rr_message_id,
996 )
997 .await
998 }
999 ReplicationMessageBody::AuditChallenge(ref challenge) => {
1000 let bootstrapping = *is_bootstrapping.read().await;
1001 handle_audit_challenge_msg(
1002 source,
1003 challenge,
1004 storage,
1005 p2p_node,
1006 bootstrapping,
1007 msg.request_id,
1008 rr_message_id,
1009 )
1010 .await
1011 }
1012 ReplicationMessageBody::FreshReplicationResponse(_)
1014 | ReplicationMessageBody::NeighborSyncResponse(_)
1015 | ReplicationMessageBody::VerificationResponse(_)
1016 | ReplicationMessageBody::FetchResponse(_)
1017 | ReplicationMessageBody::AuditResponse(_) => Ok(()),
1018 }
1019}
1020
1021#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1026async fn handle_fresh_offer(
1027 source: &PeerId,
1028 offer: &protocol::FreshReplicationOffer,
1029 storage: &Arc<LmdbStorage>,
1030 paid_list: &Arc<PaidList>,
1031 payment_verifier: &Arc<PaymentVerifier>,
1032 p2p_node: &Arc<P2PNode>,
1033 config: &ReplicationConfig,
1034 request_id: u64,
1035 rr_message_id: Option<&str>,
1036) -> Result<()> {
1037 let self_id = *p2p_node.peer_id();
1038
1039 if offer.proof_of_payment.is_empty() {
1041 send_replication_response(
1042 source,
1043 p2p_node,
1044 request_id,
1045 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1046 key: offer.key,
1047 reason: "Missing proof of payment".to_string(),
1048 }),
1049 rr_message_id,
1050 )
1051 .await;
1052 return Ok(());
1053 }
1054
1055 if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1059 warn!(
1060 "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1061 hex::encode(offer.key),
1062 offer.data.len(),
1063 crate::ant_protocol::MAX_CHUNK_SIZE,
1064 );
1065 p2p_node
1066 .report_trust_event(
1067 source,
1068 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1069 )
1070 .await;
1071 send_replication_response(
1072 source,
1073 p2p_node,
1074 request_id,
1075 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1076 key: offer.key,
1077 reason: format!(
1078 "Data size {} exceeds maximum chunk size {}",
1079 offer.data.len(),
1080 crate::ant_protocol::MAX_CHUNK_SIZE,
1081 ),
1082 }),
1083 rr_message_id,
1084 )
1085 .await;
1086 return Ok(());
1087 }
1088
1089 if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1091 send_replication_response(
1092 source,
1093 p2p_node,
1094 request_id,
1095 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1096 key: offer.key,
1097 reason: "Not responsible for this key".to_string(),
1098 }),
1099 rr_message_id,
1100 )
1101 .await;
1102 return Ok(());
1103 }
1104
1105 match payment_verifier
1107 .verify_payment(&offer.key, Some(&offer.proof_of_payment))
1108 .await
1109 {
1110 Ok(status) if status.can_store() => {
1111 debug!(
1112 "PoP validated for fresh offer key {}",
1113 hex::encode(offer.key)
1114 );
1115 }
1116 Ok(_) => {
1117 send_replication_response(
1118 source,
1119 p2p_node,
1120 request_id,
1121 ReplicationMessageBody::FreshReplicationResponse(
1122 FreshReplicationResponse::Rejected {
1123 key: offer.key,
1124 reason: "Payment verification failed: payment required".to_string(),
1125 },
1126 ),
1127 rr_message_id,
1128 )
1129 .await;
1130 return Ok(());
1131 }
1132 Err(e) => {
1133 warn!(
1134 "PoP verification error for key {}: {e}",
1135 hex::encode(offer.key)
1136 );
1137 send_replication_response(
1138 source,
1139 p2p_node,
1140 request_id,
1141 ReplicationMessageBody::FreshReplicationResponse(
1142 FreshReplicationResponse::Rejected {
1143 key: offer.key,
1144 reason: format!("Payment verification error: {e}"),
1145 },
1146 ),
1147 rr_message_id,
1148 )
1149 .await;
1150 return Ok(());
1151 }
1152 }
1153
1154 if let Err(e) = paid_list.insert(&offer.key).await {
1156 warn!("Failed to add key to PaidForList: {e}");
1157 }
1158
1159 match storage.put(&offer.key, &offer.data).await {
1161 Ok(_) => {
1162 send_replication_response(
1163 source,
1164 p2p_node,
1165 request_id,
1166 ReplicationMessageBody::FreshReplicationResponse(
1167 FreshReplicationResponse::Accepted { key: offer.key },
1168 ),
1169 rr_message_id,
1170 )
1171 .await;
1172 }
1173 Err(e) => {
1174 send_replication_response(
1175 source,
1176 p2p_node,
1177 request_id,
1178 ReplicationMessageBody::FreshReplicationResponse(
1179 FreshReplicationResponse::Rejected {
1180 key: offer.key,
1181 reason: format!("Storage error: {e}"),
1182 },
1183 ),
1184 rr_message_id,
1185 )
1186 .await;
1187 }
1188 }
1189
1190 Ok(())
1191}
1192
1193async fn handle_paid_notify(
1194 _source: &PeerId,
1195 notify: &protocol::PaidNotify,
1196 paid_list: &Arc<PaidList>,
1197 payment_verifier: &Arc<PaymentVerifier>,
1198 p2p_node: &Arc<P2PNode>,
1199 config: &ReplicationConfig,
1200) -> Result<()> {
1201 let self_id = *p2p_node.peer_id();
1202
1203 if notify.proof_of_payment.is_empty() {
1205 return Ok(());
1206 }
1207
1208 if !admission::is_in_paid_close_group(
1210 &self_id,
1211 ¬ify.key,
1212 p2p_node,
1213 config.paid_list_close_group_size,
1214 )
1215 .await
1216 {
1217 return Ok(());
1218 }
1219
1220 match payment_verifier
1222 .verify_payment(¬ify.key, Some(¬ify.proof_of_payment))
1223 .await
1224 {
1225 Ok(status) if status.can_store() => {
1226 debug!(
1227 "PoP validated for paid notify key {}",
1228 hex::encode(notify.key)
1229 );
1230 }
1231 Ok(_) => {
1232 warn!(
1233 "Paid notify rejected: payment required for key {}",
1234 hex::encode(notify.key)
1235 );
1236 return Ok(());
1237 }
1238 Err(e) => {
1239 warn!(
1240 "PoP verification error for paid notify key {}: {e}",
1241 hex::encode(notify.key)
1242 );
1243 return Ok(());
1244 }
1245 }
1246
1247 if let Err(e) = paid_list.insert(¬ify.key).await {
1248 warn!("Failed to add paid notify key to PaidForList: {e}");
1249 }
1250
1251 Ok(())
1252}
1253
1254#[allow(clippy::too_many_arguments)]
1255async fn handle_neighbor_sync_request(
1256 source: &PeerId,
1257 request: &protocol::NeighborSyncRequest,
1258 p2p_node: &Arc<P2PNode>,
1259 storage: &Arc<LmdbStorage>,
1260 paid_list: &Arc<PaidList>,
1261 queues: &Arc<RwLock<ReplicationQueues>>,
1262 config: &ReplicationConfig,
1263 is_bootstrapping: bool,
1264 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1265 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1266 request_id: u64,
1267 rr_message_id: Option<&str>,
1268) -> Result<()> {
1269 let self_id = *p2p_node.peer_id();
1270
1271 let (response, sender_in_rt) = neighbor_sync::handle_sync_request(
1279 source,
1280 request,
1281 p2p_node,
1282 storage,
1283 paid_list,
1284 config,
1285 is_bootstrapping,
1286 )
1287 .await;
1288
1289 send_replication_response(
1291 source,
1292 p2p_node,
1293 request_id,
1294 ReplicationMessageBody::NeighborSyncResponse(response),
1295 rr_message_id,
1296 )
1297 .await;
1298
1299 if !sender_in_rt {
1301 return Ok(());
1302 }
1303
1304 {
1306 let mut history = sync_history.write().await;
1307 let record = history.entry(*source).or_insert(PeerSyncRecord {
1308 last_sync: None,
1309 cycles_since_sync: 0,
1310 });
1311 record.last_sync = Some(Instant::now());
1312 record.cycles_since_sync = 0;
1313 }
1314
1315 let outcome = admit_and_queue_hints(
1317 &self_id,
1318 source,
1319 &request.replica_hints,
1320 &request.paid_hints,
1321 p2p_node,
1322 config,
1323 storage,
1324 paid_list,
1325 queues,
1326 )
1327 .await;
1328
1329 if is_bootstrapping {
1334 if !outcome.discovered.is_empty() {
1335 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1336 }
1337 if outcome.capacity_rejected_count > 0 {
1338 bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
1339 } else {
1340 bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
1341 }
1342 }
1343
1344 Ok(())
1345}
1346
1347async fn handle_verification_request(
1348 source: &PeerId,
1349 request: &protocol::VerificationRequest,
1350 storage: &Arc<LmdbStorage>,
1351 paid_list: &Arc<PaidList>,
1352 p2p_node: &Arc<P2PNode>,
1353 request_id: u64,
1354 rr_message_id: Option<&str>,
1355) -> Result<()> {
1356 #[allow(clippy::cast_possible_truncation)]
1362 let keys_len = request.keys.len() as u32;
1363 let paid_check_set: HashSet<u32> = request
1364 .paid_list_check_indices
1365 .iter()
1366 .copied()
1367 .filter(|&idx| {
1368 if idx >= keys_len {
1369 warn!(
1370 "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1371 request.keys.len(),
1372 );
1373 false
1374 } else {
1375 true
1376 }
1377 })
1378 .collect();
1379
1380 let mut results = Vec::with_capacity(request.keys.len());
1381 for (i, key) in request.keys.iter().enumerate() {
1382 let present = storage.exists(key).unwrap_or(false);
1383 let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1384 Some(paid_list.contains(key).unwrap_or(false))
1385 } else {
1386 None
1387 };
1388 results.push(protocol::KeyVerificationResult {
1389 key: *key,
1390 present,
1391 paid,
1392 });
1393 }
1394
1395 send_replication_response(
1396 source,
1397 p2p_node,
1398 request_id,
1399 ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1400 rr_message_id,
1401 )
1402 .await;
1403
1404 Ok(())
1405}
1406
1407async fn handle_fetch_request(
1408 source: &PeerId,
1409 request: &protocol::FetchRequest,
1410 storage: &Arc<LmdbStorage>,
1411 p2p_node: &Arc<P2PNode>,
1412 request_id: u64,
1413 rr_message_id: Option<&str>,
1414) -> Result<()> {
1415 let response = match storage.get(&request.key).await {
1416 Ok(Some(data)) => protocol::FetchResponse::Success {
1417 key: request.key,
1418 data,
1419 },
1420 Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1421 Err(e) => protocol::FetchResponse::Error {
1422 key: request.key,
1423 reason: format!("{e}"),
1424 },
1425 };
1426
1427 send_replication_response(
1428 source,
1429 p2p_node,
1430 request_id,
1431 ReplicationMessageBody::FetchResponse(response),
1432 rr_message_id,
1433 )
1434 .await;
1435
1436 Ok(())
1437}
1438
1439async fn handle_audit_challenge_msg(
1440 source: &PeerId,
1441 challenge: &protocol::AuditChallenge,
1442 storage: &Arc<LmdbStorage>,
1443 p2p_node: &Arc<P2PNode>,
1444 is_bootstrapping: bool,
1445 request_id: u64,
1446 rr_message_id: Option<&str>,
1447) -> Result<()> {
1448 #[allow(clippy::cast_possible_truncation)]
1449 let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1450 let response = audit::handle_audit_challenge(
1451 challenge,
1452 storage,
1453 p2p_node.peer_id(),
1454 is_bootstrapping,
1455 stored_chunks,
1456 )
1457 .await;
1458
1459 send_replication_response(
1460 source,
1461 p2p_node,
1462 request_id,
1463 ReplicationMessageBody::AuditResponse(response),
1464 rr_message_id,
1465 )
1466 .await;
1467
1468 Ok(())
1469}
1470
1471async fn send_replication_response(
1482 peer: &PeerId,
1483 p2p_node: &Arc<P2PNode>,
1484 request_id: u64,
1485 body: ReplicationMessageBody,
1486 rr_message_id: Option<&str>,
1487) {
1488 let msg = ReplicationMessage { request_id, body };
1489 let encoded = match msg.encode() {
1490 Ok(data) => data,
1491 Err(e) => {
1492 warn!("Failed to encode replication response: {e}");
1493 return;
1494 }
1495 };
1496 let result = if let Some(msg_id) = rr_message_id {
1497 p2p_node
1498 .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1499 .await
1500 } else {
1501 p2p_node
1502 .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1503 .await
1504 };
1505 if let Err(e) = result {
1506 debug!("Failed to send replication response to {peer}: {e}");
1507 }
1508}
1509
1510#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1516async fn run_neighbor_sync_round(
1517 p2p_node: &Arc<P2PNode>,
1518 storage: &Arc<LmdbStorage>,
1519 paid_list: &Arc<PaidList>,
1520 queues: &Arc<RwLock<ReplicationQueues>>,
1521 config: &ReplicationConfig,
1522 sync_state: &Arc<RwLock<NeighborSyncState>>,
1523 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1524 is_bootstrapping: &Arc<RwLock<bool>>,
1525 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1526) {
1527 let self_id = *p2p_node.peer_id();
1528 let bootstrapping = *is_bootstrapping.read().await;
1529
1530 let cycle_complete = sync_state.read().await.is_cycle_complete();
1534 if cycle_complete {
1535 let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
1539 pruning::run_prune_pass(
1540 &self_id,
1541 storage,
1542 paid_list,
1543 p2p_node,
1544 config,
1545 sync_state,
1546 allow_remote_prune_audits,
1547 )
1548 .await;
1549
1550 {
1552 let mut history = sync_history.write().await;
1553 for record in history.values_mut() {
1554 record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1555 }
1556 }
1557
1558 let neighbors =
1560 neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1561 .await;
1562
1563 let mut state = sync_state.write().await;
1565 if state.is_cycle_complete() {
1566 let old_sync_times = std::mem::take(&mut state.last_sync_times);
1570 let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1571 let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
1572 let old_prune_cursor = state.prune_cursor;
1573 *state = NeighborSyncState::new_cycle(neighbors);
1574 state.last_sync_times = old_sync_times;
1575 state.bootstrap_claims = old_bootstrap_claims;
1576 state.bootstrap_claim_history = old_bootstrap_claim_history;
1577 state.prune_cursor = old_prune_cursor;
1578 }
1579 }
1580
1581 let batch = {
1583 let mut state = sync_state.write().await;
1584 neighbor_sync::select_sync_batch(
1585 &mut state,
1586 config.neighbor_sync_peer_count,
1587 config.neighbor_sync_cooldown,
1588 )
1589 };
1590
1591 if batch.is_empty() {
1592 return;
1593 }
1594
1595 debug!("Neighbor sync: syncing with {} peers", batch.len());
1596
1597 for peer in &batch {
1599 let response = neighbor_sync::sync_with_peer(
1600 peer,
1601 p2p_node,
1602 storage,
1603 paid_list,
1604 config,
1605 bootstrapping,
1606 )
1607 .await;
1608
1609 if let Some(resp) = response {
1610 handle_sync_response(
1611 &self_id,
1612 peer,
1613 &resp,
1614 p2p_node,
1615 config,
1616 bootstrapping,
1617 bootstrap_state,
1618 storage,
1619 paid_list,
1620 queues,
1621 sync_state,
1622 sync_history,
1623 )
1624 .await;
1625 } else {
1626 let replacement = {
1628 let mut state = sync_state.write().await;
1629 neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1630 };
1631
1632 if let Some(replacement_peer) = replacement {
1634 let replacement_resp = neighbor_sync::sync_with_peer(
1635 &replacement_peer,
1636 p2p_node,
1637 storage,
1638 paid_list,
1639 config,
1640 bootstrapping,
1641 )
1642 .await;
1643
1644 if let Some(resp) = replacement_resp {
1645 handle_sync_response(
1646 &self_id,
1647 &replacement_peer,
1648 &resp,
1649 p2p_node,
1650 config,
1651 bootstrapping,
1652 bootstrap_state,
1653 storage,
1654 paid_list,
1655 queues,
1656 sync_state,
1657 sync_history,
1658 )
1659 .await;
1660 }
1661 }
1662 }
1663 }
1664}
1665
1666#[allow(clippy::too_many_arguments)]
1669async fn handle_sync_response(
1670 self_id: &PeerId,
1671 peer: &PeerId,
1672 resp: &NeighborSyncResponse,
1673 p2p_node: &Arc<P2PNode>,
1674 config: &ReplicationConfig,
1675 bootstrapping: bool,
1676 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1677 storage: &Arc<LmdbStorage>,
1678 paid_list: &Arc<PaidList>,
1679 queues: &Arc<RwLock<ReplicationQueues>>,
1680 sync_state: &Arc<RwLock<NeighborSyncState>>,
1681 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1682) {
1683 {
1685 let mut state = sync_state.write().await;
1686 neighbor_sync::record_successful_sync(&mut state, peer);
1687 }
1688 {
1689 let mut history = sync_history.write().await;
1690 let record = history.entry(*peer).or_insert(PeerSyncRecord {
1691 last_sync: None,
1692 cycles_since_sync: 0,
1693 });
1694 record.last_sync = Some(Instant::now());
1695 record.cycles_since_sync = 0;
1696 }
1697
1698 if resp.bootstrapping {
1700 let should_report = {
1704 let now = Instant::now();
1705 let mut state = sync_state.write().await;
1706 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
1707 BootstrapClaimObservation::WithinGrace { .. } => false,
1708 BootstrapClaimObservation::PastGrace { first_seen } => {
1709 warn!(
1710 "Peer {peer} has been claiming bootstrap for {:?}, \
1711 exceeding grace period of {:?} — reporting abuse",
1712 now.duration_since(first_seen),
1713 config.bootstrap_claim_grace_period,
1714 );
1715 true
1716 }
1717 BootstrapClaimObservation::Repeated { first_seen } => {
1718 warn!(
1719 "Peer {peer} repeated bootstrap claim after previously stopping; \
1720 first claim was {:?} ago — reporting abuse",
1721 now.duration_since(first_seen),
1722 );
1723 true
1724 }
1725 }
1726 };
1727 if should_report {
1728 p2p_node
1729 .report_trust_event(
1730 peer,
1731 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1732 )
1733 .await;
1734 }
1735 } else {
1736 {
1739 let mut state = sync_state.write().await;
1740 state.clear_active_bootstrap_claim(peer);
1741 }
1742 let outcome = admit_and_queue_hints(
1743 self_id,
1744 peer,
1745 &resp.replica_hints,
1746 &resp.paid_hints,
1747 p2p_node,
1748 config,
1749 storage,
1750 paid_list,
1751 queues,
1752 )
1753 .await;
1754
1755 if bootstrapping {
1760 if !outcome.discovered.is_empty() {
1761 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1762 }
1763 if outcome.capacity_rejected_count > 0 {
1764 bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
1765 } else {
1766 bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
1767 }
1768 }
1769 }
1770}
1771
1772#[allow(clippy::too_many_arguments)]
1777struct AdmissionOutcome {
1785 discovered: HashSet<XorName>,
1786 capacity_rejected_count: usize,
1787}
1788
1789#[allow(clippy::too_many_arguments)]
1790async fn admit_and_queue_hints(
1791 self_id: &PeerId,
1792 source_peer: &PeerId,
1793 replica_hints: &[XorName],
1794 paid_hints: &[XorName],
1795 p2p_node: &Arc<P2PNode>,
1796 config: &ReplicationConfig,
1797 storage: &Arc<LmdbStorage>,
1798 paid_list: &Arc<PaidList>,
1799 queues: &Arc<RwLock<ReplicationQueues>>,
1800) -> AdmissionOutcome {
1801 let pending_keys: HashSet<XorName> = {
1802 let q = queues.read().await;
1803 q.pending_keys().into_iter().collect()
1804 };
1805
1806 let admitted = admission::admit_hints(
1807 self_id,
1808 replica_hints,
1809 paid_hints,
1810 p2p_node,
1811 config,
1812 storage,
1813 paid_list,
1814 &pending_keys,
1815 )
1816 .await;
1817
1818 let mut discovered = HashSet::new();
1819 let mut capacity_rejected_count: usize = 0;
1820 let mut q = queues.write().await;
1821 let now = Instant::now();
1822
1823 for key in admitted.replica_keys {
1824 if !storage.exists(&key).unwrap_or(false) {
1825 let result = q.add_pending_verify(
1826 key,
1827 VerificationEntry {
1828 state: VerificationState::PendingVerify,
1829 pipeline: HintPipeline::Replica,
1830 verified_sources: Vec::new(),
1831 tried_sources: HashSet::new(),
1832 created_at: now,
1833 hint_sender: *source_peer,
1834 },
1835 );
1836 match result {
1837 crate::replication::scheduling::AdmissionResult::Admitted => {
1838 discovered.insert(key);
1839 }
1840 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1841 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1842 capacity_rejected_count += 1;
1843 }
1844 }
1845 }
1846 }
1847
1848 for key in admitted.paid_only_keys {
1849 let result = q.add_pending_verify(
1850 key,
1851 VerificationEntry {
1852 state: VerificationState::PendingVerify,
1853 pipeline: HintPipeline::PaidOnly,
1854 verified_sources: Vec::new(),
1855 tried_sources: HashSet::new(),
1856 created_at: now,
1857 hint_sender: *source_peer,
1858 },
1859 );
1860 match result {
1861 crate::replication::scheduling::AdmissionResult::Admitted => {
1862 discovered.insert(key);
1863 }
1864 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1865 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1866 capacity_rejected_count += 1;
1867 }
1868 }
1869 }
1870
1871 if capacity_rejected_count > 0 {
1872 debug!(
1873 "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
1874 rejected at queue capacity; source will need to re-hint after pending_verify drains"
1875 );
1876 }
1877
1878 AdmissionOutcome {
1879 discovered,
1880 capacity_rejected_count,
1881 }
1882}
1883
1884#[allow(clippy::too_many_lines)]
1890async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
1891 let VerificationCycleContext {
1892 p2p_node,
1893 paid_list,
1894 storage,
1895 queues,
1896 config,
1897 bootstrap_state,
1898 is_bootstrapping,
1899 bootstrap_complete_notify,
1900 } = ctx;
1901
1902 {
1905 let mut q = queues.write().await;
1906 q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
1907 }
1908
1909 let pending_keys = {
1910 let q = queues.read().await;
1911 q.pending_keys()
1912 };
1913
1914 if pending_keys.is_empty() {
1915 return;
1916 }
1917
1918 let self_id = *p2p_node.peer_id();
1919
1920 let mut local_paid_presence_probe_keys = Vec::new();
1923 let mut local_paid_paid_only_keys = Vec::new();
1924 let mut keys_needing_network = Vec::new();
1925 let mut terminal_keys: Vec<XorName> = Vec::new();
1926 {
1927 let mut q = queues.write().await;
1928 for key in &pending_keys {
1929 if paid_list.contains(key).unwrap_or(false) {
1930 if let Some(pipeline) =
1931 q.set_pending_state(key, VerificationState::PaidListVerified)
1932 {
1933 match pipeline {
1934 HintPipeline::PaidOnly => {
1935 local_paid_paid_only_keys.push(*key);
1940 }
1941 HintPipeline::Replica => {
1942 local_paid_presence_probe_keys.push(*key);
1947 }
1948 }
1949 }
1950 } else {
1951 keys_needing_network.push(*key);
1952 }
1953 }
1954 }
1955
1956 if !local_paid_paid_only_keys.is_empty() {
1957 let mut terminal_paid_only = Vec::new();
1958 for key in local_paid_paid_only_keys {
1959 if storage.exists(&key).unwrap_or(false) {
1960 terminal_paid_only.push(key);
1961 } else if admission::is_responsible(&self_id, &key, p2p_node, config.close_group_size)
1962 .await
1963 {
1964 local_paid_presence_probe_keys.push(key);
1965 } else {
1966 terminal_paid_only.push(key);
1967 }
1968 }
1969
1970 if !terminal_paid_only.is_empty() {
1971 let mut q = queues.write().await;
1972 for key in terminal_paid_only {
1973 q.remove_pending(&key);
1974 terminal_keys.push(key);
1975 }
1976 }
1977 }
1978
1979 if !local_paid_presence_probe_keys.is_empty() {
1983 let targets = quorum::compute_presence_targets(
1984 &local_paid_presence_probe_keys,
1985 p2p_node,
1986 config,
1987 &self_id,
1988 )
1989 .await;
1990 let evidence = quorum::run_verification_round(
1991 &local_paid_presence_probe_keys,
1992 &targets,
1993 p2p_node,
1994 config,
1995 )
1996 .await;
1997
1998 let mut q = queues.write().await;
1999 for key in local_paid_presence_probe_keys {
2000 if storage.exists(&key).unwrap_or(false) {
2001 q.remove_pending(&key);
2002 terminal_keys.push(key);
2003 continue;
2004 }
2005 let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
2006 quorum::present_sources_for_key(&key, ev, &targets)
2007 });
2008 if sources.is_empty() {
2009 q.remove_pending(&key);
2011 warn!(
2012 "Locally paid key {} has no responding holders (possible data loss)",
2013 hex::encode(key)
2014 );
2015 terminal_keys.push(key);
2016 } else {
2017 let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2018 let _ = q.promote_pending_to_fetch(key, distance, sources);
2022 }
2023 }
2024 }
2025
2026 if !keys_needing_network.is_empty() {
2028 let targets =
2030 quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
2031 .await;
2032
2033 let evidence =
2034 quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
2035
2036 let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
2039 {
2040 let q = queues.read().await;
2041 for key in &keys_needing_network {
2042 let Some(ev) = evidence.get(key) else {
2043 continue;
2044 };
2045 let Some(entry) = q.get_pending(key) else {
2046 continue;
2047 };
2048 let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
2049 evaluated.push((*key, outcome, entry.pipeline));
2050 }
2051 } let mut paid_insert_keys: Vec<XorName> = Vec::new();
2055 for (key, outcome, _) in &evaluated {
2056 if matches!(
2057 outcome,
2058 KeyVerificationOutcome::QuorumVerified { .. }
2059 | KeyVerificationOutcome::PaidListVerified { .. }
2060 ) {
2061 paid_insert_keys.push(*key);
2062 }
2063 }
2064 for key in &paid_insert_keys {
2065 if let Err(e) = paid_list.insert(key).await {
2066 warn!("Failed to add verified key to PaidForList: {e}");
2067 }
2068 }
2069
2070 let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
2075 for (key, outcome, pipeline) in &evaluated {
2076 if *pipeline == HintPipeline::PaidOnly
2077 && matches!(
2078 outcome,
2079 KeyVerificationOutcome::QuorumVerified { .. }
2080 | KeyVerificationOutcome::PaidListVerified { .. }
2081 )
2082 && !storage.exists(key).unwrap_or(false)
2083 && admission::is_responsible(&self_id, key, p2p_node, config.close_group_size).await
2084 {
2085 paid_only_fetch_keys.insert(*key);
2086 }
2087 }
2088
2089 let mut q = queues.write().await;
2091 for (key, outcome, pipeline) in evaluated {
2092 match outcome {
2093 KeyVerificationOutcome::QuorumVerified { sources }
2094 | KeyVerificationOutcome::PaidListVerified { sources } => {
2095 let fetch_eligible =
2096 pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
2097 if fetch_eligible && !sources.is_empty() {
2098 let distance =
2099 crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2100 let _ = q.promote_pending_to_fetch(key, distance, sources);
2104 } else if fetch_eligible && sources.is_empty() {
2107 warn!(
2108 "Verified responsible key {} has no holders (possible data loss)",
2109 hex::encode(key)
2110 );
2111 q.remove_pending(&key);
2112 terminal_keys.push(key);
2113 } else {
2114 q.remove_pending(&key);
2115 terminal_keys.push(key);
2116 }
2117 }
2118 KeyVerificationOutcome::QuorumFailed
2119 | KeyVerificationOutcome::QuorumInconclusive => {
2120 q.remove_pending(&key);
2121 terminal_keys.push(key);
2122 }
2123 }
2124 }
2125 }
2126
2127 update_bootstrap_after_verification(
2130 &terminal_keys,
2131 bootstrap_state,
2132 queues,
2133 is_bootstrapping,
2134 bootstrap_complete_notify,
2135 )
2136 .await;
2137}
2138
2139async fn update_bootstrap_after_verification(
2142 terminal_keys: &[XorName],
2143 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2144 queues: &Arc<RwLock<ReplicationQueues>>,
2145 is_bootstrapping: &Arc<RwLock<bool>>,
2146 bootstrap_complete_notify: &Arc<Notify>,
2147) {
2148 if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
2149 return;
2150 }
2151 {
2152 let mut bs = bootstrap_state.write().await;
2153 for key in terminal_keys {
2154 bs.remove_key(key);
2155 }
2156 }
2157 let q = queues.read().await;
2158 if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
2159 complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
2160 }
2161}
2162
2163async fn complete_bootstrap(
2165 is_bootstrapping: &Arc<RwLock<bool>>,
2166 bootstrap_complete_notify: &Arc<Notify>,
2167) {
2168 *is_bootstrapping.write().await = false;
2169 bootstrap_complete_notify.notify_waiters();
2170 info!("Replication bootstrap complete");
2171}
2172
2173enum FetchResult {
2179 Stored,
2181 IntegrityFailed,
2183 SourceFailed,
2185}
2186
2187struct FetchOutcome {
2190 key: XorName,
2191 result: FetchResult,
2192}
2193
2194#[allow(clippy::too_many_lines)]
2195async fn execute_single_fetch(
2201 p2p_node: Arc<P2PNode>,
2202 storage: Arc<LmdbStorage>,
2203 config: Arc<ReplicationConfig>,
2204 key: XorName,
2205 source: PeerId,
2206) -> FetchOutcome {
2207 let request = protocol::FetchRequest { key };
2208 let msg = ReplicationMessage {
2209 request_id: rand::thread_rng().gen::<u64>(),
2210 body: ReplicationMessageBody::FetchRequest(request),
2211 };
2212
2213 let encoded = match msg.encode() {
2214 Ok(data) => data,
2215 Err(e) => {
2216 warn!("Failed to encode fetch request: {e}");
2217 return FetchOutcome {
2218 key,
2219 result: FetchResult::SourceFailed,
2220 };
2221 }
2222 };
2223
2224 let result = p2p_node
2225 .send_request(
2226 &source,
2227 REPLICATION_PROTOCOL_ID,
2228 encoded,
2229 config.fetch_request_timeout,
2230 )
2231 .await;
2232
2233 match result {
2234 Ok(response) => {
2235 let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2236 p2p_node
2237 .report_trust_event(
2238 &source,
2239 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2240 )
2241 .await;
2242 return FetchOutcome {
2243 key,
2244 result: FetchResult::SourceFailed,
2245 };
2246 };
2247
2248 match resp_msg.body {
2249 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2250 key: resp_key,
2251 data,
2252 }) => {
2253 if resp_key != key {
2258 warn!(
2259 "Fetch response key mismatch: requested {}, got {}",
2260 hex::encode(key),
2261 hex::encode(resp_key)
2262 );
2263 p2p_node
2264 .report_trust_event(
2265 &source,
2266 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2267 )
2268 .await;
2269 return FetchOutcome {
2270 key,
2271 result: FetchResult::IntegrityFailed,
2272 };
2273 }
2274
2275 if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2279 warn!(
2280 "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2281 hex::encode(resp_key),
2282 data.len(),
2283 crate::ant_protocol::MAX_CHUNK_SIZE,
2284 );
2285 p2p_node
2286 .report_trust_event(
2287 &source,
2288 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2289 )
2290 .await;
2291 return FetchOutcome {
2292 key,
2293 result: FetchResult::IntegrityFailed,
2294 };
2295 }
2296
2297 let computed = crate::client::compute_address(&data);
2299 if computed != resp_key {
2300 warn!(
2301 "Fetched record integrity check failed: expected {}, got {}",
2302 hex::encode(resp_key),
2303 hex::encode(computed)
2304 );
2305 p2p_node
2306 .report_trust_event(
2307 &source,
2308 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2309 )
2310 .await;
2311 return FetchOutcome {
2312 key,
2313 result: FetchResult::IntegrityFailed,
2314 };
2315 }
2316
2317 if let Err(e) = storage.put(&resp_key, &data).await {
2318 warn!(
2319 "Failed to store fetched record {}: {e}",
2320 hex::encode(resp_key)
2321 );
2322 return FetchOutcome {
2323 key,
2324 result: FetchResult::SourceFailed,
2325 };
2326 }
2327
2328 FetchOutcome {
2329 key,
2330 result: FetchResult::Stored,
2331 }
2332 }
2333 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2334 ..
2335 }) => {
2336 warn!(
2342 "Fetch: verified source {source} returned NotFound for {}",
2343 hex::encode(key)
2344 );
2345 p2p_node
2346 .report_trust_event(
2347 &source,
2348 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2349 )
2350 .await;
2351 FetchOutcome {
2352 key,
2353 result: FetchResult::SourceFailed,
2354 }
2355 }
2356 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2357 reason,
2358 ..
2359 }) => {
2360 warn!(
2361 "Fetch: peer {source} returned error for {}: {reason}",
2362 hex::encode(key)
2363 );
2364 p2p_node
2365 .report_trust_event(
2366 &source,
2367 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2368 )
2369 .await;
2370 FetchOutcome {
2371 key,
2372 result: FetchResult::SourceFailed,
2373 }
2374 }
2375 _ => {
2376 p2p_node
2378 .report_trust_event(
2379 &source,
2380 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2381 )
2382 .await;
2383 FetchOutcome {
2384 key,
2385 result: FetchResult::SourceFailed,
2386 }
2387 }
2388 }
2389 }
2390 Err(e) => {
2391 debug!("Fetch request to {source} failed: {e}");
2392 FetchOutcome {
2395 key,
2396 result: FetchResult::SourceFailed,
2397 }
2398 }
2399 }
2400}
2401
2402async fn handle_audit_result(
2408 result: &AuditTickResult,
2409 p2p_node: &Arc<P2PNode>,
2410 sync_state: &Arc<RwLock<NeighborSyncState>>,
2411 config: &ReplicationConfig,
2412) {
2413 match result {
2414 AuditTickResult::Passed {
2415 challenged_peer,
2416 keys_checked,
2417 } => {
2418 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2419 {
2422 let mut state = sync_state.write().await;
2423 state.clear_active_bootstrap_claim(challenged_peer);
2424 }
2425 p2p_node
2426 .report_trust_event(
2427 challenged_peer,
2428 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2429 )
2430 .await;
2431 }
2432 AuditTickResult::Failed { evidence } => {
2433 if let FailureEvidence::AuditFailure {
2434 challenged_peer,
2435 confirmed_failed_keys,
2436 reason,
2437 ..
2438 } = evidence
2439 {
2440 error!(
2441 "Audit failure for {challenged_peer}: {} confirmed failed keys",
2442 confirmed_failed_keys.len()
2443 );
2444 if audit_failure_clears_bootstrap_claim(reason) {
2445 let mut state = sync_state.write().await;
2448 state.clear_active_bootstrap_claim(challenged_peer);
2449 } else {
2450 debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
2451 }
2452 p2p_node
2453 .report_trust_event(
2454 challenged_peer,
2455 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2456 )
2457 .await;
2458 }
2459 }
2460 AuditTickResult::BootstrapClaim { peer } => {
2461 let should_report = {
2465 let now = Instant::now();
2466 let mut state = sync_state.write().await;
2467 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
2468 {
2469 BootstrapClaimObservation::WithinGrace { .. } => {
2470 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2471 false
2472 }
2473 BootstrapClaimObservation::PastGrace { first_seen } => {
2474 warn!(
2475 "Audit: peer {peer} claiming bootstrap past grace period \
2476 ({:?} > {:?}), reporting abuse",
2477 now.duration_since(first_seen),
2478 config.bootstrap_claim_grace_period,
2479 );
2480 true
2481 }
2482 BootstrapClaimObservation::Repeated { first_seen } => {
2483 warn!(
2484 "Audit: peer {peer} repeated bootstrap claim after previously \
2485 stopping; first claim was {:?} ago, reporting abuse",
2486 now.duration_since(first_seen),
2487 );
2488 true
2489 }
2490 }
2491 };
2492 if should_report {
2493 p2p_node
2494 .report_trust_event(
2495 peer,
2496 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2497 )
2498 .await;
2499 }
2500 }
2501 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2502 }
2503}
2504
2505fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
2506 !matches!(reason, AuditFailureReason::Timeout)
2507}
2508
2509#[cfg(test)]
2512#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2513mod tests {
2514 use super::audit_failure_clears_bootstrap_claim;
2515 use crate::replication::types::AuditFailureReason;
2516
2517 #[test]
2518 fn audit_timeout_preserves_active_bootstrap_claim() {
2519 assert!(!audit_failure_clears_bootstrap_claim(
2520 &AuditFailureReason::Timeout
2521 ));
2522 }
2523
2524 #[test]
2525 fn decoded_audit_failures_clear_active_bootstrap_claim() {
2526 for reason in [
2527 AuditFailureReason::MalformedResponse,
2528 AuditFailureReason::DigestMismatch,
2529 AuditFailureReason::KeyAbsent,
2530 AuditFailureReason::Rejected,
2531 ] {
2532 assert!(
2533 audit_failure_clears_bootstrap_claim(&reason),
2534 "decoded non-bootstrap failure {reason:?} should clear active claim"
2535 );
2536 }
2537 }
2538}