1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::commitment_state::ResponderCommitmentState;
21use crate::replication::config::{
22 storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT,
23 MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID,
24};
25use crate::replication::paid_list::PaidList;
26use crate::replication::protocol::{
27 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
28 ReplicationMessageBody, ABSENT_KEY_DIGEST,
29};
30use crate::replication::quorum::{self, VerificationTargets};
31use crate::replication::types::{
32 BootstrapClaimObservation, KeyVerificationEvidence, NeighborSyncState, PaidListEvidence,
33 RepairProofs,
34};
35use crate::storage::LmdbStorage;
36
37use super::REPLICATION_TRUST_WEIGHT;
38
39const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
40
41const MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS: usize = 32;
44const MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS: usize = MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES;
47
48#[derive(Debug, Default)]
54pub struct PruneResult {
55 pub records_pruned: usize,
57 pub records_marked_out_of_range: usize,
59 pub records_cleared: usize,
61 pub paid_entries_pruned: usize,
63 pub paid_entries_marked: usize,
65 pub paid_entries_cleared: usize,
67}
68
69pub struct PrunePassContext<'a> {
71 pub self_id: &'a PeerId,
73 pub storage: &'a Arc<LmdbStorage>,
75 pub paid_list: &'a Arc<PaidList>,
77 pub p2p_node: &'a Arc<P2PNode>,
79 pub config: &'a ReplicationConfig,
81 pub sync_state: &'a Arc<RwLock<NeighborSyncState>>,
83 pub repair_proofs: &'a Arc<RwLock<RepairProofs>>,
85 pub current_sync_epoch: u64,
87 #[cfg(any(test, feature = "test-utils"))]
89 pub repair_proof_now: Option<Instant>,
90 pub allow_remote_prune_audits: bool,
92 pub commitment_state: Option<&'a Arc<ResponderCommitmentState>>,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100enum PruneAuditStatus {
101 Proven,
102 Failed,
103 Bootstrapping,
104}
105
106#[derive(Debug, Default)]
107struct RecordPruneStats {
108 marked: usize,
109 cleared: usize,
110 pruned: usize,
111 held_by_commitment: usize,
112}
113
114#[derive(Debug, Default)]
115struct PaidPruneStats {
116 marked: usize,
117 cleared: usize,
118 pruned: usize,
119}
120
121#[derive(Debug, Default)]
122struct PaidPruneDeferredCounts {
123 entry_budget: usize,
124 remote_gate: usize,
125 peer_budget: usize,
126}
127
128impl PaidPruneDeferredCounts {
129 fn log(&self) {
130 if self.entry_budget > 0 {
131 debug!(
132 "Deferred {} expired PaidForList entries beyond the per-pass verification cap \
133 ({MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS})",
134 self.entry_budget,
135 );
136 }
137
138 if self.remote_gate > 0 {
139 debug!(
140 "Deferred {} expired PaidForList entries until bootstrap drain allows remote \
141 paid-prune verification",
142 self.remote_gate,
143 );
144 }
145
146 if self.peer_budget > 0 {
147 debug!(
148 "Deferred {} expired PaidForList entries beyond the per-pass paid-prune peer cap \
149 ({MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS})",
150 self.peer_budget,
151 );
152 }
153 }
154}
155
156#[derive(Debug, Clone)]
157struct RecordPruneCandidate {
158 key: XorName,
159 target_peers: Vec<PeerId>,
160}
161
162struct RecordPruneKeyOutcome {
163 marked: bool,
164 state: RecordPruneKeyState,
165}
166
167impl Default for RecordPruneKeyOutcome {
168 fn default() -> Self {
169 Self {
170 marked: false,
171 state: RecordPruneKeyState::None,
172 }
173 }
174}
175
176enum RecordPruneKeyState {
177 None,
178 Cleared,
179 BootstrapDeferred,
180 BudgetDeferred,
181 HeldByCommitment,
185 Candidate(RecordPruneCandidate),
186}
187
188enum PaidPruneKeyState {
189 None,
190 RemoteDeferred,
191 EntryBudgetDeferred,
192 PeerBudgetDeferred,
193 Candidate(Vec<PeerId>),
194}
195
196#[derive(Default)]
197struct PruneAuditReportState {
198 audit_failures: RwLock<HashSet<PeerId>>,
199 bootstrap_abuse: RwLock<HashSet<PeerId>>,
200}
201
202pub async fn run_prune_pass(
230 self_id: &PeerId,
231 storage: &Arc<LmdbStorage>,
232 paid_list: &Arc<PaidList>,
233 p2p_node: &Arc<P2PNode>,
234 config: &ReplicationConfig,
235 sync_state: &Arc<RwLock<NeighborSyncState>>,
236 allow_remote_prune_audits: bool,
237) -> PruneResult {
238 let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
239 run_prune_pass_with_context(PrunePassContext {
240 self_id,
241 storage,
242 paid_list,
243 p2p_node,
244 config,
245 sync_state,
246 repair_proofs: &repair_proofs,
247 current_sync_epoch: 0,
248 #[cfg(any(test, feature = "test-utils"))]
249 repair_proof_now: None,
250 allow_remote_prune_audits,
251 commitment_state: None,
252 })
253 .await
254}
255
256pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult {
258 let (stored_count, record_stats) = prune_stored_records(&ctx).await;
259 let now = Instant::now();
260 let (paid_count, paid_stats) = prune_paid_entries(
261 ctx.self_id,
262 ctx.paid_list,
263 ctx.p2p_node,
264 ctx.config,
265 now,
266 ctx.allow_remote_prune_audits,
267 )
268 .await;
269
270 let result = PruneResult {
271 records_pruned: record_stats.pruned,
272 records_marked_out_of_range: record_stats.marked,
273 records_cleared: record_stats.cleared,
274 paid_entries_pruned: paid_stats.pruned,
275 paid_entries_marked: paid_stats.marked,
276 paid_entries_cleared: paid_stats.cleared,
277 };
278
279 info!(
280 "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
281 result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
282 );
283
284 result
285}
286
287#[allow(clippy::too_many_lines)]
290async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) {
291 let stored_keys = match ctx.storage.all_keys().await {
292 Ok(keys) => keys,
293 Err(e) => {
294 warn!("Failed to read stored keys for pruning: {e}");
295 return (0, RecordPruneStats::default());
296 }
297 };
298
299 let now = Instant::now();
300 let mut stats = RecordPruneStats::default();
301 let mut candidates = Vec::new();
302 let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
303 let mut budget_deferred = 0usize;
304 let mut bootstrap_deferred = 0usize;
305 let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await;
306 let mut last_selected_offset = None;
307
308 for offset in 0..stored_keys.len() {
309 let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
310 let (storage_admission_group, strict_close_group) =
311 record_prune_lookup_groups(key, ctx.p2p_node, ctx.config).await;
312
313 let outcome = evaluate_record_prune_key(
314 ctx,
315 key,
316 &storage_admission_group,
317 &strict_close_group,
318 now,
319 &mut audit_challenge_budget,
320 )
321 .await;
322 if outcome.marked {
323 stats.marked += 1;
324 }
325 match outcome.state {
326 RecordPruneKeyState::None => {}
327 RecordPruneKeyState::Cleared => stats.cleared += 1,
328 RecordPruneKeyState::BootstrapDeferred => {
329 bootstrap_deferred = bootstrap_deferred.saturating_add(1);
330 }
331 RecordPruneKeyState::BudgetDeferred => {
332 budget_deferred = budget_deferred.saturating_add(1);
333 }
334 RecordPruneKeyState::HeldByCommitment => {
335 stats.held_by_commitment = stats.held_by_commitment.saturating_add(1);
336 }
337 RecordPruneKeyState::Candidate(candidate) => {
338 last_selected_offset = Some(offset);
339 candidates.push(candidate);
340 }
341 }
342 }
343
344 advance_prune_cursor(
345 ctx.sync_state,
346 stored_keys.len(),
347 scan_start,
348 last_selected_offset,
349 )
350 .await;
351
352 if bootstrap_deferred > 0 {
353 debug!(
354 "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
355 remote prune-confirmation audits"
356 );
357 }
358
359 if budget_deferred > 0 {
360 debug!(
361 "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
362 ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
363 );
364 }
365
366 if stats.held_by_commitment > 0 {
367 debug!(
368 "Vetoed {} prune candidate(s) still committed under a recently-gossiped \
369 commitment (bounded reprieve until they age out of the retention window)",
370 stats.held_by_commitment
371 );
372 }
373
374 let present_by_key = collect_record_prune_proofs(
375 &candidates,
376 ctx.storage,
377 ctx.p2p_node,
378 ctx.config,
379 ctx.sync_state,
380 )
381 .await;
382 let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
383 &candidates,
384 &present_by_key,
385 ctx.self_id,
386 ctx.paid_list,
387 ctx.p2p_node,
388 ctx.config,
389 ctx.commitment_state,
390 )
391 .await;
392 stats.cleared += revalidated_cleared;
393 stats.pruned = delete_stored_records(
394 &keys_to_delete,
395 ctx.storage,
396 ctx.paid_list,
397 ctx.repair_proofs,
398 )
399 .await;
400
401 (stored_keys.len(), stats)
402}
403
404async fn record_prune_lookup_groups(
405 key: &XorName,
406 p2p_node: &Arc<P2PNode>,
407 config: &ReplicationConfig,
408) -> (Vec<DHTNode>, Vec<DHTNode>) {
409 let dht = p2p_node.dht_manager();
410 let storage_admission_group = dht
411 .find_closest_nodes_local_with_self(key, storage_admission_width(config.close_group_size))
412 .await;
413 let strict_close_group = dht
414 .find_closest_nodes_local_with_self(key, config.close_group_size)
415 .await;
416 (storage_admission_group, strict_close_group)
417}
418
419async fn evaluate_record_prune_key(
420 ctx: &PrunePassContext<'_>,
421 key: &XorName,
422 storage_admission_group: &[DHTNode],
423 strict_close_group: &[DHTNode],
424 now: Instant,
425 audit_challenge_budget: &mut usize,
426) -> RecordPruneKeyOutcome {
427 let mut outcome = RecordPruneKeyOutcome::default();
428 let is_responsible = storage_admission_group
429 .iter()
430 .any(|node| node.peer_id == *ctx.self_id);
431
432 if is_responsible {
433 if ctx.paid_list.record_out_of_range_since(key).is_some() {
434 ctx.paid_list.clear_record_out_of_range(key);
435 outcome.state = RecordPruneKeyState::Cleared;
436 }
437 return outcome;
438 }
439
440 if let Some(cs) = ctx.commitment_state {
452 if cs.is_held(key) {
453 outcome.state = RecordPruneKeyState::HeldByCommitment;
454 return outcome;
455 }
456 }
457
458 if ctx.paid_list.record_out_of_range_since(key).is_none() {
459 outcome.marked = true;
460 }
461 ctx.paid_list.set_record_out_of_range(key);
462
463 let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else {
464 return outcome;
465 };
466 let elapsed = now
467 .checked_duration_since(first_seen)
468 .unwrap_or(Duration::ZERO);
469 if elapsed < ctx.config.prune_hysteresis_duration {
470 return outcome;
471 }
472
473 if !ctx.allow_remote_prune_audits {
474 outcome.state = RecordPruneKeyState::BootstrapDeferred;
475 return outcome;
476 }
477
478 let target_peers = remote_close_group_peers(strict_close_group, ctx.self_id);
479 if target_peers.is_empty() {
480 warn!(
481 "Cannot prune {}: current close group has no remote peers",
482 hex::encode(key)
483 );
484 return outcome;
485 }
486
487 let current_close_peers: HashSet<PeerId> =
491 strict_close_group.iter().map(|node| node.peer_id).collect();
492 #[cfg(any(test, feature = "test-utils"))]
493 let repair_proof_now = ctx.repair_proof_now.unwrap_or(now);
494 #[cfg(not(any(test, feature = "test-utils")))]
495 let repair_proof_now = now;
496 let audit_targets = peers_with_mature_repair_proofs(
497 key,
498 &target_peers,
499 ¤t_close_peers,
500 ctx.repair_proofs,
501 ctx.current_sync_epoch,
502 repair_proof_now,
503 )
504 .await;
505 let proofs_needed = prune_proofs_needed(target_peers.len());
506 if proofs_needed == 0 || audit_targets.len() < proofs_needed {
507 debug!(
508 "Deferring prune for {} until enough of the close group has mature \
509 repair proofs",
510 hex::encode(key)
511 );
512 return outcome;
513 }
514
515 if audit_targets.len() > *audit_challenge_budget {
516 outcome.state = RecordPruneKeyState::BudgetDeferred;
517 return outcome;
518 }
519
520 *audit_challenge_budget -= audit_targets.len();
521 outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate {
522 key: *key,
523 target_peers: audit_targets,
524 });
525 outcome
526}
527
528async fn prune_paid_entries(
529 self_id: &PeerId,
530 paid_list: &Arc<PaidList>,
531 p2p_node: &Arc<P2PNode>,
532 config: &ReplicationConfig,
533 now: Instant,
534 allow_remote_prune_audits: bool,
535) -> (usize, PaidPruneStats) {
536 let paid_keys = match paid_list.all_keys() {
537 Ok(keys) => keys,
538 Err(e) => {
539 warn!("Failed to read PaidForList for pruning: {e}");
540 return (0, PaidPruneStats::default());
541 }
542 };
543
544 let dht = p2p_node.dht_manager();
545 let mut stats = PaidPruneStats::default();
546 let mut expired_candidates: Vec<(XorName, Vec<PeerId>)> = Vec::new();
547 let mut deferred_counts = PaidPruneDeferredCounts::default();
548 let mut selected_verification_peers = HashSet::new();
549 let scan_start = paid_list.paid_prune_scan_start(paid_keys.len());
552 let mut last_selected_offset = None;
553
554 for offset in 0..paid_keys.len() {
555 let key = &paid_keys[(scan_start + offset) % paid_keys.len()];
556 let closest: Vec<DHTNode> = dht
557 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
558 .await;
559 let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
560
561 if in_paid_group {
562 if paid_list.paid_out_of_range_since(key).is_some() {
563 paid_list.clear_paid_out_of_range(key);
564 stats.cleared += 1;
565 }
566 } else {
567 if paid_list.paid_out_of_range_since(key).is_none() {
568 stats.marked += 1;
569 }
570 paid_list.set_paid_out_of_range(key);
571
572 if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
573 let elapsed = now
574 .checked_duration_since(first_seen)
575 .unwrap_or(Duration::ZERO);
576 if elapsed >= config.prune_hysteresis_duration {
577 match select_paid_prune_candidate(
578 key,
579 &closest,
580 self_id,
581 allow_remote_prune_audits,
582 expired_candidates.len(),
583 &mut selected_verification_peers,
584 ) {
585 PaidPruneKeyState::None => {}
586 PaidPruneKeyState::RemoteDeferred => {
587 deferred_counts.remote_gate =
588 deferred_counts.remote_gate.saturating_add(1);
589 }
590 PaidPruneKeyState::EntryBudgetDeferred => {
591 deferred_counts.entry_budget =
592 deferred_counts.entry_budget.saturating_add(1);
593 }
594 PaidPruneKeyState::PeerBudgetDeferred => {
595 deferred_counts.peer_budget =
596 deferred_counts.peer_budget.saturating_add(1);
597 }
598 PaidPruneKeyState::Candidate(target_peers) => {
599 expired_candidates.push((*key, target_peers));
600 last_selected_offset = Some(offset);
601 }
602 }
603 }
604 }
605 }
606 }
607
608 paid_list.advance_paid_prune_cursor(paid_keys.len(), scan_start, last_selected_offset);
609 deferred_counts.log();
610
611 let confirmed_by_key =
612 collect_paid_prune_confirmations(&expired_candidates, p2p_node, config).await;
613 let (paid_keys_to_delete, revalidated_cleared) = revalidated_paid_prune_keys(
614 &expired_candidates,
615 &confirmed_by_key,
616 self_id,
617 paid_list,
618 p2p_node,
619 config,
620 )
621 .await;
622 stats.cleared += revalidated_cleared;
623 stats.pruned = delete_paid_entries(&paid_keys_to_delete, paid_list).await;
624
625 (paid_keys.len(), stats)
626}
627
628fn select_paid_prune_candidate(
629 key: &XorName,
630 closest: &[DHTNode],
631 self_id: &PeerId,
632 allow_remote_prune_audits: bool,
633 selected_candidate_count: usize,
634 selected_verification_peers: &mut HashSet<PeerId>,
635) -> PaidPruneKeyState {
636 if !allow_remote_prune_audits {
637 return PaidPruneKeyState::RemoteDeferred;
638 }
639
640 let target_peers = remote_close_group_peers(closest, self_id);
641 if target_peers.is_empty() {
642 warn!(
643 "Cannot prune paid entry {}: current paid close group has no remote peers",
644 hex::encode(key)
645 );
646 return PaidPruneKeyState::None;
647 }
648
649 if selected_candidate_count >= MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS {
650 return PaidPruneKeyState::EntryBudgetDeferred;
651 }
652
653 if !reserve_paid_prune_peer_budget(&target_peers, selected_verification_peers) {
654 return PaidPruneKeyState::PeerBudgetDeferred;
655 }
656
657 PaidPruneKeyState::Candidate(target_peers)
658}
659
660async fn delete_paid_entries(keys_to_delete: &[XorName], paid_list: &Arc<PaidList>) -> usize {
661 if keys_to_delete.is_empty() {
662 return 0;
663 }
664
665 match paid_list.remove_batch(keys_to_delete).await {
666 Ok(count) => {
667 debug!("Pruned {count} out-of-range PaidForList entries");
668 count
669 }
670 Err(e) => {
671 warn!("Failed to prune PaidForList entries: {e}");
672 0
673 }
674 }
675}
676
677async fn revalidated_paid_prune_keys(
686 expired_candidates: &[(XorName, Vec<PeerId>)],
687 confirmed_by_key: &HashMap<XorName, HashSet<PeerId>>,
688 self_id: &PeerId,
689 paid_list: &Arc<PaidList>,
690 p2p_node: &Arc<P2PNode>,
691 config: &ReplicationConfig,
692) -> (Vec<XorName>, usize) {
693 let dht = p2p_node.dht_manager();
694 let mut keys_to_delete = Vec::new();
695 let mut cleared = 0;
696 let now = Instant::now();
697
698 for (key, _) in expired_candidates {
699 let closest: Vec<DHTNode> = dht
700 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
701 .await;
702
703 if closest.iter().any(|n| n.peer_id == *self_id) {
704 if paid_list.paid_out_of_range_since(key).is_some() {
705 paid_list.clear_paid_out_of_range(key);
706 cleared += 1;
707 }
708 continue;
709 }
710
711 let Some(first_seen) = paid_list.paid_out_of_range_since(key) else {
712 continue;
713 };
714 let elapsed = now
715 .checked_duration_since(first_seen)
716 .unwrap_or(Duration::ZERO);
717 if elapsed < config.prune_hysteresis_duration {
718 continue;
719 }
720
721 let current_target_peers = remote_close_group_peers(&closest, self_id);
722 if current_target_peers.is_empty() {
723 warn!(
724 "Cannot prune paid entry {}: current paid close group has no remote peers",
725 hex::encode(key)
726 );
727 continue;
728 }
729
730 let confirmations_needed = paid_prune_confirmations_needed(current_target_peers.len());
731 if target_peers_reported_present(
732 key,
733 ¤t_target_peers,
734 confirmed_by_key,
735 confirmations_needed,
736 ) {
737 keys_to_delete.push(*key);
738 } else {
739 debug!(
740 "Deferring paid-entry prune for {} until enough of the current paid \
741 close group confirm it",
742 hex::encode(key)
743 );
744 }
745 }
746
747 (keys_to_delete, cleared)
748}
749
750fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
751 close_group
752 .iter()
753 .filter(|node| node.peer_id != *self_id)
754 .map(|node| node.peer_id)
755 .collect()
756}
757
758fn paid_prune_confirmations_needed(group_size: usize) -> usize {
767 (3 * group_size).div_ceil(4)
768}
769
770fn reserve_paid_prune_peer_budget(
771 target_peers: &[PeerId],
772 selected_verification_peers: &mut HashSet<PeerId>,
773) -> bool {
774 let new_peer_count = target_peers
775 .iter()
776 .filter(|peer| !selected_verification_peers.contains(peer))
777 .count();
778 if selected_verification_peers
779 .len()
780 .saturating_add(new_peer_count)
781 > MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS
782 {
783 return false;
784 }
785
786 selected_verification_peers.extend(target_peers.iter().copied());
787 true
788}
789
790async fn collect_paid_prune_confirmations(
797 expired_candidates: &[(XorName, Vec<PeerId>)],
798 p2p_node: &Arc<P2PNode>,
799 config: &ReplicationConfig,
800) -> HashMap<XorName, HashSet<PeerId>> {
801 if expired_candidates.is_empty() {
802 return HashMap::new();
803 }
804
805 let mut targets = VerificationTargets::default();
806 let mut keys = Vec::new();
807 for (key, target_peers) in expired_candidates {
808 if target_peers.is_empty() {
809 warn!(
810 "Cannot prune paid entry {}: current paid close group has no remote peers",
811 hex::encode(key)
812 );
813 continue;
814 }
815 keys.push(*key);
816 for peer in target_peers {
817 targets.all_peers.insert(*peer);
818 targets.peer_to_keys.entry(*peer).or_default().push(*key);
819 targets
820 .peer_to_paid_keys
821 .entry(*peer)
822 .or_default()
823 .insert(*key);
824 }
825 targets.paid_targets.insert(*key, target_peers.clone());
826 targets.paid_group_sizes.insert(*key, target_peers.len());
827 }
828 for keys_list in targets.peer_to_keys.values_mut() {
829 keys_list.sort_unstable();
830 keys_list.dedup();
831 }
832
833 let evidence = quorum::run_verification_round(&keys, &targets, p2p_node, config).await;
834 paid_confirmations_by_key(expired_candidates, &evidence)
835}
836
837fn paid_confirmations_by_key(
842 expired_candidates: &[(XorName, Vec<PeerId>)],
843 evidence: &HashMap<XorName, KeyVerificationEvidence>,
844) -> HashMap<XorName, HashSet<PeerId>> {
845 let mut confirmed_by_key: HashMap<XorName, HashSet<PeerId>> = HashMap::new();
846 for (key, target_peers) in expired_candidates {
847 let Some(key_evidence) = evidence.get(key) else {
848 continue;
849 };
850 let confirmed: HashSet<PeerId> = key_evidence
851 .paid_list
852 .iter()
853 .filter(|&(peer, status)| {
854 *status == PaidListEvidence::Confirmed && target_peers.contains(peer)
855 })
856 .map(|(peer, _)| *peer)
857 .collect();
858 if !confirmed.is_empty() {
859 confirmed_by_key.insert(*key, confirmed);
860 }
861 }
862 confirmed_by_key
863}
864
865async fn peers_with_mature_repair_proofs(
870 key: &XorName,
871 target_peers: &[PeerId],
872 current_close_peers: &HashSet<PeerId>,
873 repair_proofs: &Arc<RwLock<RepairProofs>>,
874 current_sync_epoch: u64,
875 now: Instant,
876) -> Vec<PeerId> {
877 let mut proofs = repair_proofs.write().await;
878 target_peers
879 .iter()
880 .filter(|peer| {
881 proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch, now)
882 })
883 .copied()
884 .collect()
885}
886
887async fn prune_scan_start(
888 sync_state: &Arc<RwLock<NeighborSyncState>>,
889 stored_key_count: usize,
890) -> usize {
891 if stored_key_count == 0 {
892 return 0;
893 }
894 sync_state.read().await.prune_cursor % stored_key_count
895}
896
897async fn advance_prune_cursor(
898 sync_state: &Arc<RwLock<NeighborSyncState>>,
899 stored_key_count: usize,
900 scan_start: usize,
901 last_selected_offset: Option<usize>,
902) {
903 if stored_key_count == 0 {
904 sync_state.write().await.prune_cursor = 0;
905 return;
906 }
907
908 let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
909 sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
910}
911
912async fn delete_stored_records(
913 keys_to_delete: &[XorName],
914 storage: &Arc<LmdbStorage>,
915 paid_list: &Arc<PaidList>,
916 repair_proofs: &Arc<RwLock<RepairProofs>>,
917) -> usize {
918 let mut pruned = 0;
919
920 for key in keys_to_delete {
921 if let Err(e) = storage.delete(key).await {
922 warn!("Failed to prune record {}: {e}", hex::encode(key));
923 } else {
924 pruned += 1;
925 paid_list.clear_record_out_of_range(key);
926 repair_proofs.write().await.remove_key(key);
927 paid_list.set_paid_out_of_range(key);
931 debug!("Pruned out-of-range record {}", hex::encode(key));
932 }
933 }
934
935 pruned
936}
937
938async fn collect_record_prune_proofs(
949 candidates: &[RecordPruneCandidate],
950 storage: &Arc<LmdbStorage>,
951 p2p_node: &Arc<P2PNode>,
952 config: &ReplicationConfig,
953 sync_state: &Arc<RwLock<NeighborSyncState>>,
954) -> HashMap<XorName, HashSet<PeerId>> {
955 if candidates.is_empty() {
956 return HashMap::new();
957 }
958
959 let report_state = PruneAuditReportState::default();
960 let mut requests = stream::iter(build_peer_audit_challenges(candidates))
961 .map(|(peer, key)| {
962 peer_proves_record(
963 peer,
964 key,
965 storage,
966 p2p_node,
967 config,
968 sync_state,
969 &report_state,
970 )
971 })
972 .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
973
974 let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
975 while let Some(proof) = requests.next().await {
976 if let Some((peer, key)) = proof {
977 present_by_key.entry(key).or_default().insert(peer);
978 }
979 }
980
981 present_by_key
982}
983
984async fn revalidated_record_prune_keys(
985 candidates: &[RecordPruneCandidate],
986 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
987 self_id: &PeerId,
988 paid_list: &Arc<PaidList>,
989 p2p_node: &Arc<P2PNode>,
990 config: &ReplicationConfig,
991 commitment_state: Option<&Arc<ResponderCommitmentState>>,
992) -> (Vec<XorName>, usize) {
993 let mut keys_to_delete = Vec::new();
994 let mut cleared = 0;
995 let now = Instant::now();
996
997 for candidate in candidates {
998 if let Some(cs) = commitment_state {
1003 if cs.is_held(&candidate.key) {
1004 continue;
1005 }
1006 }
1007
1008 let (storage_admission_group, strict_close_group) =
1009 record_prune_lookup_groups(&candidate.key, p2p_node, config).await;
1010
1011 if storage_admission_group
1012 .iter()
1013 .any(|n| n.peer_id == *self_id)
1014 {
1015 if paid_list
1016 .record_out_of_range_since(&candidate.key)
1017 .is_some()
1018 {
1019 paid_list.clear_record_out_of_range(&candidate.key);
1020 cleared += 1;
1021 }
1022 continue;
1023 }
1024
1025 let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
1026 continue;
1027 };
1028 let elapsed = now
1029 .checked_duration_since(first_seen)
1030 .unwrap_or(Duration::ZERO);
1031 if elapsed < config.prune_hysteresis_duration {
1032 continue;
1033 }
1034
1035 let current_target_peers = remote_close_group_peers(&strict_close_group, self_id);
1036 if current_target_peers.is_empty() {
1037 warn!(
1038 "Cannot prune {}: current close group has no remote peers",
1039 hex::encode(candidate.key)
1040 );
1041 continue;
1042 }
1043
1044 let proofs_needed = prune_proofs_needed(current_target_peers.len());
1045 if target_peers_reported_present(
1046 &candidate.key,
1047 ¤t_target_peers,
1048 present_by_key,
1049 proofs_needed,
1050 ) {
1051 keys_to_delete.push(candidate.key);
1052 } else {
1053 debug!(
1054 "Deferring prune for {} until all but one of the current close group \
1055 report it",
1056 hex::encode(candidate.key)
1057 );
1058 }
1059 }
1060
1061 (keys_to_delete, cleared)
1062}
1063
1064fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
1065 let mut challenges = Vec::new();
1066 for candidate in candidates {
1067 for peer in &candidate.target_peers {
1068 challenges.push((*peer, candidate.key));
1069 }
1070 }
1071 challenges
1072}
1073
1074#[cfg(test)]
1075fn confirmed_keys_from_presence(
1076 candidates: &[RecordPruneCandidate],
1077 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1078 proofs_needed: usize,
1079) -> Vec<XorName> {
1080 candidates
1081 .iter()
1082 .filter(|candidate| {
1083 target_peers_reported_present(
1084 &candidate.key,
1085 &candidate.target_peers,
1086 present_by_key,
1087 proofs_needed,
1088 )
1089 })
1090 .map(|candidate| candidate.key)
1091 .collect()
1092}
1093
1094fn prune_proofs_needed(group_size: usize) -> usize {
1105 if group_size <= 2 {
1106 group_size
1107 } else {
1108 group_size - 1
1109 }
1110}
1111
1112fn target_peers_reported_present(
1117 key: &XorName,
1118 target_peers: &[PeerId],
1119 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1120 proofs_needed: usize,
1121) -> bool {
1122 if proofs_needed == 0 {
1123 return false;
1124 }
1125 let Some(present_peers) = present_by_key.get(key) else {
1126 return false;
1127 };
1128 let proven = present_peers
1131 .iter()
1132 .filter(|peer| target_peers.contains(peer))
1133 .count();
1134 proven >= proofs_needed
1135}
1136
1137async fn peer_proves_record(
1140 peer: PeerId,
1141 key: XorName,
1142 storage: &Arc<LmdbStorage>,
1143 p2p_node: &Arc<P2PNode>,
1144 config: &ReplicationConfig,
1145 sync_state: &Arc<RwLock<NeighborSyncState>>,
1146 report_state: &PruneAuditReportState,
1147) -> Option<(PeerId, XorName)> {
1148 let local_bytes = local_record_bytes(&key, storage).await?;
1149
1150 let (challenge_id, nonce) = {
1151 let mut rng = rand::thread_rng();
1152 (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
1153 };
1154 let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
1155 let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
1156 else {
1157 if crate::replication::config::TIMEOUT_EVICTION_ENABLED {
1166 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1167 } else {
1168 debug!(
1169 "Prune audit for {peer} key {} got no decodable response \
1170 (eviction disabled this release — not penalising)",
1171 hex::encode(key)
1172 );
1173 }
1174 return None;
1175 };
1176
1177 let status =
1178 prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
1179 if prune_audit_response_clears_bootstrap_claim(status) {
1180 clear_prune_bootstrap_claim(&peer, sync_state).await;
1181 }
1182
1183 match status {
1184 PruneAuditStatus::Proven => Some((peer, key)),
1185 PruneAuditStatus::Bootstrapping => {
1186 report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
1187 .await;
1188 None
1189 }
1190 PruneAuditStatus::Failed => {
1191 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1192 None
1193 }
1194 }
1195}
1196
1197fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
1198 matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
1199}
1200
1201fn encode_prune_audit_challenge(
1207 peer: &PeerId,
1208 key: XorName,
1209 challenge_id: u64,
1210 nonce: [u8; 32],
1211) -> Option<Vec<u8>> {
1212 let challenge = AuditChallenge {
1213 challenge_id,
1214 nonce,
1215 challenged_peer_id: *peer.as_bytes(),
1216 keys: vec![key],
1217 };
1218 let msg = ReplicationMessage {
1219 request_id: challenge_id,
1220 body: ReplicationMessageBody::AuditChallenge(challenge),
1221 };
1222 let encoded = match msg.encode() {
1223 Ok(data) => data,
1224 Err(e) => {
1225 warn!(
1226 "Failed to encode prune audit challenge for {} against {peer}: {e}",
1227 hex::encode(key),
1228 );
1229 return None;
1230 }
1231 };
1232 Some(encoded)
1233}
1234
1235async fn send_prune_audit_challenge(
1236 peer: &PeerId,
1237 key: &XorName,
1238 encoded: Vec<u8>,
1239 p2p_node: &Arc<P2PNode>,
1240 config: &ReplicationConfig,
1241) -> Option<ReplicationMessage> {
1242 let response = match p2p_node
1243 .send_request(
1244 peer,
1245 REPLICATION_PROTOCOL_ID,
1246 encoded,
1247 config.prune_audit_response_timeout,
1248 )
1249 .await
1250 {
1251 Ok(response) => response,
1252 Err(e) => {
1253 debug!(
1254 "Prune audit challenge for {} against {peer} failed: {e}",
1255 hex::encode(key)
1256 );
1257 return None;
1258 }
1259 };
1260
1261 let decoded = match ReplicationMessage::decode(&response.data) {
1262 Ok(msg) => msg,
1263 Err(e) => {
1264 warn!("Failed to decode prune audit response from {peer}: {e}");
1265 return None;
1266 }
1267 };
1268
1269 Some(decoded)
1270}
1271
1272fn prune_audit_response_status(
1273 decoded: ReplicationMessage,
1274 challenge_id: u64,
1275 peer: &PeerId,
1276 key: &XorName,
1277 nonce: &[u8; 32],
1278 local_bytes: &[u8],
1279) -> PruneAuditStatus {
1280 match decoded.body {
1281 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
1282 challenge_id: resp_id,
1283 digests,
1284 }) => {
1285 if resp_id != challenge_id {
1286 warn!("Prune audit challenge ID mismatch from {peer}");
1287 return PruneAuditStatus::Failed;
1288 }
1289 let [digest] = digests.as_slice() else {
1290 warn!(
1291 "Prune audit response from {peer} returned {} digests for one challenged key",
1292 digests.len(),
1293 );
1294 return PruneAuditStatus::Failed;
1295 };
1296 if *digest == ABSENT_KEY_DIGEST {
1297 warn!(
1298 "Prune audit proof from {peer} failed for {}: peer reports key absent",
1299 hex::encode(key)
1300 );
1301 return PruneAuditStatus::Failed;
1302 }
1303 if audit_digest_proves_key(peer, key, nonce, local_bytes, digest) {
1304 PruneAuditStatus::Proven
1305 } else {
1306 warn!(
1307 "Prune audit proof from {peer} failed for {}: digest mismatch",
1308 hex::encode(key)
1309 );
1310 PruneAuditStatus::Failed
1311 }
1312 }
1313 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
1314 challenge_id: resp_id,
1315 }) => {
1316 if resp_id == challenge_id {
1317 warn!(
1318 "Prune audit proof for {} blocked by bootstrap claim from {peer}",
1319 hex::encode(key)
1320 );
1321 PruneAuditStatus::Bootstrapping
1322 } else {
1323 warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
1324 PruneAuditStatus::Failed
1325 }
1326 }
1327 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
1328 challenge_id: resp_id,
1329 reason,
1330 }) => {
1331 if resp_id == challenge_id {
1332 warn!(
1333 "Prune audit proof for {} rejected by {peer}: {reason}",
1334 hex::encode(key)
1335 );
1336 } else {
1337 warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
1338 }
1339 PruneAuditStatus::Failed
1340 }
1341 _ => {
1342 warn!("Unexpected prune audit response type from {peer}");
1343 PruneAuditStatus::Failed
1344 }
1345 }
1346}
1347
1348async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
1349 match storage.get_raw(key).await {
1350 Ok(Some(bytes)) => Some(bytes),
1351 Ok(None) => {
1352 debug!(
1353 "Cannot prune-audit {}: local record disappeared",
1354 hex::encode(key)
1355 );
1356 None
1357 }
1358 Err(e) => {
1359 warn!(
1360 "Cannot prune-audit {}: failed to read local record: {e}",
1361 hex::encode(key)
1362 );
1363 None
1364 }
1365 }
1366}
1367
1368fn audit_digest_proves_key(
1369 peer: &PeerId,
1370 key: &XorName,
1371 nonce: &[u8; 32],
1372 local_bytes: &[u8],
1373 digest: &[u8; 32],
1374) -> bool {
1375 if *digest == ABSENT_KEY_DIGEST {
1376 return false;
1377 }
1378 let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
1379 *digest == expected
1380}
1381
1382async fn report_prune_audit_failure_once(
1383 peer: &PeerId,
1384 key: &XorName,
1385 p2p_node: &Arc<P2PNode>,
1386 config: &ReplicationConfig,
1387 report_state: &PruneAuditReportState,
1388) -> bool {
1389 let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
1390 && reserve_prune_audit_failure_report(report_state, peer).await;
1391 if !should_report {
1392 return false;
1393 }
1394
1395 p2p_node
1396 .report_trust_event(
1397 peer,
1398 saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
1399 )
1400 .await;
1401 true
1402}
1403
1404async fn reserve_prune_audit_failure_report(
1405 report_state: &PruneAuditReportState,
1406 peer: &PeerId,
1407) -> bool {
1408 report_state.audit_failures.write().await.insert(*peer)
1409}
1410
1411async fn reserve_prune_bootstrap_abuse_report(
1412 report_state: &PruneAuditReportState,
1413 peer: &PeerId,
1414) -> bool {
1415 report_state.bootstrap_abuse.write().await.insert(*peer)
1416}
1417
1418async fn report_prune_bootstrap_claim(
1419 peer: &PeerId,
1420 key: &XorName,
1421 p2p_node: &Arc<P2PNode>,
1422 config: &ReplicationConfig,
1423 sync_state: &Arc<RwLock<NeighborSyncState>>,
1424 report_state: &PruneAuditReportState,
1425) {
1426 if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
1427 return;
1428 }
1429
1430 let observation = {
1431 let now = Instant::now();
1432 let mut state = sync_state.write().await;
1433 (
1434 now,
1435 state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
1436 )
1437 };
1438
1439 let (now, observation) = observation;
1440 match observation {
1441 BootstrapClaimObservation::WithinGrace { .. } => {
1442 debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
1443 return;
1444 }
1445 BootstrapClaimObservation::PastGrace { first_seen } => {
1446 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1447 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1448 return;
1449 }
1450 warn!(
1451 "Prune audit: peer {peer} claiming bootstrap past grace period \
1452 ({:?} > {:?}), reporting abuse",
1453 now.duration_since(first_seen),
1454 config.bootstrap_claim_grace_period,
1455 );
1456 }
1457 BootstrapClaimObservation::Repeated { first_seen } => {
1458 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1459 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1460 return;
1461 }
1462 warn!(
1463 "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
1464 first claim was {:?} ago, reporting abuse",
1465 now.duration_since(first_seen),
1466 );
1467 }
1468 }
1469
1470 p2p_node
1471 .report_trust_event(
1472 peer,
1473 saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1474 )
1475 .await;
1476}
1477
1478async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
1479 let removed = {
1480 let mut state = sync_state.write().await;
1481 state.clear_active_bootstrap_claim(peer)
1482 };
1483 if removed {
1484 debug!("Prune audit: cleared active bootstrap claim for {peer}");
1485 }
1486}
1487
1488async fn peer_is_currently_responsible(
1489 peer: &PeerId,
1490 key: &XorName,
1491 p2p_node: &Arc<P2PNode>,
1492 config: &ReplicationConfig,
1493) -> bool {
1494 let closest = p2p_node
1495 .dht_manager()
1496 .find_closest_nodes_local_with_self(key, config.close_group_size)
1497 .await;
1498 closest.iter().any(|node| node.peer_id == *peer)
1499}
1500
1501#[cfg(test)]
1502#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1503mod tests {
1504 use super::*;
1505
1506 fn peer_id_from_byte(b: u8) -> PeerId {
1507 let mut bytes = [0u8; 32];
1508 bytes[0] = b;
1509 PeerId::from_bytes(bytes)
1510 }
1511
1512 fn key_from_byte(b: u8) -> XorName {
1513 [b; 32]
1514 }
1515
1516 fn peer_ids(count: usize) -> Vec<PeerId> {
1517 (0..count)
1518 .map(|idx| peer_id_from_byte(u8::try_from(idx + 1).expect("peer byte")))
1519 .collect()
1520 }
1521
1522 fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
1523 RecordPruneCandidate { key, target_peers }
1524 }
1525
1526 #[test]
1527 fn prune_audit_challenges_are_one_per_candidate_peer() {
1528 let peer_a = peer_id_from_byte(1);
1529 let peer_b = peer_id_from_byte(2);
1530 let key_a = key_from_byte(0xA);
1531 let key_b = key_from_byte(0xB);
1532 let candidates = vec![
1533 candidate(key_a, vec![peer_a, peer_b]),
1534 candidate(key_b, vec![peer_b]),
1535 ];
1536
1537 let mut challenges = build_peer_audit_challenges(&candidates);
1538 challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1539
1540 let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
1541 expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1542 assert_eq!(challenges, expected);
1543 }
1544
1545 #[test]
1546 fn confirmed_keys_require_quorum_of_target_peers_present() {
1547 let peer_a = peer_id_from_byte(1);
1548 let peer_b = peer_id_from_byte(2);
1549 let peer_c = peer_id_from_byte(3);
1550 let key = key_from_byte(0xC);
1551 let candidates = vec![candidate(key, vec![peer_a, peer_b, peer_c])];
1552 let mut present_by_key = HashMap::new();
1553 present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
1554
1555 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1558 assert_eq!(confirmed, vec![key]);
1559
1560 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 3);
1562 assert!(confirmed.is_empty());
1563 }
1564
1565 #[test]
1566 fn confirmed_keys_defer_below_quorum_or_missing_peer_evidence() {
1567 let peer_a = peer_id_from_byte(1);
1568 let peer_b = peer_id_from_byte(2);
1569 let quorum_key = key_from_byte(0xD);
1570 let below_quorum_key = key_from_byte(0xE);
1571 let missing_key = key_from_byte(0xF);
1572 let candidates = vec![
1573 candidate(quorum_key, vec![peer_a, peer_b]),
1574 candidate(below_quorum_key, vec![peer_a, peer_b]),
1575 candidate(missing_key, vec![peer_a, peer_b]),
1576 ];
1577 let mut present_by_key = HashMap::new();
1578 present_by_key.insert(quorum_key, HashSet::from([peer_a, peer_b]));
1579 present_by_key.insert(below_quorum_key, HashSet::from([peer_a]));
1580
1581 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1582
1583 assert_eq!(confirmed, vec![quorum_key]);
1584 }
1585
1586 #[test]
1587 fn prune_proofs_needed_tolerates_exactly_one_lagging_peer() {
1588 assert_eq!(prune_proofs_needed(0), 0);
1589 assert_eq!(prune_proofs_needed(1), 1);
1591 assert_eq!(prune_proofs_needed(2), 2);
1592 assert_eq!(prune_proofs_needed(3), 2);
1593 assert_eq!(prune_proofs_needed(5), 4);
1594 assert_eq!(prune_proofs_needed(7), 6);
1596 }
1597
1598 #[test]
1599 fn paid_prune_confirmations_are_three_quarters_rounded_up() {
1600 assert_eq!(paid_prune_confirmations_needed(0), 0);
1601 assert_eq!(paid_prune_confirmations_needed(1), 1);
1602 assert_eq!(paid_prune_confirmations_needed(2), 2);
1603 assert_eq!(paid_prune_confirmations_needed(4), 3);
1604 assert_eq!(paid_prune_confirmations_needed(20), 15);
1606 }
1607
1608 #[test]
1609 fn paid_prune_peer_budget_allows_overlapping_targets() {
1610 let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS);
1611 let mut selected_peers = HashSet::new();
1612
1613 assert!(reserve_paid_prune_peer_budget(&peers, &mut selected_peers));
1614 assert_eq!(
1615 selected_peers.len(),
1616 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1617 );
1618
1619 let overlapping_targets = vec![peers[0], peers[1]];
1620 assert!(reserve_paid_prune_peer_budget(
1621 &overlapping_targets,
1622 &mut selected_peers,
1623 ));
1624 assert_eq!(
1625 selected_peers.len(),
1626 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1627 );
1628 }
1629
1630 #[test]
1631 fn paid_prune_peer_budget_rejects_new_peers_past_cap() {
1632 let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS + 1);
1633 let mut selected_peers = HashSet::new();
1634
1635 assert!(reserve_paid_prune_peer_budget(
1636 &peers[..MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS],
1637 &mut selected_peers,
1638 ));
1639 assert!(!reserve_paid_prune_peer_budget(
1640 &peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS..],
1641 &mut selected_peers,
1642 ));
1643 assert_eq!(
1644 selected_peers.len(),
1645 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1646 );
1647 assert!(!selected_peers.contains(&peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS]));
1648 }
1649
1650 #[test]
1651 fn paid_confirmations_count_only_confirmed_target_peers() {
1652 let confirmed_peer = peer_id_from_byte(1);
1653 let not_found_peer = peer_id_from_byte(2);
1654 let unresolved_peer = peer_id_from_byte(3);
1655 let outsider = peer_id_from_byte(4);
1656 let key = key_from_byte(0x21);
1657 let candidates = vec![(key, vec![confirmed_peer, not_found_peer, unresolved_peer])];
1658
1659 let mut evidence = HashMap::new();
1660 evidence.insert(
1661 key,
1662 KeyVerificationEvidence {
1663 presence: HashMap::new(),
1664 paid_list: HashMap::from([
1665 (confirmed_peer, PaidListEvidence::Confirmed),
1666 (not_found_peer, PaidListEvidence::NotFound),
1667 (unresolved_peer, PaidListEvidence::Unresolved),
1668 (outsider, PaidListEvidence::Confirmed),
1670 ]),
1671 },
1672 );
1673
1674 let confirmed_by_key = paid_confirmations_by_key(&candidates, &evidence);
1675
1676 assert_eq!(
1677 confirmed_by_key.get(&key),
1678 Some(&HashSet::from([confirmed_peer])),
1679 "only Confirmed answers from target peers may count",
1680 );
1681 }
1682
1683 #[test]
1684 fn paid_confirmations_skip_keys_without_evidence() {
1685 let peer = peer_id_from_byte(1);
1686 let key = key_from_byte(0x22);
1687 let candidates = vec![(key, vec![peer])];
1688
1689 let confirmed_by_key = paid_confirmations_by_key(&candidates, &HashMap::new());
1690
1691 assert!(confirmed_by_key.is_empty());
1692 }
1693
1694 #[test]
1695 fn zero_quorum_never_confirms() {
1696 let peer_a = peer_id_from_byte(1);
1697 let key = key_from_byte(0x10);
1698 let mut present_by_key = HashMap::new();
1699 present_by_key.insert(key, HashSet::from([peer_a]));
1700
1701 assert!(!target_peers_reported_present(
1702 &key,
1703 &[peer_a],
1704 &present_by_key,
1705 0
1706 ));
1707 }
1708
1709 #[test]
1710 fn proofs_from_non_target_peers_do_not_count_toward_quorum() {
1711 let target = peer_id_from_byte(1);
1712 let outsider = peer_id_from_byte(2);
1713 let key = key_from_byte(0x11);
1714 let mut present_by_key = HashMap::new();
1715 present_by_key.insert(key, HashSet::from([outsider]));
1716
1717 assert!(!target_peers_reported_present(
1718 &key,
1719 &[target],
1720 &present_by_key,
1721 1
1722 ));
1723 }
1724
1725 #[test]
1726 fn duplicated_target_peer_counts_once_toward_quorum() {
1727 let peer = peer_id_from_byte(1);
1728 let key = key_from_byte(0x12);
1729 let mut present_by_key = HashMap::new();
1730 present_by_key.insert(key, HashSet::from([peer]));
1731
1732 assert!(!target_peers_reported_present(
1733 &key,
1734 &[peer, peer],
1735 &present_by_key,
1736 2
1737 ));
1738 }
1739
1740 #[test]
1741 fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
1742 let peer = peer_id_from_byte(1);
1743 let other_peer = peer_id_from_byte(2);
1744 let key = key_from_byte(0x11);
1745 let other_key = key_from_byte(0x12);
1746 let nonce = [0xAA; 32];
1747 let other_nonce = [0xBB; 32];
1748 let bytes = b"record bytes";
1749 let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
1750
1751 assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
1752 assert!(!audit_digest_proves_key(
1753 &other_peer,
1754 &key,
1755 &nonce,
1756 bytes,
1757 &digest
1758 ));
1759 assert!(!audit_digest_proves_key(
1760 &peer, &other_key, &nonce, bytes, &digest
1761 ));
1762 assert!(!audit_digest_proves_key(
1763 &peer,
1764 &key,
1765 &other_nonce,
1766 bytes,
1767 &digest
1768 ));
1769 assert!(!audit_digest_proves_key(
1770 &peer,
1771 &key,
1772 &nonce,
1773 b"different bytes",
1774 &digest
1775 ));
1776 assert!(!audit_digest_proves_key(
1777 &peer,
1778 &key,
1779 &nonce,
1780 bytes,
1781 &ABSENT_KEY_DIGEST
1782 ));
1783 }
1784
1785 #[tokio::test]
1786 async fn prune_cursor_advances_past_selected_budget_window() {
1787 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1788 state.write().await.prune_cursor = 2;
1789
1790 let start = prune_scan_start(&state, 10).await;
1791 advance_prune_cursor(&state, 10, start, Some(3)).await;
1792
1793 assert_eq!(state.read().await.prune_cursor, 6);
1794 }
1795
1796 #[tokio::test]
1797 async fn prune_cursor_advances_even_when_no_candidate_selected() {
1798 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1799 state.write().await.prune_cursor = 9;
1800
1801 let start = prune_scan_start(&state, 10).await;
1802 advance_prune_cursor(&state, 10, start, None).await;
1803
1804 assert_eq!(state.read().await.prune_cursor, 0);
1805 }
1806
1807 #[tokio::test]
1808 async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1809 let peer = peer_id_from_byte(1);
1810 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1811 let first_seen = Instant::now();
1812 state
1813 .write()
1814 .await
1815 .bootstrap_claims
1816 .insert(peer, first_seen);
1817 state
1818 .write()
1819 .await
1820 .bootstrap_claim_history
1821 .insert(peer, first_seen);
1822
1823 clear_prune_bootstrap_claim(&peer, &state).await;
1824
1825 let state = state.read().await;
1826 assert!(!state.bootstrap_claims.contains_key(&peer));
1827 assert!(state.bootstrap_claim_history.contains_key(&peer));
1828 }
1829
1830 #[test]
1831 fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1832 assert!(prune_audit_response_clears_bootstrap_claim(
1833 PruneAuditStatus::Proven
1834 ));
1835 assert!(prune_audit_response_clears_bootstrap_claim(
1836 PruneAuditStatus::Failed
1837 ));
1838 assert!(!prune_audit_response_clears_bootstrap_claim(
1839 PruneAuditStatus::Bootstrapping
1840 ));
1841 }
1842
1843 #[tokio::test]
1844 async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1845 let peer = peer_id_from_byte(1);
1846 let other_peer = peer_id_from_byte(2);
1847 let report_state = PruneAuditReportState::default();
1848
1849 assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1850 assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1851 assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1852
1853 let reported = report_state.audit_failures.read().await;
1854 assert_eq!(reported.len(), 2);
1855 assert!(reported.contains(&peer));
1856 assert!(reported.contains(&other_peer));
1857 }
1858
1859 #[tokio::test]
1860 async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1861 let peer = peer_id_from_byte(1);
1862 let other_peer = peer_id_from_byte(2);
1863 let report_state = PruneAuditReportState::default();
1864
1865 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1866 assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1867 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1868
1869 let reported = report_state.bootstrap_abuse.read().await;
1870 assert_eq!(reported.len(), 2);
1871 assert!(reported.contains(&peer));
1872 assert!(reported.contains(&other_peer));
1873 }
1874}