1#![allow(clippy::significant_drop_tightening)]
16
17pub mod admission;
18pub mod audit;
19pub mod bootstrap;
20pub mod config;
21pub mod fresh;
22pub mod neighbor_sync;
23pub mod paid_list;
24pub mod protocol;
25pub mod pruning;
26pub mod quorum;
27pub mod scheduling;
28pub mod types;
29
30use std::collections::{HashMap, HashSet};
31use std::path::Path;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use std::pin::Pin;
36
37use crate::logging::{debug, error, info, warn};
38use futures::stream::FuturesUnordered;
39use futures::{Future, StreamExt};
40use rand::Rng;
41use tokio::sync::{mpsc, Notify, RwLock, Semaphore};
42use tokio::task::JoinHandle;
43use tokio_util::sync::CancellationToken;
44
45use crate::ant_protocol::XorName;
46use crate::error::{Error, Result};
47use crate::payment::{PaymentVerifier, VerificationContext};
48use crate::replication::audit::AuditTickResult;
49use crate::replication::config::{
50 max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
51 REPLICATION_PROTOCOL_ID,
52};
53use crate::replication::paid_list::PaidList;
54use crate::replication::protocol::{
55 FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
56 VerificationResponse,
57};
58use crate::replication::quorum::KeyVerificationOutcome;
59use crate::replication::scheduling::ReplicationQueues;
60use crate::replication::types::{
61 AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline,
62 NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState,
63};
64use crate::storage::LmdbStorage;
65use saorsa_core::identity::PeerId;
66use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent};
67
68const RR_PREFIX: &str = "/rr/";
74
75type FetchFuture = Pin<Box<dyn Future<Output = (XorName, Option<FetchOutcome>)> + Send>>;
77
78struct VerificationCycleContext<'a> {
80 p2p_node: &'a Arc<P2PNode>,
81 paid_list: &'a Arc<PaidList>,
82 storage: &'a Arc<LmdbStorage>,
83 queues: &'a Arc<RwLock<ReplicationQueues>>,
84 config: &'a ReplicationConfig,
85 bootstrap_state: &'a Arc<RwLock<BootstrapState>>,
86 is_bootstrapping: &'a Arc<RwLock<bool>>,
87 bootstrap_complete_notify: &'a Arc<Notify>,
88}
89
90const FETCH_WORKER_POLL_MS: u64 = 100;
92
93const VERIFICATION_WORKER_POLL_MS: u64 = 250;
95
96const BOOTSTRAP_DRAIN_CHECK_SECS: u64 = 5;
98
99const REPLICATION_TRUST_WEIGHT: f64 = 1.0;
105
106pub struct ReplicationEngine {
112 config: Arc<ReplicationConfig>,
114 p2p_node: Arc<P2PNode>,
116 storage: Arc<LmdbStorage>,
118 paid_list: Arc<PaidList>,
120 payment_verifier: Arc<PaymentVerifier>,
122 queues: Arc<RwLock<ReplicationQueues>>,
124 sync_state: Arc<RwLock<NeighborSyncState>>,
126 sync_history: Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
132 sync_cycle_epoch: Arc<RwLock<u64>>,
134 repair_proofs: Arc<RwLock<RepairProofs>>,
136 bootstrap_state: Arc<RwLock<BootstrapState>>,
138 is_bootstrapping: Arc<RwLock<bool>>,
140 sync_trigger: Arc<Notify>,
142 bootstrap_complete_notify: Arc<Notify>,
144 send_semaphore: Arc<Semaphore>,
147 fresh_write_rx: Option<mpsc::UnboundedReceiver<fresh::FreshWriteEvent>>,
152 shutdown: CancellationToken,
154 task_handles: Vec<JoinHandle<()>>,
156}
157
158impl ReplicationEngine {
159 pub async fn new(
166 config: ReplicationConfig,
167 p2p_node: Arc<P2PNode>,
168 storage: Arc<LmdbStorage>,
169 payment_verifier: Arc<PaymentVerifier>,
170 root_dir: &Path,
171 fresh_write_rx: mpsc::UnboundedReceiver<fresh::FreshWriteEvent>,
172 shutdown: CancellationToken,
173 ) -> Result<Self> {
174 config.validate().map_err(Error::Config)?;
175
176 let paid_list = Arc::new(
177 PaidList::new(root_dir)
178 .await
179 .map_err(|e| Error::Storage(format!("Failed to open PaidList: {e}")))?,
180 );
181
182 let initial_neighbors = NeighborSyncState::new_cycle(Vec::new());
183 let config = Arc::new(config);
184
185 Ok(Self {
186 config: Arc::clone(&config),
187 p2p_node,
188 storage,
189 paid_list,
190 payment_verifier,
191 queues: Arc::new(RwLock::new(ReplicationQueues::new())),
192 sync_state: Arc::new(RwLock::new(initial_neighbors)),
193 sync_history: Arc::new(RwLock::new(HashMap::new())),
194 sync_cycle_epoch: Arc::new(RwLock::new(0)),
195 repair_proofs: Arc::new(RwLock::new(RepairProofs::new())),
196 bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())),
197 is_bootstrapping: Arc::new(RwLock::new(true)),
198 sync_trigger: Arc::new(Notify::new()),
199 bootstrap_complete_notify: Arc::new(Notify::new()),
200 send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)),
201 fresh_write_rx: Some(fresh_write_rx),
202 shutdown,
203 task_handles: Vec::new(),
204 })
205 }
206
207 #[must_use]
209 pub fn paid_list(&self) -> &Arc<PaidList> {
210 &self.paid_list
211 }
212
213 pub fn start(&mut self, dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>) {
219 if !self.task_handles.is_empty() {
220 error!("ReplicationEngine::start() called while already running — ignoring");
221 return;
222 }
223 info!("Starting replication engine");
224
225 self.start_message_handler();
226 self.start_neighbor_sync_loop();
227 self.start_self_lookup_loop();
228 self.start_audit_loop();
229 self.start_fetch_worker();
230 self.start_verification_worker();
231 self.start_bootstrap_sync(dht_events);
232 self.start_fresh_write_drainer();
233
234 info!(
235 "Replication engine started with {} background tasks",
236 self.task_handles.len()
237 );
238 }
239
240 pub async fn is_bootstrapping(&self) -> bool {
245 *self.is_bootstrapping.read().await
246 }
247
248 pub async fn wait_for_bootstrap_complete(&self, timeout: Duration) -> bool {
257 let notified = self.bootstrap_complete_notify.notified();
260 tokio::pin!(notified);
261 notified.as_mut().enable();
262
263 if !*self.is_bootstrapping.read().await {
264 return true;
265 }
266
267 tokio::time::timeout(timeout, notified).await.is_ok()
268 }
269
270 pub async fn shutdown(&mut self) {
276 self.shutdown.cancel();
277 for (i, mut handle) in self.task_handles.drain(..).enumerate() {
278 match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
279 Ok(Ok(())) => {}
280 Ok(Err(e)) if e.is_cancelled() => {}
281 Ok(Err(e)) => warn!("Replication task {i} panicked during shutdown: {e}"),
282 Err(_) => {
283 warn!("Replication task {i} did not stop within 10s, aborting");
284 handle.abort();
285 }
286 }
287 }
288 }
289
290 pub fn trigger_neighbor_sync(&self) {
296 self.sync_trigger.notify_one();
297 }
298
299 pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) {
301 fresh::replicate_fresh(
302 key,
303 data,
304 proof_of_payment,
305 &self.p2p_node,
306 &self.paid_list,
307 &self.config,
308 &self.send_semaphore,
309 )
310 .await;
311 }
312
313 fn start_fresh_write_drainer(&mut self) {
320 let Some(mut rx) = self.fresh_write_rx.take() else {
321 return;
322 };
323 let p2p = Arc::clone(&self.p2p_node);
324 let paid_list = Arc::clone(&self.paid_list);
325 let config = Arc::clone(&self.config);
326 let send_semaphore = Arc::clone(&self.send_semaphore);
327 let shutdown = self.shutdown.clone();
328
329 let handle = tokio::spawn(async move {
330 loop {
331 tokio::select! {
332 () = shutdown.cancelled() => break,
333 event = rx.recv() => {
334 let Some(event) = event else { break };
335 fresh::replicate_fresh(
336 &event.key,
337 &event.data,
338 &event.payment_proof,
339 &p2p,
340 &paid_list,
341 &config,
342 &send_semaphore,
343 )
344 .await;
345 }
346 }
347 }
348 debug!("Fresh-write drainer shut down");
349 });
350 self.task_handles.push(handle);
351 }
352
353 #[allow(clippy::too_many_lines)]
354 fn start_message_handler(&mut self) {
355 let mut p2p_events = self.p2p_node.subscribe_events();
356 let mut dht_events = self.p2p_node.dht_manager().subscribe_events();
357 let p2p = Arc::clone(&self.p2p_node);
358 let storage = Arc::clone(&self.storage);
359 let paid_list = Arc::clone(&self.paid_list);
360 let payment_verifier = Arc::clone(&self.payment_verifier);
361 let queues = Arc::clone(&self.queues);
362 let config = Arc::clone(&self.config);
363 let shutdown = self.shutdown.clone();
364 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
365 let bootstrap_state = Arc::clone(&self.bootstrap_state);
366 let sync_history = Arc::clone(&self.sync_history);
367 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
368 let repair_proofs = Arc::clone(&self.repair_proofs);
369 let sync_trigger = Arc::clone(&self.sync_trigger);
370
371 let handle = tokio::spawn(async move {
372 loop {
373 tokio::select! {
374 () = shutdown.cancelled() => break,
375 event = p2p_events.recv() => {
376 let Ok(event) = event else { continue };
377 if let P2PEvent::Message {
378 topic,
379 source: Some(source),
380 data,
381 ..
382 } = event {
383 let rr_info = if topic == REPLICATION_PROTOCOL_ID {
387 Some((data.clone(), None))
388 } else if topic.starts_with(RR_PREFIX)
389 && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID
390 {
391 P2PNode::parse_request_envelope(&data)
392 .filter(|(_, is_resp, _)| !is_resp)
393 .map(|(msg_id, _, payload)| (payload, Some(msg_id)))
394 } else {
395 None
396 };
397 if let Some((payload, rr_message_id)) = rr_info {
398 match handle_replication_message(
399 &source,
400 &payload,
401 &p2p,
402 &storage,
403 &paid_list,
404 &payment_verifier,
405 &queues,
406 &config,
407 &is_bootstrapping,
408 &bootstrap_state,
409 &sync_history,
410 &sync_cycle_epoch,
411 &repair_proofs,
412 rr_message_id.as_deref(),
413 ).await {
414 Ok(()) => {}
415 Err(e) => {
416 debug!(
417 "Replication message from {source} error: {e}"
418 );
419 }
420 }
421 }
422 }
423 }
424 dht_event = dht_events.recv() => {
432 let Ok(dht_event) = dht_event else { continue };
433 match dht_event {
434 DhtNetworkEvent::KClosestPeersChanged { .. } => {
435 debug!(
436 "K-closest peers changed, triggering early neighbor sync"
437 );
438 sync_trigger.notify_one();
439 }
440 DhtNetworkEvent::PeerRemoved { peer_id } => {
441 repair_proofs.write().await.remove_peer(&peer_id);
442 }
443 _ => {}
444 }
445 }
446 }
447 }
448 debug!("Replication message handler shut down");
449 });
450 self.task_handles.push(handle);
451 }
452
453 fn start_neighbor_sync_loop(&mut self) {
454 let p2p = Arc::clone(&self.p2p_node);
455 let storage = Arc::clone(&self.storage);
456 let paid_list = Arc::clone(&self.paid_list);
457 let queues = Arc::clone(&self.queues);
458 let config = Arc::clone(&self.config);
459 let shutdown = self.shutdown.clone();
460 let sync_state = Arc::clone(&self.sync_state);
461 let sync_history = Arc::clone(&self.sync_history);
462 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
463 let repair_proofs = Arc::clone(&self.repair_proofs);
464 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
465 let bootstrap_state = Arc::clone(&self.bootstrap_state);
466 let sync_trigger = Arc::clone(&self.sync_trigger);
467
468 let handle = tokio::spawn(async move {
469 loop {
470 let interval = config.random_neighbor_sync_interval();
471 tokio::select! {
472 () = shutdown.cancelled() => break,
473 () = tokio::time::sleep(interval) => {}
474 () = sync_trigger.notified() => {
475 debug!("Neighbor sync triggered by topology change");
476 }
477 }
478 tokio::select! {
482 () = shutdown.cancelled() => break,
483 () = run_neighbor_sync_round(
484 &p2p,
485 &storage,
486 &paid_list,
487 &queues,
488 &config,
489 &sync_state,
490 &sync_history,
491 &sync_cycle_epoch,
492 &repair_proofs,
493 &is_bootstrapping,
494 &bootstrap_state,
495 ) => {}
496 }
497 }
498 debug!("Neighbor sync loop shut down");
499 });
500 self.task_handles.push(handle);
501 }
502
503 fn start_self_lookup_loop(&mut self) {
504 let p2p = Arc::clone(&self.p2p_node);
505 let config = Arc::clone(&self.config);
506 let shutdown = self.shutdown.clone();
507
508 let handle = tokio::spawn(async move {
509 loop {
510 let interval = config.random_self_lookup_interval();
511 tokio::select! {
512 () = shutdown.cancelled() => break,
513 () = tokio::time::sleep(interval) => {
514 if let Err(e) = p2p.dht_manager().trigger_self_lookup().await {
515 debug!("Self-lookup failed: {e}");
516 }
517 }
518 }
519 }
520 debug!("Self-lookup loop shut down");
521 });
522 self.task_handles.push(handle);
523 }
524
525 fn start_audit_loop(&mut self) {
526 let p2p = Arc::clone(&self.p2p_node);
527 let storage = Arc::clone(&self.storage);
528 let config = Arc::clone(&self.config);
529 let shutdown = self.shutdown.clone();
530 let sync_history = Arc::clone(&self.sync_history);
531 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
532 let repair_proofs = Arc::clone(&self.repair_proofs);
533 let bootstrap_state = Arc::clone(&self.bootstrap_state);
534 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
535 let sync_state = Arc::clone(&self.sync_state);
536
537 let handle = tokio::spawn(async move {
538 loop {
540 tokio::select! {
541 () = shutdown.cancelled() => return,
542 () = tokio::time::sleep(
543 std::time::Duration::from_secs(BOOTSTRAP_DRAIN_CHECK_SECS)
544 ) => {
545 if bootstrap_state.read().await.is_drained() {
546 break;
547 }
548 }
549 }
550 }
551
552 {
554 let bootstrapping = *is_bootstrapping.read().await;
555 let result = {
556 let history = sync_history.read().await;
557 let current_sync_epoch = *sync_cycle_epoch.read().await;
558 audit::audit_tick_with_repair_proofs(
559 &p2p,
560 &storage,
561 &config,
562 &history,
563 &repair_proofs,
564 current_sync_epoch,
565 bootstrapping,
566 )
567 .await
568 };
569 handle_audit_result(&result, &p2p, &sync_state, &config).await;
570 }
571
572 loop {
574 let interval = config.random_audit_tick_interval();
575 tokio::select! {
576 () = shutdown.cancelled() => break,
577 () = tokio::time::sleep(interval) => {
578 let bootstrapping = *is_bootstrapping.read().await;
579 let result = {
580 let history = sync_history.read().await;
581 let current_sync_epoch = *sync_cycle_epoch.read().await;
582 audit::audit_tick_with_repair_proofs(
583 &p2p,
584 &storage,
585 &config,
586 &history,
587 &repair_proofs,
588 current_sync_epoch,
589 bootstrapping,
590 )
591 .await
592 };
593 handle_audit_result(&result, &p2p, &sync_state, &config).await;
594 }
595 }
596 }
597 debug!("Audit loop shut down");
598 });
599 self.task_handles.push(handle);
600 }
601
602 #[allow(clippy::too_many_lines, clippy::option_if_let_else)]
603 fn start_fetch_worker(&mut self) {
604 let p2p = Arc::clone(&self.p2p_node);
605 let storage = Arc::clone(&self.storage);
606 let queues = Arc::clone(&self.queues);
607 let config = Arc::clone(&self.config);
608 let shutdown = self.shutdown.clone();
609 let bootstrap_state = Arc::clone(&self.bootstrap_state);
610 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
611 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
612 let concurrency = max_parallel_fetch();
613
614 info!("Fetch worker concurrency set to {concurrency} (hardware threads)");
615
616 let handle = tokio::spawn(async move {
617 let mut in_flight = FuturesUnordered::<FetchFuture>::new();
620
621 loop {
622 {
624 let mut q = queues.write().await;
625 while in_flight.len() < concurrency {
626 let Some(candidate) = q.dequeue_fetch() else {
627 break;
628 };
629 let Some(&source) = candidate.sources.first() else {
630 warn!(
631 "Fetch candidate {} has no sources — dropping",
632 hex::encode(candidate.key)
633 );
634 continue;
635 };
636 q.start_fetch(candidate.key, source, candidate.sources.clone());
637
638 let p2p = Arc::clone(&p2p);
639 let storage = Arc::clone(&storage);
640 let config = Arc::clone(&config);
641 let token = shutdown.clone();
642 let fetch_key = candidate.key;
643 in_flight.push(Box::pin(async move {
644 let handle = tokio::spawn(async move {
645 tokio::select! {
647 () = token.cancelled() => FetchOutcome {
648 key: fetch_key,
649 result: FetchResult::SourceFailed,
650 },
651 outcome = execute_single_fetch(
652 p2p, storage, config, fetch_key, source,
653 ) => outcome,
654 }
655 });
656 match handle.await {
657 Ok(outcome) => (outcome.key, Some(outcome)),
658 Err(e) => {
659 error!(
660 "Fetch task for {} panicked: {e}",
661 hex::encode(fetch_key)
662 );
663 (fetch_key, None)
664 }
665 }
666 }));
667 }
668 } if in_flight.is_empty() {
671 tokio::select! {
673 () = shutdown.cancelled() => break,
674 () = tokio::time::sleep(
675 std::time::Duration::from_millis(FETCH_WORKER_POLL_MS)
676 ) => continue,
677 }
678 }
679
680 tokio::select! {
682 () = shutdown.cancelled() => break,
683 Some((key, maybe_outcome)) = in_flight.next() => {
684 let mut q = queues.write().await;
685 let terminal = if let Some(outcome) = maybe_outcome {
686 match outcome.result {
687 FetchResult::Stored => {
688 q.complete_fetch(&key);
689 true
690 }
691 FetchResult::IntegrityFailed | FetchResult::SourceFailed => {
692 if let Some(next_peer) = q.retry_fetch(&key) {
693 let p2p = Arc::clone(&p2p);
695 let storage = Arc::clone(&storage);
696 let config = Arc::clone(&config);
697 let token = shutdown.clone();
698 let fetch_key = key;
699 in_flight.push(Box::pin(async move {
700 let handle = tokio::spawn(async move {
701 tokio::select! {
702 () = token.cancelled() => FetchOutcome {
703 key: fetch_key,
704 result: FetchResult::SourceFailed,
705 },
706 outcome = execute_single_fetch(
707 p2p, storage, config, fetch_key, next_peer,
708 ) => outcome,
709 }
710 });
711 match handle.await {
712 Ok(outcome) => (outcome.key, Some(outcome)),
713 Err(e) => {
714 error!(
715 "Fetch task for {} panicked: {e}",
716 hex::encode(fetch_key)
717 );
718 (fetch_key, None)
719 }
720 }
721 }));
722 false
723 } else {
724 q.complete_fetch(&key);
725 true
726 }
727 }
728 }
729 } else {
730 q.complete_fetch(&key);
732 true
733 };
734
735 if terminal {
737 drop(q); if !bootstrap_state.read().await.is_drained() {
739 bootstrap_state.write().await.remove_key(&key);
740 let q = queues.read().await;
741 if bootstrap::check_bootstrap_drained(
742 &bootstrap_state,
743 &q,
744 )
745 .await
746 {
747 complete_bootstrap(
748 &is_bootstrapping,
749 &bootstrap_complete_notify,
750 ).await;
751 }
752 }
753 }
754 }
755 }
756 }
757
758 while in_flight.next().await.is_some() {}
762 debug!("Fetch worker shut down");
763 });
764 self.task_handles.push(handle);
765 }
766
767 fn start_verification_worker(&mut self) {
768 let p2p = Arc::clone(&self.p2p_node);
769 let storage = Arc::clone(&self.storage);
770 let queues = Arc::clone(&self.queues);
771 let paid_list = Arc::clone(&self.paid_list);
772 let config = Arc::clone(&self.config);
773 let shutdown = self.shutdown.clone();
774 let bootstrap_state = Arc::clone(&self.bootstrap_state);
775 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
776 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
777
778 let handle = tokio::spawn(async move {
779 loop {
780 tokio::select! {
781 () = shutdown.cancelled() => break,
782 () = tokio::time::sleep(
783 std::time::Duration::from_millis(VERIFICATION_WORKER_POLL_MS)
784 ) => {
785 let ctx = VerificationCycleContext {
786 p2p_node: &p2p,
787 paid_list: &paid_list,
788 storage: &storage,
789 queues: &queues,
790 config: &config,
791 bootstrap_state: &bootstrap_state,
792 is_bootstrapping: &is_bootstrapping,
793 bootstrap_complete_notify: &bootstrap_complete_notify,
794 };
795 run_verification_cycle(ctx).await;
796 }
797 }
798 }
799 debug!("Verification worker shut down");
800 });
801 self.task_handles.push(handle);
802 }
803
804 #[allow(clippy::too_many_lines)]
816 fn start_bootstrap_sync(
817 &mut self,
818 dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
819 ) {
820 let p2p = Arc::clone(&self.p2p_node);
821 let storage = Arc::clone(&self.storage);
822 let paid_list = Arc::clone(&self.paid_list);
823 let queues = Arc::clone(&self.queues);
824 let config = Arc::clone(&self.config);
825 let shutdown = self.shutdown.clone();
826 let is_bootstrapping = Arc::clone(&self.is_bootstrapping);
827 let bootstrap_state = Arc::clone(&self.bootstrap_state);
828 let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify);
829 let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch);
830 let repair_proofs = Arc::clone(&self.repair_proofs);
831
832 let handle = tokio::spawn(async move {
833 let gate = bootstrap::wait_for_bootstrap_complete(
837 dht_events,
838 config.bootstrap_complete_timeout_secs,
839 &shutdown,
840 )
841 .await;
842
843 if gate == bootstrap::BootstrapGateResult::Shutdown {
844 return;
845 }
846
847 let self_id = *p2p.peer_id();
848 let neighbors =
849 neighbor_sync::snapshot_close_neighbors(&p2p, &self_id, config.neighbor_sync_scope)
850 .await;
851
852 if neighbors.is_empty() {
853 info!("Bootstrap sync: no close neighbors found, marking drained");
854 bootstrap::mark_bootstrap_drained(&bootstrap_state).await;
855 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
856 return;
857 }
858
859 let neighbor_count = neighbors.len();
860 info!("Bootstrap sync: syncing with {neighbor_count} close neighbors");
861
862 for batch in neighbors.chunks(config.neighbor_sync_peer_count) {
864 if shutdown.is_cancelled() {
865 break;
866 }
867
868 for peer in batch {
869 if shutdown.is_cancelled() {
870 break;
871 }
872
873 let bootstrapping = *is_bootstrapping.read().await;
875
876 bootstrap::increment_pending_requests(&bootstrap_state, 1).await;
877
878 let outcome = neighbor_sync::sync_with_peer_with_outcome(
879 peer,
880 &p2p,
881 &storage,
882 &paid_list,
883 &config,
884 bootstrapping,
885 )
886 .await;
887
888 bootstrap::decrement_pending_requests(&bootstrap_state, 1).await;
889
890 if let Some(outcome) = outcome {
891 if !outcome.response.bootstrapping {
892 record_sent_replica_hints(
893 peer,
894 &outcome.sent_replica_hints,
895 &repair_proofs,
896 &sync_cycle_epoch,
897 )
898 .await;
899 let outcome = admit_and_queue_hints(
901 &self_id,
902 peer,
903 &outcome.response.replica_hints,
904 &outcome.response.paid_hints,
905 &p2p,
906 &config,
907 &storage,
908 &paid_list,
909 &queues,
910 )
911 .await;
912
913 if !outcome.discovered.is_empty() {
915 bootstrap::track_discovered_keys(
916 &bootstrap_state,
917 &outcome.discovered,
918 )
919 .await;
920 }
921
922 if outcome.capacity_rejected_count > 0 {
927 bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await;
928 } else {
929 bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await;
930 }
931 }
932 }
933 }
934 }
935
936 {
938 let q = queues.read().await;
939 if bootstrap::check_bootstrap_drained(&bootstrap_state, &q).await {
940 complete_bootstrap(&is_bootstrapping, &bootstrap_complete_notify).await;
941 }
942 }
943
944 info!("Bootstrap sync completed");
945 });
946 self.task_handles.push(handle);
947 }
948}
949
950#[allow(clippy::too_many_arguments)]
960async fn handle_replication_message(
961 source: &PeerId,
962 data: &[u8],
963 p2p_node: &Arc<P2PNode>,
964 storage: &Arc<LmdbStorage>,
965 paid_list: &Arc<PaidList>,
966 payment_verifier: &Arc<PaymentVerifier>,
967 queues: &Arc<RwLock<ReplicationQueues>>,
968 config: &ReplicationConfig,
969 is_bootstrapping: &Arc<RwLock<bool>>,
970 bootstrap_state: &Arc<RwLock<BootstrapState>>,
971 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
972 sync_cycle_epoch: &Arc<RwLock<u64>>,
973 repair_proofs: &Arc<RwLock<RepairProofs>>,
974 rr_message_id: Option<&str>,
975) -> Result<()> {
976 let msg = ReplicationMessage::decode(data)
977 .map_err(|e| Error::Protocol(format!("Failed to decode replication message: {e}")))?;
978
979 match msg.body {
980 ReplicationMessageBody::FreshReplicationOffer(ref offer) => {
981 handle_fresh_offer(
982 source,
983 offer,
984 storage,
985 paid_list,
986 payment_verifier,
987 p2p_node,
988 config,
989 msg.request_id,
990 rr_message_id,
991 )
992 .await
993 }
994 ReplicationMessageBody::PaidNotify(ref notify) => {
995 handle_paid_notify(
996 source,
997 notify,
998 paid_list,
999 payment_verifier,
1000 p2p_node,
1001 config,
1002 )
1003 .await
1004 }
1005 ReplicationMessageBody::NeighborSyncRequest(ref request) => {
1006 let bootstrapping = *is_bootstrapping.read().await;
1007 handle_neighbor_sync_request(
1008 source,
1009 request,
1010 p2p_node,
1011 storage,
1012 paid_list,
1013 queues,
1014 config,
1015 bootstrapping,
1016 bootstrap_state,
1017 sync_history,
1018 sync_cycle_epoch,
1019 repair_proofs,
1020 msg.request_id,
1021 rr_message_id,
1022 )
1023 .await
1024 }
1025 ReplicationMessageBody::VerificationRequest(ref request) => {
1026 handle_verification_request(
1027 source,
1028 request,
1029 storage,
1030 paid_list,
1031 p2p_node,
1032 msg.request_id,
1033 rr_message_id,
1034 )
1035 .await
1036 }
1037 ReplicationMessageBody::FetchRequest(ref request) => {
1038 handle_fetch_request(
1039 source,
1040 request,
1041 storage,
1042 p2p_node,
1043 msg.request_id,
1044 rr_message_id,
1045 )
1046 .await
1047 }
1048 ReplicationMessageBody::AuditChallenge(ref challenge) => {
1049 let bootstrapping = *is_bootstrapping.read().await;
1050 handle_audit_challenge_msg(
1051 source,
1052 challenge,
1053 storage,
1054 p2p_node,
1055 bootstrapping,
1056 msg.request_id,
1057 rr_message_id,
1058 )
1059 .await
1060 }
1061 ReplicationMessageBody::FreshReplicationResponse(_)
1063 | ReplicationMessageBody::NeighborSyncResponse(_)
1064 | ReplicationMessageBody::VerificationResponse(_)
1065 | ReplicationMessageBody::FetchResponse(_)
1066 | ReplicationMessageBody::AuditResponse(_) => Ok(()),
1067 }
1068}
1069
1070#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1075async fn handle_fresh_offer(
1076 source: &PeerId,
1077 offer: &protocol::FreshReplicationOffer,
1078 storage: &Arc<LmdbStorage>,
1079 paid_list: &Arc<PaidList>,
1080 payment_verifier: &Arc<PaymentVerifier>,
1081 p2p_node: &Arc<P2PNode>,
1082 config: &ReplicationConfig,
1083 request_id: u64,
1084 rr_message_id: Option<&str>,
1085) -> Result<()> {
1086 let self_id = *p2p_node.peer_id();
1087
1088 if offer.proof_of_payment.is_empty() {
1090 send_replication_response(
1091 source,
1092 p2p_node,
1093 request_id,
1094 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1095 key: offer.key,
1096 reason: "Missing proof of payment".to_string(),
1097 }),
1098 rr_message_id,
1099 )
1100 .await;
1101 return Ok(());
1102 }
1103
1104 if offer.data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
1108 warn!(
1109 "Rejecting fresh offer for key {}: data size {} exceeds MAX_CHUNK_SIZE {}",
1110 hex::encode(offer.key),
1111 offer.data.len(),
1112 crate::ant_protocol::MAX_CHUNK_SIZE,
1113 );
1114 p2p_node
1115 .report_trust_event(
1116 source,
1117 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1118 )
1119 .await;
1120 send_replication_response(
1121 source,
1122 p2p_node,
1123 request_id,
1124 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1125 key: offer.key,
1126 reason: format!(
1127 "Data size {} exceeds maximum chunk size {}",
1128 offer.data.len(),
1129 crate::ant_protocol::MAX_CHUNK_SIZE,
1130 ),
1131 }),
1132 rr_message_id,
1133 )
1134 .await;
1135 return Ok(());
1136 }
1137
1138 if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
1140 send_replication_response(
1141 source,
1142 p2p_node,
1143 request_id,
1144 ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
1145 key: offer.key,
1146 reason: "Not responsible for this key".to_string(),
1147 }),
1148 rr_message_id,
1149 )
1150 .await;
1151 return Ok(());
1152 }
1153
1154 match payment_verifier
1161 .verify_payment(
1162 &offer.key,
1163 Some(&offer.proof_of_payment),
1164 VerificationContext::Replication,
1165 )
1166 .await
1167 {
1168 Ok(status) if status.can_store() => {
1169 debug!(
1170 "PoP validated for fresh offer key {}",
1171 hex::encode(offer.key)
1172 );
1173 }
1174 Ok(_) => {
1175 send_replication_response(
1176 source,
1177 p2p_node,
1178 request_id,
1179 ReplicationMessageBody::FreshReplicationResponse(
1180 FreshReplicationResponse::Rejected {
1181 key: offer.key,
1182 reason: "Payment verification failed: payment required".to_string(),
1183 },
1184 ),
1185 rr_message_id,
1186 )
1187 .await;
1188 return Ok(());
1189 }
1190 Err(e) => {
1191 warn!(
1192 "PoP verification error for key {}: {e}",
1193 hex::encode(offer.key)
1194 );
1195 send_replication_response(
1196 source,
1197 p2p_node,
1198 request_id,
1199 ReplicationMessageBody::FreshReplicationResponse(
1200 FreshReplicationResponse::Rejected {
1201 key: offer.key,
1202 reason: format!("Payment verification error: {e}"),
1203 },
1204 ),
1205 rr_message_id,
1206 )
1207 .await;
1208 return Ok(());
1209 }
1210 }
1211
1212 if let Err(e) = paid_list.insert(&offer.key).await {
1214 warn!("Failed to add key to PaidForList: {e}");
1215 }
1216
1217 match storage.put(&offer.key, &offer.data).await {
1219 Ok(_) => {
1220 send_replication_response(
1221 source,
1222 p2p_node,
1223 request_id,
1224 ReplicationMessageBody::FreshReplicationResponse(
1225 FreshReplicationResponse::Accepted { key: offer.key },
1226 ),
1227 rr_message_id,
1228 )
1229 .await;
1230 }
1231 Err(e) => {
1232 send_replication_response(
1233 source,
1234 p2p_node,
1235 request_id,
1236 ReplicationMessageBody::FreshReplicationResponse(
1237 FreshReplicationResponse::Rejected {
1238 key: offer.key,
1239 reason: format!("Storage error: {e}"),
1240 },
1241 ),
1242 rr_message_id,
1243 )
1244 .await;
1245 }
1246 }
1247
1248 Ok(())
1249}
1250
1251async fn handle_paid_notify(
1252 _source: &PeerId,
1253 notify: &protocol::PaidNotify,
1254 paid_list: &Arc<PaidList>,
1255 payment_verifier: &Arc<PaymentVerifier>,
1256 p2p_node: &Arc<P2PNode>,
1257 config: &ReplicationConfig,
1258) -> Result<()> {
1259 let self_id = *p2p_node.peer_id();
1260
1261 if notify.proof_of_payment.is_empty() {
1263 return Ok(());
1264 }
1265
1266 if !admission::is_in_paid_close_group(
1268 &self_id,
1269 ¬ify.key,
1270 p2p_node,
1271 config.paid_list_close_group_size,
1272 )
1273 .await
1274 {
1275 return Ok(());
1276 }
1277
1278 match payment_verifier
1281 .verify_payment(
1282 ¬ify.key,
1283 Some(¬ify.proof_of_payment),
1284 VerificationContext::Replication,
1285 )
1286 .await
1287 {
1288 Ok(status) if status.can_store() => {
1289 debug!(
1290 "PoP validated for paid notify key {}",
1291 hex::encode(notify.key)
1292 );
1293 }
1294 Ok(_) => {
1295 warn!(
1296 "Paid notify rejected: payment required for key {}",
1297 hex::encode(notify.key)
1298 );
1299 return Ok(());
1300 }
1301 Err(e) => {
1302 warn!(
1303 "PoP verification error for paid notify key {}: {e}",
1304 hex::encode(notify.key)
1305 );
1306 return Ok(());
1307 }
1308 }
1309
1310 if let Err(e) = paid_list.insert(¬ify.key).await {
1311 warn!("Failed to add paid notify key to PaidForList: {e}");
1312 }
1313
1314 Ok(())
1315}
1316
1317#[allow(clippy::too_many_arguments)]
1318async fn handle_neighbor_sync_request(
1319 source: &PeerId,
1320 request: &protocol::NeighborSyncRequest,
1321 p2p_node: &Arc<P2PNode>,
1322 storage: &Arc<LmdbStorage>,
1323 paid_list: &Arc<PaidList>,
1324 queues: &Arc<RwLock<ReplicationQueues>>,
1325 config: &ReplicationConfig,
1326 is_bootstrapping: bool,
1327 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1328 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1329 sync_cycle_epoch: &Arc<RwLock<u64>>,
1330 repair_proofs: &Arc<RwLock<RepairProofs>>,
1331 request_id: u64,
1332 rr_message_id: Option<&str>,
1333) -> Result<()> {
1334 let self_id = *p2p_node.peer_id();
1335
1336 let (response, sent_replica_hints, sender_in_rt) =
1344 neighbor_sync::handle_sync_request_with_proofs(
1345 source,
1346 request,
1347 p2p_node,
1348 storage,
1349 paid_list,
1350 config,
1351 is_bootstrapping,
1352 )
1353 .await;
1354
1355 let response_sent = send_replication_response_checked(
1357 source,
1358 p2p_node,
1359 request_id,
1360 ReplicationMessageBody::NeighborSyncResponse(response),
1361 rr_message_id,
1362 )
1363 .await;
1364
1365 if !sender_in_rt {
1367 return Ok(());
1368 }
1369
1370 {
1373 let mut history = sync_history.write().await;
1374 let record = history.entry(*source).or_insert(PeerSyncRecord {
1375 last_sync: None,
1376 cycles_since_sync: 0,
1377 });
1378 record.last_sync = Some(Instant::now());
1379 record.cycles_since_sync = 0;
1380 }
1381
1382 if response_sent && !request.bootstrapping {
1383 record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch)
1384 .await;
1385 }
1386
1387 let outcome = admit_and_queue_hints(
1389 &self_id,
1390 source,
1391 &request.replica_hints,
1392 &request.paid_hints,
1393 p2p_node,
1394 config,
1395 storage,
1396 paid_list,
1397 queues,
1398 )
1399 .await;
1400
1401 if is_bootstrapping {
1406 if !outcome.discovered.is_empty() {
1407 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1408 }
1409 if outcome.capacity_rejected_count > 0 {
1410 bootstrap::note_capacity_rejected(bootstrap_state, *source).await;
1411 } else {
1412 bootstrap::clear_capacity_rejected(bootstrap_state, source).await;
1413 }
1414 }
1415
1416 Ok(())
1417}
1418
1419async fn handle_verification_request(
1420 source: &PeerId,
1421 request: &protocol::VerificationRequest,
1422 storage: &Arc<LmdbStorage>,
1423 paid_list: &Arc<PaidList>,
1424 p2p_node: &Arc<P2PNode>,
1425 request_id: u64,
1426 rr_message_id: Option<&str>,
1427) -> Result<()> {
1428 #[allow(clippy::cast_possible_truncation)]
1434 let keys_len = request.keys.len() as u32;
1435 let paid_check_set: HashSet<u32> = request
1436 .paid_list_check_indices
1437 .iter()
1438 .copied()
1439 .filter(|&idx| {
1440 if idx >= keys_len {
1441 warn!(
1442 "Verification request from {source}: paid_list_check_index {idx} out of bounds (keys.len() = {})",
1443 request.keys.len(),
1444 );
1445 false
1446 } else {
1447 true
1448 }
1449 })
1450 .collect();
1451
1452 let mut results = Vec::with_capacity(request.keys.len());
1453 for (i, key) in request.keys.iter().enumerate() {
1454 let present = storage.exists(key).unwrap_or(false);
1455 let paid = if paid_check_set.contains(&u32::try_from(i).unwrap_or(u32::MAX)) {
1456 Some(paid_list.contains(key).unwrap_or(false))
1457 } else {
1458 None
1459 };
1460 results.push(protocol::KeyVerificationResult {
1461 key: *key,
1462 present,
1463 paid,
1464 });
1465 }
1466
1467 send_replication_response(
1468 source,
1469 p2p_node,
1470 request_id,
1471 ReplicationMessageBody::VerificationResponse(VerificationResponse { results }),
1472 rr_message_id,
1473 )
1474 .await;
1475
1476 Ok(())
1477}
1478
1479async fn handle_fetch_request(
1480 source: &PeerId,
1481 request: &protocol::FetchRequest,
1482 storage: &Arc<LmdbStorage>,
1483 p2p_node: &Arc<P2PNode>,
1484 request_id: u64,
1485 rr_message_id: Option<&str>,
1486) -> Result<()> {
1487 let response = match storage.get(&request.key).await {
1488 Ok(Some(data)) => protocol::FetchResponse::Success {
1489 key: request.key,
1490 data,
1491 },
1492 Ok(None) => protocol::FetchResponse::NotFound { key: request.key },
1493 Err(e) => protocol::FetchResponse::Error {
1494 key: request.key,
1495 reason: format!("{e}"),
1496 },
1497 };
1498
1499 send_replication_response(
1500 source,
1501 p2p_node,
1502 request_id,
1503 ReplicationMessageBody::FetchResponse(response),
1504 rr_message_id,
1505 )
1506 .await;
1507
1508 Ok(())
1509}
1510
1511async fn handle_audit_challenge_msg(
1512 source: &PeerId,
1513 challenge: &protocol::AuditChallenge,
1514 storage: &Arc<LmdbStorage>,
1515 p2p_node: &Arc<P2PNode>,
1516 is_bootstrapping: bool,
1517 request_id: u64,
1518 rr_message_id: Option<&str>,
1519) -> Result<()> {
1520 #[allow(clippy::cast_possible_truncation)]
1521 let stored_chunks = storage.current_chunks().map_or(0, |c| c as usize);
1522 let response = audit::handle_audit_challenge(
1523 challenge,
1524 storage,
1525 p2p_node.peer_id(),
1526 is_bootstrapping,
1527 stored_chunks,
1528 )
1529 .await;
1530
1531 send_replication_response(
1532 source,
1533 p2p_node,
1534 request_id,
1535 ReplicationMessageBody::AuditResponse(response),
1536 rr_message_id,
1537 )
1538 .await;
1539
1540 Ok(())
1541}
1542
1543async fn send_replication_response(
1553 peer: &PeerId,
1554 p2p_node: &Arc<P2PNode>,
1555 request_id: u64,
1556 body: ReplicationMessageBody,
1557 rr_message_id: Option<&str>,
1558) {
1559 let _ =
1560 send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await;
1561}
1562
1563async fn send_replication_response_checked(
1573 peer: &PeerId,
1574 p2p_node: &Arc<P2PNode>,
1575 request_id: u64,
1576 body: ReplicationMessageBody,
1577 rr_message_id: Option<&str>,
1578) -> bool {
1579 let msg = ReplicationMessage { request_id, body };
1580 let encoded = match msg.encode() {
1581 Ok(data) => data,
1582 Err(e) => {
1583 warn!("Failed to encode replication response: {e}");
1584 return false;
1585 }
1586 };
1587 let result = if let Some(msg_id) = rr_message_id {
1588 p2p_node
1589 .send_response(peer, REPLICATION_PROTOCOL_ID, msg_id, encoded)
1590 .await
1591 } else {
1592 p2p_node
1593 .send_message(peer, REPLICATION_PROTOCOL_ID, encoded, &[])
1594 .await
1595 };
1596 if let Err(e) = result {
1597 debug!("Failed to send replication response to {peer}: {e}");
1598 return false;
1599 }
1600 true
1601}
1602
1603async fn record_sent_replica_hints(
1604 peer: &PeerId,
1605 hints: &[neighbor_sync::SentReplicaHint],
1606 repair_proofs: &Arc<RwLock<RepairProofs>>,
1607 sync_cycle_epoch: &Arc<RwLock<u64>>,
1608) {
1609 if hints.is_empty() {
1610 return;
1611 }
1612
1613 let hinted_at_epoch = *sync_cycle_epoch.read().await;
1614 let mut proofs = repair_proofs.write().await;
1615 for hint in hints {
1616 if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) {
1617 debug!(
1618 "Recorded repair hint proof for peer {peer} and key {}",
1619 hex::encode(hint.key)
1620 );
1621 }
1622 }
1623}
1624
1625#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1631async fn run_neighbor_sync_round(
1632 p2p_node: &Arc<P2PNode>,
1633 storage: &Arc<LmdbStorage>,
1634 paid_list: &Arc<PaidList>,
1635 queues: &Arc<RwLock<ReplicationQueues>>,
1636 config: &ReplicationConfig,
1637 sync_state: &Arc<RwLock<NeighborSyncState>>,
1638 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1639 sync_cycle_epoch: &Arc<RwLock<u64>>,
1640 repair_proofs: &Arc<RwLock<RepairProofs>>,
1641 is_bootstrapping: &Arc<RwLock<bool>>,
1642 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1643) {
1644 let self_id = *p2p_node.peer_id();
1645 let bootstrapping = *is_bootstrapping.read().await;
1646
1647 let cycle_complete = sync_state.read().await.is_cycle_complete();
1651 if cycle_complete {
1652 {
1655 let mut history = sync_history.write().await;
1656 for record in history.values_mut() {
1657 record.cycles_since_sync = record.cycles_since_sync.saturating_add(1);
1658 }
1659 }
1660 let current_sync_epoch = {
1661 let mut epoch = sync_cycle_epoch.write().await;
1662 *epoch = epoch.saturating_add(1);
1663 *epoch
1664 };
1665
1666 let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained();
1670 pruning::run_prune_pass_with_context(pruning::PrunePassContext {
1671 self_id: &self_id,
1672 storage,
1673 paid_list,
1674 p2p_node,
1675 config,
1676 sync_state,
1677 repair_proofs,
1678 current_sync_epoch,
1679 allow_remote_prune_audits,
1680 })
1681 .await;
1682
1683 let neighbors =
1685 neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope)
1686 .await;
1687
1688 let mut state = sync_state.write().await;
1690 if state.is_cycle_complete() {
1691 let old_sync_times = std::mem::take(&mut state.last_sync_times);
1695 let old_bootstrap_claims = std::mem::take(&mut state.bootstrap_claims);
1696 let old_bootstrap_claim_history = std::mem::take(&mut state.bootstrap_claim_history);
1697 let old_prune_cursor = state.prune_cursor;
1698 *state = NeighborSyncState::new_cycle(neighbors);
1699 state.last_sync_times = old_sync_times;
1700 state.bootstrap_claims = old_bootstrap_claims;
1701 state.bootstrap_claim_history = old_bootstrap_claim_history;
1702 state.prune_cursor = old_prune_cursor;
1703 }
1704 }
1705
1706 let batch = {
1708 let mut state = sync_state.write().await;
1709 neighbor_sync::select_sync_batch(
1710 &mut state,
1711 config.neighbor_sync_peer_count,
1712 config.neighbor_sync_cooldown,
1713 )
1714 };
1715
1716 if batch.is_empty() {
1717 return;
1718 }
1719
1720 debug!("Neighbor sync: syncing with {} peers", batch.len());
1721
1722 for peer in &batch {
1724 let outcome = neighbor_sync::sync_with_peer_with_outcome(
1725 peer,
1726 p2p_node,
1727 storage,
1728 paid_list,
1729 config,
1730 bootstrapping,
1731 )
1732 .await;
1733
1734 if let Some(outcome) = outcome {
1735 handle_sync_response(
1736 &self_id,
1737 peer,
1738 &outcome.response,
1739 &outcome.sent_replica_hints,
1740 p2p_node,
1741 config,
1742 bootstrapping,
1743 bootstrap_state,
1744 storage,
1745 paid_list,
1746 queues,
1747 sync_state,
1748 sync_history,
1749 sync_cycle_epoch,
1750 repair_proofs,
1751 )
1752 .await;
1753 } else {
1754 let replacement = {
1756 let mut state = sync_state.write().await;
1757 neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown)
1758 };
1759
1760 if let Some(replacement_peer) = replacement {
1762 let replacement_outcome = neighbor_sync::sync_with_peer_with_outcome(
1763 &replacement_peer,
1764 p2p_node,
1765 storage,
1766 paid_list,
1767 config,
1768 bootstrapping,
1769 )
1770 .await;
1771
1772 if let Some(outcome) = replacement_outcome {
1773 handle_sync_response(
1774 &self_id,
1775 &replacement_peer,
1776 &outcome.response,
1777 &outcome.sent_replica_hints,
1778 p2p_node,
1779 config,
1780 bootstrapping,
1781 bootstrap_state,
1782 storage,
1783 paid_list,
1784 queues,
1785 sync_state,
1786 sync_history,
1787 sync_cycle_epoch,
1788 repair_proofs,
1789 )
1790 .await;
1791 }
1792 }
1793 }
1794 }
1795}
1796
1797#[allow(clippy::too_many_arguments)]
1800async fn handle_sync_response(
1801 self_id: &PeerId,
1802 peer: &PeerId,
1803 resp: &NeighborSyncResponse,
1804 sent_replica_hints: &[neighbor_sync::SentReplicaHint],
1805 p2p_node: &Arc<P2PNode>,
1806 config: &ReplicationConfig,
1807 bootstrapping: bool,
1808 bootstrap_state: &Arc<RwLock<BootstrapState>>,
1809 storage: &Arc<LmdbStorage>,
1810 paid_list: &Arc<PaidList>,
1811 queues: &Arc<RwLock<ReplicationQueues>>,
1812 sync_state: &Arc<RwLock<NeighborSyncState>>,
1813 sync_history: &Arc<RwLock<HashMap<PeerId, PeerSyncRecord>>>,
1814 sync_cycle_epoch: &Arc<RwLock<u64>>,
1815 repair_proofs: &Arc<RwLock<RepairProofs>>,
1816) {
1817 {
1819 let mut state = sync_state.write().await;
1820 neighbor_sync::record_successful_sync(&mut state, peer);
1821 }
1822 {
1823 let mut history = sync_history.write().await;
1824 let record = history.entry(*peer).or_insert(PeerSyncRecord {
1825 last_sync: None,
1826 cycles_since_sync: 0,
1827 });
1828 record.last_sync = Some(Instant::now());
1829 record.cycles_since_sync = 0;
1830 }
1831
1832 if resp.bootstrapping {
1834 let should_report = {
1838 let now = Instant::now();
1839 let mut state = sync_state.write().await;
1840 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period) {
1841 BootstrapClaimObservation::WithinGrace { .. } => false,
1842 BootstrapClaimObservation::PastGrace { first_seen } => {
1843 warn!(
1844 "Peer {peer} has been claiming bootstrap for {:?}, \
1845 exceeding grace period of {:?} — reporting abuse",
1846 now.duration_since(first_seen),
1847 config.bootstrap_claim_grace_period,
1848 );
1849 true
1850 }
1851 BootstrapClaimObservation::Repeated { first_seen } => {
1852 warn!(
1853 "Peer {peer} repeated bootstrap claim after previously stopping; \
1854 first claim was {:?} ago — reporting abuse",
1855 now.duration_since(first_seen),
1856 );
1857 true
1858 }
1859 }
1860 };
1861 if should_report {
1862 p2p_node
1863 .report_trust_event(
1864 peer,
1865 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1866 )
1867 .await;
1868 }
1869 } else {
1870 {
1873 let mut state = sync_state.write().await;
1874 state.clear_active_bootstrap_claim(peer);
1875 }
1876 record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await;
1877 let outcome = admit_and_queue_hints(
1878 self_id,
1879 peer,
1880 &resp.replica_hints,
1881 &resp.paid_hints,
1882 p2p_node,
1883 config,
1884 storage,
1885 paid_list,
1886 queues,
1887 )
1888 .await;
1889
1890 if bootstrapping {
1895 if !outcome.discovered.is_empty() {
1896 bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await;
1897 }
1898 if outcome.capacity_rejected_count > 0 {
1899 bootstrap::note_capacity_rejected(bootstrap_state, *peer).await;
1900 } else {
1901 bootstrap::clear_capacity_rejected(bootstrap_state, peer).await;
1902 }
1903 }
1904 }
1905}
1906
1907#[allow(clippy::too_many_arguments)]
1912struct AdmissionOutcome {
1920 discovered: HashSet<XorName>,
1921 capacity_rejected_count: usize,
1922}
1923
1924#[allow(clippy::too_many_arguments)]
1925async fn admit_and_queue_hints(
1926 self_id: &PeerId,
1927 source_peer: &PeerId,
1928 replica_hints: &[XorName],
1929 paid_hints: &[XorName],
1930 p2p_node: &Arc<P2PNode>,
1931 config: &ReplicationConfig,
1932 storage: &Arc<LmdbStorage>,
1933 paid_list: &Arc<PaidList>,
1934 queues: &Arc<RwLock<ReplicationQueues>>,
1935) -> AdmissionOutcome {
1936 let pending_keys: HashSet<XorName> = {
1937 let q = queues.read().await;
1938 q.pending_keys().into_iter().collect()
1939 };
1940
1941 let admitted = admission::admit_hints(
1942 self_id,
1943 replica_hints,
1944 paid_hints,
1945 p2p_node,
1946 config,
1947 storage,
1948 paid_list,
1949 &pending_keys,
1950 )
1951 .await;
1952
1953 let mut discovered = HashSet::new();
1954 let mut capacity_rejected_count: usize = 0;
1955 let mut q = queues.write().await;
1956 let now = Instant::now();
1957
1958 for key in admitted.replica_keys {
1959 if !storage.exists(&key).unwrap_or(false) {
1960 let result = q.add_pending_verify(
1961 key,
1962 VerificationEntry {
1963 state: VerificationState::PendingVerify,
1964 pipeline: HintPipeline::Replica,
1965 verified_sources: Vec::new(),
1966 tried_sources: HashSet::new(),
1967 created_at: now,
1968 hint_sender: *source_peer,
1969 },
1970 );
1971 match result {
1972 crate::replication::scheduling::AdmissionResult::Admitted => {
1973 discovered.insert(key);
1974 }
1975 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
1976 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
1977 capacity_rejected_count += 1;
1978 }
1979 }
1980 }
1981 }
1982
1983 for key in admitted.paid_only_keys {
1984 let result = q.add_pending_verify(
1985 key,
1986 VerificationEntry {
1987 state: VerificationState::PendingVerify,
1988 pipeline: HintPipeline::PaidOnly,
1989 verified_sources: Vec::new(),
1990 tried_sources: HashSet::new(),
1991 created_at: now,
1992 hint_sender: *source_peer,
1993 },
1994 );
1995 match result {
1996 crate::replication::scheduling::AdmissionResult::Admitted => {
1997 discovered.insert(key);
1998 }
1999 crate::replication::scheduling::AdmissionResult::AlreadyPresent => {}
2000 crate::replication::scheduling::AdmissionResult::CapacityRejected => {
2001 capacity_rejected_count += 1;
2002 }
2003 }
2004 }
2005
2006 if capacity_rejected_count > 0 {
2007 debug!(
2008 "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \
2009 rejected at queue capacity; source will need to re-hint after pending_verify drains"
2010 );
2011 }
2012
2013 AdmissionOutcome {
2014 discovered,
2015 capacity_rejected_count,
2016 }
2017}
2018
2019#[allow(clippy::too_many_lines)]
2025async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
2026 let VerificationCycleContext {
2027 p2p_node,
2028 paid_list,
2029 storage,
2030 queues,
2031 config,
2032 bootstrap_state,
2033 is_bootstrapping,
2034 bootstrap_complete_notify,
2035 } = ctx;
2036
2037 {
2040 let mut q = queues.write().await;
2041 q.evict_stale(config::PENDING_VERIFY_MAX_AGE);
2042 }
2043
2044 let pending_keys = {
2045 let q = queues.read().await;
2046 q.pending_keys()
2047 };
2048
2049 if pending_keys.is_empty() {
2050 return;
2051 }
2052
2053 let self_id = *p2p_node.peer_id();
2054
2055 let mut local_paid_presence_probe_keys = Vec::new();
2058 let mut local_paid_paid_only_keys = Vec::new();
2059 let mut keys_needing_network = Vec::new();
2060 let mut terminal_keys: Vec<XorName> = Vec::new();
2061 {
2062 let mut q = queues.write().await;
2063 for key in &pending_keys {
2064 if paid_list.contains(key).unwrap_or(false) {
2065 if let Some(pipeline) =
2066 q.set_pending_state(key, VerificationState::PaidListVerified)
2067 {
2068 match pipeline {
2069 HintPipeline::PaidOnly => {
2070 local_paid_paid_only_keys.push(*key);
2075 }
2076 HintPipeline::Replica => {
2077 local_paid_presence_probe_keys.push(*key);
2082 }
2083 }
2084 }
2085 } else {
2086 keys_needing_network.push(*key);
2087 }
2088 }
2089 }
2090
2091 if !local_paid_paid_only_keys.is_empty() {
2092 let mut terminal_paid_only = Vec::new();
2093 for key in local_paid_paid_only_keys {
2094 if storage.exists(&key).unwrap_or(false) {
2095 terminal_paid_only.push(key);
2096 } else if admission::is_responsible(&self_id, &key, p2p_node, config.close_group_size)
2097 .await
2098 {
2099 local_paid_presence_probe_keys.push(key);
2100 } else {
2101 terminal_paid_only.push(key);
2102 }
2103 }
2104
2105 if !terminal_paid_only.is_empty() {
2106 let mut q = queues.write().await;
2107 for key in terminal_paid_only {
2108 q.remove_pending(&key);
2109 terminal_keys.push(key);
2110 }
2111 }
2112 }
2113
2114 if !local_paid_presence_probe_keys.is_empty() {
2118 let targets = quorum::compute_presence_targets(
2119 &local_paid_presence_probe_keys,
2120 p2p_node,
2121 config,
2122 &self_id,
2123 )
2124 .await;
2125 let evidence = quorum::run_verification_round(
2126 &local_paid_presence_probe_keys,
2127 &targets,
2128 p2p_node,
2129 config,
2130 )
2131 .await;
2132
2133 let mut q = queues.write().await;
2134 for key in local_paid_presence_probe_keys {
2135 if storage.exists(&key).unwrap_or(false) {
2136 q.remove_pending(&key);
2137 terminal_keys.push(key);
2138 continue;
2139 }
2140 let sources = evidence.get(&key).map_or_else(Vec::new, |ev| {
2141 quorum::present_sources_for_key(&key, ev, &targets)
2142 });
2143 if sources.is_empty() {
2144 q.remove_pending(&key);
2146 warn!(
2147 "Locally paid key {} has no responding holders (possible data loss)",
2148 hex::encode(key)
2149 );
2150 terminal_keys.push(key);
2151 } else {
2152 let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2153 let _ = q.promote_pending_to_fetch(key, distance, sources);
2157 }
2158 }
2159 }
2160
2161 if !keys_needing_network.is_empty() {
2163 let targets =
2165 quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id)
2166 .await;
2167
2168 let evidence =
2169 quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await;
2170
2171 let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new();
2174 {
2175 let q = queues.read().await;
2176 for key in &keys_needing_network {
2177 let Some(ev) = evidence.get(key) else {
2178 continue;
2179 };
2180 let Some(entry) = q.get_pending(key) else {
2181 continue;
2182 };
2183 let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config);
2184 evaluated.push((*key, outcome, entry.pipeline));
2185 }
2186 } let mut paid_insert_keys: Vec<XorName> = Vec::new();
2190 for (key, outcome, _) in &evaluated {
2191 if matches!(
2192 outcome,
2193 KeyVerificationOutcome::QuorumVerified { .. }
2194 | KeyVerificationOutcome::PaidListVerified { .. }
2195 ) {
2196 paid_insert_keys.push(*key);
2197 }
2198 }
2199 for key in &paid_insert_keys {
2200 if let Err(e) = paid_list.insert(key).await {
2201 warn!("Failed to add verified key to PaidForList: {e}");
2202 }
2203 }
2204
2205 let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
2210 for (key, outcome, pipeline) in &evaluated {
2211 if *pipeline == HintPipeline::PaidOnly
2212 && matches!(
2213 outcome,
2214 KeyVerificationOutcome::QuorumVerified { .. }
2215 | KeyVerificationOutcome::PaidListVerified { .. }
2216 )
2217 && !storage.exists(key).unwrap_or(false)
2218 && admission::is_responsible(&self_id, key, p2p_node, config.close_group_size).await
2219 {
2220 paid_only_fetch_keys.insert(*key);
2221 }
2222 }
2223
2224 let mut q = queues.write().await;
2226 for (key, outcome, pipeline) in evaluated {
2227 match outcome {
2228 KeyVerificationOutcome::QuorumVerified { sources }
2229 | KeyVerificationOutcome::PaidListVerified { sources } => {
2230 let fetch_eligible =
2231 pipeline == HintPipeline::Replica || paid_only_fetch_keys.contains(&key);
2232 if fetch_eligible && !sources.is_empty() {
2233 let distance =
2234 crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes());
2235 let _ = q.promote_pending_to_fetch(key, distance, sources);
2239 } else if fetch_eligible && sources.is_empty() {
2242 warn!(
2243 "Verified responsible key {} has no holders (possible data loss)",
2244 hex::encode(key)
2245 );
2246 q.remove_pending(&key);
2247 terminal_keys.push(key);
2248 } else {
2249 q.remove_pending(&key);
2250 terminal_keys.push(key);
2251 }
2252 }
2253 KeyVerificationOutcome::QuorumFailed
2254 | KeyVerificationOutcome::QuorumInconclusive => {
2255 q.remove_pending(&key);
2256 terminal_keys.push(key);
2257 }
2258 }
2259 }
2260 }
2261
2262 update_bootstrap_after_verification(
2265 &terminal_keys,
2266 bootstrap_state,
2267 queues,
2268 is_bootstrapping,
2269 bootstrap_complete_notify,
2270 )
2271 .await;
2272}
2273
2274async fn update_bootstrap_after_verification(
2277 terminal_keys: &[XorName],
2278 bootstrap_state: &Arc<RwLock<BootstrapState>>,
2279 queues: &Arc<RwLock<ReplicationQueues>>,
2280 is_bootstrapping: &Arc<RwLock<bool>>,
2281 bootstrap_complete_notify: &Arc<Notify>,
2282) {
2283 if terminal_keys.is_empty() || bootstrap_state.read().await.is_drained() {
2284 return;
2285 }
2286 {
2287 let mut bs = bootstrap_state.write().await;
2288 for key in terminal_keys {
2289 bs.remove_key(key);
2290 }
2291 }
2292 let q = queues.read().await;
2293 if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await {
2294 complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await;
2295 }
2296}
2297
2298async fn complete_bootstrap(
2300 is_bootstrapping: &Arc<RwLock<bool>>,
2301 bootstrap_complete_notify: &Arc<Notify>,
2302) {
2303 *is_bootstrapping.write().await = false;
2304 bootstrap_complete_notify.notify_waiters();
2305 info!("Replication bootstrap complete");
2306}
2307
2308enum FetchResult {
2314 Stored,
2316 IntegrityFailed,
2318 SourceFailed,
2320}
2321
2322struct FetchOutcome {
2325 key: XorName,
2326 result: FetchResult,
2327}
2328
2329#[allow(clippy::too_many_lines)]
2330async fn execute_single_fetch(
2336 p2p_node: Arc<P2PNode>,
2337 storage: Arc<LmdbStorage>,
2338 config: Arc<ReplicationConfig>,
2339 key: XorName,
2340 source: PeerId,
2341) -> FetchOutcome {
2342 let request = protocol::FetchRequest { key };
2343 let msg = ReplicationMessage {
2344 request_id: rand::thread_rng().gen::<u64>(),
2345 body: ReplicationMessageBody::FetchRequest(request),
2346 };
2347
2348 let encoded = match msg.encode() {
2349 Ok(data) => data,
2350 Err(e) => {
2351 warn!("Failed to encode fetch request: {e}");
2352 return FetchOutcome {
2353 key,
2354 result: FetchResult::SourceFailed,
2355 };
2356 }
2357 };
2358
2359 let result = p2p_node
2360 .send_request(
2361 &source,
2362 REPLICATION_PROTOCOL_ID,
2363 encoded,
2364 config.fetch_request_timeout,
2365 )
2366 .await;
2367
2368 match result {
2369 Ok(response) => {
2370 let Ok(resp_msg) = ReplicationMessage::decode(&response.data) else {
2371 p2p_node
2372 .report_trust_event(
2373 &source,
2374 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2375 )
2376 .await;
2377 return FetchOutcome {
2378 key,
2379 result: FetchResult::SourceFailed,
2380 };
2381 };
2382
2383 match resp_msg.body {
2384 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success {
2385 key: resp_key,
2386 data,
2387 }) => {
2388 if resp_key != key {
2393 warn!(
2394 "Fetch response key mismatch: requested {}, got {}",
2395 hex::encode(key),
2396 hex::encode(resp_key)
2397 );
2398 p2p_node
2399 .report_trust_event(
2400 &source,
2401 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2402 )
2403 .await;
2404 return FetchOutcome {
2405 key,
2406 result: FetchResult::IntegrityFailed,
2407 };
2408 }
2409
2410 if data.len() > crate::ant_protocol::MAX_CHUNK_SIZE {
2414 warn!(
2415 "Fetched record {} exceeds MAX_CHUNK_SIZE ({} > {})",
2416 hex::encode(resp_key),
2417 data.len(),
2418 crate::ant_protocol::MAX_CHUNK_SIZE,
2419 );
2420 p2p_node
2421 .report_trust_event(
2422 &source,
2423 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2424 )
2425 .await;
2426 return FetchOutcome {
2427 key,
2428 result: FetchResult::IntegrityFailed,
2429 };
2430 }
2431
2432 let computed = crate::client::compute_address(&data);
2434 if computed != resp_key {
2435 warn!(
2436 "Fetched record integrity check failed: expected {}, got {}",
2437 hex::encode(resp_key),
2438 hex::encode(computed)
2439 );
2440 p2p_node
2441 .report_trust_event(
2442 &source,
2443 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2444 )
2445 .await;
2446 return FetchOutcome {
2447 key,
2448 result: FetchResult::IntegrityFailed,
2449 };
2450 }
2451
2452 if let Err(e) = storage.put(&resp_key, &data).await {
2453 warn!(
2454 "Failed to store fetched record {}: {e}",
2455 hex::encode(resp_key)
2456 );
2457 return FetchOutcome {
2458 key,
2459 result: FetchResult::SourceFailed,
2460 };
2461 }
2462
2463 FetchOutcome {
2464 key,
2465 result: FetchResult::Stored,
2466 }
2467 }
2468 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::NotFound {
2469 ..
2470 }) => {
2471 warn!(
2477 "Fetch: verified source {source} returned NotFound for {}",
2478 hex::encode(key)
2479 );
2480 p2p_node
2481 .report_trust_event(
2482 &source,
2483 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2484 )
2485 .await;
2486 FetchOutcome {
2487 key,
2488 result: FetchResult::SourceFailed,
2489 }
2490 }
2491 ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error {
2492 reason,
2493 ..
2494 }) => {
2495 warn!(
2496 "Fetch: peer {source} returned error for {}: {reason}",
2497 hex::encode(key)
2498 );
2499 p2p_node
2500 .report_trust_event(
2501 &source,
2502 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2503 )
2504 .await;
2505 FetchOutcome {
2506 key,
2507 result: FetchResult::SourceFailed,
2508 }
2509 }
2510 _ => {
2511 p2p_node
2513 .report_trust_event(
2514 &source,
2515 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2516 )
2517 .await;
2518 FetchOutcome {
2519 key,
2520 result: FetchResult::SourceFailed,
2521 }
2522 }
2523 }
2524 }
2525 Err(e) => {
2526 debug!("Fetch request to {source} failed: {e}");
2527 FetchOutcome {
2530 key,
2531 result: FetchResult::SourceFailed,
2532 }
2533 }
2534 }
2535}
2536
2537async fn handle_audit_result(
2543 result: &AuditTickResult,
2544 p2p_node: &Arc<P2PNode>,
2545 sync_state: &Arc<RwLock<NeighborSyncState>>,
2546 config: &ReplicationConfig,
2547) {
2548 match result {
2549 AuditTickResult::Passed {
2550 challenged_peer,
2551 keys_checked,
2552 } => {
2553 debug!("Audit passed for {challenged_peer} ({keys_checked} keys)");
2554 {
2557 let mut state = sync_state.write().await;
2558 state.clear_active_bootstrap_claim(challenged_peer);
2559 }
2560 p2p_node
2561 .report_trust_event(
2562 challenged_peer,
2563 TrustEvent::ApplicationSuccess(REPLICATION_TRUST_WEIGHT),
2564 )
2565 .await;
2566 }
2567 AuditTickResult::Failed { evidence } => {
2568 if let FailureEvidence::AuditFailure {
2569 challenged_peer,
2570 confirmed_failed_keys,
2571 summary,
2572 reason,
2573 ..
2574 } = evidence
2575 {
2576 error!(
2577 "Audit failure for {challenged_peer}: reason={reason:?}, confirmed_failed_keys={}, challenged_keys={}, absent_keys={}, digest_mismatch_keys={}",
2578 confirmed_failed_keys.len(),
2579 summary.challenged_keys,
2580 summary.absent_keys,
2581 summary.digest_mismatch_keys,
2582 );
2583 if audit_failure_clears_bootstrap_claim(reason) {
2584 let mut state = sync_state.write().await;
2587 state.clear_active_bootstrap_claim(challenged_peer);
2588 } else {
2589 debug!("Audit timeout for {challenged_peer}; retaining active bootstrap claim");
2590 }
2591 p2p_node
2592 .report_trust_event(
2593 challenged_peer,
2594 TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT),
2595 )
2596 .await;
2597 }
2598 }
2599 AuditTickResult::BootstrapClaim { peer } => {
2600 let should_report = {
2604 let now = Instant::now();
2605 let mut state = sync_state.write().await;
2606 match state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period)
2607 {
2608 BootstrapClaimObservation::WithinGrace { .. } => {
2609 debug!("Audit: peer {peer} claims bootstrapping (within grace period)");
2610 false
2611 }
2612 BootstrapClaimObservation::PastGrace { first_seen } => {
2613 warn!(
2614 "Audit: peer {peer} claiming bootstrap past grace period \
2615 ({:?} > {:?}), reporting abuse",
2616 now.duration_since(first_seen),
2617 config.bootstrap_claim_grace_period,
2618 );
2619 true
2620 }
2621 BootstrapClaimObservation::Repeated { first_seen } => {
2622 warn!(
2623 "Audit: peer {peer} repeated bootstrap claim after previously \
2624 stopping; first claim was {:?} ago, reporting abuse",
2625 now.duration_since(first_seen),
2626 );
2627 true
2628 }
2629 }
2630 };
2631 if should_report {
2632 p2p_node
2633 .report_trust_event(
2634 peer,
2635 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
2636 )
2637 .await;
2638 }
2639 }
2640 AuditTickResult::Idle | AuditTickResult::InsufficientKeys => {}
2641 }
2642}
2643
2644fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool {
2645 !matches!(reason, AuditFailureReason::Timeout)
2646}
2647
2648#[cfg(test)]
2651#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2652mod tests {
2653 use super::audit_failure_clears_bootstrap_claim;
2654 use crate::replication::types::AuditFailureReason;
2655
2656 #[test]
2657 fn audit_timeout_preserves_active_bootstrap_claim() {
2658 assert!(!audit_failure_clears_bootstrap_claim(
2659 &AuditFailureReason::Timeout
2660 ));
2661 }
2662
2663 #[test]
2664 fn decoded_audit_failures_clear_active_bootstrap_claim() {
2665 for reason in [
2666 AuditFailureReason::MalformedResponse,
2667 AuditFailureReason::DigestMismatch,
2668 AuditFailureReason::KeyAbsent,
2669 AuditFailureReason::Rejected,
2670 ] {
2671 assert!(
2672 audit_failure_clears_bootstrap_claim(&reason),
2673 "decoded non-bootstrap failure {reason:?} should clear active claim"
2674 );
2675 }
2676 }
2677}