1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::commitment_state::ResponderCommitmentState;
21use crate::replication::config::{
22 storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT,
23 MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID,
24};
25use crate::replication::paid_list::PaidList;
26use crate::replication::protocol::{
27 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
28 ReplicationMessageBody, ABSENT_KEY_DIGEST,
29};
30use crate::replication::quorum::{self, VerificationTargets};
31use crate::replication::types::{
32 BootstrapClaimObservation, KeyVerificationEvidence, NeighborSyncState, PaidListEvidence,
33 RepairProofs,
34};
35use crate::storage::LmdbStorage;
36
37use super::REPLICATION_TRUST_WEIGHT;
38
39const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
40
41const MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS: usize = 32;
44const MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS: usize = MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES;
47
48#[derive(Debug, Default)]
54pub struct PruneResult {
55 pub records_pruned: usize,
57 pub records_marked_out_of_range: usize,
59 pub records_cleared: usize,
61 pub paid_entries_pruned: usize,
63 pub paid_entries_marked: usize,
65 pub paid_entries_cleared: usize,
67}
68
69pub struct PrunePassContext<'a> {
71 pub self_id: &'a PeerId,
73 pub storage: &'a Arc<LmdbStorage>,
75 pub paid_list: &'a Arc<PaidList>,
77 pub p2p_node: &'a Arc<P2PNode>,
79 pub config: &'a ReplicationConfig,
81 pub sync_state: &'a Arc<RwLock<NeighborSyncState>>,
83 pub repair_proofs: &'a Arc<RwLock<RepairProofs>>,
85 pub current_sync_epoch: u64,
87 #[cfg(any(test, feature = "test-utils"))]
89 pub repair_proof_now: Option<Instant>,
90 pub allow_remote_prune_audits: bool,
92 pub commitment_state: Option<&'a Arc<ResponderCommitmentState>>,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100enum PruneAuditStatus {
101 Proven,
102 Failed,
103 Bootstrapping,
104}
105
106#[derive(Debug, Default)]
107struct RecordPruneStats {
108 marked: usize,
109 cleared: usize,
110 pruned: usize,
111 held_by_commitment: usize,
112}
113
114#[derive(Debug, Default)]
115struct PaidPruneStats {
116 marked: usize,
117 cleared: usize,
118 pruned: usize,
119}
120
121#[derive(Debug, Default)]
122struct PaidPruneDeferredCounts {
123 entry_budget: usize,
124 remote_gate: usize,
125 peer_budget: usize,
126}
127
128impl PaidPruneDeferredCounts {
129 fn log(&self) {
130 if self.entry_budget > 0 {
131 debug!(
132 "Deferred {} expired PaidForList entries beyond the per-pass verification cap \
133 ({MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS})",
134 self.entry_budget,
135 );
136 }
137
138 if self.remote_gate > 0 {
139 debug!(
140 "Deferred {} expired PaidForList entries until bootstrap drain allows remote \
141 paid-prune verification",
142 self.remote_gate,
143 );
144 }
145
146 if self.peer_budget > 0 {
147 debug!(
148 "Deferred {} expired PaidForList entries beyond the per-pass paid-prune peer cap \
149 ({MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS})",
150 self.peer_budget,
151 );
152 }
153 }
154}
155
156#[derive(Debug, Clone)]
157struct RecordPruneCandidate {
158 key: XorName,
159 target_peers: Vec<PeerId>,
160}
161
162struct RecordPruneKeyOutcome {
163 marked: bool,
164 state: RecordPruneKeyState,
165}
166
167impl Default for RecordPruneKeyOutcome {
168 fn default() -> Self {
169 Self {
170 marked: false,
171 state: RecordPruneKeyState::None,
172 }
173 }
174}
175
176enum RecordPruneKeyState {
177 None,
178 Cleared,
179 BootstrapDeferred,
180 BudgetDeferred,
181 HeldByCommitment,
185 Candidate(RecordPruneCandidate),
186}
187
188enum PaidPruneKeyState {
189 None,
190 RemoteDeferred,
191 EntryBudgetDeferred,
192 PeerBudgetDeferred,
193 Candidate(Vec<PeerId>),
194}
195
196#[derive(Default)]
197struct PruneAuditReportState {
198 audit_failures: RwLock<HashSet<PeerId>>,
199 bootstrap_abuse: RwLock<HashSet<PeerId>>,
200}
201
202pub async fn run_prune_pass(
230 self_id: &PeerId,
231 storage: &Arc<LmdbStorage>,
232 paid_list: &Arc<PaidList>,
233 p2p_node: &Arc<P2PNode>,
234 config: &ReplicationConfig,
235 sync_state: &Arc<RwLock<NeighborSyncState>>,
236 allow_remote_prune_audits: bool,
237) -> PruneResult {
238 let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
239 run_prune_pass_with_context(PrunePassContext {
240 self_id,
241 storage,
242 paid_list,
243 p2p_node,
244 config,
245 sync_state,
246 repair_proofs: &repair_proofs,
247 current_sync_epoch: 0,
248 #[cfg(any(test, feature = "test-utils"))]
249 repair_proof_now: None,
250 allow_remote_prune_audits,
251 commitment_state: None,
252 })
253 .await
254}
255
256pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult {
258 let (stored_count, record_stats) = prune_stored_records(&ctx).await;
259 let now = Instant::now();
260 let (paid_count, paid_stats) = prune_paid_entries(
261 ctx.self_id,
262 ctx.paid_list,
263 ctx.p2p_node,
264 ctx.config,
265 now,
266 ctx.allow_remote_prune_audits,
267 )
268 .await;
269
270 let result = PruneResult {
271 records_pruned: record_stats.pruned,
272 records_marked_out_of_range: record_stats.marked,
273 records_cleared: record_stats.cleared,
274 paid_entries_pruned: paid_stats.pruned,
275 paid_entries_marked: paid_stats.marked,
276 paid_entries_cleared: paid_stats.cleared,
277 };
278
279 info!(
280 "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
281 result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
282 );
283
284 result
285}
286
287#[allow(clippy::too_many_lines)]
290async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) {
291 let stored_keys = match ctx.storage.all_keys().await {
292 Ok(keys) => keys,
293 Err(e) => {
294 warn!("Failed to read stored keys for pruning: {e}");
295 return (0, RecordPruneStats::default());
296 }
297 };
298
299 let now = Instant::now();
300 let mut stats = RecordPruneStats::default();
301 let mut candidates = Vec::new();
302 let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
303 let mut budget_deferred = 0usize;
304 let mut bootstrap_deferred = 0usize;
305 let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await;
306 let mut last_selected_offset = None;
307
308 for offset in 0..stored_keys.len() {
309 let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
310 let (storage_admission_group, strict_close_group) =
311 record_prune_lookup_groups(key, ctx.p2p_node, ctx.config).await;
312
313 let outcome = evaluate_record_prune_key(
314 ctx,
315 key,
316 &storage_admission_group,
317 &strict_close_group,
318 now,
319 &mut audit_challenge_budget,
320 )
321 .await;
322 if outcome.marked {
323 stats.marked += 1;
324 }
325 match outcome.state {
326 RecordPruneKeyState::None => {}
327 RecordPruneKeyState::Cleared => stats.cleared += 1,
328 RecordPruneKeyState::BootstrapDeferred => {
329 bootstrap_deferred = bootstrap_deferred.saturating_add(1);
330 }
331 RecordPruneKeyState::BudgetDeferred => {
332 budget_deferred = budget_deferred.saturating_add(1);
333 }
334 RecordPruneKeyState::HeldByCommitment => {
335 stats.held_by_commitment = stats.held_by_commitment.saturating_add(1);
336 }
337 RecordPruneKeyState::Candidate(candidate) => {
338 last_selected_offset = Some(offset);
339 candidates.push(candidate);
340 }
341 }
342 }
343
344 advance_prune_cursor(
345 ctx.sync_state,
346 stored_keys.len(),
347 scan_start,
348 last_selected_offset,
349 )
350 .await;
351
352 if bootstrap_deferred > 0 {
353 debug!(
354 "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
355 remote prune-confirmation audits"
356 );
357 }
358
359 if budget_deferred > 0 {
360 debug!(
361 "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
362 ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
363 );
364 }
365
366 if stats.held_by_commitment > 0 {
367 debug!(
368 "Vetoed {} prune candidate(s) still committed under a recently-gossiped \
369 commitment (bounded reprieve until they age out of the retention window)",
370 stats.held_by_commitment
371 );
372 }
373
374 let present_by_key = collect_record_prune_proofs(
375 &candidates,
376 ctx.storage,
377 ctx.p2p_node,
378 ctx.config,
379 ctx.sync_state,
380 )
381 .await;
382 let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
383 &candidates,
384 &present_by_key,
385 ctx.self_id,
386 ctx.paid_list,
387 ctx.p2p_node,
388 ctx.config,
389 ctx.commitment_state,
390 )
391 .await;
392 stats.cleared += revalidated_cleared;
393 stats.pruned = delete_stored_records(
394 &keys_to_delete,
395 ctx.storage,
396 ctx.paid_list,
397 ctx.repair_proofs,
398 )
399 .await;
400
401 (stored_keys.len(), stats)
402}
403
404async fn record_prune_lookup_groups(
405 key: &XorName,
406 p2p_node: &Arc<P2PNode>,
407 config: &ReplicationConfig,
408) -> (Vec<DHTNode>, Vec<DHTNode>) {
409 let dht = p2p_node.dht_manager();
410 let storage_admission_group = dht
411 .find_closest_nodes_local_with_self(key, storage_admission_width(config.close_group_size))
412 .await;
413 let strict_close_group = dht
414 .find_closest_nodes_local_with_self(key, config.close_group_size)
415 .await;
416 (storage_admission_group, strict_close_group)
417}
418
419async fn evaluate_record_prune_key(
420 ctx: &PrunePassContext<'_>,
421 key: &XorName,
422 storage_admission_group: &[DHTNode],
423 strict_close_group: &[DHTNode],
424 now: Instant,
425 audit_challenge_budget: &mut usize,
426) -> RecordPruneKeyOutcome {
427 let mut outcome = RecordPruneKeyOutcome::default();
428 let is_responsible = storage_admission_group
429 .iter()
430 .any(|node| node.peer_id == *ctx.self_id);
431
432 if is_responsible {
433 if ctx.paid_list.record_out_of_range_since(key).is_some() {
434 ctx.paid_list.clear_record_out_of_range(key);
435 outcome.state = RecordPruneKeyState::Cleared;
436 }
437 return outcome;
438 }
439
440 if let Some(cs) = ctx.commitment_state {
452 if cs.is_held(key) {
453 outcome.state = RecordPruneKeyState::HeldByCommitment;
454 return outcome;
455 }
456 }
457
458 if ctx.paid_list.record_out_of_range_since(key).is_none() {
459 outcome.marked = true;
460 }
461 ctx.paid_list.set_record_out_of_range(key);
462
463 let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else {
464 return outcome;
465 };
466 let elapsed = now
467 .checked_duration_since(first_seen)
468 .unwrap_or(Duration::ZERO);
469 if elapsed < ctx.config.prune_hysteresis_duration {
470 return outcome;
471 }
472
473 if !ctx.allow_remote_prune_audits {
474 outcome.state = RecordPruneKeyState::BootstrapDeferred;
475 return outcome;
476 }
477
478 let target_peers = remote_close_group_peers(strict_close_group, ctx.self_id);
479 if target_peers.is_empty() {
480 warn!(
481 "Cannot prune {}: current close group has no remote peers",
482 hex::encode(key)
483 );
484 return outcome;
485 }
486
487 let current_close_peers: HashSet<PeerId> =
491 strict_close_group.iter().map(|node| node.peer_id).collect();
492 #[cfg(any(test, feature = "test-utils"))]
493 let repair_proof_now = ctx.repair_proof_now.unwrap_or(now);
494 #[cfg(not(any(test, feature = "test-utils")))]
495 let repair_proof_now = now;
496 let audit_targets = peers_with_mature_repair_proofs(
497 key,
498 &target_peers,
499 ¤t_close_peers,
500 ctx.repair_proofs,
501 ctx.current_sync_epoch,
502 repair_proof_now,
503 )
504 .await;
505 let proofs_needed = prune_proofs_needed(target_peers.len());
506 if proofs_needed == 0 || audit_targets.len() < proofs_needed {
507 debug!(
508 "Deferring prune for {} until enough of the close group has mature \
509 repair proofs",
510 hex::encode(key)
511 );
512 return outcome;
513 }
514
515 if audit_targets.len() > *audit_challenge_budget {
516 outcome.state = RecordPruneKeyState::BudgetDeferred;
517 return outcome;
518 }
519
520 *audit_challenge_budget -= audit_targets.len();
521 outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate {
522 key: *key,
523 target_peers: audit_targets,
524 });
525 outcome
526}
527
528async fn prune_paid_entries(
529 self_id: &PeerId,
530 paid_list: &Arc<PaidList>,
531 p2p_node: &Arc<P2PNode>,
532 config: &ReplicationConfig,
533 now: Instant,
534 allow_remote_prune_audits: bool,
535) -> (usize, PaidPruneStats) {
536 let paid_keys = match paid_list.all_keys() {
537 Ok(keys) => keys,
538 Err(e) => {
539 warn!("Failed to read PaidForList for pruning: {e}");
540 return (0, PaidPruneStats::default());
541 }
542 };
543
544 let dht = p2p_node.dht_manager();
545 let mut stats = PaidPruneStats::default();
546 let mut expired_candidates: Vec<(XorName, Vec<PeerId>)> = Vec::new();
547 let mut deferred_counts = PaidPruneDeferredCounts::default();
548 let mut selected_verification_peers = HashSet::new();
549 let scan_start = paid_list.paid_prune_scan_start(paid_keys.len());
552 let mut last_selected_offset = None;
553
554 for offset in 0..paid_keys.len() {
555 let key = &paid_keys[(scan_start + offset) % paid_keys.len()];
556 let closest: Vec<DHTNode> = dht
557 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
558 .await;
559 let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
560
561 if in_paid_group {
562 if paid_list.paid_out_of_range_since(key).is_some() {
563 paid_list.clear_paid_out_of_range(key);
564 stats.cleared += 1;
565 }
566 } else {
567 if paid_list.paid_out_of_range_since(key).is_none() {
568 stats.marked += 1;
569 }
570 paid_list.set_paid_out_of_range(key);
571
572 if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
573 let elapsed = now
574 .checked_duration_since(first_seen)
575 .unwrap_or(Duration::ZERO);
576 if elapsed >= config.prune_hysteresis_duration {
577 match select_paid_prune_candidate(
578 key,
579 &closest,
580 self_id,
581 allow_remote_prune_audits,
582 expired_candidates.len(),
583 &mut selected_verification_peers,
584 ) {
585 PaidPruneKeyState::None => {}
586 PaidPruneKeyState::RemoteDeferred => {
587 deferred_counts.remote_gate =
588 deferred_counts.remote_gate.saturating_add(1);
589 }
590 PaidPruneKeyState::EntryBudgetDeferred => {
591 deferred_counts.entry_budget =
592 deferred_counts.entry_budget.saturating_add(1);
593 }
594 PaidPruneKeyState::PeerBudgetDeferred => {
595 deferred_counts.peer_budget =
596 deferred_counts.peer_budget.saturating_add(1);
597 }
598 PaidPruneKeyState::Candidate(target_peers) => {
599 expired_candidates.push((*key, target_peers));
600 last_selected_offset = Some(offset);
601 }
602 }
603 }
604 }
605 }
606 }
607
608 paid_list.advance_paid_prune_cursor(paid_keys.len(), scan_start, last_selected_offset);
609 deferred_counts.log();
610
611 let confirmed_by_key =
612 collect_paid_prune_confirmations(&expired_candidates, p2p_node, config).await;
613 let (paid_keys_to_delete, revalidated_cleared) = revalidated_paid_prune_keys(
614 &expired_candidates,
615 &confirmed_by_key,
616 self_id,
617 paid_list,
618 p2p_node,
619 config,
620 )
621 .await;
622 stats.cleared += revalidated_cleared;
623 stats.pruned = delete_paid_entries(&paid_keys_to_delete, paid_list).await;
624
625 (paid_keys.len(), stats)
626}
627
628fn select_paid_prune_candidate(
629 key: &XorName,
630 closest: &[DHTNode],
631 self_id: &PeerId,
632 allow_remote_prune_audits: bool,
633 selected_candidate_count: usize,
634 selected_verification_peers: &mut HashSet<PeerId>,
635) -> PaidPruneKeyState {
636 if !allow_remote_prune_audits {
637 return PaidPruneKeyState::RemoteDeferred;
638 }
639
640 let target_peers = remote_close_group_peers(closest, self_id);
641 if target_peers.is_empty() {
642 warn!(
643 "Cannot prune paid entry {}: current paid close group has no remote peers",
644 hex::encode(key)
645 );
646 return PaidPruneKeyState::None;
647 }
648
649 if selected_candidate_count >= MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS {
650 return PaidPruneKeyState::EntryBudgetDeferred;
651 }
652
653 if !reserve_paid_prune_peer_budget(&target_peers, selected_verification_peers) {
654 return PaidPruneKeyState::PeerBudgetDeferred;
655 }
656
657 PaidPruneKeyState::Candidate(target_peers)
658}
659
660async fn delete_paid_entries(keys_to_delete: &[XorName], paid_list: &Arc<PaidList>) -> usize {
661 if keys_to_delete.is_empty() {
662 return 0;
663 }
664
665 match paid_list.remove_batch(keys_to_delete).await {
666 Ok(count) => {
667 debug!("Pruned {count} out-of-range PaidForList entries");
668 count
669 }
670 Err(e) => {
671 warn!("Failed to prune PaidForList entries: {e}");
672 0
673 }
674 }
675}
676
677async fn revalidated_paid_prune_keys(
686 expired_candidates: &[(XorName, Vec<PeerId>)],
687 confirmed_by_key: &HashMap<XorName, HashSet<PeerId>>,
688 self_id: &PeerId,
689 paid_list: &Arc<PaidList>,
690 p2p_node: &Arc<P2PNode>,
691 config: &ReplicationConfig,
692) -> (Vec<XorName>, usize) {
693 let dht = p2p_node.dht_manager();
694 let mut keys_to_delete = Vec::new();
695 let mut cleared = 0;
696 let now = Instant::now();
697
698 for (key, _) in expired_candidates {
699 let closest: Vec<DHTNode> = dht
700 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
701 .await;
702
703 if closest.iter().any(|n| n.peer_id == *self_id) {
704 if paid_list.paid_out_of_range_since(key).is_some() {
705 paid_list.clear_paid_out_of_range(key);
706 cleared += 1;
707 }
708 continue;
709 }
710
711 let Some(first_seen) = paid_list.paid_out_of_range_since(key) else {
712 continue;
713 };
714 let elapsed = now
715 .checked_duration_since(first_seen)
716 .unwrap_or(Duration::ZERO);
717 if elapsed < config.prune_hysteresis_duration {
718 continue;
719 }
720
721 let current_target_peers = remote_close_group_peers(&closest, self_id);
722 if current_target_peers.is_empty() {
723 warn!(
724 "Cannot prune paid entry {}: current paid close group has no remote peers",
725 hex::encode(key)
726 );
727 continue;
728 }
729
730 let confirmations_needed = paid_prune_confirmations_needed(current_target_peers.len());
731 if target_peers_reported_present(
732 key,
733 ¤t_target_peers,
734 confirmed_by_key,
735 confirmations_needed,
736 ) {
737 keys_to_delete.push(*key);
738 } else {
739 debug!(
740 "Deferring paid-entry prune for {} until enough of the current paid \
741 close group confirm it",
742 hex::encode(key)
743 );
744 }
745 }
746
747 (keys_to_delete, cleared)
748}
749
750fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
751 close_group
752 .iter()
753 .filter(|node| node.peer_id != *self_id)
754 .map(|node| node.peer_id)
755 .collect()
756}
757
758fn paid_prune_confirmations_needed(group_size: usize) -> usize {
767 (3 * group_size).div_ceil(4)
768}
769
770fn reserve_paid_prune_peer_budget(
771 target_peers: &[PeerId],
772 selected_verification_peers: &mut HashSet<PeerId>,
773) -> bool {
774 let new_peer_count = target_peers
775 .iter()
776 .filter(|peer| !selected_verification_peers.contains(peer))
777 .count();
778 if selected_verification_peers
779 .len()
780 .saturating_add(new_peer_count)
781 > MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS
782 {
783 return false;
784 }
785
786 selected_verification_peers.extend(target_peers.iter().copied());
787 true
788}
789
790async fn collect_paid_prune_confirmations(
797 expired_candidates: &[(XorName, Vec<PeerId>)],
798 p2p_node: &Arc<P2PNode>,
799 config: &ReplicationConfig,
800) -> HashMap<XorName, HashSet<PeerId>> {
801 if expired_candidates.is_empty() {
802 return HashMap::new();
803 }
804
805 let mut targets = VerificationTargets::default();
806 let mut keys = Vec::new();
807 for (key, target_peers) in expired_candidates {
808 if target_peers.is_empty() {
809 warn!(
810 "Cannot prune paid entry {}: current paid close group has no remote peers",
811 hex::encode(key)
812 );
813 continue;
814 }
815 keys.push(*key);
816 for peer in target_peers {
817 targets.all_peers.insert(*peer);
818 targets.peer_to_keys.entry(*peer).or_default().push(*key);
819 targets
820 .peer_to_paid_keys
821 .entry(*peer)
822 .or_default()
823 .insert(*key);
824 }
825 targets.paid_targets.insert(*key, target_peers.clone());
826 targets.paid_group_sizes.insert(*key, target_peers.len());
827 }
828 for keys_list in targets.peer_to_keys.values_mut() {
829 keys_list.sort_unstable();
830 keys_list.dedup();
831 }
832
833 let evidence = quorum::run_verification_round(&keys, &targets, p2p_node, config).await;
834 paid_confirmations_by_key(expired_candidates, &evidence)
835}
836
837fn paid_confirmations_by_key(
842 expired_candidates: &[(XorName, Vec<PeerId>)],
843 evidence: &HashMap<XorName, KeyVerificationEvidence>,
844) -> HashMap<XorName, HashSet<PeerId>> {
845 let mut confirmed_by_key: HashMap<XorName, HashSet<PeerId>> = HashMap::new();
846 for (key, target_peers) in expired_candidates {
847 let Some(key_evidence) = evidence.get(key) else {
848 continue;
849 };
850 let confirmed: HashSet<PeerId> = key_evidence
851 .paid_list
852 .iter()
853 .filter(|&(peer, status)| {
854 *status == PaidListEvidence::Confirmed && target_peers.contains(peer)
855 })
856 .map(|(peer, _)| *peer)
857 .collect();
858 if !confirmed.is_empty() {
859 confirmed_by_key.insert(*key, confirmed);
860 }
861 }
862 confirmed_by_key
863}
864
865async fn peers_with_mature_repair_proofs(
870 key: &XorName,
871 target_peers: &[PeerId],
872 current_close_peers: &HashSet<PeerId>,
873 repair_proofs: &Arc<RwLock<RepairProofs>>,
874 current_sync_epoch: u64,
875 now: Instant,
876) -> Vec<PeerId> {
877 let mut proofs = repair_proofs.write().await;
878 target_peers
879 .iter()
880 .filter(|peer| {
881 proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch, now)
882 })
883 .copied()
884 .collect()
885}
886
887async fn prune_scan_start(
888 sync_state: &Arc<RwLock<NeighborSyncState>>,
889 stored_key_count: usize,
890) -> usize {
891 if stored_key_count == 0 {
892 return 0;
893 }
894 sync_state.read().await.prune_cursor % stored_key_count
895}
896
897async fn advance_prune_cursor(
898 sync_state: &Arc<RwLock<NeighborSyncState>>,
899 stored_key_count: usize,
900 scan_start: usize,
901 last_selected_offset: Option<usize>,
902) {
903 if stored_key_count == 0 {
904 sync_state.write().await.prune_cursor = 0;
905 return;
906 }
907
908 let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
909 sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
910}
911
912async fn delete_stored_records(
913 keys_to_delete: &[XorName],
914 storage: &Arc<LmdbStorage>,
915 paid_list: &Arc<PaidList>,
916 repair_proofs: &Arc<RwLock<RepairProofs>>,
917) -> usize {
918 let mut pruned = 0;
919
920 for key in keys_to_delete {
921 if let Err(e) = storage.delete(key).await {
922 warn!("Failed to prune record {}: {e}", hex::encode(key));
923 } else {
924 pruned += 1;
925 paid_list.clear_record_out_of_range(key);
926 repair_proofs.write().await.remove_key(key);
927 paid_list.set_paid_out_of_range(key);
931 debug!("Pruned out-of-range record {}", hex::encode(key));
932 }
933 }
934
935 pruned
936}
937
938async fn collect_record_prune_proofs(
949 candidates: &[RecordPruneCandidate],
950 storage: &Arc<LmdbStorage>,
951 p2p_node: &Arc<P2PNode>,
952 config: &ReplicationConfig,
953 sync_state: &Arc<RwLock<NeighborSyncState>>,
954) -> HashMap<XorName, HashSet<PeerId>> {
955 if candidates.is_empty() {
956 return HashMap::new();
957 }
958
959 let report_state = PruneAuditReportState::default();
960 let mut requests = stream::iter(build_peer_audit_challenges(candidates))
961 .map(|(peer, key)| {
962 peer_proves_record(
963 peer,
964 key,
965 storage,
966 p2p_node,
967 config,
968 sync_state,
969 &report_state,
970 )
971 })
972 .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
973
974 let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
975 while let Some(proof) = requests.next().await {
976 if let Some((peer, key)) = proof {
977 present_by_key.entry(key).or_default().insert(peer);
978 }
979 }
980
981 present_by_key
982}
983
984async fn revalidated_record_prune_keys(
985 candidates: &[RecordPruneCandidate],
986 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
987 self_id: &PeerId,
988 paid_list: &Arc<PaidList>,
989 p2p_node: &Arc<P2PNode>,
990 config: &ReplicationConfig,
991 commitment_state: Option<&Arc<ResponderCommitmentState>>,
992) -> (Vec<XorName>, usize) {
993 let mut keys_to_delete = Vec::new();
994 let mut cleared = 0;
995 let now = Instant::now();
996
997 for candidate in candidates {
998 if let Some(cs) = commitment_state {
1003 if cs.is_held(&candidate.key) {
1004 continue;
1005 }
1006 }
1007
1008 let (storage_admission_group, strict_close_group) =
1009 record_prune_lookup_groups(&candidate.key, p2p_node, config).await;
1010
1011 if storage_admission_group
1012 .iter()
1013 .any(|n| n.peer_id == *self_id)
1014 {
1015 if paid_list
1016 .record_out_of_range_since(&candidate.key)
1017 .is_some()
1018 {
1019 paid_list.clear_record_out_of_range(&candidate.key);
1020 cleared += 1;
1021 }
1022 continue;
1023 }
1024
1025 let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
1026 continue;
1027 };
1028 let elapsed = now
1029 .checked_duration_since(first_seen)
1030 .unwrap_or(Duration::ZERO);
1031 if elapsed < config.prune_hysteresis_duration {
1032 continue;
1033 }
1034
1035 let current_target_peers = remote_close_group_peers(&strict_close_group, self_id);
1036 if current_target_peers.is_empty() {
1037 warn!(
1038 "Cannot prune {}: current close group has no remote peers",
1039 hex::encode(candidate.key)
1040 );
1041 continue;
1042 }
1043
1044 let proofs_needed = prune_proofs_needed(current_target_peers.len());
1045 if target_peers_reported_present(
1046 &candidate.key,
1047 ¤t_target_peers,
1048 present_by_key,
1049 proofs_needed,
1050 ) {
1051 keys_to_delete.push(candidate.key);
1052 } else {
1053 debug!(
1054 "Deferring prune for {} until all but one of the current close group \
1055 report it",
1056 hex::encode(candidate.key)
1057 );
1058 }
1059 }
1060
1061 (keys_to_delete, cleared)
1062}
1063
1064fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
1065 let mut challenges = Vec::new();
1066 for candidate in candidates {
1067 for peer in &candidate.target_peers {
1068 challenges.push((*peer, candidate.key));
1069 }
1070 }
1071 challenges
1072}
1073
1074#[cfg(test)]
1075fn confirmed_keys_from_presence(
1076 candidates: &[RecordPruneCandidate],
1077 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1078 proofs_needed: usize,
1079) -> Vec<XorName> {
1080 candidates
1081 .iter()
1082 .filter(|candidate| {
1083 target_peers_reported_present(
1084 &candidate.key,
1085 &candidate.target_peers,
1086 present_by_key,
1087 proofs_needed,
1088 )
1089 })
1090 .map(|candidate| candidate.key)
1091 .collect()
1092}
1093
1094fn prune_proofs_needed(group_size: usize) -> usize {
1105 if group_size <= 2 {
1106 group_size
1107 } else {
1108 group_size - 1
1109 }
1110}
1111
1112fn target_peers_reported_present(
1117 key: &XorName,
1118 target_peers: &[PeerId],
1119 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1120 proofs_needed: usize,
1121) -> bool {
1122 if proofs_needed == 0 {
1123 return false;
1124 }
1125 let Some(present_peers) = present_by_key.get(key) else {
1126 return false;
1127 };
1128 let proven = present_peers
1131 .iter()
1132 .filter(|peer| target_peers.contains(peer))
1133 .count();
1134 proven >= proofs_needed
1135}
1136
1137async fn peer_proves_record(
1140 peer: PeerId,
1141 key: XorName,
1142 storage: &Arc<LmdbStorage>,
1143 p2p_node: &Arc<P2PNode>,
1144 config: &ReplicationConfig,
1145 sync_state: &Arc<RwLock<NeighborSyncState>>,
1146 report_state: &PruneAuditReportState,
1147) -> Option<(PeerId, XorName)> {
1148 let local_bytes = local_record_bytes(&key, storage).await?;
1149
1150 let (challenge_id, nonce) = {
1151 let mut rng = rand::thread_rng();
1152 (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
1153 };
1154 let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
1155 let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
1156 else {
1157 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1161 return None;
1162 };
1163
1164 let status =
1165 prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
1166 if prune_audit_response_clears_bootstrap_claim(status) {
1167 clear_prune_bootstrap_claim(&peer, sync_state).await;
1168 }
1169
1170 match status {
1171 PruneAuditStatus::Proven => Some((peer, key)),
1172 PruneAuditStatus::Bootstrapping => {
1173 report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
1174 .await;
1175 None
1176 }
1177 PruneAuditStatus::Failed => {
1178 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1179 None
1180 }
1181 }
1182}
1183
1184fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
1185 matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
1186}
1187
1188fn encode_prune_audit_challenge(
1194 peer: &PeerId,
1195 key: XorName,
1196 challenge_id: u64,
1197 nonce: [u8; 32],
1198) -> Option<Vec<u8>> {
1199 let challenge = AuditChallenge {
1200 challenge_id,
1201 nonce,
1202 challenged_peer_id: *peer.as_bytes(),
1203 keys: vec![key],
1204 };
1205 let msg = ReplicationMessage {
1206 request_id: challenge_id,
1207 body: ReplicationMessageBody::AuditChallenge(challenge),
1208 };
1209 let encoded = match msg.encode() {
1210 Ok(data) => data,
1211 Err(e) => {
1212 warn!(
1213 "Failed to encode prune audit challenge for {} against {peer}: {e}",
1214 hex::encode(key),
1215 );
1216 return None;
1217 }
1218 };
1219 Some(encoded)
1220}
1221
1222async fn send_prune_audit_challenge(
1223 peer: &PeerId,
1224 key: &XorName,
1225 encoded: Vec<u8>,
1226 p2p_node: &Arc<P2PNode>,
1227 config: &ReplicationConfig,
1228) -> Option<ReplicationMessage> {
1229 let response = match p2p_node
1230 .send_request(
1231 peer,
1232 REPLICATION_PROTOCOL_ID,
1233 encoded,
1234 config.prune_audit_response_timeout,
1235 )
1236 .await
1237 {
1238 Ok(response) => response,
1239 Err(e) => {
1240 debug!(
1241 "Prune audit challenge for {} against {peer} failed: {e}",
1242 hex::encode(key)
1243 );
1244 return None;
1245 }
1246 };
1247
1248 let decoded = match ReplicationMessage::decode(&response.data) {
1249 Ok(msg) => msg,
1250 Err(e) => {
1251 warn!("Failed to decode prune audit response from {peer}: {e}");
1252 return None;
1253 }
1254 };
1255
1256 Some(decoded)
1257}
1258
1259fn prune_audit_response_status(
1260 decoded: ReplicationMessage,
1261 challenge_id: u64,
1262 peer: &PeerId,
1263 key: &XorName,
1264 nonce: &[u8; 32],
1265 local_bytes: &[u8],
1266) -> PruneAuditStatus {
1267 match decoded.body {
1268 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
1269 challenge_id: resp_id,
1270 digests,
1271 }) => {
1272 if resp_id != challenge_id {
1273 warn!("Prune audit challenge ID mismatch from {peer}");
1274 return PruneAuditStatus::Failed;
1275 }
1276 let [digest] = digests.as_slice() else {
1277 warn!(
1278 "Prune audit response from {peer} returned {} digests for one challenged key",
1279 digests.len(),
1280 );
1281 return PruneAuditStatus::Failed;
1282 };
1283 if *digest == ABSENT_KEY_DIGEST {
1284 warn!(
1285 "Prune audit proof from {peer} failed for {}: peer reports key absent",
1286 hex::encode(key)
1287 );
1288 return PruneAuditStatus::Failed;
1289 }
1290 if audit_digest_proves_key(peer, key, nonce, local_bytes, digest) {
1291 PruneAuditStatus::Proven
1292 } else {
1293 warn!(
1294 "Prune audit proof from {peer} failed for {}: digest mismatch",
1295 hex::encode(key)
1296 );
1297 PruneAuditStatus::Failed
1298 }
1299 }
1300 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
1301 challenge_id: resp_id,
1302 }) => {
1303 if resp_id == challenge_id {
1304 warn!(
1305 "Prune audit proof for {} blocked by bootstrap claim from {peer}",
1306 hex::encode(key)
1307 );
1308 PruneAuditStatus::Bootstrapping
1309 } else {
1310 warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
1311 PruneAuditStatus::Failed
1312 }
1313 }
1314 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
1315 challenge_id: resp_id,
1316 reason,
1317 }) => {
1318 if resp_id == challenge_id {
1319 warn!(
1320 "Prune audit proof for {} rejected by {peer}: {reason}",
1321 hex::encode(key)
1322 );
1323 } else {
1324 warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
1325 }
1326 PruneAuditStatus::Failed
1327 }
1328 _ => {
1329 warn!("Unexpected prune audit response type from {peer}");
1330 PruneAuditStatus::Failed
1331 }
1332 }
1333}
1334
1335async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
1336 match storage.get_raw(key).await {
1337 Ok(Some(bytes)) => Some(bytes),
1338 Ok(None) => {
1339 debug!(
1340 "Cannot prune-audit {}: local record disappeared",
1341 hex::encode(key)
1342 );
1343 None
1344 }
1345 Err(e) => {
1346 warn!(
1347 "Cannot prune-audit {}: failed to read local record: {e}",
1348 hex::encode(key)
1349 );
1350 None
1351 }
1352 }
1353}
1354
1355fn audit_digest_proves_key(
1356 peer: &PeerId,
1357 key: &XorName,
1358 nonce: &[u8; 32],
1359 local_bytes: &[u8],
1360 digest: &[u8; 32],
1361) -> bool {
1362 if *digest == ABSENT_KEY_DIGEST {
1363 return false;
1364 }
1365 let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
1366 *digest == expected
1367}
1368
1369async fn report_prune_audit_failure_once(
1370 peer: &PeerId,
1371 key: &XorName,
1372 p2p_node: &Arc<P2PNode>,
1373 config: &ReplicationConfig,
1374 report_state: &PruneAuditReportState,
1375) -> bool {
1376 let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
1377 && reserve_prune_audit_failure_report(report_state, peer).await;
1378 if !should_report {
1379 return false;
1380 }
1381
1382 p2p_node
1383 .report_trust_event(
1384 peer,
1385 saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
1386 )
1387 .await;
1388 true
1389}
1390
1391async fn reserve_prune_audit_failure_report(
1392 report_state: &PruneAuditReportState,
1393 peer: &PeerId,
1394) -> bool {
1395 report_state.audit_failures.write().await.insert(*peer)
1396}
1397
1398async fn reserve_prune_bootstrap_abuse_report(
1399 report_state: &PruneAuditReportState,
1400 peer: &PeerId,
1401) -> bool {
1402 report_state.bootstrap_abuse.write().await.insert(*peer)
1403}
1404
1405async fn report_prune_bootstrap_claim(
1406 peer: &PeerId,
1407 key: &XorName,
1408 p2p_node: &Arc<P2PNode>,
1409 config: &ReplicationConfig,
1410 sync_state: &Arc<RwLock<NeighborSyncState>>,
1411 report_state: &PruneAuditReportState,
1412) {
1413 if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
1414 return;
1415 }
1416
1417 let observation = {
1418 let now = Instant::now();
1419 let mut state = sync_state.write().await;
1420 (
1421 now,
1422 state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
1423 )
1424 };
1425
1426 let (now, observation) = observation;
1427 match observation {
1428 BootstrapClaimObservation::WithinGrace { .. } => {
1429 debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
1430 return;
1431 }
1432 BootstrapClaimObservation::PastGrace { first_seen } => {
1433 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1434 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1435 return;
1436 }
1437 warn!(
1438 "Prune audit: peer {peer} claiming bootstrap past grace period \
1439 ({:?} > {:?}), reporting abuse",
1440 now.duration_since(first_seen),
1441 config.bootstrap_claim_grace_period,
1442 );
1443 }
1444 BootstrapClaimObservation::Repeated { first_seen } => {
1445 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1446 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1447 return;
1448 }
1449 warn!(
1450 "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
1451 first claim was {:?} ago, reporting abuse",
1452 now.duration_since(first_seen),
1453 );
1454 }
1455 }
1456
1457 p2p_node
1458 .report_trust_event(
1459 peer,
1460 saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1461 )
1462 .await;
1463}
1464
1465async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
1466 let removed = {
1467 let mut state = sync_state.write().await;
1468 state.clear_active_bootstrap_claim(peer)
1469 };
1470 if removed {
1471 debug!("Prune audit: cleared active bootstrap claim for {peer}");
1472 }
1473}
1474
1475async fn peer_is_currently_responsible(
1476 peer: &PeerId,
1477 key: &XorName,
1478 p2p_node: &Arc<P2PNode>,
1479 config: &ReplicationConfig,
1480) -> bool {
1481 let closest = p2p_node
1482 .dht_manager()
1483 .find_closest_nodes_local_with_self(key, config.close_group_size)
1484 .await;
1485 closest.iter().any(|node| node.peer_id == *peer)
1486}
1487
1488#[cfg(test)]
1489#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1490mod tests {
1491 use super::*;
1492
1493 fn peer_id_from_byte(b: u8) -> PeerId {
1494 let mut bytes = [0u8; 32];
1495 bytes[0] = b;
1496 PeerId::from_bytes(bytes)
1497 }
1498
1499 fn key_from_byte(b: u8) -> XorName {
1500 [b; 32]
1501 }
1502
1503 fn peer_ids(count: usize) -> Vec<PeerId> {
1504 (0..count)
1505 .map(|idx| peer_id_from_byte(u8::try_from(idx + 1).expect("peer byte")))
1506 .collect()
1507 }
1508
1509 fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
1510 RecordPruneCandidate { key, target_peers }
1511 }
1512
1513 #[test]
1514 fn prune_audit_challenges_are_one_per_candidate_peer() {
1515 let peer_a = peer_id_from_byte(1);
1516 let peer_b = peer_id_from_byte(2);
1517 let key_a = key_from_byte(0xA);
1518 let key_b = key_from_byte(0xB);
1519 let candidates = vec![
1520 candidate(key_a, vec![peer_a, peer_b]),
1521 candidate(key_b, vec![peer_b]),
1522 ];
1523
1524 let mut challenges = build_peer_audit_challenges(&candidates);
1525 challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1526
1527 let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
1528 expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1529 assert_eq!(challenges, expected);
1530 }
1531
1532 #[test]
1533 fn confirmed_keys_require_quorum_of_target_peers_present() {
1534 let peer_a = peer_id_from_byte(1);
1535 let peer_b = peer_id_from_byte(2);
1536 let peer_c = peer_id_from_byte(3);
1537 let key = key_from_byte(0xC);
1538 let candidates = vec![candidate(key, vec![peer_a, peer_b, peer_c])];
1539 let mut present_by_key = HashMap::new();
1540 present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
1541
1542 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1545 assert_eq!(confirmed, vec![key]);
1546
1547 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 3);
1549 assert!(confirmed.is_empty());
1550 }
1551
1552 #[test]
1553 fn confirmed_keys_defer_below_quorum_or_missing_peer_evidence() {
1554 let peer_a = peer_id_from_byte(1);
1555 let peer_b = peer_id_from_byte(2);
1556 let quorum_key = key_from_byte(0xD);
1557 let below_quorum_key = key_from_byte(0xE);
1558 let missing_key = key_from_byte(0xF);
1559 let candidates = vec![
1560 candidate(quorum_key, vec![peer_a, peer_b]),
1561 candidate(below_quorum_key, vec![peer_a, peer_b]),
1562 candidate(missing_key, vec![peer_a, peer_b]),
1563 ];
1564 let mut present_by_key = HashMap::new();
1565 present_by_key.insert(quorum_key, HashSet::from([peer_a, peer_b]));
1566 present_by_key.insert(below_quorum_key, HashSet::from([peer_a]));
1567
1568 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1569
1570 assert_eq!(confirmed, vec![quorum_key]);
1571 }
1572
1573 #[test]
1574 fn prune_proofs_needed_tolerates_exactly_one_lagging_peer() {
1575 assert_eq!(prune_proofs_needed(0), 0);
1576 assert_eq!(prune_proofs_needed(1), 1);
1578 assert_eq!(prune_proofs_needed(2), 2);
1579 assert_eq!(prune_proofs_needed(3), 2);
1580 assert_eq!(prune_proofs_needed(5), 4);
1581 assert_eq!(prune_proofs_needed(7), 6);
1583 }
1584
1585 #[test]
1586 fn paid_prune_confirmations_are_three_quarters_rounded_up() {
1587 assert_eq!(paid_prune_confirmations_needed(0), 0);
1588 assert_eq!(paid_prune_confirmations_needed(1), 1);
1589 assert_eq!(paid_prune_confirmations_needed(2), 2);
1590 assert_eq!(paid_prune_confirmations_needed(4), 3);
1591 assert_eq!(paid_prune_confirmations_needed(20), 15);
1593 }
1594
1595 #[test]
1596 fn paid_prune_peer_budget_allows_overlapping_targets() {
1597 let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS);
1598 let mut selected_peers = HashSet::new();
1599
1600 assert!(reserve_paid_prune_peer_budget(&peers, &mut selected_peers));
1601 assert_eq!(
1602 selected_peers.len(),
1603 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1604 );
1605
1606 let overlapping_targets = vec![peers[0], peers[1]];
1607 assert!(reserve_paid_prune_peer_budget(
1608 &overlapping_targets,
1609 &mut selected_peers,
1610 ));
1611 assert_eq!(
1612 selected_peers.len(),
1613 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1614 );
1615 }
1616
1617 #[test]
1618 fn paid_prune_peer_budget_rejects_new_peers_past_cap() {
1619 let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS + 1);
1620 let mut selected_peers = HashSet::new();
1621
1622 assert!(reserve_paid_prune_peer_budget(
1623 &peers[..MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS],
1624 &mut selected_peers,
1625 ));
1626 assert!(!reserve_paid_prune_peer_budget(
1627 &peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS..],
1628 &mut selected_peers,
1629 ));
1630 assert_eq!(
1631 selected_peers.len(),
1632 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1633 );
1634 assert!(!selected_peers.contains(&peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS]));
1635 }
1636
1637 #[test]
1638 fn paid_confirmations_count_only_confirmed_target_peers() {
1639 let confirmed_peer = peer_id_from_byte(1);
1640 let not_found_peer = peer_id_from_byte(2);
1641 let unresolved_peer = peer_id_from_byte(3);
1642 let outsider = peer_id_from_byte(4);
1643 let key = key_from_byte(0x21);
1644 let candidates = vec![(key, vec![confirmed_peer, not_found_peer, unresolved_peer])];
1645
1646 let mut evidence = HashMap::new();
1647 evidence.insert(
1648 key,
1649 KeyVerificationEvidence {
1650 presence: HashMap::new(),
1651 paid_list: HashMap::from([
1652 (confirmed_peer, PaidListEvidence::Confirmed),
1653 (not_found_peer, PaidListEvidence::NotFound),
1654 (unresolved_peer, PaidListEvidence::Unresolved),
1655 (outsider, PaidListEvidence::Confirmed),
1657 ]),
1658 },
1659 );
1660
1661 let confirmed_by_key = paid_confirmations_by_key(&candidates, &evidence);
1662
1663 assert_eq!(
1664 confirmed_by_key.get(&key),
1665 Some(&HashSet::from([confirmed_peer])),
1666 "only Confirmed answers from target peers may count",
1667 );
1668 }
1669
1670 #[test]
1671 fn paid_confirmations_skip_keys_without_evidence() {
1672 let peer = peer_id_from_byte(1);
1673 let key = key_from_byte(0x22);
1674 let candidates = vec![(key, vec![peer])];
1675
1676 let confirmed_by_key = paid_confirmations_by_key(&candidates, &HashMap::new());
1677
1678 assert!(confirmed_by_key.is_empty());
1679 }
1680
1681 #[test]
1682 fn zero_quorum_never_confirms() {
1683 let peer_a = peer_id_from_byte(1);
1684 let key = key_from_byte(0x10);
1685 let mut present_by_key = HashMap::new();
1686 present_by_key.insert(key, HashSet::from([peer_a]));
1687
1688 assert!(!target_peers_reported_present(
1689 &key,
1690 &[peer_a],
1691 &present_by_key,
1692 0
1693 ));
1694 }
1695
1696 #[test]
1697 fn proofs_from_non_target_peers_do_not_count_toward_quorum() {
1698 let target = peer_id_from_byte(1);
1699 let outsider = peer_id_from_byte(2);
1700 let key = key_from_byte(0x11);
1701 let mut present_by_key = HashMap::new();
1702 present_by_key.insert(key, HashSet::from([outsider]));
1703
1704 assert!(!target_peers_reported_present(
1705 &key,
1706 &[target],
1707 &present_by_key,
1708 1
1709 ));
1710 }
1711
1712 #[test]
1713 fn duplicated_target_peer_counts_once_toward_quorum() {
1714 let peer = peer_id_from_byte(1);
1715 let key = key_from_byte(0x12);
1716 let mut present_by_key = HashMap::new();
1717 present_by_key.insert(key, HashSet::from([peer]));
1718
1719 assert!(!target_peers_reported_present(
1720 &key,
1721 &[peer, peer],
1722 &present_by_key,
1723 2
1724 ));
1725 }
1726
1727 #[test]
1728 fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
1729 let peer = peer_id_from_byte(1);
1730 let other_peer = peer_id_from_byte(2);
1731 let key = key_from_byte(0x11);
1732 let other_key = key_from_byte(0x12);
1733 let nonce = [0xAA; 32];
1734 let other_nonce = [0xBB; 32];
1735 let bytes = b"record bytes";
1736 let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
1737
1738 assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
1739 assert!(!audit_digest_proves_key(
1740 &other_peer,
1741 &key,
1742 &nonce,
1743 bytes,
1744 &digest
1745 ));
1746 assert!(!audit_digest_proves_key(
1747 &peer, &other_key, &nonce, bytes, &digest
1748 ));
1749 assert!(!audit_digest_proves_key(
1750 &peer,
1751 &key,
1752 &other_nonce,
1753 bytes,
1754 &digest
1755 ));
1756 assert!(!audit_digest_proves_key(
1757 &peer,
1758 &key,
1759 &nonce,
1760 b"different bytes",
1761 &digest
1762 ));
1763 assert!(!audit_digest_proves_key(
1764 &peer,
1765 &key,
1766 &nonce,
1767 bytes,
1768 &ABSENT_KEY_DIGEST
1769 ));
1770 }
1771
1772 #[tokio::test]
1773 async fn prune_cursor_advances_past_selected_budget_window() {
1774 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1775 state.write().await.prune_cursor = 2;
1776
1777 let start = prune_scan_start(&state, 10).await;
1778 advance_prune_cursor(&state, 10, start, Some(3)).await;
1779
1780 assert_eq!(state.read().await.prune_cursor, 6);
1781 }
1782
1783 #[tokio::test]
1784 async fn prune_cursor_advances_even_when_no_candidate_selected() {
1785 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1786 state.write().await.prune_cursor = 9;
1787
1788 let start = prune_scan_start(&state, 10).await;
1789 advance_prune_cursor(&state, 10, start, None).await;
1790
1791 assert_eq!(state.read().await.prune_cursor, 0);
1792 }
1793
1794 #[tokio::test]
1795 async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1796 let peer = peer_id_from_byte(1);
1797 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1798 let first_seen = Instant::now();
1799 state
1800 .write()
1801 .await
1802 .bootstrap_claims
1803 .insert(peer, first_seen);
1804 state
1805 .write()
1806 .await
1807 .bootstrap_claim_history
1808 .insert(peer, first_seen);
1809
1810 clear_prune_bootstrap_claim(&peer, &state).await;
1811
1812 let state = state.read().await;
1813 assert!(!state.bootstrap_claims.contains_key(&peer));
1814 assert!(state.bootstrap_claim_history.contains_key(&peer));
1815 }
1816
1817 #[test]
1818 fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1819 assert!(prune_audit_response_clears_bootstrap_claim(
1820 PruneAuditStatus::Proven
1821 ));
1822 assert!(prune_audit_response_clears_bootstrap_claim(
1823 PruneAuditStatus::Failed
1824 ));
1825 assert!(!prune_audit_response_clears_bootstrap_claim(
1826 PruneAuditStatus::Bootstrapping
1827 ));
1828 }
1829
1830 #[tokio::test]
1831 async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1832 let peer = peer_id_from_byte(1);
1833 let other_peer = peer_id_from_byte(2);
1834 let report_state = PruneAuditReportState::default();
1835
1836 assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1837 assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1838 assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1839
1840 let reported = report_state.audit_failures.read().await;
1841 assert_eq!(reported.len(), 2);
1842 assert!(reported.contains(&peer));
1843 assert!(reported.contains(&other_peer));
1844 }
1845
1846 #[tokio::test]
1847 async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1848 let peer = peer_id_from_byte(1);
1849 let other_peer = peer_id_from_byte(2);
1850 let report_state = PruneAuditReportState::default();
1851
1852 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1853 assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1854 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1855
1856 let reported = report_state.bootstrap_abuse.read().await;
1857 assert_eq!(reported.len(), 2);
1858 assert!(reported.contains(&peer));
1859 assert!(reported.contains(&other_peer));
1860 }
1861}