1#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::PaymentVerifier;
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50 max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51 REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55 FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56 VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61 AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
62 NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68const RR_PREFIX: &str = "/rr/";
74
75type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78struct VerificationCycleContext<'a> {
80 p2p_node: &'a Arc<P2PNode>,
81 paid_list: &'a Arc<PaidList>,
82 storage: &'a Arc<LmdbStorage>,
83 queues: &'a Arc<RwLock<ReplicationQueues>>,
84 config: &'a ReplicationConfig,
85 bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
86 is_bootstrapping: &'a Arc<RwLock<bool>>,
87 bootstrap_complete_notify: &'a Arc<Notify>,
88}
89
90const FETCH_WORKER_POLL_MS: u64 = 100;
92
93const VERIFICATION_WORKER_POLL_MS: u64 = 250;
95
96const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
98
99const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
105
106pub struct ReplicationEngine {
112 config: Arc<ReplicationConfig>,
114 p2p_node: Arc<P2PNode>,
116 storage: Arc<LmdbStorage>,
118 paid_list: Arc<PaidList>,
120 payment_verifier: Arc<PaymentVerifier>,
122 queues: Arc<RwLock<ReplicationQueues>>,
124 sync_state: Arc<RwLock<NeighborSyncState>>,
126 sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
132 sync_cycle_epoch: Arc<RwLock<u64>>,
134 repair_proofs: Arc<RwLock<RepairProofs>>,
136 bootstrap_state: Arc<RwLock<BootstrapState>>,
138 is_bootstrapping: Arc<RwLock<bool>>,
140 sync_trigger: Arc<Notify>,
142 bootstrap_complete_notify: Arc<Notify>,
144 send_semaphore: Arc<Semaphore>,
147 fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
152 shutdown: CancellationToken,
154 task_handles: Vec<JoinHandle<()>>,
156}
157
158impl ReplicationEngine {
159 pub async fn new(
166 config: ReplicationConfig,
167 p2p_node: Arc<P2PNode>,
168 storage: Arc<LmdbStorage>,
169 payment_verifier: Arc<PaymentVerifier>,
170 root_dir: &Path,
171 fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
172 shutdown: CancellationToken,
173 ) -> Result<Self> {
174 config.validate().map_err(Error::Config)?;
175
176 let paid_list = Arc::new(
177 PaidList::new(root_dir)
178 .await
179 .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
180 );
181
182 let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
183 let config = Arc::new(config);
184
185 Ok(Self {
186 config: Arc::clone(&config),
187 p2p_node,
188 storage,
189 paid_list,
190 payment_verifier,
191 queues: Arc::new(RwLock::new(ReplicationQueues::new())),
192 sync_state: Arc::new(RwLock::new(initial_neighbors)),
193 sync_history: Arc::new(RwLock::new(HashMap::new())),
194 sync_cycle_epoch: Arc::new(RwLock::new(0)),
195 repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
196 bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
197 is_bootstrapping: Arc::new(RwLock::new(true)),
198 sync_trigger: Arc::new(Notify::new()),
199 bootstrap_complete_notify: Arc::new(Notify::new()),
200 send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
201 fresh_write_rx: Some(fresh_write_rx),
202 shutdown,
203 task_handles: Vec::new(),
204 })
205 }
206
207 #[must_use]
209 pub fn paid_list(&self) -> &Arc<PaidList> {
210 &self.paid_list
211 }
212
213 pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
219 if !self.task_handles.is_empty() {
220 error!("ReplicationEngine::start() called while already running — ignoring");
221 return;
222 }
223 info!("Starting replication engine");
224
225 self.start_message_handler();
226 self.start_neighbor_sync_loop();
227 self.start_self_lookup_loop();
228 self.start_audit_loop();
229 self.start_fetch_worker();
230 self.start_verification_worker();
231 self.start_bootstrap_sync(dht_events);
232 self.start_fresh_write_drainer();
233
234 info!(
235 "Replication engine started with {} background tasks",
236 self.task_handles.len()
237 );
238 }
239
240 pub async fn is_bootstrapping(&self) -> bool {
245 *self.is_bootstrapping.read().await
246 }
247
248 pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
257 let notified = self.bootstrap_complete_notify.notified();
260 tokio::pin!(notified);
261 notified.as_mut().enable();
262
263 if !*self.is_bootstrapping.read().await {
264 return true;
265 }
266
267 tokio::time::timeout(timeout, notified).await.is_ok()
268 }
269
270 pub async fn shutdown(&mut self) {
276 self.shutdown.cancel();
277 for (i, mut handle) in self.task_handles.drain(..).enumerate() {
278 match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
279 Ok(Ok(())) => {}
280 Ok(Err(e)) if e.is_cancelled() => {}
281 Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
282 Err(_) => {
283 warn!("Replication task {i} did not stop within 10s, aborting");
284 handle.abort();
285 }
286 }
287 }
288 }
289
290 pub fn trigger_neighbor_sync(&self) {
296 self.sync_trigger.notify_one();
297 }
298
299 pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
301 fresh::replicate_fresh(
302 key,
303 data,
304 proof_of_payment,
305 &self.p2p_node,
306 &self.paid_list,
307 &self.config,
308 &self.send_semaphore,
309 )
310 .await;
311 }
312
313 fn start_fresh_write_drainer(&mut self) {
320 let Some(mut rx) = self.fresh_write_rx.take() else {
321 return;
322 };
323 let p2p = Arc::clone(&self.p2p_node);
324 let paid_list = Arc::clone(&self.paid_list);
325 let config = Arc::clone(&self.config);
326 let send_semaphore = Arc::clone(&self.send_semaphore);
327 let shutdown = self.shutdown.clone();
328
329 let handle = tokio::spawn(async move {
330 loop {
331 tokio::select! {
332 () = shutdown.cancelled() => break,
333 event = rx.recv() => {
334 let Some(event) = event else { break };
335 fresh::replicate_fresh(
336 &event.key,
337 &event.data,
338 &event.payment_proof,
339 &p2p,
340 &paid_list,
341 &config,
342 &send_semaphore,
343 )
344 .await;
345 }
346 }
347 }
348 debug!("Fresh-write drainer shut down");
349 });
350 self.task_handles.push(handle);
351 }
352
353 #[allow(clippy::too_many_lines)]
354 fn start_message_handler(&mut self) {
355 let mut p2p_events = self.p2p_node.subscribe_events();
356 let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
357 let p2p = Arc::clone(&self.p2p_node);
358 let storage = Arc::clone(&self.storage);
359 let paid_list = Arc::clone(&self.paid_list);
360 let payment_verifier = Arc::clone(&self.payment_verifier);
361 let queues = Arc::clone(&self.queues);
362 let config = Arc::clone(&self.config);
363 let shutdown = self.shutdown.clone();
364 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
365 let bootstrap_state = Arc::clone(&self.bootstrap_state);
366 let sync_history = Arc::clone(&self.sync_history);
367 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
368 let repair_proofs = Arc::clone(&self.repair_proofs);
369 let sync_trigger = Arc::clone(&self.sync_trigger);
370
371 let handle = tokio::spawn(async move {
372 loop {
373 tokio::select! {
374 () = shutdown.cancelled() => break,
375 event = p2p_events.recv() => {
376 let Ok(event) = event else { continue };
377 if let P2PEvent::Message {
378 topic,
379 source: Some(source),
380 data,
381 ..
382 } = event {
383 let rr_info = if topic == REPLICATION_PROTOCOL_ID {
387 Some((data.clone(), None))
388 } else if topic.starts_with(RR_PREFIX)
389 && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
390 {
391 P2PNode::parse_request_envelope(&data)
392 .filter(|(_, is_resp, _)| !is_resp)
393 .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
394 } else {
395 None
396 };
397 if let Some((payload, rr_message_id)) = rr_info {
398 match handle_replication_message(
399 &source,
400 &payload,
401 &p2p,
402 &storage,
403 &paid_list,
404 &payment_verifier,
405 &queues,
406 &config,
407 &is_bootstrapping,
408 &bootstrap_state,
409 &sync_history,
410 &sync_cycle_epoch,
411 &repair_proofs,
412 rr_message_id.as_deref(),
413 ).await {
414 Ok(()) => {}
415 Err(e) => {
416 debug!(
417 "Replication message from {source} error: {e}"
418 );
419 }
420 }
421 }
422 }
423 }
424 dht_event = dht_events.recv() => {
432 let Ok(dht_event) = dht_event else { continue };
433 match dht_event {
434 DhtNetworkEvent::KClosestPeersChanged { .. } => {
435 debug!(
436 "K-closest peers changed, triggering early neighbor sync"
437 );
438 sync_trigger.notify_one();
439 }
440 DhtNetworkEvent::PeerRemoved { peer_id } => {
441 repair_proofs.write().await.remove_peer(&peer_id);
442 }
443 _ => {}
444 }
445 }
446 }
447 }
448 debug!("Replication message handler shut down");
449 });
450 self.task_handles.push(handle);
451 }
452
453 fn start_neighbor_sync_loop(&mut self) {
454 let p2p = Arc::clone(&self.p2p_node);
455 let storage = Arc::clone(&self.storage);
456 let paid_list = Arc::clone(&self.paid_list);
457 let queues = Arc::clone(&self.queues);
458 let config = Arc::clone(&self.config);
459 let shutdown = self.shutdown.clone();
460 let sync_state = Arc::clone(&self.sync_state);
461 let sync_history = Arc::clone(&self.sync_history);
462 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
463 let repair_proofs = Arc::clone(&self.repair_proofs);
464 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
465 let bootstrap_state = Arc::clone(&self.bootstrap_state);
466 let sync_trigger = Arc::clone(&self.sync_trigger);
467
468 let handle = tokio::spawn(async move {
469 loop {
470 let interval = config.random_neighbor_sync_interval();
471 tokio::select! {
472 () = shutdown.cancelled() => break,
473 () = tokio::time::sleep(interval) => {}
474 () = sync_trigger.notified() => {
475 debug!("Neighbor sync triggered by topology change");
476 }
477 }
478 tokio::select! {
482 () = shutdown.cancelled() => break,
483 () = run_neighbor_sync_round(
484 &p2p,
485 &storage,
486 &paid_list,
487 &queues,
488 &config,
489 &sync_state,
490 &sync_history,
491 &sync_cycle_epoch,
492 &repair_proofs,
493 &is_bootstrapping,
494 &bootstrap_state,
495 ) => {}
496 }
497 }
498 debug!("Neighbor sync loop shut down");
499 });
500 self.task_handles.push(handle);
501 }
502
503 fn start_self_lookup_loop(&mut self) {
504 let p2p = Arc::clone(&self.p2p_node);
505 let config = Arc::clone(&self.config);
506 let shutdown = self.shutdown.clone();
507
508 let handle = tokio::spawn(async move {
509 loop {
510 let interval = config.random_self_lookup_interval();
511 tokio::select! {
512 () = shutdown.cancelled() => break,
513 () = tokio::time::sleep(interval) => {
514 if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
515 debug!("Self-lookup failed: {e}");
516 }
517 }
518 }
519 }
520 debug!("Self-lookup loop shut down");
521 });
522 self.task_handles.push(handle);
523 }
524
525 fn start_audit_loop(&mut self) {
526 let p2p = Arc::clone(&self.p2p_node);
527 let storage = Arc::clone(&self.storage);
528 let config = Arc::clone(&self.config);
529 let shutdown = self.shutdown.clone();
530 let sync_history = Arc::clone(&self.sync_history);
531 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
532 let repair_proofs = Arc::clone(&self.repair_proofs);
533 let bootstrap_state = Arc::clone(&self.bootstrap_state);
534 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
535 let sync_state = Arc::clone(&self.sync_state);
536
537 let handle = tokio::spawn(async move {
538 loop {
540 tokio::select! {
541 () = shutdown.cancelled() => return,
542 () = tokio::time::sleep(
543 std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
544 ) => {
545 if bootstrap_state.read().await.is_drained() {
546 break;
547 }
548 }
549 }
550 }
551
552 {
554 let bootstrapping = *is_bootstrapping.read().await;
555 let result = {
556 let history = sync_history.read().await;
557 let current_sync_epoch = *sync_cycle_epoch.read().await;
558 audit::audit_tick_with_repair_proofs(
559 &p2p,
560 &storage,
561 &config,
562 &history,
563 &repair_proofs,
564 current_sync_epoch,
565 bootstrapping,
566 )
567 .await
568 };
569 handle_audit_result(&result, &p2p, &sync_state, &config).await;
570 }
571
572 loop {
574 let interval = config.random_audit_tick_interval();
575 tokio::select! {
576 () = shutdown.cancelled() => break,
577 () = tokio::time::sleep(interval) => {
578 let bootstrapping = *is_bootstrapping.read().await;
579 let result = {
580 let history = sync_history.read().await;
581 let current_sync_epoch = *sync_cycle_epoch.read().await;
582 audit::audit_tick_with_repair_proofs(
583 &p2p,
584 &storage,
585 &config,
586 &history,
587 &repair_proofs,
588 current_sync_epoch,
589 bootstrapping,
590 )
591 .await
592 };
593 handle_audit_result(&result, &p2p, &sync_state, &config).await;
594 }
595 }
596 }
597 debug!("Audit loop shut down");
598 });
599 self.task_handles.push(handle);
600 }
601
602 #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
603 fn start_fetch_worker(&mut self) {
604 let p2p = Arc::clone(&self.p2p_node);
605 let storage = Arc::clone(&self.storage);
606 let queues = Arc::clone(&self.queues);
607 let config = Arc::clone(&self.config);
608 let shutdown = self.shutdown.clone();
609 let bootstrap_state = Arc::clone(&self.bootstrap_state);
610 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
611 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
612 let concurrency = max_parallel_fetch();
613
614 info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
615
616 let handle = tokio::spawn(async move {
617 let mut in_flight = FuturesUnordered::<FetchFuture>::new();
620
621 loop {
622 {
624 let mut q = queues.write().await;
625 while in_flight.len() < concurrency {
626 let Some(candidate) = q.dequeue_fetch() else {
627 break;
628 };
629 let Some(&source) = candidate.sources.first() else {
630 warn!(
631 "Fetch candidate {} has no sources — dropping",
632 hex::encode(candidate.key)
633 );
634 continue;
635 };
636 q.start_fetch(candidate.key, source, candidate.sources.clone());
637
638 let p2p = Arc::clone(&p2p);
639 let storage = Arc::clone(&storage);
640 let config = Arc::clone(&config);
641 let token = shutdown.clone();
642 let fetch_key = candidate.key;
643 in_flight.push(Box::pin(async move {
644 let handle = tokio::spawn(async move {
645 tokio::select! {
647 () = token.cancelled() => FetchOutcome {
648 key: fetch_key,
649 result: FetchResult::SourceFailed,
650 },
651 outcome = execute_single_fetch(
652 p2p, storage, config, fetch_key, source,
653 ) => outcome,
654 }
655 });
656 match handle.await {
657 Ok(outcome) => (outcome.key, Some(outcome)),
658 Err(e) => {
659 error!(
660 "Fetch task for {} panicked: {e}",
661 hex::encode(fetch_key)
662 );
663 (fetch_key, None)
664 }
665 }
666 }));
667 }
668 } if in_flight.is_empty() {
671 tokio::select! {
673 () = shutdown.cancelled() => break,
674 () = tokio::time::sleep(
675 std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
676 ) => continue,
677 }
678 }
679
680 tokio::select! {
682 () = shutdown.cancelled() => break,
683 Some((key, maybe_outcome)) = in_flight.next() => {
684 let mut q = queues.write().await;
685 let terminal = if let Some(outcome) = maybe_outcome {
686 match outcome.result {
687 FetchResult::Stored => {
688 q.complete_fetch(&key);
689 true
690 }
691 FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
692 if let Some(next_peer) = q.retry_fetch(&key) {
693 let p2p = Arc::clone(&p2p);
695 let storage = Arc::clone(&storage);
696 let config = Arc::clone(&config);
697 let token = shutdown.clone();
698 let fetch_key = key;
699 in_flight.push(Box::pin(async move {
700 let handle = tokio::spawn(async move {
701 tokio::select! {
702 () = token.cancelled() => FetchOutcome {
703 key: fetch_key,
704 result: FetchResult::SourceFailed,
705 },
706 outcome = execute_single_fetch(
707 p2p, storage, config, fetch_key, next_peer,
708 ) => outcome,
709 }
710 });
711 match handle.await {
712 Ok(outcome) => (outcome.key, Some(outcome)),
713 Err(e) => {
714 error!(
715 "Fetch task for {} panicked: {e}",
716 hex::encode(fetch_key)
717 );
718 (fetch_key, None)
719 }
720 }
721 }));
722 false
723 } else {
724 q.complete_fetch(&key);
725 true
726 }
727 }
728 }
729 } else {
730 q.complete_fetch(&key);
732 true
733 };
734
735 if terminal {
737 drop(q); if !bootstrap_state.read().await.is_drained() {
739 bootstrap_state.write().await.remove_key(&key);
740 let q = queues.read().await;
741 if bootstrap::check_bootstrap_drained(
742 &bootstrap_state,
743 &q,
744 )
745 .await
746 {
747 complete_bootstrap(
748 &is_bootstrapping,
749 &bootstrap_complete_notify,
750 ).await;
751 }
752 }
753 }
754 }
755 }
756 }
757
758 while in_flight.next().await.is_some() {}
762 debug!("Fetch worker shut down");
763 });
764 self.task_handles.push(handle);
765 }
766
767 fn start_verification_worker(&mut self) {
768 let p2p = Arc::clone(&self.p2p_node);
769 let storage = Arc::clone(&self.storage);
770 let queues = Arc::clone(&self.queues);
771 let paid_list = Arc::clone(&self.paid_list);
772 let config = Arc::clone(&self.config);
773 let shutdown = self.shutdown.clone();
774 let bootstrap_state = Arc::clone(&self.bootstrap_state);
775 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
776 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
777
778 let handle = tokio::spawn(async move {
779 loop {
780 tokio::select! {
781 () = shutdown.cancelled() => break,
782 () = tokio::time::sleep(
783 std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
784 ) => {
785 let ctx = VerificationCycleContext {
786 p2p_node: &p2p,
787 paid_list: &paid_list,
788 storage: &storage,
789 queues: &queues,
790 config: &config,
791 bootstrap_state: &bootstrap_state,
792 is_bootstrapping: &is_bootstrapping,
793 bootstrap_complete_notify: &bootstrap_complete_notify,
794 };
795 run_verification_cycle(ctx).await;
796 }
797 }
798 }
799 debug!("Verification worker shut down");
800 });
801 self.task_handles.push(handle);
802 }
803
804 #[allow(clippy::too_many_lines)]
816 fn start_bootstrap_sync(
817 &mut self,
818 dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
819 ) {
820 let p2p = Arc::clone(&self.p2p_node);
821 let storage = Arc::clone(&self.storage);
822 let paid_list = Arc::clone(&self.paid_list);
823 let queues = Arc::clone(&self.queues);
824 let config = Arc::clone(&self.config);
825 let shutdown = self.shutdown.clone();
826 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
827 let bootstrap_state = Arc::clone(&self.bootstrap_state);
828 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
829 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
830 let repair_proofs = Arc::clone(&self.repair_proofs);
831
832 let handle = tokio::spawn(async move {
833 let gate = bootstrap::wait_for_bootstrap_complete(
837 dht_events,
838 config.bootstrap_complete_timeout_secs,
839 &shutdown,
840 )
841 .await;
842
843 if gate == bootstrap::BootstrapGateResult::Shutdown {
844 return;
845 }
846
847 let self_id = *p2p.peer_id();
848 let neighbors =
849 neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
850 .await;
851
852 if neighbors.is_empty() {
853 info!("Bootstrap sync: no close neighbors found, marking drained");
854 bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
855 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
856 return;
857 }
858
859 let neighbor_count = neighbors.len();
860 info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
861
862 for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
864 if shutdown.is_cancelled() {
865 break;
866 }
867
868 for peer in batch {
869 if shutdown.is_cancelled() {
870 break;
871 }
872
873 let bootstrapping = *is_bootstrapping.read().await;
875
876 bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
877
878 let outcome = neighbor_sync::sync_with_peer_with_outcome(
879 peer,
880 &p2p,
881 &storage,
882 &paid_list,
883 &config,
884 bootstrapping,
885 )
886 .await;
887
888 bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
889
890 if let Some(outcome) = outcome {
891 if !outcome.response.bootstrapping {
892 record_sent_replica_hints(
893 peer,
894 &outcome.sent_replica_hints,
895 &repair_proofs,
896 &sync_cycle_epoch,
897 )
898 .await;
899 let outcome = admit_and_queue_hints(
901 &self_id,
902 peer,
903 &outcome.response.replica_hints,
904 &outcome.response.paid_hints,
905 &p2p,
906 &config,
907 &storage,
908 &paid_list,
909 &queues,
910 )
911 .await;
912
913 if !outcome.discovered.is_empty() {
915 bootstrap::track_discovered_keys(
916 &bootstrap_state,
917 &outcome.discovered,
918 )
919 .await;
920 }
921
922 if outcome.capacity_rejected_count > 0 {
927 bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
928 } else {
929 bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
930 }
931 }
932 }
933 }
934 }
935
936 {
938 let q = queues.read().await;
939 if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
940 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
941 }
942 }
943
944 info!("Bootstrap sync completed");
945 });
946 self.task_handles.push(handle);
947 }
948}
949
950#[allow(clippy::too_many_arguments)]
960async fn handle_replication_message(
961 source: &PeerId,
962 data: &[u8],
963 p2p_node: &Arc<P2PNode>,
964 storage: &Arc<LmdbStorage>,
965 paid_list: &Arc<PaidList>,
966 payment_verifier: &Arc<PaymentVerifier>,
967 queues: &Arc<RwLock<ReplicationQueues>>,
968 config: &ReplicationConfig,
969 is_bootstrapping: &Arc<RwLock<bool>>,
970 bootstrap_state: &Arc<RwLock<BootstrapState>>,
971 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
972 sync_cycle_epoch: &Arc<RwLock<u64>>,
973 repair_proofs: &Arc<RwLock<RepairProofs>>,
974 rr_message_id: Option<&str>,
975) -> Result<()> {
976 let msg = ReplicationMessage::decode(data)
977 .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
978
979 match msg.body {
980 ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
981 handle_fresh_offer(
982 source,
983 offer,
984 storage,
985 paid_list,
986 payment_verifier,
987 p2p_node,
988 config,
989 msg.request_id,
990 rr_message_id,
991 )
992 .await
993 }
994 ReplicationMessageBody::PaidNotify(ref notify) => {
995 handle_paid_notify(
996 source,
997 notify,
998 paid_list,
999 payment_verifier,
1000 p2p_node,
1001 config,
1002 )
1003 .await
1004 }
1005 ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1006 let bootstrapping = *is_bootstrapping.read().await;
1007 handle_neighbor_sync_request(
1008 source,
1009 request,
1010 p2p_node,
1011 storage,
1012 paid_list,
1013 queues,
1014 config,
1015 bootstrapping,
1016 bootstrap_state,
1017 sync_history,
1018 sync_cycle_epoch,
1019 repair_proofs,
1020 msg.request_id,
1021 rr_message_id,
1022 )
1023 .await
1024 }
1025 ReplicationMessageBody::VerificationRequest(ref request) => {
1026 handle_verification_request(
1027 source,
1028 request,
1029 storage,
1030 paid_list,
1031 p2p_node,
1032 msg.request_id,
1033 rr_message_id,
1034 )
1035 .await
1036 }
1037 ReplicationMessageBody::FetchRequest(ref request) => {
1038 handle_fetch_request(
1039 source,
1040 request,
1041 storage,
1042 p2p_node,
1043 msg.request_id,
1044 rr_message_id,
1045 )
1046 .await
1047 }
1048 ReplicationMessageBody::AuditChallenge(ref challenge) => {
1049 let bootstrapping = *is_bootstrapping.read().await;
1050 handle_audit_challenge_msg(
1051 source,
1052 challenge,
1053 storage,
1054 p2p_node,
1055 bootstrapping,
1056 msg.request_id,
1057 rr_message_id,
1058 )
1059 .await
1060 }
1061 ReplicationMessageBody::FreshReplicationResponse(_)
1063 | ReplicationMessageBody::NeighborSyncResponse(_)
1064 | ReplicationMessageBody::VerificationResponse(_)
1065 | ReplicationMessageBody::FetchResponse(_)
1066 | ReplicationMessageBody::AuditResponse(_) => Ok(()),
1067 }
1068}
1069
1070#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1075async fn handle_fresh_offer(
1076 source: &PeerId,
1077 offer: &protocol::FreshReplicationOffer,
1078 storage: &Arc<LmdbStorage>,
1079 paid_list: &Arc<PaidList>,
1080 payment_verifier: &Arc<PaymentVerifier>,
1081 p2p_node: &Arc<P2PNode>,
1082 config: &ReplicationConfig,
1083 request_id: u64,
1084 rr_message_id: Option<&str>,
1085) -> Result<()> {
1086 let self_id = *p2p_node.peer_id();
1087
1088 if offer.proof_of_payment.is_empty() {
1090 send_replication_response(
1091 source,
1092 p2p_node,
1093 request_id,
1094 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1095 key: offer.key,
1096 reason: "Missing proof of payment".to_string(),
1097 }),
1098 rr_message_id,
1099 )
1100 .await;
1101 return Ok(());
1102 }
1103
1104 if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1108 warn!(
1109 "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1110 hex::encode(offer.key),
1111 offer.data.len(),
1112 crate::ant_protocol::MAX_CHUNK_SIZE,
1113 );
1114 p2p_node
1115 .report_trust_event(
1116 source,
1117 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1118 )
1119 .await;
1120 send_replication_response(
1121 source,
1122 p2p_node,
1123 request_id,
1124 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1125 key: offer.key,
1126 reason: format!(
1127 "Data size {} exceeds maximum chunk size {}",
1128 offer.data.len(),
1129 crate::ant_protocol::MAX_CHUNK_SIZE,
1130 ),
1131 }),
1132 rr_message_id,
1133 )
1134 .await;
1135 return Ok(());
1136 }
1137
1138 if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1140 send_replication_response(
1141 source,
1142 p2p_node,
1143 request_id,
1144 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1145 key: offer.key,
1146 reason: "Not responsible for this key".to_string(),
1147 }),
1148 rr_message_id,
1149 )
1150 .await;
1151 return Ok(());
1152 }
1153
1154 match payment_verifier
1156 .verify_payment(&offer.key, Some(&offer.proof_of_payment))
1157 .await
1158 {
1159 Ok(status) if status.can_store() => {
1160 debug!(
1161 "PoP validated for fresh offer key {}",
1162 hex::encode(offer.key)
1163 );
1164 }
1165 Ok(_) => {
1166 send_replication_response(
1167 source,
1168 p2p_node,
1169 request_id,
1170 ReplicationMessageBody::FreshReplicationResponse(
1171 FreshReplicationResponse::Rejected {
1172 key: offer.key,
1173 reason: "Payment verification failed: payment required".to_string(),
1174 },
1175 ),
1176 rr_message_id,
1177 )
1178 .await;
1179 return Ok(());
1180 }
1181 Err(e) => {
1182 warn!(
1183 "PoP verification error for key {}: {e}",
1184 hex::encode(offer.key)
1185 );
1186 send_replication_response(
1187 source,
1188 p2p_node,
1189 request_id,
1190 ReplicationMessageBody::FreshReplicationResponse(
1191 FreshReplicationResponse::Rejected {
1192 key: offer.key,
1193 reason: format!("Payment verification error: {e}"),
1194 },
1195 ),
1196 rr_message_id,
1197 )
1198 .await;
1199 return Ok(());
1200 }
1201 }
1202
1203 if let Err(e) = paid_list.insert(&offer.key).await {
1205 warn!("Failed to add key to PaidForList: {e}");
1206 }
1207
1208 match storage.put(&offer.key, &offer.data).await {
1210 Ok(_) => {
1211 send_replication_response(
1212 source,
1213 p2p_node,
1214 request_id,
1215 ReplicationMessageBody::FreshReplicationResponse(
1216 FreshReplicationResponse::Accepted { key: offer.key },
1217 ),
1218 rr_message_id,
1219 )
1220 .await;
1221 }
1222 Err(e) => {
1223 send_replication_response(
1224 source,
1225 p2p_node,
1226 request_id,
1227 ReplicationMessageBody::FreshReplicationResponse(
1228 FreshReplicationResponse::Rejected {
1229 key: offer.key,
1230 reason: format!("Storage error: {e}"),
1231 },
1232 ),
1233 rr_message_id,
1234 )
1235 .await;
1236 }
1237 }
1238
1239 Ok(())
1240}
1241
1242async fn handle_paid_notify(
1243 _source: &PeerId,
1244 notify: &protocol::PaidNotify,
1245 paid_list: &Arc<PaidList>,
1246 payment_verifier: &Arc<PaymentVerifier>,
1247 p2p_node: &Arc<P2PNode>,
1248 config: &ReplicationConfig,
1249) -> Result<()> {
1250 let self_id = *p2p_node.peer_id();
1251
1252 if notify.proof_of_payment.is_empty() {
1254 return Ok(());
1255 }
1256
1257 if !admission::is_in_paid_close_group(
1259 &self_id,
1260 ¬ify.key,
1261 p2p_node,
1262 config.paid_list_close_group_size,
1263 )
1264 .await
1265 {
1266 return Ok(());
1267 }
1268
1269 match payment_verifier
1271 .verify_payment(¬ify.key, Some(¬ify.proof_of_payment))
1272 .await
1273 {
1274 Ok(status) if status.can_store() => {
1275 debug!(
1276 "PoP validated for paid notify key {}",
1277 hex::encode(notify.key)
1278 );
1279 }
1280 Ok(_) => {
1281 warn!(
1282 "Paid notify rejected: payment required for key {}",
1283 hex::encode(notify.key)
1284 );
1285 return Ok(());
1286 }
1287 Err(e) => {
1288 warn!(
1289 "PoP verification error for paid notify key {}: {e}",
1290 hex::encode(notify.key)
1291 );
1292 return Ok(());
1293 }
1294 }
1295
1296 if let Err(e) = paid_list.insert(¬ify.key).await {
1297 warn!("Failed to add paid notify key to PaidForList: {e}");
1298 }
1299
1300 Ok(())
1301}
1302
1303#[allow(clippy::too_many_arguments)]
1304async fn handle_neighbor_sync_request(
1305 source: &PeerId,
1306 request: &protocol::NeighborSyncRequest,
1307 p2p_node: &Arc<P2PNode>,
1308 storage: &Arc<LmdbStorage>,
1309 paid_list: &Arc<PaidList>,
1310 queues: &Arc<RwLock<ReplicationQueues>>,
1311 config: &ReplicationConfig,
1312 is_bootstrapping: bool,
1313 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1314 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1315 sync_cycle_epoch: &Arc<RwLock<u64>>,
1316 repair_proofs: &Arc<RwLock<RepairProofs>>,
1317 request_id: u64,
1318 rr_message_id: Option<&str>,
1319) -> Result<()> {
1320 let self_id = *p2p_node.peer_id();
1321
1322 let (response, sent_replica_hints, sender_in_rt) =
1330 neighbor_sync::handle_sync_request_with_proofs(
1331 source,
1332 request,
1333 p2p_node,
1334 storage,
1335 paid_list,
1336 config,
1337 is_bootstrapping,
1338 )
1339 .await;
1340
1341 let response_sent = send_replication_response_checked(
1343 source,
1344 p2p_node,
1345 request_id,
1346 ReplicationMessageBody::NeighborSyncResponse(response),
1347 rr_message_id,
1348 )
1349 .await;
1350
1351 if !sender_in_rt {
1353 return Ok(());
1354 }
1355
1356 {
1359 let mut history = sync_history.write().await;
1360 let record = history.entry(*source).or_insert(PeerSyncRecord {
1361 last_sync: None,
1362 cycles_since_sync: 0,
1363 });
1364 record.last_sync = Some(Instant::now());
1365 record.cycles_since_sync = 0;
1366 }
1367
1368 if response_sent && !request.bootstrapping {
1369 record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
1370 .await;
1371 }
1372
1373 let outcome = admit_and_queue_hints(
1375 &self_id,
1376 source,
1377 &request.replica_hints,
1378 &request.paid_hints,
1379 p2p_node,
1380 config,
1381 storage,
1382 paid_list,
1383 queues,
1384 )
1385 .await;
1386
1387 if is_bootstrapping {
1392 if !outcome.discovered.is_empty() {
1393 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1394 }
1395 if outcome.capacity_rejected_count > 0 {
1396 bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
1397 } else {
1398 bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
1399 }
1400 }
1401
1402 Ok(())
1403}
1404
1405async fn handle_verification_request(
1406 source: &PeerId,
1407 request: &protocol::VerificationRequest,
1408 storage: &Arc<LmdbStorage>,
1409 paid_list: &Arc<PaidList>,
1410 p2p_node: &Arc<P2PNode>,
1411 request_id: u64,
1412 rr_message_id: Option<&str>,
1413) -> Result<()> {
1414 #[allow(clippy::cast_possible_truncation)]
1420 let keys_len = request.keys.len() as u32;
1421 let paid_check_set: HashSet<u32> = request
1422 .paid_list_check_indices
1423 .iter()
1424 .copied()
1425 .filter(|&idx| {
1426 if idx >= keys_len {
1427 warn!(
1428 "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1429 request.keys.len(),
1430 );
1431 false
1432 } else {
1433 true
1434 }
1435 })
1436 .collect();
1437
1438 let mut results = Vec::with_capacity(request.keys.len());
1439 for (i, key) in request.keys.iter().enumerate() {
1440 let present = storage.exists(key).unwrap_or(false);
1441 let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1442 Some(paid_list.contains(key).unwrap_or(false))
1443 } else {
1444 None
1445 };
1446 results.push(protocol::KeyVerificationResult {
1447 key: *key,
1448 present,
1449 paid,
1450 });
1451 }
1452
1453 send_replication_response(
1454 source,
1455 p2p_node,
1456 request_id,
1457 ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1458 rr_message_id,
1459 )
1460 .await;
1461
1462 Ok(())
1463}
1464
1465async fn handle_fetch_request(
1466 source: &PeerId,
1467 request: &protocol::FetchRequest,
1468 storage: &Arc<LmdbStorage>,
1469 p2p_node: &Arc<P2PNode>,
1470 request_id: u64,
1471 rr_message_id: Option<&str>,
1472) -> Result<()> {
1473 let response = match storage.get(&request.key).await {
1474 Ok(Some(data)) => protocol::FetchResponse::Success {
1475 key: request.key,
1476 data,
1477 },
1478 Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1479 Err(e) => protocol::FetchResponse::Error {
1480 key: request.key,
1481 reason: format!("{e}"),
1482 },
1483 };
1484
1485 send_replication_response(
1486 source,
1487 p2p_node,
1488 request_id,
1489 ReplicationMessageBody::FetchResponse(response),
1490 rr_message_id,
1491 )
1492 .await;
1493
1494 Ok(())
1495}
1496
1497async fn handle_audit_challenge_msg(
1498 source: &PeerId,
1499 challenge: &protocol::AuditChallenge,
1500 storage: &Arc<LmdbStorage>,
1501 p2p_node: &Arc<P2PNode>,
1502 is_bootstrapping: bool,
1503 request_id: u64,
1504 rr_message_id: Option<&str>,
1505) -> Result<()> {
1506 #[allow(clippy::cast_possible_truncation)]
1507 let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1508 let response = audit::handle_audit_challenge(
1509 challenge,
1510 storage,
1511 p2p_node.peer_id(),
1512 is_bootstrapping,
1513 stored_chunks,
1514 )
1515 .await;
1516
1517 send_replication_response(
1518 source,
1519 p2p_node,
1520 request_id,
1521 ReplicationMessageBody::AuditResponse(response),
1522 rr_message_id,
1523 )
1524 .await;
1525
1526 Ok(())
1527}
1528
1529async fn send_replication_response(
1539 peer: &PeerId,
1540 p2p_node: &Arc<P2PNode>,
1541 request_id: u64,
1542 body: ReplicationMessageBody,
1543 rr_message_id: Option<&str>,
1544) {
1545 let _ =
1546 send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
1547}
1548
1549async fn send_replication_response_checked(
1559 peer: &PeerId,
1560 p2p_node: &Arc<P2PNode>,
1561 request_id: u64,
1562 body: ReplicationMessageBody,
1563 rr_message_id: Option<&str>,
1564) -> bool {
1565 let msg = ReplicationMessage { request_id, body };
1566 let encoded = match msg.encode() {
1567 Ok(data) => data,
1568 Err(e) => {
1569 warn!("Failed to encode replication response: {e}");
1570 return false;
1571 }
1572 };
1573 let result = if let Some(msg_id) = rr_message_id {
1574 p2p_node
1575 .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1576 .await
1577 } else {
1578 p2p_node
1579 .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1580 .await
1581 };
1582 if let Err(e) = result {
1583 debug!("Failed to send replication response to {peer}: {e}");
1584 return false;
1585 }
1586 true
1587}
1588
1589async fn record_sent_replica_hints(
1590 peer: &PeerId,
1591 hints: &[neighbor_sync::SentReplicaHint],
1592 repair_proofs: &Arc<RwLock<RepairProofs>>,
1593 sync_cycle_epoch: &Arc<RwLock<u64>>,
1594) {
1595 if hints.is_empty() {
1596 return;
1597 }
1598
1599 let hinted_at_epoch = *sync_cycle_epoch.read().await;
1600 let mut proofs = repair_proofs.write().await;
1601 for hint in hints {
1602 if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
1603 debug!(
1604 "Recorded repair hint proof for peer {peer} and key {}",
1605 hex::encode(hint.key)
1606 );
1607 }
1608 }
1609}
1610
1611#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1617async fn run_neighbor_sync_round(
1618 p2p_node: &Arc<P2PNode>,
1619 storage: &Arc<LmdbStorage>,
1620 paid_list: &Arc<PaidList>,
1621 queues: &Arc<RwLock<ReplicationQueues>>,
1622 config: &ReplicationConfig,
1623 sync_state: &Arc<RwLock<NeighborSyncState>>,
1624 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1625 sync_cycle_epoch: &Arc<RwLock<u64>>,
1626 repair_proofs: &Arc<RwLock<RepairProofs>>,
1627 is_bootstrapping: &Arc<RwLock<bool>>,
1628 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1629) {
1630 let self_id = *p2p_node.peer_id();
1631 let bootstrapping = *is_bootstrapping.read().await;
1632
1633 let cycle_complete = sync_state.read().await.is_cycle_complete();
1637 if cycle_complete {
1638 {
1641 let mut history = sync_history.write().await;
1642 for record in history.values_mut() {
1643 record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1644 }
1645 }
1646 let current_sync_epoch = {
1647 let mut epoch = sync_cycle_epoch.write().await;
1648 *epoch = epoch.saturating_add(1);
1649 *epoch
1650 };
1651
1652 let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
1656 pruning::run_prune_pass_with_context(pruning::PrunePassContext {
1657 self_id: &self_id,
1658 storage,
1659 paid_list,
1660 p2p_node,
1661 config,
1662 sync_state,
1663 repair_proofs,
1664 current_sync_epoch,
1665 allow_remote_prune_audits,
1666 })
1667 .await;
1668
1669 let neighbors =
1671 neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1672 .await;
1673
1674 let mut state = sync_state.write().await;
1676 if state.is_cycle_complete() {
1677 let old_sync_times = std::mem::take(&mut state.last_sync_times);
1681 let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1682 let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
1683 let old_prune_cursor = state.prune_cursor;
1684 *state = NeighborSyncState::new_cycle(neighbors);
1685 state.last_sync_times = old_sync_times;
1686 state.bootstrap_claims = old_bootstrap_claims;
1687 state.bootstrap_claim_history = old_bootstrap_claim_history;
1688 state.prune_cursor = old_prune_cursor;
1689 }
1690 }
1691
1692 let batch = {
1694 let mut state = sync_state.write().await;
1695 neighbor_sync::select_sync_batch(
1696 &mut state,
1697 config.neighbor_sync_peer_count,
1698 config.neighbor_sync_cooldown,
1699 )
1700 };
1701
1702 if batch.is_empty() {
1703 return;
1704 }
1705
1706 debug!("Neighbor sync: syncing with {} peers", batch.len());
1707
1708 for peer in &batch {
1710 let outcome = neighbor_sync::sync_with_peer_with_outcome(
1711 peer,
1712 p2p_node,
1713 storage,
1714 paid_list,
1715 config,
1716 bootstrapping,
1717 )
1718 .await;
1719
1720 if let Some(outcome) = outcome {
1721 handle_sync_response(
1722 &self_id,
1723 peer,
1724 &outcome.response,
1725 &outcome.sent_replica_hints,
1726 p2p_node,
1727 config,
1728 bootstrapping,
1729 bootstrap_state,
1730 storage,
1731 paid_list,
1732 queues,
1733 sync_state,
1734 sync_history,
1735 sync_cycle_epoch,
1736 repair_proofs,
1737 )
1738 .await;
1739 } else {
1740 let replacement = {
1742 let mut state = sync_state.write().await;
1743 neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1744 };
1745
1746 if let Some(replacement_peer) = replacement {
1748 let replacement_outcome = neighbor_sync::sync_with_peer_with_outcome(
1749 &replacement_peer,
1750 p2p_node,
1751 storage,
1752 paid_list,
1753 config,
1754 bootstrapping,
1755 )
1756 .await;
1757
1758 if let Some(outcome) = replacement_outcome {
1759 handle_sync_response(
1760 &self_id,
1761 &replacement_peer,
1762 &outcome.response,
1763 &outcome.sent_replica_hints,
1764 p2p_node,
1765 config,
1766 bootstrapping,
1767 bootstrap_state,
1768 storage,
1769 paid_list,
1770 queues,
1771 sync_state,
1772 sync_history,
1773 sync_cycle_epoch,
1774 repair_proofs,
1775 )
1776 .await;
1777 }
1778 }
1779 }
1780 }
1781}
1782
1783#[allow(clippy::too_many_arguments)]
1786async fn handle_sync_response(
1787 self_id: &PeerId,
1788 peer: &PeerId,
1789 resp: &NeighborSyncResponse,
1790 sent_replica_hints: &[neighbor_sync::SentReplicaHint],
1791 p2p_node: &Arc<P2PNode>,
1792 config: &ReplicationConfig,
1793 bootstrapping: bool,
1794 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1795 storage: &Arc<LmdbStorage>,
1796 paid_list: &Arc<PaidList>,
1797 queues: &Arc<RwLock<ReplicationQueues>>,
1798 sync_state: &Arc<RwLock<NeighborSyncState>>,
1799 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1800 sync_cycle_epoch: &Arc<RwLock<u64>>,
1801 repair_proofs: &Arc<RwLock<RepairProofs>>,
1802) {
1803 {
1805 let mut state = sync_state.write().await;
1806 neighbor_sync::record_successful_sync(&mut state, peer);
1807 }
1808 {
1809 let mut history = sync_history.write().await;
1810 let record = history.entry(*peer).or_insert(PeerSyncRecord {
1811 last_sync: None,
1812 cycles_since_sync: 0,
1813 });
1814 record.last_sync = Some(Instant::now());
1815 record.cycles_since_sync = 0;
1816 }
1817
1818 if resp.bootstrapping {
1820 let should_report = {
1824 let now = Instant::now();
1825 let mut state = sync_state.write().await;
1826 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
1827 BootstrapClaimObservation::WithinGrace { .. } => false,
1828 BootstrapClaimObservation::PastGrace { first_seen } => {
1829 warn!(
1830 "Peer {peer} has been claiming bootstrap for {:?}, \
1831 exceeding grace period of {:?} — reporting abuse",
1832 now.duration_since(first_seen),
1833 config.bootstrap_claim_grace_period,
1834 );
1835 true
1836 }
1837 BootstrapClaimObservation::Repeated { first_seen } => {
1838 warn!(
1839 "Peer {peer} repeated bootstrap claim after previously stopping; \
1840 first claim was {:?} ago — reporting abuse",
1841 now.duration_since(first_seen),
1842 );
1843 true
1844 }
1845 }
1846 };
1847 if should_report {
1848 p2p_node
1849 .report_trust_event(
1850 peer,
1851 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1852 )
1853 .await;
1854 }
1855 } else {
1856 {
1859 let mut state = sync_state.write().await;
1860 state.clear_active_bootstrap_claim(peer);
1861 }
1862 record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
1863 let outcome = admit_and_queue_hints(
1864 self_id,
1865 peer,
1866 &resp.replica_hints,
1867 &resp.paid_hints,
1868 p2p_node,
1869 config,
1870 storage,
1871 paid_list,
1872 queues,
1873 )
1874 .await;
1875
1876 if bootstrapping {
1881 if !outcome.discovered.is_empty() {
1882 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1883 }
1884 if outcome.capacity_rejected_count > 0 {
1885 bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
1886 } else {
1887 bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
1888 }
1889 }
1890 }
1891}
1892
1893#[allow(clippy::too_many_arguments)]
1898struct AdmissionOutcome {
1906 discovered: HashSet<XorName>,
1907 capacity_rejected_count: usize,
1908}
1909
1910#[allow(clippy::too_many_arguments)]
1911async fn admit_and_queue_hints(
1912 self_id: &PeerId,
1913 source_peer: &PeerId,
1914 replica_hints: &[XorName],
1915 paid_hints: &[XorName],
1916 p2p_node: &Arc<P2PNode>,
1917 config: &ReplicationConfig,
1918 storage: &Arc<LmdbStorage>,
1919 paid_list: &Arc<PaidList>,
1920 queues: &Arc<RwLock<ReplicationQueues>>,
1921) -> AdmissionOutcome {
1922 let pending_keys: HashSet<XorName> = {
1923 let q = queues.read().await;
1924 q.pending_keys().into_iter().collect()
1925 };
1926
1927 let admitted = admission::admit_hints(
1928 self_id,
1929 replica_hints,
1930 paid_hints,
1931 p2p_node,
1932 config,
1933 storage,
1934 paid_list,
1935 &pending_keys,
1936 )
1937 .await;
1938
1939 let mut discovered = HashSet::new();
1940 let mut capacity_rejected_count: usize = 0;
1941 let mut q = queues.write().await;
1942 let now = Instant::now();
1943
1944 for key in admitted.replica_keys {
1945 if !storage.exists(&key).unwrap_or(false) {
1946 let result = q.add_pending_verify(
1947 key,
1948 VerificationEntry {
1949 state: VerificationState::PendingVerify,
1950 pipeline: HintPipeline::Replica,
1951 verified_sources: Vec::new(),
1952 tried_sources: HashSet::new(),
1953 created_at: now,
1954 hint_sender: *source_peer,
1955 },
1956 );
1957 match result {
1958 crate::replication::scheduling::AdmissionResult::Admitted => {
1959 discovered.insert(key);
1960 }
1961 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1962 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1963 capacity_rejected_count += 1;
1964 }
1965 }
1966 }
1967 }
1968
1969 for key in admitted.paid_only_keys {
1970 let result = q.add_pending_verify(
1971 key,
1972 VerificationEntry {
1973 state: VerificationState::PendingVerify,
1974 pipeline: HintPipeline::PaidOnly,
1975 verified_sources: Vec::new(),
1976 tried_sources: HashSet::new(),
1977 created_at: now,
1978 hint_sender: *source_peer,
1979 },
1980 );
1981 match result {
1982 crate::replication::scheduling::AdmissionResult::Admitted => {
1983 discovered.insert(key);
1984 }
1985 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1986 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1987 capacity_rejected_count += 1;
1988 }
1989 }
1990 }
1991
1992 if capacity_rejected_count > 0 {
1993 debug!(
1994 "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
1995 rejected at queue capacity; source will need to re-hint after pending_verify drains"
1996 );
1997 }
1998
1999 AdmissionOutcome {
2000 discovered,
2001 capacity_rejected_count,
2002 }
2003}
2004
2005#[allow(clippy::too_many_lines)]
2011async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
2012 let VerificationCycleContext {
2013 p2p_node,
2014 paid_list,
2015 storage,
2016 queues,
2017 config,
2018 bootstrap_state,
2019 is_bootstrapping,
2020 bootstrap_complete_notify,
2021 } = ctx;
2022
2023 {
2026 let mut q = queues.write().await;
2027 q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
2028 }
2029
2030 let pending_keys = {
2031 let q = queues.read().await;
2032 q.pending_keys()
2033 };
2034
2035 if pending_keys.is_empty() {
2036 return;
2037 }
2038
2039 let self_id = *p2p_node.peer_id();
2040
2041 let mut local_paid_presence_probe_keys = Vec::new();
2044 let mut local_paid_paid_only_keys = Vec::new();
2045 let mut keys_needing_network = Vec::new();
2046 let mut terminal_keys: Vec<XorName> = Vec::new();
2047 {
2048 let mut q = queues.write().await;
2049 for key in &pending_keys {
2050 if paid_list.contains(key).unwrap_or(false) {
2051 if let Some(pipeline) =
2052 q.set_pending_state(key, VerificationState::PaidListVerified)
2053 {
2054 match pipeline {
2055 HintPipeline::PaidOnly => {
2056 local_paid_paid_only_keys.push(*key);
2061 }
2062 HintPipeline::Replica => {
2063 local_paid_presence_probe_keys.push(*key);
2068 }
2069 }
2070 }
2071 } else {
2072 keys_needing_network.push(*key);
2073 }
2074 }
2075 }
2076
2077 if !local_paid_paid_only_keys.is_empty() {
2078 let mut terminal_paid_only = Vec::new();
2079 for key in local_paid_paid_only_keys {
2080 if storage.exists(&key).unwrap_or(false) {
2081 terminal_paid_only.push(key);
2082 } else if admission::is_responsible(&self_id, &key, p2p_node, config.close_group_size)
2083 .await
2084 {
2085 local_paid_presence_probe_keys.push(key);
2086 } else {
2087 terminal_paid_only.push(key);
2088 }
2089 }
2090
2091 if !terminal_paid_only.is_empty() {
2092 let mut q = queues.write().await;
2093 for key in terminal_paid_only {
2094 q.remove_pending(&key);
2095 terminal_keys.push(key);
2096 }
2097 }
2098 }
2099
2100 if !local_paid_presence_probe_keys.is_empty() {
2104 let targets = quorum::compute_presence_targets(
2105 &local_paid_presence_probe_keys,
2106 p2p_node,
2107 config,
2108 &self_id,
2109 )
2110 .await;
2111 let evidence = quorum::run_verification_round(
2112 &local_paid_presence_probe_keys,
2113 &targets,
2114 p2p_node,
2115 config,
2116 )
2117 .await;
2118
2119 let mut q = queues.write().await;
2120 for key in local_paid_presence_probe_keys {
2121 if storage.exists(&key).unwrap_or(false) {
2122 q.remove_pending(&key);
2123 terminal_keys.push(key);
2124 continue;
2125 }
2126 let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
2127 quorum::present_sources_for_key(&key, ev, &targets)
2128 });
2129 if sources.is_empty() {
2130 q.remove_pending(&key);
2132 warn!(
2133 "Locally paid key {} has no responding holders (possible data loss)",
2134 hex::encode(key)
2135 );
2136 terminal_keys.push(key);
2137 } else {
2138 let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2139 let _ = q.promote_pending_to_fetch(key, distance, sources);
2143 }
2144 }
2145 }
2146
2147 if !keys_needing_network.is_empty() {
2149 let targets =
2151 quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
2152 .await;
2153
2154 let evidence =
2155 quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
2156
2157 let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
2160 {
2161 let q = queues.read().await;
2162 for key in &keys_needing_network {
2163 let Some(ev) = evidence.get(key) else {
2164 continue;
2165 };
2166 let Some(entry) = q.get_pending(key) else {
2167 continue;
2168 };
2169 let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
2170 evaluated.push((*key, outcome, entry.pipeline));
2171 }
2172 } let mut paid_insert_keys: Vec<XorName> = Vec::new();
2176 for (key, outcome, _) in &evaluated {
2177 if matches!(
2178 outcome,
2179 KeyVerificationOutcome::QuorumVerified { .. }
2180 | KeyVerificationOutcome::PaidListVerified { .. }
2181 ) {
2182 paid_insert_keys.push(*key);
2183 }
2184 }
2185 for key in &paid_insert_keys {
2186 if let Err(e) = paid_list.insert(key).await {
2187 warn!("Failed to add verified key to PaidForList: {e}");
2188 }
2189 }
2190
2191 let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
2196 for (key, outcome, pipeline) in &evaluated {
2197 if *pipeline == HintPipeline::PaidOnly
2198 && matches!(
2199 outcome,
2200 KeyVerificationOutcome::QuorumVerified { .. }
2201 | KeyVerificationOutcome::PaidListVerified { .. }
2202 )
2203 && !storage.exists(key).unwrap_or(false)
2204 && admission::is_responsible(&self_id, key, p2p_node, config.close_group_size).await
2205 {
2206 paid_only_fetch_keys.insert(*key);
2207 }
2208 }
2209
2210 let mut q = queues.write().await;
2212 for (key, outcome, pipeline) in evaluated {
2213 match outcome {
2214 KeyVerificationOutcome::QuorumVerified { sources }
2215 | KeyVerificationOutcome::PaidListVerified { sources } => {
2216 let fetch_eligible =
2217 pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
2218 if fetch_eligible && !sources.is_empty() {
2219 let distance =
2220 crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2221 let _ = q.promote_pending_to_fetch(key, distance, sources);
2225 } else if fetch_eligible && sources.is_empty() {
2228 warn!(
2229 "Verified responsible key {} has no holders (possible data loss)",
2230 hex::encode(key)
2231 );
2232 q.remove_pending(&key);
2233 terminal_keys.push(key);
2234 } else {
2235 q.remove_pending(&key);
2236 terminal_keys.push(key);
2237 }
2238 }
2239 KeyVerificationOutcome::QuorumFailed
2240 | KeyVerificationOutcome::QuorumInconclusive => {
2241 q.remove_pending(&key);
2242 terminal_keys.push(key);
2243 }
2244 }
2245 }
2246 }
2247
2248 update_bootstrap_after_verification(
2251 &terminal_keys,
2252 bootstrap_state,
2253 queues,
2254 is_bootstrapping,
2255 bootstrap_complete_notify,
2256 )
2257 .await;
2258}
2259
2260async fn update_bootstrap_after_verification(
2263 terminal_keys: &[XorName],
2264 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2265 queues: &Arc<RwLock<ReplicationQueues>>,
2266 is_bootstrapping: &Arc<RwLock<bool>>,
2267 bootstrap_complete_notify: &Arc<Notify>,
2268) {
2269 if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
2270 return;
2271 }
2272 {
2273 let mut bs = bootstrap_state.write().await;
2274 for key in terminal_keys {
2275 bs.remove_key(key);
2276 }
2277 }
2278 let q = queues.read().await;
2279 if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
2280 complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
2281 }
2282}
2283
2284async fn complete_bootstrap(
2286 is_bootstrapping: &Arc<RwLock<bool>>,
2287 bootstrap_complete_notify: &Arc<Notify>,
2288) {
2289 *is_bootstrapping.write().await = false;
2290 bootstrap_complete_notify.notify_waiters();
2291 info!("Replication bootstrap complete");
2292}
2293
2294enum FetchResult {
2300 Stored,
2302 IntegrityFailed,
2304 SourceFailed,
2306}
2307
2308struct FetchOutcome {
2311 key: XorName,
2312 result: FetchResult,
2313}
2314
2315#[allow(clippy::too_many_lines)]
2316async fn execute_single_fetch(
2322 p2p_node: Arc<P2PNode>,
2323 storage: Arc<LmdbStorage>,
2324 config: Arc<ReplicationConfig>,
2325 key: XorName,
2326 source: PeerId,
2327) -> FetchOutcome {
2328 let request = protocol::FetchRequest { key };
2329 let msg = ReplicationMessage {
2330 request_id: rand::thread_rng().gen::<u64>(),
2331 body: ReplicationMessageBody::FetchRequest(request),
2332 };
2333
2334 let encoded = match msg.encode() {
2335 Ok(data) => data,
2336 Err(e) => {
2337 warn!("Failed to encode fetch request: {e}");
2338 return FetchOutcome {
2339 key,
2340 result: FetchResult::SourceFailed,
2341 };
2342 }
2343 };
2344
2345 let result = p2p_node
2346 .send_request(
2347 &source,
2348 REPLICATION_PROTOCOL_ID,
2349 encoded,
2350 config.fetch_request_timeout,
2351 )
2352 .await;
2353
2354 match result {
2355 Ok(response) => {
2356 let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2357 p2p_node
2358 .report_trust_event(
2359 &source,
2360 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2361 )
2362 .await;
2363 return FetchOutcome {
2364 key,
2365 result: FetchResult::SourceFailed,
2366 };
2367 };
2368
2369 match resp_msg.body {
2370 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2371 key: resp_key,
2372 data,
2373 }) => {
2374 if resp_key != key {
2379 warn!(
2380 "Fetch response key mismatch: requested {}, got {}",
2381 hex::encode(key),
2382 hex::encode(resp_key)
2383 );
2384 p2p_node
2385 .report_trust_event(
2386 &source,
2387 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2388 )
2389 .await;
2390 return FetchOutcome {
2391 key,
2392 result: FetchResult::IntegrityFailed,
2393 };
2394 }
2395
2396 if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2400 warn!(
2401 "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2402 hex::encode(resp_key),
2403 data.len(),
2404 crate::ant_protocol::MAX_CHUNK_SIZE,
2405 );
2406 p2p_node
2407 .report_trust_event(
2408 &source,
2409 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2410 )
2411 .await;
2412 return FetchOutcome {
2413 key,
2414 result: FetchResult::IntegrityFailed,
2415 };
2416 }
2417
2418 let computed = crate::client::compute_address(&data);
2420 if computed != resp_key {
2421 warn!(
2422 "Fetched record integrity check failed: expected {}, got {}",
2423 hex::encode(resp_key),
2424 hex::encode(computed)
2425 );
2426 p2p_node
2427 .report_trust_event(
2428 &source,
2429 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2430 )
2431 .await;
2432 return FetchOutcome {
2433 key,
2434 result: FetchResult::IntegrityFailed,
2435 };
2436 }
2437
2438 if let Err(e) = storage.put(&resp_key, &data).await {
2439 warn!(
2440 "Failed to store fetched record {}: {e}",
2441 hex::encode(resp_key)
2442 );
2443 return FetchOutcome {
2444 key,
2445 result: FetchResult::SourceFailed,
2446 };
2447 }
2448
2449 FetchOutcome {
2450 key,
2451 result: FetchResult::Stored,
2452 }
2453 }
2454 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2455 ..
2456 }) => {
2457 warn!(
2463 "Fetch: verified source {source} returned NotFound for {}",
2464 hex::encode(key)
2465 );
2466 p2p_node
2467 .report_trust_event(
2468 &source,
2469 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2470 )
2471 .await;
2472 FetchOutcome {
2473 key,
2474 result: FetchResult::SourceFailed,
2475 }
2476 }
2477 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2478 reason,
2479 ..
2480 }) => {
2481 warn!(
2482 "Fetch: peer {source} returned error for {}: {reason}",
2483 hex::encode(key)
2484 );
2485 p2p_node
2486 .report_trust_event(
2487 &source,
2488 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2489 )
2490 .await;
2491 FetchOutcome {
2492 key,
2493 result: FetchResult::SourceFailed,
2494 }
2495 }
2496 _ => {
2497 p2p_node
2499 .report_trust_event(
2500 &source,
2501 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2502 )
2503 .await;
2504 FetchOutcome {
2505 key,
2506 result: FetchResult::SourceFailed,
2507 }
2508 }
2509 }
2510 }
2511 Err(e) => {
2512 debug!("Fetch request to {source} failed: {e}");
2513 FetchOutcome {
2516 key,
2517 result: FetchResult::SourceFailed,
2518 }
2519 }
2520 }
2521}
2522
2523async fn handle_audit_result(
2529 result: &AuditTickResult,
2530 p2p_node: &Arc<P2PNode>,
2531 sync_state: &Arc<RwLock<NeighborSyncState>>,
2532 config: &ReplicationConfig,
2533) {
2534 match result {
2535 AuditTickResult::Passed {
2536 challenged_peer,
2537 keys_checked,
2538 } => {
2539 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2540 {
2543 let mut state = sync_state.write().await;
2544 state.clear_active_bootstrap_claim(challenged_peer);
2545 }
2546 p2p_node
2547 .report_trust_event(
2548 challenged_peer,
2549 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2550 )
2551 .await;
2552 }
2553 AuditTickResult::Failed { evidence } => {
2554 if let FailureEvidence::AuditFailure {
2555 challenged_peer,
2556 confirmed_failed_keys,
2557 reason,
2558 ..
2559 } = evidence
2560 {
2561 error!(
2562 "Audit failure for {challenged_peer}: {} confirmed failed keys",
2563 confirmed_failed_keys.len()
2564 );
2565 if audit_failure_clears_bootstrap_claim(reason) {
2566 let mut state = sync_state.write().await;
2569 state.clear_active_bootstrap_claim(challenged_peer);
2570 } else {
2571 debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
2572 }
2573 p2p_node
2574 .report_trust_event(
2575 challenged_peer,
2576 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2577 )
2578 .await;
2579 }
2580 }
2581 AuditTickResult::BootstrapClaim { peer } => {
2582 let should_report = {
2586 let now = Instant::now();
2587 let mut state = sync_state.write().await;
2588 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
2589 {
2590 BootstrapClaimObservation::WithinGrace { .. } => {
2591 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2592 false
2593 }
2594 BootstrapClaimObservation::PastGrace { first_seen } => {
2595 warn!(
2596 "Audit: peer {peer} claiming bootstrap past grace period \
2597 ({:?} > {:?}), reporting abuse",
2598 now.duration_since(first_seen),
2599 config.bootstrap_claim_grace_period,
2600 );
2601 true
2602 }
2603 BootstrapClaimObservation::Repeated { first_seen } => {
2604 warn!(
2605 "Audit: peer {peer} repeated bootstrap claim after previously \
2606 stopping; first claim was {:?} ago, reporting abuse",
2607 now.duration_since(first_seen),
2608 );
2609 true
2610 }
2611 }
2612 };
2613 if should_report {
2614 p2p_node
2615 .report_trust_event(
2616 peer,
2617 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2618 )
2619 .await;
2620 }
2621 }
2622 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2623 }
2624}
2625
2626fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
2627 !matches!(reason, AuditFailureReason::Timeout)
2628}
2629
2630#[cfg(test)]
2633#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2634mod tests {
2635 use super::audit_failure_clears_bootstrap_claim;
2636 use crate::replication::types::AuditFailureReason;
2637
2638 #[test]
2639 fn audit_timeout_preserves_active_bootstrap_claim() {
2640 assert!(!audit_failure_clears_bootstrap_claim(
2641 &AuditFailureReason::Timeout
2642 ));
2643 }
2644
2645 #[test]
2646 fn decoded_audit_failures_clear_active_bootstrap_claim() {
2647 for reason in [
2648 AuditFailureReason::MalformedResponse,
2649 AuditFailureReason::DigestMismatch,
2650 AuditFailureReason::KeyAbsent,
2651 AuditFailureReason::Rejected,
2652 ] {
2653 assert!(
2654 audit_failure_clears_bootstrap_claim(&reason),
2655 "decoded non-bootstrap failure {reason:?} should clear active claim"
2656 );
2657 }
2658 }
2659}