1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::config::{
21 storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT,
22 MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID,
23};
24use crate::replication::paid_list::PaidList;
25use crate::replication::protocol::{
26 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
27 ReplicationMessageBody, ABSENT_KEY_DIGEST,
28};
29use crate::replication::quorum::{self, VerificationTargets};
30use crate::replication::types::{
31 BootstrapClaimObservation, KeyVerificationEvidence, NeighborSyncState, PaidListEvidence,
32 RepairProofs,
33};
34use crate::storage::LmdbStorage;
35
36use super::REPLICATION_TRUST_WEIGHT;
37
38const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
39
40const MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS: usize = 32;
43const MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS: usize = MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES;
46
47#[derive(Debug, Default)]
53pub struct PruneResult {
54 pub records_pruned: usize,
56 pub records_marked_out_of_range: usize,
58 pub records_cleared: usize,
60 pub paid_entries_pruned: usize,
62 pub paid_entries_marked: usize,
64 pub paid_entries_cleared: usize,
66}
67
68pub struct PrunePassContext<'a> {
70 pub self_id: &'a PeerId,
72 pub storage: &'a Arc<LmdbStorage>,
74 pub paid_list: &'a Arc<PaidList>,
76 pub p2p_node: &'a Arc<P2PNode>,
78 pub config: &'a ReplicationConfig,
80 pub sync_state: &'a Arc<RwLock<NeighborSyncState>>,
82 pub repair_proofs: &'a Arc<RwLock<RepairProofs>>,
84 pub current_sync_epoch: u64,
86 #[cfg(any(test, feature = "test-utils"))]
88 pub repair_proof_now: Option<Instant>,
89 pub allow_remote_prune_audits: bool,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94enum PruneAuditStatus {
95 Proven,
96 Failed,
97 Bootstrapping,
98}
99
100#[derive(Debug, Default)]
101struct RecordPruneStats {
102 marked: usize,
103 cleared: usize,
104 pruned: usize,
105}
106
107#[derive(Debug, Default)]
108struct PaidPruneStats {
109 marked: usize,
110 cleared: usize,
111 pruned: usize,
112}
113
114#[derive(Debug, Default)]
115struct PaidPruneDeferredCounts {
116 entry_budget: usize,
117 remote_gate: usize,
118 peer_budget: usize,
119}
120
121impl PaidPruneDeferredCounts {
122 fn log(&self) {
123 if self.entry_budget > 0 {
124 debug!(
125 "Deferred {} expired PaidForList entries beyond the per-pass verification cap \
126 ({MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS})",
127 self.entry_budget,
128 );
129 }
130
131 if self.remote_gate > 0 {
132 debug!(
133 "Deferred {} expired PaidForList entries until bootstrap drain allows remote \
134 paid-prune verification",
135 self.remote_gate,
136 );
137 }
138
139 if self.peer_budget > 0 {
140 debug!(
141 "Deferred {} expired PaidForList entries beyond the per-pass paid-prune peer cap \
142 ({MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS})",
143 self.peer_budget,
144 );
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
150struct RecordPruneCandidate {
151 key: XorName,
152 target_peers: Vec<PeerId>,
153}
154
155struct RecordPruneKeyOutcome {
156 marked: bool,
157 state: RecordPruneKeyState,
158}
159
160impl Default for RecordPruneKeyOutcome {
161 fn default() -> Self {
162 Self {
163 marked: false,
164 state: RecordPruneKeyState::None,
165 }
166 }
167}
168
169enum RecordPruneKeyState {
170 None,
171 Cleared,
172 BootstrapDeferred,
173 BudgetDeferred,
174 Candidate(RecordPruneCandidate),
175}
176
177enum PaidPruneKeyState {
178 None,
179 RemoteDeferred,
180 EntryBudgetDeferred,
181 PeerBudgetDeferred,
182 Candidate(Vec<PeerId>),
183}
184
185#[derive(Default)]
186struct PruneAuditReportState {
187 audit_failures: RwLock<HashSet<PeerId>>,
188 bootstrap_abuse: RwLock<HashSet<PeerId>>,
189}
190
191pub async fn run_prune_pass(
219 self_id: &PeerId,
220 storage: &Arc<LmdbStorage>,
221 paid_list: &Arc<PaidList>,
222 p2p_node: &Arc<P2PNode>,
223 config: &ReplicationConfig,
224 sync_state: &Arc<RwLock<NeighborSyncState>>,
225 allow_remote_prune_audits: bool,
226) -> PruneResult {
227 let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
228 run_prune_pass_with_context(PrunePassContext {
229 self_id,
230 storage,
231 paid_list,
232 p2p_node,
233 config,
234 sync_state,
235 repair_proofs: &repair_proofs,
236 current_sync_epoch: 0,
237 #[cfg(any(test, feature = "test-utils"))]
238 repair_proof_now: None,
239 allow_remote_prune_audits,
240 })
241 .await
242}
243
244pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult {
246 let (stored_count, record_stats) = prune_stored_records(&ctx).await;
247 let now = Instant::now();
248 let (paid_count, paid_stats) = prune_paid_entries(
249 ctx.self_id,
250 ctx.paid_list,
251 ctx.p2p_node,
252 ctx.config,
253 now,
254 ctx.allow_remote_prune_audits,
255 )
256 .await;
257
258 let result = PruneResult {
259 records_pruned: record_stats.pruned,
260 records_marked_out_of_range: record_stats.marked,
261 records_cleared: record_stats.cleared,
262 paid_entries_pruned: paid_stats.pruned,
263 paid_entries_marked: paid_stats.marked,
264 paid_entries_cleared: paid_stats.cleared,
265 };
266
267 info!(
268 "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
269 result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
270 );
271
272 result
273}
274
275async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) {
276 let stored_keys = match ctx.storage.all_keys().await {
277 Ok(keys) => keys,
278 Err(e) => {
279 warn!("Failed to read stored keys for pruning: {e}");
280 return (0, RecordPruneStats::default());
281 }
282 };
283
284 let now = Instant::now();
285 let mut stats = RecordPruneStats::default();
286 let mut candidates = Vec::new();
287 let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
288 let mut budget_deferred = 0usize;
289 let mut bootstrap_deferred = 0usize;
290 let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await;
291 let mut last_selected_offset = None;
292
293 for offset in 0..stored_keys.len() {
294 let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
295 let (storage_admission_group, strict_close_group) =
296 record_prune_lookup_groups(key, ctx.p2p_node, ctx.config).await;
297
298 let outcome = evaluate_record_prune_key(
299 ctx,
300 key,
301 &storage_admission_group,
302 &strict_close_group,
303 now,
304 &mut audit_challenge_budget,
305 )
306 .await;
307 if outcome.marked {
308 stats.marked += 1;
309 }
310 match outcome.state {
311 RecordPruneKeyState::None => {}
312 RecordPruneKeyState::Cleared => stats.cleared += 1,
313 RecordPruneKeyState::BootstrapDeferred => {
314 bootstrap_deferred = bootstrap_deferred.saturating_add(1);
315 }
316 RecordPruneKeyState::BudgetDeferred => {
317 budget_deferred = budget_deferred.saturating_add(1);
318 }
319 RecordPruneKeyState::Candidate(candidate) => {
320 last_selected_offset = Some(offset);
321 candidates.push(candidate);
322 }
323 }
324 }
325
326 advance_prune_cursor(
327 ctx.sync_state,
328 stored_keys.len(),
329 scan_start,
330 last_selected_offset,
331 )
332 .await;
333
334 if bootstrap_deferred > 0 {
335 debug!(
336 "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
337 remote prune-confirmation audits"
338 );
339 }
340
341 if budget_deferred > 0 {
342 debug!(
343 "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
344 ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
345 );
346 }
347
348 let present_by_key = collect_record_prune_proofs(
349 &candidates,
350 ctx.storage,
351 ctx.p2p_node,
352 ctx.config,
353 ctx.sync_state,
354 )
355 .await;
356 let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
357 &candidates,
358 &present_by_key,
359 ctx.self_id,
360 ctx.paid_list,
361 ctx.p2p_node,
362 ctx.config,
363 )
364 .await;
365 stats.cleared += revalidated_cleared;
366 stats.pruned = delete_stored_records(
367 &keys_to_delete,
368 ctx.storage,
369 ctx.paid_list,
370 ctx.repair_proofs,
371 )
372 .await;
373
374 (stored_keys.len(), stats)
375}
376
377async fn record_prune_lookup_groups(
378 key: &XorName,
379 p2p_node: &Arc<P2PNode>,
380 config: &ReplicationConfig,
381) -> (Vec<DHTNode>, Vec<DHTNode>) {
382 let dht = p2p_node.dht_manager();
383 let storage_admission_group = dht
384 .find_closest_nodes_local_with_self(key, storage_admission_width(config.close_group_size))
385 .await;
386 let strict_close_group = dht
387 .find_closest_nodes_local_with_self(key, config.close_group_size)
388 .await;
389 (storage_admission_group, strict_close_group)
390}
391
392async fn evaluate_record_prune_key(
393 ctx: &PrunePassContext<'_>,
394 key: &XorName,
395 storage_admission_group: &[DHTNode],
396 strict_close_group: &[DHTNode],
397 now: Instant,
398 audit_challenge_budget: &mut usize,
399) -> RecordPruneKeyOutcome {
400 let mut outcome = RecordPruneKeyOutcome::default();
401 let is_responsible = storage_admission_group
402 .iter()
403 .any(|node| node.peer_id == *ctx.self_id);
404
405 if is_responsible {
406 if ctx.paid_list.record_out_of_range_since(key).is_some() {
407 ctx.paid_list.clear_record_out_of_range(key);
408 outcome.state = RecordPruneKeyState::Cleared;
409 }
410 return outcome;
411 }
412
413 if ctx.paid_list.record_out_of_range_since(key).is_none() {
414 outcome.marked = true;
415 }
416 ctx.paid_list.set_record_out_of_range(key);
417
418 let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else {
419 return outcome;
420 };
421 let elapsed = now
422 .checked_duration_since(first_seen)
423 .unwrap_or(Duration::ZERO);
424 if elapsed < ctx.config.prune_hysteresis_duration {
425 return outcome;
426 }
427
428 if !ctx.allow_remote_prune_audits {
429 outcome.state = RecordPruneKeyState::BootstrapDeferred;
430 return outcome;
431 }
432
433 let target_peers = remote_close_group_peers(strict_close_group, ctx.self_id);
434 if target_peers.is_empty() {
435 warn!(
436 "Cannot prune {}: current close group has no remote peers",
437 hex::encode(key)
438 );
439 return outcome;
440 }
441
442 let current_close_peers: HashSet<PeerId> =
446 strict_close_group.iter().map(|node| node.peer_id).collect();
447 #[cfg(any(test, feature = "test-utils"))]
448 let repair_proof_now = ctx.repair_proof_now.unwrap_or(now);
449 #[cfg(not(any(test, feature = "test-utils")))]
450 let repair_proof_now = now;
451 let audit_targets = peers_with_mature_repair_proofs(
452 key,
453 &target_peers,
454 ¤t_close_peers,
455 ctx.repair_proofs,
456 ctx.current_sync_epoch,
457 repair_proof_now,
458 )
459 .await;
460 let proofs_needed = prune_proofs_needed(target_peers.len());
461 if proofs_needed == 0 || audit_targets.len() < proofs_needed {
462 debug!(
463 "Deferring prune for {} until enough of the close group has mature \
464 repair proofs",
465 hex::encode(key)
466 );
467 return outcome;
468 }
469
470 if audit_targets.len() > *audit_challenge_budget {
471 outcome.state = RecordPruneKeyState::BudgetDeferred;
472 return outcome;
473 }
474
475 *audit_challenge_budget -= audit_targets.len();
476 outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate {
477 key: *key,
478 target_peers: audit_targets,
479 });
480 outcome
481}
482
483async fn prune_paid_entries(
484 self_id: &PeerId,
485 paid_list: &Arc<PaidList>,
486 p2p_node: &Arc<P2PNode>,
487 config: &ReplicationConfig,
488 now: Instant,
489 allow_remote_prune_audits: bool,
490) -> (usize, PaidPruneStats) {
491 let paid_keys = match paid_list.all_keys() {
492 Ok(keys) => keys,
493 Err(e) => {
494 warn!("Failed to read PaidForList for pruning: {e}");
495 return (0, PaidPruneStats::default());
496 }
497 };
498
499 let dht = p2p_node.dht_manager();
500 let mut stats = PaidPruneStats::default();
501 let mut expired_candidates: Vec<(XorName, Vec<PeerId>)> = Vec::new();
502 let mut deferred_counts = PaidPruneDeferredCounts::default();
503 let mut selected_verification_peers = HashSet::new();
504 let scan_start = paid_list.paid_prune_scan_start(paid_keys.len());
507 let mut last_selected_offset = None;
508
509 for offset in 0..paid_keys.len() {
510 let key = &paid_keys[(scan_start + offset) % paid_keys.len()];
511 let closest: Vec<DHTNode> = dht
512 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
513 .await;
514 let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
515
516 if in_paid_group {
517 if paid_list.paid_out_of_range_since(key).is_some() {
518 paid_list.clear_paid_out_of_range(key);
519 stats.cleared += 1;
520 }
521 } else {
522 if paid_list.paid_out_of_range_since(key).is_none() {
523 stats.marked += 1;
524 }
525 paid_list.set_paid_out_of_range(key);
526
527 if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
528 let elapsed = now
529 .checked_duration_since(first_seen)
530 .unwrap_or(Duration::ZERO);
531 if elapsed >= config.prune_hysteresis_duration {
532 match select_paid_prune_candidate(
533 key,
534 &closest,
535 self_id,
536 allow_remote_prune_audits,
537 expired_candidates.len(),
538 &mut selected_verification_peers,
539 ) {
540 PaidPruneKeyState::None => {}
541 PaidPruneKeyState::RemoteDeferred => {
542 deferred_counts.remote_gate =
543 deferred_counts.remote_gate.saturating_add(1);
544 }
545 PaidPruneKeyState::EntryBudgetDeferred => {
546 deferred_counts.entry_budget =
547 deferred_counts.entry_budget.saturating_add(1);
548 }
549 PaidPruneKeyState::PeerBudgetDeferred => {
550 deferred_counts.peer_budget =
551 deferred_counts.peer_budget.saturating_add(1);
552 }
553 PaidPruneKeyState::Candidate(target_peers) => {
554 expired_candidates.push((*key, target_peers));
555 last_selected_offset = Some(offset);
556 }
557 }
558 }
559 }
560 }
561 }
562
563 paid_list.advance_paid_prune_cursor(paid_keys.len(), scan_start, last_selected_offset);
564 deferred_counts.log();
565
566 let confirmed_by_key =
567 collect_paid_prune_confirmations(&expired_candidates, p2p_node, config).await;
568 let (paid_keys_to_delete, revalidated_cleared) = revalidated_paid_prune_keys(
569 &expired_candidates,
570 &confirmed_by_key,
571 self_id,
572 paid_list,
573 p2p_node,
574 config,
575 )
576 .await;
577 stats.cleared += revalidated_cleared;
578 stats.pruned = delete_paid_entries(&paid_keys_to_delete, paid_list).await;
579
580 (paid_keys.len(), stats)
581}
582
583fn select_paid_prune_candidate(
584 key: &XorName,
585 closest: &[DHTNode],
586 self_id: &PeerId,
587 allow_remote_prune_audits: bool,
588 selected_candidate_count: usize,
589 selected_verification_peers: &mut HashSet<PeerId>,
590) -> PaidPruneKeyState {
591 if !allow_remote_prune_audits {
592 return PaidPruneKeyState::RemoteDeferred;
593 }
594
595 let target_peers = remote_close_group_peers(closest, self_id);
596 if target_peers.is_empty() {
597 warn!(
598 "Cannot prune paid entry {}: current paid close group has no remote peers",
599 hex::encode(key)
600 );
601 return PaidPruneKeyState::None;
602 }
603
604 if selected_candidate_count >= MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS {
605 return PaidPruneKeyState::EntryBudgetDeferred;
606 }
607
608 if !reserve_paid_prune_peer_budget(&target_peers, selected_verification_peers) {
609 return PaidPruneKeyState::PeerBudgetDeferred;
610 }
611
612 PaidPruneKeyState::Candidate(target_peers)
613}
614
615async fn delete_paid_entries(keys_to_delete: &[XorName], paid_list: &Arc<PaidList>) -> usize {
616 if keys_to_delete.is_empty() {
617 return 0;
618 }
619
620 match paid_list.remove_batch(keys_to_delete).await {
621 Ok(count) => {
622 debug!("Pruned {count} out-of-range PaidForList entries");
623 count
624 }
625 Err(e) => {
626 warn!("Failed to prune PaidForList entries: {e}");
627 0
628 }
629 }
630}
631
632async fn revalidated_paid_prune_keys(
641 expired_candidates: &[(XorName, Vec<PeerId>)],
642 confirmed_by_key: &HashMap<XorName, HashSet<PeerId>>,
643 self_id: &PeerId,
644 paid_list: &Arc<PaidList>,
645 p2p_node: &Arc<P2PNode>,
646 config: &ReplicationConfig,
647) -> (Vec<XorName>, usize) {
648 let dht = p2p_node.dht_manager();
649 let mut keys_to_delete = Vec::new();
650 let mut cleared = 0;
651 let now = Instant::now();
652
653 for (key, _) in expired_candidates {
654 let closest: Vec<DHTNode> = dht
655 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
656 .await;
657
658 if closest.iter().any(|n| n.peer_id == *self_id) {
659 if paid_list.paid_out_of_range_since(key).is_some() {
660 paid_list.clear_paid_out_of_range(key);
661 cleared += 1;
662 }
663 continue;
664 }
665
666 let Some(first_seen) = paid_list.paid_out_of_range_since(key) else {
667 continue;
668 };
669 let elapsed = now
670 .checked_duration_since(first_seen)
671 .unwrap_or(Duration::ZERO);
672 if elapsed < config.prune_hysteresis_duration {
673 continue;
674 }
675
676 let current_target_peers = remote_close_group_peers(&closest, self_id);
677 if current_target_peers.is_empty() {
678 warn!(
679 "Cannot prune paid entry {}: current paid close group has no remote peers",
680 hex::encode(key)
681 );
682 continue;
683 }
684
685 let confirmations_needed = paid_prune_confirmations_needed(current_target_peers.len());
686 if target_peers_reported_present(
687 key,
688 ¤t_target_peers,
689 confirmed_by_key,
690 confirmations_needed,
691 ) {
692 keys_to_delete.push(*key);
693 } else {
694 debug!(
695 "Deferring paid-entry prune for {} until enough of the current paid \
696 close group confirm it",
697 hex::encode(key)
698 );
699 }
700 }
701
702 (keys_to_delete, cleared)
703}
704
705fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
706 close_group
707 .iter()
708 .filter(|node| node.peer_id != *self_id)
709 .map(|node| node.peer_id)
710 .collect()
711}
712
713fn paid_prune_confirmations_needed(group_size: usize) -> usize {
722 (3 * group_size).div_ceil(4)
723}
724
725fn reserve_paid_prune_peer_budget(
726 target_peers: &[PeerId],
727 selected_verification_peers: &mut HashSet<PeerId>,
728) -> bool {
729 let new_peer_count = target_peers
730 .iter()
731 .filter(|peer| !selected_verification_peers.contains(peer))
732 .count();
733 if selected_verification_peers
734 .len()
735 .saturating_add(new_peer_count)
736 > MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS
737 {
738 return false;
739 }
740
741 selected_verification_peers.extend(target_peers.iter().copied());
742 true
743}
744
745async fn collect_paid_prune_confirmations(
752 expired_candidates: &[(XorName, Vec<PeerId>)],
753 p2p_node: &Arc<P2PNode>,
754 config: &ReplicationConfig,
755) -> HashMap<XorName, HashSet<PeerId>> {
756 if expired_candidates.is_empty() {
757 return HashMap::new();
758 }
759
760 let mut targets = VerificationTargets::default();
761 let mut keys = Vec::new();
762 for (key, target_peers) in expired_candidates {
763 if target_peers.is_empty() {
764 warn!(
765 "Cannot prune paid entry {}: current paid close group has no remote peers",
766 hex::encode(key)
767 );
768 continue;
769 }
770 keys.push(*key);
771 for peer in target_peers {
772 targets.all_peers.insert(*peer);
773 targets.peer_to_keys.entry(*peer).or_default().push(*key);
774 targets
775 .peer_to_paid_keys
776 .entry(*peer)
777 .or_default()
778 .insert(*key);
779 }
780 targets.paid_targets.insert(*key, target_peers.clone());
781 targets.paid_group_sizes.insert(*key, target_peers.len());
782 }
783 for keys_list in targets.peer_to_keys.values_mut() {
784 keys_list.sort_unstable();
785 keys_list.dedup();
786 }
787
788 let evidence = quorum::run_verification_round(&keys, &targets, p2p_node, config).await;
789 paid_confirmations_by_key(expired_candidates, &evidence)
790}
791
792fn paid_confirmations_by_key(
797 expired_candidates: &[(XorName, Vec<PeerId>)],
798 evidence: &HashMap<XorName, KeyVerificationEvidence>,
799) -> HashMap<XorName, HashSet<PeerId>> {
800 let mut confirmed_by_key: HashMap<XorName, HashSet<PeerId>> = HashMap::new();
801 for (key, target_peers) in expired_candidates {
802 let Some(key_evidence) = evidence.get(key) else {
803 continue;
804 };
805 let confirmed: HashSet<PeerId> = key_evidence
806 .paid_list
807 .iter()
808 .filter(|&(peer, status)| {
809 *status == PaidListEvidence::Confirmed && target_peers.contains(peer)
810 })
811 .map(|(peer, _)| *peer)
812 .collect();
813 if !confirmed.is_empty() {
814 confirmed_by_key.insert(*key, confirmed);
815 }
816 }
817 confirmed_by_key
818}
819
820async fn peers_with_mature_repair_proofs(
825 key: &XorName,
826 target_peers: &[PeerId],
827 current_close_peers: &HashSet<PeerId>,
828 repair_proofs: &Arc<RwLock<RepairProofs>>,
829 current_sync_epoch: u64,
830 now: Instant,
831) -> Vec<PeerId> {
832 let mut proofs = repair_proofs.write().await;
833 target_peers
834 .iter()
835 .filter(|peer| {
836 proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch, now)
837 })
838 .copied()
839 .collect()
840}
841
842async fn prune_scan_start(
843 sync_state: &Arc<RwLock<NeighborSyncState>>,
844 stored_key_count: usize,
845) -> usize {
846 if stored_key_count == 0 {
847 return 0;
848 }
849 sync_state.read().await.prune_cursor % stored_key_count
850}
851
852async fn advance_prune_cursor(
853 sync_state: &Arc<RwLock<NeighborSyncState>>,
854 stored_key_count: usize,
855 scan_start: usize,
856 last_selected_offset: Option<usize>,
857) {
858 if stored_key_count == 0 {
859 sync_state.write().await.prune_cursor = 0;
860 return;
861 }
862
863 let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
864 sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
865}
866
867async fn delete_stored_records(
868 keys_to_delete: &[XorName],
869 storage: &Arc<LmdbStorage>,
870 paid_list: &Arc<PaidList>,
871 repair_proofs: &Arc<RwLock<RepairProofs>>,
872) -> usize {
873 let mut pruned = 0;
874
875 for key in keys_to_delete {
876 if let Err(e) = storage.delete(key).await {
877 warn!("Failed to prune record {}: {e}", hex::encode(key));
878 } else {
879 pruned += 1;
880 paid_list.clear_record_out_of_range(key);
881 repair_proofs.write().await.remove_key(key);
882 paid_list.set_paid_out_of_range(key);
886 debug!("Pruned out-of-range record {}", hex::encode(key));
887 }
888 }
889
890 pruned
891}
892
893async fn collect_record_prune_proofs(
904 candidates: &[RecordPruneCandidate],
905 storage: &Arc<LmdbStorage>,
906 p2p_node: &Arc<P2PNode>,
907 config: &ReplicationConfig,
908 sync_state: &Arc<RwLock<NeighborSyncState>>,
909) -> HashMap<XorName, HashSet<PeerId>> {
910 if candidates.is_empty() {
911 return HashMap::new();
912 }
913
914 let report_state = PruneAuditReportState::default();
915 let mut requests = stream::iter(build_peer_audit_challenges(candidates))
916 .map(|(peer, key)| {
917 peer_proves_record(
918 peer,
919 key,
920 storage,
921 p2p_node,
922 config,
923 sync_state,
924 &report_state,
925 )
926 })
927 .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
928
929 let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
930 while let Some(proof) = requests.next().await {
931 if let Some((peer, key)) = proof {
932 present_by_key.entry(key).or_default().insert(peer);
933 }
934 }
935
936 present_by_key
937}
938
939async fn revalidated_record_prune_keys(
940 candidates: &[RecordPruneCandidate],
941 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
942 self_id: &PeerId,
943 paid_list: &Arc<PaidList>,
944 p2p_node: &Arc<P2PNode>,
945 config: &ReplicationConfig,
946) -> (Vec<XorName>, usize) {
947 let mut keys_to_delete = Vec::new();
948 let mut cleared = 0;
949 let now = Instant::now();
950
951 for candidate in candidates {
952 let (storage_admission_group, strict_close_group) =
953 record_prune_lookup_groups(&candidate.key, p2p_node, config).await;
954
955 if storage_admission_group
956 .iter()
957 .any(|n| n.peer_id == *self_id)
958 {
959 if paid_list
960 .record_out_of_range_since(&candidate.key)
961 .is_some()
962 {
963 paid_list.clear_record_out_of_range(&candidate.key);
964 cleared += 1;
965 }
966 continue;
967 }
968
969 let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
970 continue;
971 };
972 let elapsed = now
973 .checked_duration_since(first_seen)
974 .unwrap_or(Duration::ZERO);
975 if elapsed < config.prune_hysteresis_duration {
976 continue;
977 }
978
979 let current_target_peers = remote_close_group_peers(&strict_close_group, self_id);
980 if current_target_peers.is_empty() {
981 warn!(
982 "Cannot prune {}: current close group has no remote peers",
983 hex::encode(candidate.key)
984 );
985 continue;
986 }
987
988 let proofs_needed = prune_proofs_needed(current_target_peers.len());
989 if target_peers_reported_present(
990 &candidate.key,
991 ¤t_target_peers,
992 present_by_key,
993 proofs_needed,
994 ) {
995 keys_to_delete.push(candidate.key);
996 } else {
997 debug!(
998 "Deferring prune for {} until all but one of the current close group \
999 report it",
1000 hex::encode(candidate.key)
1001 );
1002 }
1003 }
1004
1005 (keys_to_delete, cleared)
1006}
1007
1008fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
1009 let mut challenges = Vec::new();
1010 for candidate in candidates {
1011 for peer in &candidate.target_peers {
1012 challenges.push((*peer, candidate.key));
1013 }
1014 }
1015 challenges
1016}
1017
1018#[cfg(test)]
1019fn confirmed_keys_from_presence(
1020 candidates: &[RecordPruneCandidate],
1021 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1022 proofs_needed: usize,
1023) -> Vec<XorName> {
1024 candidates
1025 .iter()
1026 .filter(|candidate| {
1027 target_peers_reported_present(
1028 &candidate.key,
1029 &candidate.target_peers,
1030 present_by_key,
1031 proofs_needed,
1032 )
1033 })
1034 .map(|candidate| candidate.key)
1035 .collect()
1036}
1037
1038fn prune_proofs_needed(group_size: usize) -> usize {
1049 if group_size <= 2 {
1050 group_size
1051 } else {
1052 group_size - 1
1053 }
1054}
1055
1056fn target_peers_reported_present(
1061 key: &XorName,
1062 target_peers: &[PeerId],
1063 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1064 proofs_needed: usize,
1065) -> bool {
1066 if proofs_needed == 0 {
1067 return false;
1068 }
1069 let Some(present_peers) = present_by_key.get(key) else {
1070 return false;
1071 };
1072 let proven = present_peers
1075 .iter()
1076 .filter(|peer| target_peers.contains(peer))
1077 .count();
1078 proven >= proofs_needed
1079}
1080
1081async fn peer_proves_record(
1084 peer: PeerId,
1085 key: XorName,
1086 storage: &Arc<LmdbStorage>,
1087 p2p_node: &Arc<P2PNode>,
1088 config: &ReplicationConfig,
1089 sync_state: &Arc<RwLock<NeighborSyncState>>,
1090 report_state: &PruneAuditReportState,
1091) -> Option<(PeerId, XorName)> {
1092 let local_bytes = local_record_bytes(&key, storage).await?;
1093
1094 let (challenge_id, nonce) = {
1095 let mut rng = rand::thread_rng();
1096 (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
1097 };
1098 let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
1099 let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
1100 else {
1101 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1105 return None;
1106 };
1107
1108 let status =
1109 prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
1110 if prune_audit_response_clears_bootstrap_claim(status) {
1111 clear_prune_bootstrap_claim(&peer, sync_state).await;
1112 }
1113
1114 match status {
1115 PruneAuditStatus::Proven => Some((peer, key)),
1116 PruneAuditStatus::Bootstrapping => {
1117 report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
1118 .await;
1119 None
1120 }
1121 PruneAuditStatus::Failed => {
1122 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1123 None
1124 }
1125 }
1126}
1127
1128fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
1129 matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
1130}
1131
1132fn encode_prune_audit_challenge(
1133 peer: &PeerId,
1134 key: XorName,
1135 challenge_id: u64,
1136 nonce: [u8; 32],
1137) -> Option<Vec<u8>> {
1138 let challenge = AuditChallenge {
1139 challenge_id,
1140 nonce,
1141 challenged_peer_id: *peer.as_bytes(),
1142 keys: vec![key],
1143 };
1144 let msg = ReplicationMessage {
1145 request_id: challenge_id,
1146 body: ReplicationMessageBody::AuditChallenge(challenge),
1147 };
1148 let encoded = match msg.encode() {
1149 Ok(data) => data,
1150 Err(e) => {
1151 warn!(
1152 "Failed to encode prune audit challenge for {} against {peer}: {e}",
1153 hex::encode(key),
1154 );
1155 return None;
1156 }
1157 };
1158 Some(encoded)
1159}
1160
1161async fn send_prune_audit_challenge(
1162 peer: &PeerId,
1163 key: &XorName,
1164 encoded: Vec<u8>,
1165 p2p_node: &Arc<P2PNode>,
1166 config: &ReplicationConfig,
1167) -> Option<ReplicationMessage> {
1168 let response = match p2p_node
1169 .send_request(
1170 peer,
1171 REPLICATION_PROTOCOL_ID,
1172 encoded,
1173 config.audit_response_timeout(1),
1174 )
1175 .await
1176 {
1177 Ok(response) => response,
1178 Err(e) => {
1179 debug!(
1180 "Prune audit challenge for {} against {peer} failed: {e}",
1181 hex::encode(key)
1182 );
1183 return None;
1184 }
1185 };
1186
1187 let decoded = match ReplicationMessage::decode(&response.data) {
1188 Ok(msg) => msg,
1189 Err(e) => {
1190 warn!("Failed to decode prune audit response from {peer}: {e}");
1191 return None;
1192 }
1193 };
1194
1195 Some(decoded)
1196}
1197
1198fn prune_audit_response_status(
1199 decoded: ReplicationMessage,
1200 challenge_id: u64,
1201 peer: &PeerId,
1202 key: &XorName,
1203 nonce: &[u8; 32],
1204 local_bytes: &[u8],
1205) -> PruneAuditStatus {
1206 match decoded.body {
1207 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
1208 challenge_id: resp_id,
1209 digests,
1210 }) => {
1211 if resp_id != challenge_id {
1212 warn!("Prune audit challenge ID mismatch from {peer}");
1213 return PruneAuditStatus::Failed;
1214 }
1215 let [digest] = digests.as_slice() else {
1216 warn!(
1217 "Prune audit response from {peer} returned {} digests for one challenged key",
1218 digests.len(),
1219 );
1220 return PruneAuditStatus::Failed;
1221 };
1222 if *digest == ABSENT_KEY_DIGEST {
1223 warn!(
1224 "Prune audit proof from {peer} failed for {}: peer reports key absent",
1225 hex::encode(key)
1226 );
1227 return PruneAuditStatus::Failed;
1228 }
1229 if audit_digest_proves_key(peer, key, nonce, local_bytes, digest) {
1230 PruneAuditStatus::Proven
1231 } else {
1232 warn!(
1233 "Prune audit proof from {peer} failed for {}: digest mismatch",
1234 hex::encode(key)
1235 );
1236 PruneAuditStatus::Failed
1237 }
1238 }
1239 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
1240 challenge_id: resp_id,
1241 }) => {
1242 if resp_id == challenge_id {
1243 warn!(
1244 "Prune audit proof for {} blocked by bootstrap claim from {peer}",
1245 hex::encode(key)
1246 );
1247 PruneAuditStatus::Bootstrapping
1248 } else {
1249 warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
1250 PruneAuditStatus::Failed
1251 }
1252 }
1253 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
1254 challenge_id: resp_id,
1255 reason,
1256 }) => {
1257 if resp_id == challenge_id {
1258 warn!(
1259 "Prune audit proof for {} rejected by {peer}: {reason}",
1260 hex::encode(key)
1261 );
1262 } else {
1263 warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
1264 }
1265 PruneAuditStatus::Failed
1266 }
1267 _ => {
1268 warn!("Unexpected prune audit response type from {peer}");
1269 PruneAuditStatus::Failed
1270 }
1271 }
1272}
1273
1274async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
1275 match storage.get_raw(key).await {
1276 Ok(Some(bytes)) => Some(bytes),
1277 Ok(None) => {
1278 debug!(
1279 "Cannot prune-audit {}: local record disappeared",
1280 hex::encode(key)
1281 );
1282 None
1283 }
1284 Err(e) => {
1285 warn!(
1286 "Cannot prune-audit {}: failed to read local record: {e}",
1287 hex::encode(key)
1288 );
1289 None
1290 }
1291 }
1292}
1293
1294fn audit_digest_proves_key(
1295 peer: &PeerId,
1296 key: &XorName,
1297 nonce: &[u8; 32],
1298 local_bytes: &[u8],
1299 digest: &[u8; 32],
1300) -> bool {
1301 if *digest == ABSENT_KEY_DIGEST {
1302 return false;
1303 }
1304 let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
1305 *digest == expected
1306}
1307
1308async fn report_prune_audit_failure_once(
1309 peer: &PeerId,
1310 key: &XorName,
1311 p2p_node: &Arc<P2PNode>,
1312 config: &ReplicationConfig,
1313 report_state: &PruneAuditReportState,
1314) -> bool {
1315 let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
1316 && reserve_prune_audit_failure_report(report_state, peer).await;
1317 if !should_report {
1318 return false;
1319 }
1320
1321 p2p_node
1322 .report_trust_event(
1323 peer,
1324 saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
1325 )
1326 .await;
1327 true
1328}
1329
1330async fn reserve_prune_audit_failure_report(
1331 report_state: &PruneAuditReportState,
1332 peer: &PeerId,
1333) -> bool {
1334 report_state.audit_failures.write().await.insert(*peer)
1335}
1336
1337async fn reserve_prune_bootstrap_abuse_report(
1338 report_state: &PruneAuditReportState,
1339 peer: &PeerId,
1340) -> bool {
1341 report_state.bootstrap_abuse.write().await.insert(*peer)
1342}
1343
1344async fn report_prune_bootstrap_claim(
1345 peer: &PeerId,
1346 key: &XorName,
1347 p2p_node: &Arc<P2PNode>,
1348 config: &ReplicationConfig,
1349 sync_state: &Arc<RwLock<NeighborSyncState>>,
1350 report_state: &PruneAuditReportState,
1351) {
1352 if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
1353 return;
1354 }
1355
1356 let observation = {
1357 let now = Instant::now();
1358 let mut state = sync_state.write().await;
1359 (
1360 now,
1361 state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
1362 )
1363 };
1364
1365 let (now, observation) = observation;
1366 match observation {
1367 BootstrapClaimObservation::WithinGrace { .. } => {
1368 debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
1369 return;
1370 }
1371 BootstrapClaimObservation::PastGrace { first_seen } => {
1372 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1373 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1374 return;
1375 }
1376 warn!(
1377 "Prune audit: peer {peer} claiming bootstrap past grace period \
1378 ({:?} > {:?}), reporting abuse",
1379 now.duration_since(first_seen),
1380 config.bootstrap_claim_grace_period,
1381 );
1382 }
1383 BootstrapClaimObservation::Repeated { first_seen } => {
1384 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1385 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1386 return;
1387 }
1388 warn!(
1389 "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
1390 first claim was {:?} ago, reporting abuse",
1391 now.duration_since(first_seen),
1392 );
1393 }
1394 }
1395
1396 p2p_node
1397 .report_trust_event(
1398 peer,
1399 saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1400 )
1401 .await;
1402}
1403
1404async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
1405 let removed = {
1406 let mut state = sync_state.write().await;
1407 state.clear_active_bootstrap_claim(peer)
1408 };
1409 if removed {
1410 debug!("Prune audit: cleared active bootstrap claim for {peer}");
1411 }
1412}
1413
1414async fn peer_is_currently_responsible(
1415 peer: &PeerId,
1416 key: &XorName,
1417 p2p_node: &Arc<P2PNode>,
1418 config: &ReplicationConfig,
1419) -> bool {
1420 let closest = p2p_node
1421 .dht_manager()
1422 .find_closest_nodes_local_with_self(key, config.close_group_size)
1423 .await;
1424 closest.iter().any(|node| node.peer_id == *peer)
1425}
1426
1427#[cfg(test)]
1428#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1429mod tests {
1430 use super::*;
1431
1432 fn peer_id_from_byte(b: u8) -> PeerId {
1433 let mut bytes = [0u8; 32];
1434 bytes[0] = b;
1435 PeerId::from_bytes(bytes)
1436 }
1437
1438 fn key_from_byte(b: u8) -> XorName {
1439 [b; 32]
1440 }
1441
1442 fn peer_ids(count: usize) -> Vec<PeerId> {
1443 (0..count)
1444 .map(|idx| peer_id_from_byte(u8::try_from(idx + 1).expect("peer byte")))
1445 .collect()
1446 }
1447
1448 fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
1449 RecordPruneCandidate { key, target_peers }
1450 }
1451
1452 #[test]
1453 fn prune_audit_challenges_are_one_per_candidate_peer() {
1454 let peer_a = peer_id_from_byte(1);
1455 let peer_b = peer_id_from_byte(2);
1456 let key_a = key_from_byte(0xA);
1457 let key_b = key_from_byte(0xB);
1458 let candidates = vec![
1459 candidate(key_a, vec![peer_a, peer_b]),
1460 candidate(key_b, vec![peer_b]),
1461 ];
1462
1463 let mut challenges = build_peer_audit_challenges(&candidates);
1464 challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1465
1466 let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
1467 expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1468 assert_eq!(challenges, expected);
1469 }
1470
1471 #[test]
1472 fn confirmed_keys_require_quorum_of_target_peers_present() {
1473 let peer_a = peer_id_from_byte(1);
1474 let peer_b = peer_id_from_byte(2);
1475 let peer_c = peer_id_from_byte(3);
1476 let key = key_from_byte(0xC);
1477 let candidates = vec![candidate(key, vec![peer_a, peer_b, peer_c])];
1478 let mut present_by_key = HashMap::new();
1479 present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
1480
1481 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1484 assert_eq!(confirmed, vec![key]);
1485
1486 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 3);
1488 assert!(confirmed.is_empty());
1489 }
1490
1491 #[test]
1492 fn confirmed_keys_defer_below_quorum_or_missing_peer_evidence() {
1493 let peer_a = peer_id_from_byte(1);
1494 let peer_b = peer_id_from_byte(2);
1495 let quorum_key = key_from_byte(0xD);
1496 let below_quorum_key = key_from_byte(0xE);
1497 let missing_key = key_from_byte(0xF);
1498 let candidates = vec![
1499 candidate(quorum_key, vec![peer_a, peer_b]),
1500 candidate(below_quorum_key, vec![peer_a, peer_b]),
1501 candidate(missing_key, vec![peer_a, peer_b]),
1502 ];
1503 let mut present_by_key = HashMap::new();
1504 present_by_key.insert(quorum_key, HashSet::from([peer_a, peer_b]));
1505 present_by_key.insert(below_quorum_key, HashSet::from([peer_a]));
1506
1507 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1508
1509 assert_eq!(confirmed, vec![quorum_key]);
1510 }
1511
1512 #[test]
1513 fn prune_proofs_needed_tolerates_exactly_one_lagging_peer() {
1514 assert_eq!(prune_proofs_needed(0), 0);
1515 assert_eq!(prune_proofs_needed(1), 1);
1517 assert_eq!(prune_proofs_needed(2), 2);
1518 assert_eq!(prune_proofs_needed(3), 2);
1519 assert_eq!(prune_proofs_needed(5), 4);
1520 assert_eq!(prune_proofs_needed(7), 6);
1522 }
1523
1524 #[test]
1525 fn paid_prune_confirmations_are_three_quarters_rounded_up() {
1526 assert_eq!(paid_prune_confirmations_needed(0), 0);
1527 assert_eq!(paid_prune_confirmations_needed(1), 1);
1528 assert_eq!(paid_prune_confirmations_needed(2), 2);
1529 assert_eq!(paid_prune_confirmations_needed(4), 3);
1530 assert_eq!(paid_prune_confirmations_needed(20), 15);
1532 }
1533
1534 #[test]
1535 fn paid_prune_peer_budget_allows_overlapping_targets() {
1536 let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS);
1537 let mut selected_peers = HashSet::new();
1538
1539 assert!(reserve_paid_prune_peer_budget(&peers, &mut selected_peers));
1540 assert_eq!(
1541 selected_peers.len(),
1542 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1543 );
1544
1545 let overlapping_targets = vec![peers[0], peers[1]];
1546 assert!(reserve_paid_prune_peer_budget(
1547 &overlapping_targets,
1548 &mut selected_peers,
1549 ));
1550 assert_eq!(
1551 selected_peers.len(),
1552 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1553 );
1554 }
1555
1556 #[test]
1557 fn paid_prune_peer_budget_rejects_new_peers_past_cap() {
1558 let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS + 1);
1559 let mut selected_peers = HashSet::new();
1560
1561 assert!(reserve_paid_prune_peer_budget(
1562 &peers[..MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS],
1563 &mut selected_peers,
1564 ));
1565 assert!(!reserve_paid_prune_peer_budget(
1566 &peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS..],
1567 &mut selected_peers,
1568 ));
1569 assert_eq!(
1570 selected_peers.len(),
1571 MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1572 );
1573 assert!(!selected_peers.contains(&peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS]));
1574 }
1575
1576 #[test]
1577 fn paid_confirmations_count_only_confirmed_target_peers() {
1578 let confirmed_peer = peer_id_from_byte(1);
1579 let not_found_peer = peer_id_from_byte(2);
1580 let unresolved_peer = peer_id_from_byte(3);
1581 let outsider = peer_id_from_byte(4);
1582 let key = key_from_byte(0x21);
1583 let candidates = vec![(key, vec![confirmed_peer, not_found_peer, unresolved_peer])];
1584
1585 let mut evidence = HashMap::new();
1586 evidence.insert(
1587 key,
1588 KeyVerificationEvidence {
1589 presence: HashMap::new(),
1590 paid_list: HashMap::from([
1591 (confirmed_peer, PaidListEvidence::Confirmed),
1592 (not_found_peer, PaidListEvidence::NotFound),
1593 (unresolved_peer, PaidListEvidence::Unresolved),
1594 (outsider, PaidListEvidence::Confirmed),
1596 ]),
1597 },
1598 );
1599
1600 let confirmed_by_key = paid_confirmations_by_key(&candidates, &evidence);
1601
1602 assert_eq!(
1603 confirmed_by_key.get(&key),
1604 Some(&HashSet::from([confirmed_peer])),
1605 "only Confirmed answers from target peers may count",
1606 );
1607 }
1608
1609 #[test]
1610 fn paid_confirmations_skip_keys_without_evidence() {
1611 let peer = peer_id_from_byte(1);
1612 let key = key_from_byte(0x22);
1613 let candidates = vec![(key, vec![peer])];
1614
1615 let confirmed_by_key = paid_confirmations_by_key(&candidates, &HashMap::new());
1616
1617 assert!(confirmed_by_key.is_empty());
1618 }
1619
1620 #[test]
1621 fn zero_quorum_never_confirms() {
1622 let peer_a = peer_id_from_byte(1);
1623 let key = key_from_byte(0x10);
1624 let mut present_by_key = HashMap::new();
1625 present_by_key.insert(key, HashSet::from([peer_a]));
1626
1627 assert!(!target_peers_reported_present(
1628 &key,
1629 &[peer_a],
1630 &present_by_key,
1631 0
1632 ));
1633 }
1634
1635 #[test]
1636 fn proofs_from_non_target_peers_do_not_count_toward_quorum() {
1637 let target = peer_id_from_byte(1);
1638 let outsider = peer_id_from_byte(2);
1639 let key = key_from_byte(0x11);
1640 let mut present_by_key = HashMap::new();
1641 present_by_key.insert(key, HashSet::from([outsider]));
1642
1643 assert!(!target_peers_reported_present(
1644 &key,
1645 &[target],
1646 &present_by_key,
1647 1
1648 ));
1649 }
1650
1651 #[test]
1652 fn duplicated_target_peer_counts_once_toward_quorum() {
1653 let peer = peer_id_from_byte(1);
1654 let key = key_from_byte(0x12);
1655 let mut present_by_key = HashMap::new();
1656 present_by_key.insert(key, HashSet::from([peer]));
1657
1658 assert!(!target_peers_reported_present(
1659 &key,
1660 &[peer, peer],
1661 &present_by_key,
1662 2
1663 ));
1664 }
1665
1666 #[test]
1667 fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
1668 let peer = peer_id_from_byte(1);
1669 let other_peer = peer_id_from_byte(2);
1670 let key = key_from_byte(0x11);
1671 let other_key = key_from_byte(0x12);
1672 let nonce = [0xAA; 32];
1673 let other_nonce = [0xBB; 32];
1674 let bytes = b"record bytes";
1675 let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
1676
1677 assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
1678 assert!(!audit_digest_proves_key(
1679 &other_peer,
1680 &key,
1681 &nonce,
1682 bytes,
1683 &digest
1684 ));
1685 assert!(!audit_digest_proves_key(
1686 &peer, &other_key, &nonce, bytes, &digest
1687 ));
1688 assert!(!audit_digest_proves_key(
1689 &peer,
1690 &key,
1691 &other_nonce,
1692 bytes,
1693 &digest
1694 ));
1695 assert!(!audit_digest_proves_key(
1696 &peer,
1697 &key,
1698 &nonce,
1699 b"different bytes",
1700 &digest
1701 ));
1702 assert!(!audit_digest_proves_key(
1703 &peer,
1704 &key,
1705 &nonce,
1706 bytes,
1707 &ABSENT_KEY_DIGEST
1708 ));
1709 }
1710
1711 #[tokio::test]
1712 async fn prune_cursor_advances_past_selected_budget_window() {
1713 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1714 state.write().await.prune_cursor = 2;
1715
1716 let start = prune_scan_start(&state, 10).await;
1717 advance_prune_cursor(&state, 10, start, Some(3)).await;
1718
1719 assert_eq!(state.read().await.prune_cursor, 6);
1720 }
1721
1722 #[tokio::test]
1723 async fn prune_cursor_advances_even_when_no_candidate_selected() {
1724 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1725 state.write().await.prune_cursor = 9;
1726
1727 let start = prune_scan_start(&state, 10).await;
1728 advance_prune_cursor(&state, 10, start, None).await;
1729
1730 assert_eq!(state.read().await.prune_cursor, 0);
1731 }
1732
1733 #[tokio::test]
1734 async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1735 let peer = peer_id_from_byte(1);
1736 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1737 let first_seen = Instant::now();
1738 state
1739 .write()
1740 .await
1741 .bootstrap_claims
1742 .insert(peer, first_seen);
1743 state
1744 .write()
1745 .await
1746 .bootstrap_claim_history
1747 .insert(peer, first_seen);
1748
1749 clear_prune_bootstrap_claim(&peer, &state).await;
1750
1751 let state = state.read().await;
1752 assert!(!state.bootstrap_claims.contains_key(&peer));
1753 assert!(state.bootstrap_claim_history.contains_key(&peer));
1754 }
1755
1756 #[test]
1757 fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1758 assert!(prune_audit_response_clears_bootstrap_claim(
1759 PruneAuditStatus::Proven
1760 ));
1761 assert!(prune_audit_response_clears_bootstrap_claim(
1762 PruneAuditStatus::Failed
1763 ));
1764 assert!(!prune_audit_response_clears_bootstrap_claim(
1765 PruneAuditStatus::Bootstrapping
1766 ));
1767 }
1768
1769 #[tokio::test]
1770 async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1771 let peer = peer_id_from_byte(1);
1772 let other_peer = peer_id_from_byte(2);
1773 let report_state = PruneAuditReportState::default();
1774
1775 assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1776 assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1777 assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1778
1779 let reported = report_state.audit_failures.read().await;
1780 assert_eq!(reported.len(), 2);
1781 assert!(reported.contains(&peer));
1782 assert!(reported.contains(&other_peer));
1783 }
1784
1785 #[tokio::test]
1786 async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1787 let peer = peer_id_from_byte(1);
1788 let other_peer = peer_id_from_byte(2);
1789 let report_state = PruneAuditReportState::default();
1790
1791 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1792 assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1793 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1794
1795 let reported = report_state.bootstrap_abuse.read().await;
1796 assert_eq!(reported.len(), 2);
1797 assert!(reported.contains(&peer));
1798 assert!(reported.contains(&other_peer));
1799 }
1800}