Skip to main content

ant_node/replication/
pruning.rs

1//! Post-cycle responsibility pruning (Section 11).
2//!
3//! On `NeighborSyncCycleComplete`: prune stored records and `PaidForList`
4//! entries that have been continuously out of range for at least
5//! `PRUNE_HYSTERESIS_DURATION`.
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::config::{
21    storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT,
22    MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID,
23};
24use crate::replication::paid_list::PaidList;
25use crate::replication::protocol::{
26    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
27    ReplicationMessageBody, ABSENT_KEY_DIGEST,
28};
29use crate::replication::quorum::{self, VerificationTargets};
30use crate::replication::types::{
31    BootstrapClaimObservation, KeyVerificationEvidence, NeighborSyncState, PaidListEvidence,
32    RepairProofs,
33};
34use crate::storage::LmdbStorage;
35
36use super::REPLICATION_TRUST_WEIGHT;
37
38const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
39
40/// Maximum expired `PaidForList` entries selected for verification per prune
41/// pass. The unique peer fan-out for those entries is capped separately.
42const MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS: usize = 32;
43/// Maximum unique peers contacted for paid-list verification per prune pass.
44/// `quorum::run_verification_round` sends one request per target peer.
45const MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS: usize = MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES;
46
47// ---------------------------------------------------------------------------
48// Result type
49// ---------------------------------------------------------------------------
50
51/// Summary of a prune pass.
52#[derive(Debug, Default)]
53pub struct PruneResult {
54    /// Number of records deleted from storage.
55    pub records_pruned: usize,
56    /// Number of records with out-of-range timestamp newly set.
57    pub records_marked_out_of_range: usize,
58    /// Number of records with out-of-range timestamp cleared (back in range).
59    pub records_cleared: usize,
60    /// Number of `PaidForList` entries removed.
61    pub paid_entries_pruned: usize,
62    /// Number of `PaidForList` entries with out-of-range timestamp newly set.
63    pub paid_entries_marked: usize,
64    /// Number of `PaidForList` entries cleared (back in range).
65    pub paid_entries_cleared: usize,
66}
67
68/// Shared dependencies and switches for one prune pass.
69pub struct PrunePassContext<'a> {
70    /// Local peer id.
71    pub self_id: &'a PeerId,
72    /// Local record storage.
73    pub storage: &'a Arc<LmdbStorage>,
74    /// Persistent paid-list state.
75    pub paid_list: &'a Arc<PaidList>,
76    /// P2P node used for routing lookups and prune-confirmation audits.
77    pub p2p_node: &'a Arc<P2PNode>,
78    /// Replication configuration.
79    pub config: &'a ReplicationConfig,
80    /// Neighbor-sync state, including prune cursor and bootstrap claims.
81    pub sync_state: &'a Arc<RwLock<NeighborSyncState>>,
82    /// Key-specific repair proofs used to gate prune-confirmation audits.
83    pub repair_proofs: &'a Arc<RwLock<RepairProofs>>,
84    /// Current local neighbor-sync cycle epoch for repair-proof maturity.
85    pub current_sync_epoch: u64,
86    /// Test-only clock override for repair-proof maturity checks.
87    #[cfg(any(test, feature = "test-utils"))]
88    pub repair_proof_now: Option<Instant>,
89    /// Whether remote prune-confirmation audits are allowed this pass.
90    pub allow_remote_prune_audits: bool,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94enum PruneAuditStatus {
95    Proven,
96    Failed,
97    Bootstrapping,
98}
99
100#[derive(Debug, Default)]
101struct RecordPruneStats {
102    marked: usize,
103    cleared: usize,
104    pruned: usize,
105}
106
107#[derive(Debug, Default)]
108struct PaidPruneStats {
109    marked: usize,
110    cleared: usize,
111    pruned: usize,
112}
113
114#[derive(Debug, Default)]
115struct PaidPruneDeferredCounts {
116    entry_budget: usize,
117    remote_gate: usize,
118    peer_budget: usize,
119}
120
121impl PaidPruneDeferredCounts {
122    fn log(&self) {
123        if self.entry_budget > 0 {
124            debug!(
125                "Deferred {} expired PaidForList entries beyond the per-pass verification cap \
126                 ({MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS})",
127                self.entry_budget,
128            );
129        }
130
131        if self.remote_gate > 0 {
132            debug!(
133                "Deferred {} expired PaidForList entries until bootstrap drain allows remote \
134                 paid-prune verification",
135                self.remote_gate,
136            );
137        }
138
139        if self.peer_budget > 0 {
140            debug!(
141                "Deferred {} expired PaidForList entries beyond the per-pass paid-prune peer cap \
142                 ({MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS})",
143                self.peer_budget,
144            );
145        }
146    }
147}
148
149#[derive(Debug, Clone)]
150struct RecordPruneCandidate {
151    key: XorName,
152    target_peers: Vec<PeerId>,
153}
154
155struct RecordPruneKeyOutcome {
156    marked: bool,
157    state: RecordPruneKeyState,
158}
159
160impl Default for RecordPruneKeyOutcome {
161    fn default() -> Self {
162        Self {
163            marked: false,
164            state: RecordPruneKeyState::None,
165        }
166    }
167}
168
169enum RecordPruneKeyState {
170    None,
171    Cleared,
172    BootstrapDeferred,
173    BudgetDeferred,
174    Candidate(RecordPruneCandidate),
175}
176
177enum PaidPruneKeyState {
178    None,
179    RemoteDeferred,
180    EntryBudgetDeferred,
181    PeerBudgetDeferred,
182    Candidate(Vec<PeerId>),
183}
184
185#[derive(Default)]
186struct PruneAuditReportState {
187    audit_failures: RwLock<HashSet<PeerId>>,
188    bootstrap_abuse: RwLock<HashSet<PeerId>>,
189}
190
191// ---------------------------------------------------------------------------
192// Prune pass
193// ---------------------------------------------------------------------------
194
195/// Execute post-cycle responsibility pruning.
196///
197/// For each stored record K:
198/// - If `self` is within the storage-admission group
199///   (`close_group_size + STORAGE_ADMISSION_MARGIN`): clear
200///   `RecordOutOfRangeFirstSeen`.
201/// - If not in that group: set timestamp if not already set; delete if the
202///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and all but one
203///   of the strict current close group prove they store the record.
204///
205/// For each `PaidForList` entry K:
206/// - If self is in `PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen`.
207/// - If not in group: set timestamp if not already set; remove entry if the
208///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and three
209///   quarters of the current paid close group (15 of 20 at production
210///   parameters) confirm the key in their own `PaidForList`.
211///
212/// Compatibility wrapper for callers that have not adopted repair-proof
213/// tracking. It preserves the original public signature, but it has no proof
214/// table or advanced sync epoch to pass into record prune-confirmation audits.
215/// Out-of-range records are therefore marked/deferred rather than deleted via
216/// remote confirmation. The replication engine calls
217/// [`run_prune_pass_with_context`] so it can pass real repair proofs.
218pub async fn run_prune_pass(
219    self_id: &PeerId,
220    storage: &Arc<LmdbStorage>,
221    paid_list: &Arc<PaidList>,
222    p2p_node: &Arc<P2PNode>,
223    config: &ReplicationConfig,
224    sync_state: &Arc<RwLock<NeighborSyncState>>,
225    allow_remote_prune_audits: bool,
226) -> PruneResult {
227    let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
228    run_prune_pass_with_context(PrunePassContext {
229        self_id,
230        storage,
231        paid_list,
232        p2p_node,
233        config,
234        sync_state,
235        repair_proofs: &repair_proofs,
236        current_sync_epoch: 0,
237        #[cfg(any(test, feature = "test-utils"))]
238        repair_proof_now: None,
239        allow_remote_prune_audits,
240    })
241    .await
242}
243
244/// Execute one prune pass with repair-proof-gated remote confirmations.
245pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult {
246    let (stored_count, record_stats) = prune_stored_records(&ctx).await;
247    let now = Instant::now();
248    let (paid_count, paid_stats) = prune_paid_entries(
249        ctx.self_id,
250        ctx.paid_list,
251        ctx.p2p_node,
252        ctx.config,
253        now,
254        ctx.allow_remote_prune_audits,
255    )
256    .await;
257
258    let result = PruneResult {
259        records_pruned: record_stats.pruned,
260        records_marked_out_of_range: record_stats.marked,
261        records_cleared: record_stats.cleared,
262        paid_entries_pruned: paid_stats.pruned,
263        paid_entries_marked: paid_stats.marked,
264        paid_entries_cleared: paid_stats.cleared,
265    };
266
267    info!(
268        "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
269        result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
270    );
271
272    result
273}
274
275async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) {
276    let stored_keys = match ctx.storage.all_keys().await {
277        Ok(keys) => keys,
278        Err(e) => {
279            warn!("Failed to read stored keys for pruning: {e}");
280            return (0, RecordPruneStats::default());
281        }
282    };
283
284    let now = Instant::now();
285    let mut stats = RecordPruneStats::default();
286    let mut candidates = Vec::new();
287    let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
288    let mut budget_deferred = 0usize;
289    let mut bootstrap_deferred = 0usize;
290    let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await;
291    let mut last_selected_offset = None;
292
293    for offset in 0..stored_keys.len() {
294        let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
295        let (storage_admission_group, strict_close_group) =
296            record_prune_lookup_groups(key, ctx.p2p_node, ctx.config).await;
297
298        let outcome = evaluate_record_prune_key(
299            ctx,
300            key,
301            &storage_admission_group,
302            &strict_close_group,
303            now,
304            &mut audit_challenge_budget,
305        )
306        .await;
307        if outcome.marked {
308            stats.marked += 1;
309        }
310        match outcome.state {
311            RecordPruneKeyState::None => {}
312            RecordPruneKeyState::Cleared => stats.cleared += 1,
313            RecordPruneKeyState::BootstrapDeferred => {
314                bootstrap_deferred = bootstrap_deferred.saturating_add(1);
315            }
316            RecordPruneKeyState::BudgetDeferred => {
317                budget_deferred = budget_deferred.saturating_add(1);
318            }
319            RecordPruneKeyState::Candidate(candidate) => {
320                last_selected_offset = Some(offset);
321                candidates.push(candidate);
322            }
323        }
324    }
325
326    advance_prune_cursor(
327        ctx.sync_state,
328        stored_keys.len(),
329        scan_start,
330        last_selected_offset,
331    )
332    .await;
333
334    if bootstrap_deferred > 0 {
335        debug!(
336            "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
337             remote prune-confirmation audits"
338        );
339    }
340
341    if budget_deferred > 0 {
342        debug!(
343            "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
344             ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
345        );
346    }
347
348    let present_by_key = collect_record_prune_proofs(
349        &candidates,
350        ctx.storage,
351        ctx.p2p_node,
352        ctx.config,
353        ctx.sync_state,
354    )
355    .await;
356    let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
357        &candidates,
358        &present_by_key,
359        ctx.self_id,
360        ctx.paid_list,
361        ctx.p2p_node,
362        ctx.config,
363    )
364    .await;
365    stats.cleared += revalidated_cleared;
366    stats.pruned = delete_stored_records(
367        &keys_to_delete,
368        ctx.storage,
369        ctx.paid_list,
370        ctx.repair_proofs,
371    )
372    .await;
373
374    (stored_keys.len(), stats)
375}
376
377async fn record_prune_lookup_groups(
378    key: &XorName,
379    p2p_node: &Arc<P2PNode>,
380    config: &ReplicationConfig,
381) -> (Vec<DHTNode>, Vec<DHTNode>) {
382    let dht = p2p_node.dht_manager();
383    let storage_admission_group = dht
384        .find_closest_nodes_local_with_self(key, storage_admission_width(config.close_group_size))
385        .await;
386    let strict_close_group = dht
387        .find_closest_nodes_local_with_self(key, config.close_group_size)
388        .await;
389    (storage_admission_group, strict_close_group)
390}
391
392async fn evaluate_record_prune_key(
393    ctx: &PrunePassContext<'_>,
394    key: &XorName,
395    storage_admission_group: &[DHTNode],
396    strict_close_group: &[DHTNode],
397    now: Instant,
398    audit_challenge_budget: &mut usize,
399) -> RecordPruneKeyOutcome {
400    let mut outcome = RecordPruneKeyOutcome::default();
401    let is_responsible = storage_admission_group
402        .iter()
403        .any(|node| node.peer_id == *ctx.self_id);
404
405    if is_responsible {
406        if ctx.paid_list.record_out_of_range_since(key).is_some() {
407            ctx.paid_list.clear_record_out_of_range(key);
408            outcome.state = RecordPruneKeyState::Cleared;
409        }
410        return outcome;
411    }
412
413    if ctx.paid_list.record_out_of_range_since(key).is_none() {
414        outcome.marked = true;
415    }
416    ctx.paid_list.set_record_out_of_range(key);
417
418    let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else {
419        return outcome;
420    };
421    let elapsed = now
422        .checked_duration_since(first_seen)
423        .unwrap_or(Duration::ZERO);
424    if elapsed < ctx.config.prune_hysteresis_duration {
425        return outcome;
426    }
427
428    if !ctx.allow_remote_prune_audits {
429        outcome.state = RecordPruneKeyState::BootstrapDeferred;
430        return outcome;
431    }
432
433    let target_peers = remote_close_group_peers(strict_close_group, ctx.self_id);
434    if target_peers.is_empty() {
435        warn!(
436            "Cannot prune {}: current close group has no remote peers",
437            hex::encode(key)
438        );
439        return outcome;
440    }
441
442    // Only peers we have hinted (mature repair proof) may be audited; the
443    // proof threshold must be reachable among them. A never-synced peer in
444    // the close group reduces the audit pool instead of vetoing the prune.
445    let current_close_peers: HashSet<PeerId> =
446        strict_close_group.iter().map(|node| node.peer_id).collect();
447    #[cfg(any(test, feature = "test-utils"))]
448    let repair_proof_now = ctx.repair_proof_now.unwrap_or(now);
449    #[cfg(not(any(test, feature = "test-utils")))]
450    let repair_proof_now = now;
451    let audit_targets = peers_with_mature_repair_proofs(
452        key,
453        &target_peers,
454        &current_close_peers,
455        ctx.repair_proofs,
456        ctx.current_sync_epoch,
457        repair_proof_now,
458    )
459    .await;
460    let proofs_needed = prune_proofs_needed(target_peers.len());
461    if proofs_needed == 0 || audit_targets.len() < proofs_needed {
462        debug!(
463            "Deferring prune for {} until enough of the close group has mature \
464             repair proofs",
465            hex::encode(key)
466        );
467        return outcome;
468    }
469
470    if audit_targets.len() > *audit_challenge_budget {
471        outcome.state = RecordPruneKeyState::BudgetDeferred;
472        return outcome;
473    }
474
475    *audit_challenge_budget -= audit_targets.len();
476    outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate {
477        key: *key,
478        target_peers: audit_targets,
479    });
480    outcome
481}
482
483async fn prune_paid_entries(
484    self_id: &PeerId,
485    paid_list: &Arc<PaidList>,
486    p2p_node: &Arc<P2PNode>,
487    config: &ReplicationConfig,
488    now: Instant,
489    allow_remote_prune_audits: bool,
490) -> (usize, PaidPruneStats) {
491    let paid_keys = match paid_list.all_keys() {
492        Ok(keys) => keys,
493        Err(e) => {
494            warn!("Failed to read PaidForList for pruning: {e}");
495            return (0, PaidPruneStats::default());
496        }
497    };
498
499    let dht = p2p_node.dht_manager();
500    let mut stats = PaidPruneStats::default();
501    let mut expired_candidates: Vec<(XorName, Vec<PeerId>)> = Vec::new();
502    let mut deferred_counts = PaidPruneDeferredCounts::default();
503    let mut selected_verification_peers = HashSet::new();
504    // Rotate the scan start so expired entries beyond the per-pass cap are
505    // not starved by the same head-of-list entries every pass.
506    let scan_start = paid_list.paid_prune_scan_start(paid_keys.len());
507    let mut last_selected_offset = None;
508
509    for offset in 0..paid_keys.len() {
510        let key = &paid_keys[(scan_start + offset) % paid_keys.len()];
511        let closest: Vec<DHTNode> = dht
512            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
513            .await;
514        let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
515
516        if in_paid_group {
517            if paid_list.paid_out_of_range_since(key).is_some() {
518                paid_list.clear_paid_out_of_range(key);
519                stats.cleared += 1;
520            }
521        } else {
522            if paid_list.paid_out_of_range_since(key).is_none() {
523                stats.marked += 1;
524            }
525            paid_list.set_paid_out_of_range(key);
526
527            if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
528                let elapsed = now
529                    .checked_duration_since(first_seen)
530                    .unwrap_or(Duration::ZERO);
531                if elapsed >= config.prune_hysteresis_duration {
532                    match select_paid_prune_candidate(
533                        key,
534                        &closest,
535                        self_id,
536                        allow_remote_prune_audits,
537                        expired_candidates.len(),
538                        &mut selected_verification_peers,
539                    ) {
540                        PaidPruneKeyState::None => {}
541                        PaidPruneKeyState::RemoteDeferred => {
542                            deferred_counts.remote_gate =
543                                deferred_counts.remote_gate.saturating_add(1);
544                        }
545                        PaidPruneKeyState::EntryBudgetDeferred => {
546                            deferred_counts.entry_budget =
547                                deferred_counts.entry_budget.saturating_add(1);
548                        }
549                        PaidPruneKeyState::PeerBudgetDeferred => {
550                            deferred_counts.peer_budget =
551                                deferred_counts.peer_budget.saturating_add(1);
552                        }
553                        PaidPruneKeyState::Candidate(target_peers) => {
554                            expired_candidates.push((*key, target_peers));
555                            last_selected_offset = Some(offset);
556                        }
557                    }
558                }
559            }
560        }
561    }
562
563    paid_list.advance_paid_prune_cursor(paid_keys.len(), scan_start, last_selected_offset);
564    deferred_counts.log();
565
566    let confirmed_by_key =
567        collect_paid_prune_confirmations(&expired_candidates, p2p_node, config).await;
568    let (paid_keys_to_delete, revalidated_cleared) = revalidated_paid_prune_keys(
569        &expired_candidates,
570        &confirmed_by_key,
571        self_id,
572        paid_list,
573        p2p_node,
574        config,
575    )
576    .await;
577    stats.cleared += revalidated_cleared;
578    stats.pruned = delete_paid_entries(&paid_keys_to_delete, paid_list).await;
579
580    (paid_keys.len(), stats)
581}
582
583fn select_paid_prune_candidate(
584    key: &XorName,
585    closest: &[DHTNode],
586    self_id: &PeerId,
587    allow_remote_prune_audits: bool,
588    selected_candidate_count: usize,
589    selected_verification_peers: &mut HashSet<PeerId>,
590) -> PaidPruneKeyState {
591    if !allow_remote_prune_audits {
592        return PaidPruneKeyState::RemoteDeferred;
593    }
594
595    let target_peers = remote_close_group_peers(closest, self_id);
596    if target_peers.is_empty() {
597        warn!(
598            "Cannot prune paid entry {}: current paid close group has no remote peers",
599            hex::encode(key)
600        );
601        return PaidPruneKeyState::None;
602    }
603
604    if selected_candidate_count >= MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS {
605        return PaidPruneKeyState::EntryBudgetDeferred;
606    }
607
608    if !reserve_paid_prune_peer_budget(&target_peers, selected_verification_peers) {
609        return PaidPruneKeyState::PeerBudgetDeferred;
610    }
611
612    PaidPruneKeyState::Candidate(target_peers)
613}
614
615async fn delete_paid_entries(keys_to_delete: &[XorName], paid_list: &Arc<PaidList>) -> usize {
616    if keys_to_delete.is_empty() {
617        return 0;
618    }
619
620    match paid_list.remove_batch(keys_to_delete).await {
621        Ok(count) => {
622            debug!("Pruned {count} out-of-range PaidForList entries");
623            count
624        }
625        Err(e) => {
626            warn!("Failed to prune PaidForList entries: {e}");
627            0
628        }
629    }
630}
631
632/// Re-check each confirmed candidate against current local state before
633/// deletion.
634///
635/// The network round in [`collect_paid_prune_confirmations`] takes time;
636/// the paid close group may have changed underneath it, including self
637/// moving back into range. Mirrors [`revalidated_record_prune_keys`]:
638/// confirmations only count from peers still in the current paid close
639/// group, against a threshold computed from that current group.
640async fn revalidated_paid_prune_keys(
641    expired_candidates: &[(XorName, Vec<PeerId>)],
642    confirmed_by_key: &HashMap<XorName, HashSet<PeerId>>,
643    self_id: &PeerId,
644    paid_list: &Arc<PaidList>,
645    p2p_node: &Arc<P2PNode>,
646    config: &ReplicationConfig,
647) -> (Vec<XorName>, usize) {
648    let dht = p2p_node.dht_manager();
649    let mut keys_to_delete = Vec::new();
650    let mut cleared = 0;
651    let now = Instant::now();
652
653    for (key, _) in expired_candidates {
654        let closest: Vec<DHTNode> = dht
655            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
656            .await;
657
658        if closest.iter().any(|n| n.peer_id == *self_id) {
659            if paid_list.paid_out_of_range_since(key).is_some() {
660                paid_list.clear_paid_out_of_range(key);
661                cleared += 1;
662            }
663            continue;
664        }
665
666        let Some(first_seen) = paid_list.paid_out_of_range_since(key) else {
667            continue;
668        };
669        let elapsed = now
670            .checked_duration_since(first_seen)
671            .unwrap_or(Duration::ZERO);
672        if elapsed < config.prune_hysteresis_duration {
673            continue;
674        }
675
676        let current_target_peers = remote_close_group_peers(&closest, self_id);
677        if current_target_peers.is_empty() {
678            warn!(
679                "Cannot prune paid entry {}: current paid close group has no remote peers",
680                hex::encode(key)
681            );
682            continue;
683        }
684
685        let confirmations_needed = paid_prune_confirmations_needed(current_target_peers.len());
686        if target_peers_reported_present(
687            key,
688            &current_target_peers,
689            confirmed_by_key,
690            confirmations_needed,
691        ) {
692            keys_to_delete.push(*key);
693        } else {
694            debug!(
695                "Deferring paid-entry prune for {} until enough of the current paid \
696                 close group confirm it",
697                hex::encode(key)
698            );
699        }
700    }
701
702    (keys_to_delete, cleared)
703}
704
705fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
706    close_group
707        .iter()
708        .filter(|node| node.peer_id != *self_id)
709        .map(|node| node.peer_id)
710        .collect()
711}
712
713/// Confirmations required before removing an out-of-range `PaidForList`
714/// entry: three quarters of the paid close group rounded up, 15 of 20 at
715/// production parameters.
716///
717/// Paid-entry pruning is deliberately gated on the paid lists of the current
718/// paid close group, never on chunk possession: the paid list is the
719/// authorization record, and a wide confirmed majority must already track
720/// the key before this node may forget it.
721fn paid_prune_confirmations_needed(group_size: usize) -> usize {
722    (3 * group_size).div_ceil(4)
723}
724
725fn reserve_paid_prune_peer_budget(
726    target_peers: &[PeerId],
727    selected_verification_peers: &mut HashSet<PeerId>,
728) -> bool {
729    let new_peer_count = target_peers
730        .iter()
731        .filter(|peer| !selected_verification_peers.contains(peer))
732        .count();
733    if selected_verification_peers
734        .len()
735        .saturating_add(new_peer_count)
736        > MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS
737    {
738        return false;
739    }
740
741    selected_verification_peers.extend(target_peers.iter().copied());
742    true
743}
744
745/// Ask the current paid close group whether they track each expired key in
746/// their `PaidForList`, and return the confirming peers per key.
747///
748/// The deletion decision happens afterwards in
749/// [`revalidated_paid_prune_keys`], against the paid close group as it
750/// stands once the network round has completed.
751async fn collect_paid_prune_confirmations(
752    expired_candidates: &[(XorName, Vec<PeerId>)],
753    p2p_node: &Arc<P2PNode>,
754    config: &ReplicationConfig,
755) -> HashMap<XorName, HashSet<PeerId>> {
756    if expired_candidates.is_empty() {
757        return HashMap::new();
758    }
759
760    let mut targets = VerificationTargets::default();
761    let mut keys = Vec::new();
762    for (key, target_peers) in expired_candidates {
763        if target_peers.is_empty() {
764            warn!(
765                "Cannot prune paid entry {}: current paid close group has no remote peers",
766                hex::encode(key)
767            );
768            continue;
769        }
770        keys.push(*key);
771        for peer in target_peers {
772            targets.all_peers.insert(*peer);
773            targets.peer_to_keys.entry(*peer).or_default().push(*key);
774            targets
775                .peer_to_paid_keys
776                .entry(*peer)
777                .or_default()
778                .insert(*key);
779        }
780        targets.paid_targets.insert(*key, target_peers.clone());
781        targets.paid_group_sizes.insert(*key, target_peers.len());
782    }
783    for keys_list in targets.peer_to_keys.values_mut() {
784        keys_list.sort_unstable();
785        keys_list.dedup();
786    }
787
788    let evidence = quorum::run_verification_round(&keys, &targets, p2p_node, config).await;
789    paid_confirmations_by_key(expired_candidates, &evidence)
790}
791
792/// Aggregate `Confirmed` paid-list evidence into per-key peer sets.
793///
794/// Only peers in the candidate's own target set count; `NotFound` and
795/// `Unresolved` answers never confirm.
796fn paid_confirmations_by_key(
797    expired_candidates: &[(XorName, Vec<PeerId>)],
798    evidence: &HashMap<XorName, KeyVerificationEvidence>,
799) -> HashMap<XorName, HashSet<PeerId>> {
800    let mut confirmed_by_key: HashMap<XorName, HashSet<PeerId>> = HashMap::new();
801    for (key, target_peers) in expired_candidates {
802        let Some(key_evidence) = evidence.get(key) else {
803            continue;
804        };
805        let confirmed: HashSet<PeerId> = key_evidence
806            .paid_list
807            .iter()
808            .filter(|&(peer, status)| {
809                *status == PaidListEvidence::Confirmed && target_peers.contains(peer)
810            })
811            .map(|(peer, _)| *peer)
812            .collect();
813        if !confirmed.is_empty() {
814            confirmed_by_key.insert(*key, confirmed);
815        }
816    }
817    confirmed_by_key
818}
819
820/// Filter `target_peers` down to those with a mature repair proof for `key`.
821///
822/// Per design rule 20, peers without a key-specific mature repair hint proof
823/// are never audited for that key.
824async fn peers_with_mature_repair_proofs(
825    key: &XorName,
826    target_peers: &[PeerId],
827    current_close_peers: &HashSet<PeerId>,
828    repair_proofs: &Arc<RwLock<RepairProofs>>,
829    current_sync_epoch: u64,
830    now: Instant,
831) -> Vec<PeerId> {
832    let mut proofs = repair_proofs.write().await;
833    target_peers
834        .iter()
835        .filter(|peer| {
836            proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch, now)
837        })
838        .copied()
839        .collect()
840}
841
842async fn prune_scan_start(
843    sync_state: &Arc<RwLock<NeighborSyncState>>,
844    stored_key_count: usize,
845) -> usize {
846    if stored_key_count == 0 {
847        return 0;
848    }
849    sync_state.read().await.prune_cursor % stored_key_count
850}
851
852async fn advance_prune_cursor(
853    sync_state: &Arc<RwLock<NeighborSyncState>>,
854    stored_key_count: usize,
855    scan_start: usize,
856    last_selected_offset: Option<usize>,
857) {
858    if stored_key_count == 0 {
859        sync_state.write().await.prune_cursor = 0;
860        return;
861    }
862
863    let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
864    sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
865}
866
867async fn delete_stored_records(
868    keys_to_delete: &[XorName],
869    storage: &Arc<LmdbStorage>,
870    paid_list: &Arc<PaidList>,
871    repair_proofs: &Arc<RwLock<RepairProofs>>,
872) -> usize {
873    let mut pruned = 0;
874
875    for key in keys_to_delete {
876        if let Err(e) = storage.delete(key).await {
877            warn!("Failed to prune record {}: {e}", hex::encode(key));
878        } else {
879            pruned += 1;
880            paid_list.clear_record_out_of_range(key);
881            repair_proofs.write().await.remove_key(key);
882            // Seed the PaidForList out-of-range timer so the second pass can
883            // prune the entry sooner, closing the re-admission window between
884            // the storage delete and the PaidForList prune pass.
885            paid_list.set_paid_out_of_range(key);
886            debug!("Pruned out-of-range record {}", hex::encode(key));
887        }
888    }
889
890    pruned
891}
892
893/// Collect positive presence reports for prune candidates.
894///
895/// A key is deleted once all but one of the current close group prove
896/// possession ([`prune_proofs_needed`]). Requiring unanimous proofs left
897/// out-of-range records undeletable whenever a single close-group peer
898/// lagged, while the all-but-one threshold still demands more copies than
899/// the storage quorum used elsewhere. Keys below the proof threshold stay
900/// local, and the retained record continues to participate in normal
901/// neighbor-sync repair because replica hint construction walks all locally
902/// stored keys, including out-of-range keys retained by hysteresis.
903async fn collect_record_prune_proofs(
904    candidates: &[RecordPruneCandidate],
905    storage: &Arc<LmdbStorage>,
906    p2p_node: &Arc<P2PNode>,
907    config: &ReplicationConfig,
908    sync_state: &Arc<RwLock<NeighborSyncState>>,
909) -> HashMap<XorName, HashSet<PeerId>> {
910    if candidates.is_empty() {
911        return HashMap::new();
912    }
913
914    let report_state = PruneAuditReportState::default();
915    let mut requests = stream::iter(build_peer_audit_challenges(candidates))
916        .map(|(peer, key)| {
917            peer_proves_record(
918                peer,
919                key,
920                storage,
921                p2p_node,
922                config,
923                sync_state,
924                &report_state,
925            )
926        })
927        .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
928
929    let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
930    while let Some(proof) = requests.next().await {
931        if let Some((peer, key)) = proof {
932            present_by_key.entry(key).or_default().insert(peer);
933        }
934    }
935
936    present_by_key
937}
938
939async fn revalidated_record_prune_keys(
940    candidates: &[RecordPruneCandidate],
941    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
942    self_id: &PeerId,
943    paid_list: &Arc<PaidList>,
944    p2p_node: &Arc<P2PNode>,
945    config: &ReplicationConfig,
946) -> (Vec<XorName>, usize) {
947    let mut keys_to_delete = Vec::new();
948    let mut cleared = 0;
949    let now = Instant::now();
950
951    for candidate in candidates {
952        let (storage_admission_group, strict_close_group) =
953            record_prune_lookup_groups(&candidate.key, p2p_node, config).await;
954
955        if storage_admission_group
956            .iter()
957            .any(|n| n.peer_id == *self_id)
958        {
959            if paid_list
960                .record_out_of_range_since(&candidate.key)
961                .is_some()
962            {
963                paid_list.clear_record_out_of_range(&candidate.key);
964                cleared += 1;
965            }
966            continue;
967        }
968
969        let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
970            continue;
971        };
972        let elapsed = now
973            .checked_duration_since(first_seen)
974            .unwrap_or(Duration::ZERO);
975        if elapsed < config.prune_hysteresis_duration {
976            continue;
977        }
978
979        let current_target_peers = remote_close_group_peers(&strict_close_group, self_id);
980        if current_target_peers.is_empty() {
981            warn!(
982                "Cannot prune {}: current close group has no remote peers",
983                hex::encode(candidate.key)
984            );
985            continue;
986        }
987
988        let proofs_needed = prune_proofs_needed(current_target_peers.len());
989        if target_peers_reported_present(
990            &candidate.key,
991            &current_target_peers,
992            present_by_key,
993            proofs_needed,
994        ) {
995            keys_to_delete.push(candidate.key);
996        } else {
997            debug!(
998                "Deferring prune for {} until all but one of the current close group \
999                 report it",
1000                hex::encode(candidate.key)
1001            );
1002        }
1003    }
1004
1005    (keys_to_delete, cleared)
1006}
1007
1008fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
1009    let mut challenges = Vec::new();
1010    for candidate in candidates {
1011        for peer in &candidate.target_peers {
1012            challenges.push((*peer, candidate.key));
1013        }
1014    }
1015    challenges
1016}
1017
1018#[cfg(test)]
1019fn confirmed_keys_from_presence(
1020    candidates: &[RecordPruneCandidate],
1021    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1022    proofs_needed: usize,
1023) -> Vec<XorName> {
1024    candidates
1025        .iter()
1026        .filter(|candidate| {
1027            target_peers_reported_present(
1028                &candidate.key,
1029                &candidate.target_peers,
1030                present_by_key,
1031                proofs_needed,
1032            )
1033        })
1034        .map(|candidate| candidate.key)
1035        .collect()
1036}
1037
1038/// Proofs required before deleting an out-of-range record: all but one of
1039/// the close group (6 of 7 at production parameters).
1040///
1041/// Stricter than the storage quorum (`QuorumNeeded`) because pruning only
1042/// runs after `PRUNE_HYSTERESIS_DURATION` out of range, by which time many
1043/// sync cycles should have replicated the record across the whole close
1044/// group. Tolerating exactly one lagging peer keeps a single absent peer
1045/// from vetoing deletion forever without accepting under-replication.
1046/// Groups of one or two peers require every proof: tolerating a miss there
1047/// would allow deletion on a single attestation.
1048fn prune_proofs_needed(group_size: usize) -> usize {
1049    if group_size <= 2 {
1050        group_size
1051    } else {
1052        group_size - 1
1053    }
1054}
1055
1056/// Whether enough target peers supplied positive evidence to allow deletion.
1057///
1058/// `proofs_needed == 0` means confirmation is impossible (no targets), not
1059/// trivially met.
1060fn target_peers_reported_present(
1061    key: &XorName,
1062    target_peers: &[PeerId],
1063    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1064    proofs_needed: usize,
1065) -> bool {
1066    if proofs_needed == 0 {
1067        return false;
1068    }
1069    let Some(present_peers) = present_by_key.get(key) else {
1070        return false;
1071    };
1072    // Count distinct proven peers: iterating the present set keeps a
1073    // duplicated entry in `target_peers` from being counted twice.
1074    let proven = present_peers
1075        .iter()
1076        .filter(|peer| target_peers.contains(peer))
1077        .count();
1078    proven >= proofs_needed
1079}
1080
1081/// Challenge a peer to prove it holds the exact record bytes for `key`.
1082/// `None` means the peer failed to provide usable proof.
1083async fn peer_proves_record(
1084    peer: PeerId,
1085    key: XorName,
1086    storage: &Arc<LmdbStorage>,
1087    p2p_node: &Arc<P2PNode>,
1088    config: &ReplicationConfig,
1089    sync_state: &Arc<RwLock<NeighborSyncState>>,
1090    report_state: &PruneAuditReportState,
1091) -> Option<(PeerId, XorName)> {
1092    let local_bytes = local_record_bytes(&key, storage).await?;
1093
1094    let (challenge_id, nonce) = {
1095        let mut rng = rand::thread_rng();
1096        (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
1097    };
1098    let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
1099    let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
1100    else {
1101        // No decoded response means we did not observe the peer stop claiming
1102        // bootstrap status. Preserve any active claim so a later claim is not
1103        // misclassified as repeated abuse.
1104        report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1105        return None;
1106    };
1107
1108    let status =
1109        prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
1110    if prune_audit_response_clears_bootstrap_claim(status) {
1111        clear_prune_bootstrap_claim(&peer, sync_state).await;
1112    }
1113
1114    match status {
1115        PruneAuditStatus::Proven => Some((peer, key)),
1116        PruneAuditStatus::Bootstrapping => {
1117            report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
1118                .await;
1119            None
1120        }
1121        PruneAuditStatus::Failed => {
1122            report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1123            None
1124        }
1125    }
1126}
1127
1128fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
1129    matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
1130}
1131
1132fn encode_prune_audit_challenge(
1133    peer: &PeerId,
1134    key: XorName,
1135    challenge_id: u64,
1136    nonce: [u8; 32],
1137) -> Option<Vec<u8>> {
1138    let challenge = AuditChallenge {
1139        challenge_id,
1140        nonce,
1141        challenged_peer_id: *peer.as_bytes(),
1142        keys: vec![key],
1143    };
1144    let msg = ReplicationMessage {
1145        request_id: challenge_id,
1146        body: ReplicationMessageBody::AuditChallenge(challenge),
1147    };
1148    let encoded = match msg.encode() {
1149        Ok(data) => data,
1150        Err(e) => {
1151            warn!(
1152                "Failed to encode prune audit challenge for {} against {peer}: {e}",
1153                hex::encode(key),
1154            );
1155            return None;
1156        }
1157    };
1158    Some(encoded)
1159}
1160
1161async fn send_prune_audit_challenge(
1162    peer: &PeerId,
1163    key: &XorName,
1164    encoded: Vec<u8>,
1165    p2p_node: &Arc<P2PNode>,
1166    config: &ReplicationConfig,
1167) -> Option<ReplicationMessage> {
1168    let response = match p2p_node
1169        .send_request(
1170            peer,
1171            REPLICATION_PROTOCOL_ID,
1172            encoded,
1173            config.audit_response_timeout(1),
1174        )
1175        .await
1176    {
1177        Ok(response) => response,
1178        Err(e) => {
1179            debug!(
1180                "Prune audit challenge for {} against {peer} failed: {e}",
1181                hex::encode(key)
1182            );
1183            return None;
1184        }
1185    };
1186
1187    let decoded = match ReplicationMessage::decode(&response.data) {
1188        Ok(msg) => msg,
1189        Err(e) => {
1190            warn!("Failed to decode prune audit response from {peer}: {e}");
1191            return None;
1192        }
1193    };
1194
1195    Some(decoded)
1196}
1197
1198fn prune_audit_response_status(
1199    decoded: ReplicationMessage,
1200    challenge_id: u64,
1201    peer: &PeerId,
1202    key: &XorName,
1203    nonce: &[u8; 32],
1204    local_bytes: &[u8],
1205) -> PruneAuditStatus {
1206    match decoded.body {
1207        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
1208            challenge_id: resp_id,
1209            digests,
1210        }) => {
1211            if resp_id != challenge_id {
1212                warn!("Prune audit challenge ID mismatch from {peer}");
1213                return PruneAuditStatus::Failed;
1214            }
1215            let [digest] = digests.as_slice() else {
1216                warn!(
1217                    "Prune audit response from {peer} returned {} digests for one challenged key",
1218                    digests.len(),
1219                );
1220                return PruneAuditStatus::Failed;
1221            };
1222            if *digest == ABSENT_KEY_DIGEST {
1223                warn!(
1224                    "Prune audit proof from {peer} failed for {}: peer reports key absent",
1225                    hex::encode(key)
1226                );
1227                return PruneAuditStatus::Failed;
1228            }
1229            if audit_digest_proves_key(peer, key, nonce, local_bytes, digest) {
1230                PruneAuditStatus::Proven
1231            } else {
1232                warn!(
1233                    "Prune audit proof from {peer} failed for {}: digest mismatch",
1234                    hex::encode(key)
1235                );
1236                PruneAuditStatus::Failed
1237            }
1238        }
1239        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
1240            challenge_id: resp_id,
1241        }) => {
1242            if resp_id == challenge_id {
1243                warn!(
1244                    "Prune audit proof for {} blocked by bootstrap claim from {peer}",
1245                    hex::encode(key)
1246                );
1247                PruneAuditStatus::Bootstrapping
1248            } else {
1249                warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
1250                PruneAuditStatus::Failed
1251            }
1252        }
1253        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
1254            challenge_id: resp_id,
1255            reason,
1256        }) => {
1257            if resp_id == challenge_id {
1258                warn!(
1259                    "Prune audit proof for {} rejected by {peer}: {reason}",
1260                    hex::encode(key)
1261                );
1262            } else {
1263                warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
1264            }
1265            PruneAuditStatus::Failed
1266        }
1267        _ => {
1268            warn!("Unexpected prune audit response type from {peer}");
1269            PruneAuditStatus::Failed
1270        }
1271    }
1272}
1273
1274async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
1275    match storage.get_raw(key).await {
1276        Ok(Some(bytes)) => Some(bytes),
1277        Ok(None) => {
1278            debug!(
1279                "Cannot prune-audit {}: local record disappeared",
1280                hex::encode(key)
1281            );
1282            None
1283        }
1284        Err(e) => {
1285            warn!(
1286                "Cannot prune-audit {}: failed to read local record: {e}",
1287                hex::encode(key)
1288            );
1289            None
1290        }
1291    }
1292}
1293
1294fn audit_digest_proves_key(
1295    peer: &PeerId,
1296    key: &XorName,
1297    nonce: &[u8; 32],
1298    local_bytes: &[u8],
1299    digest: &[u8; 32],
1300) -> bool {
1301    if *digest == ABSENT_KEY_DIGEST {
1302        return false;
1303    }
1304    let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
1305    *digest == expected
1306}
1307
1308async fn report_prune_audit_failure_once(
1309    peer: &PeerId,
1310    key: &XorName,
1311    p2p_node: &Arc<P2PNode>,
1312    config: &ReplicationConfig,
1313    report_state: &PruneAuditReportState,
1314) -> bool {
1315    let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
1316        && reserve_prune_audit_failure_report(report_state, peer).await;
1317    if !should_report {
1318        return false;
1319    }
1320
1321    p2p_node
1322        .report_trust_event(
1323            peer,
1324            saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
1325        )
1326        .await;
1327    true
1328}
1329
1330async fn reserve_prune_audit_failure_report(
1331    report_state: &PruneAuditReportState,
1332    peer: &PeerId,
1333) -> bool {
1334    report_state.audit_failures.write().await.insert(*peer)
1335}
1336
1337async fn reserve_prune_bootstrap_abuse_report(
1338    report_state: &PruneAuditReportState,
1339    peer: &PeerId,
1340) -> bool {
1341    report_state.bootstrap_abuse.write().await.insert(*peer)
1342}
1343
1344async fn report_prune_bootstrap_claim(
1345    peer: &PeerId,
1346    key: &XorName,
1347    p2p_node: &Arc<P2PNode>,
1348    config: &ReplicationConfig,
1349    sync_state: &Arc<RwLock<NeighborSyncState>>,
1350    report_state: &PruneAuditReportState,
1351) {
1352    if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
1353        return;
1354    }
1355
1356    let observation = {
1357        let now = Instant::now();
1358        let mut state = sync_state.write().await;
1359        (
1360            now,
1361            state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
1362        )
1363    };
1364
1365    let (now, observation) = observation;
1366    match observation {
1367        BootstrapClaimObservation::WithinGrace { .. } => {
1368            debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
1369            return;
1370        }
1371        BootstrapClaimObservation::PastGrace { first_seen } => {
1372            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1373                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1374                return;
1375            }
1376            warn!(
1377                "Prune audit: peer {peer} claiming bootstrap past grace period \
1378                 ({:?} > {:?}), reporting abuse",
1379                now.duration_since(first_seen),
1380                config.bootstrap_claim_grace_period,
1381            );
1382        }
1383        BootstrapClaimObservation::Repeated { first_seen } => {
1384            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1385                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1386                return;
1387            }
1388            warn!(
1389                "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
1390                 first claim was {:?} ago, reporting abuse",
1391                now.duration_since(first_seen),
1392            );
1393        }
1394    }
1395
1396    p2p_node
1397        .report_trust_event(
1398            peer,
1399            saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1400        )
1401        .await;
1402}
1403
1404async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
1405    let removed = {
1406        let mut state = sync_state.write().await;
1407        state.clear_active_bootstrap_claim(peer)
1408    };
1409    if removed {
1410        debug!("Prune audit: cleared active bootstrap claim for {peer}");
1411    }
1412}
1413
1414async fn peer_is_currently_responsible(
1415    peer: &PeerId,
1416    key: &XorName,
1417    p2p_node: &Arc<P2PNode>,
1418    config: &ReplicationConfig,
1419) -> bool {
1420    let closest = p2p_node
1421        .dht_manager()
1422        .find_closest_nodes_local_with_self(key, config.close_group_size)
1423        .await;
1424    closest.iter().any(|node| node.peer_id == *peer)
1425}
1426
1427#[cfg(test)]
1428#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1429mod tests {
1430    use super::*;
1431
1432    fn peer_id_from_byte(b: u8) -> PeerId {
1433        let mut bytes = [0u8; 32];
1434        bytes[0] = b;
1435        PeerId::from_bytes(bytes)
1436    }
1437
1438    fn key_from_byte(b: u8) -> XorName {
1439        [b; 32]
1440    }
1441
1442    fn peer_ids(count: usize) -> Vec<PeerId> {
1443        (0..count)
1444            .map(|idx| peer_id_from_byte(u8::try_from(idx + 1).expect("peer byte")))
1445            .collect()
1446    }
1447
1448    fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
1449        RecordPruneCandidate { key, target_peers }
1450    }
1451
1452    #[test]
1453    fn prune_audit_challenges_are_one_per_candidate_peer() {
1454        let peer_a = peer_id_from_byte(1);
1455        let peer_b = peer_id_from_byte(2);
1456        let key_a = key_from_byte(0xA);
1457        let key_b = key_from_byte(0xB);
1458        let candidates = vec![
1459            candidate(key_a, vec![peer_a, peer_b]),
1460            candidate(key_b, vec![peer_b]),
1461        ];
1462
1463        let mut challenges = build_peer_audit_challenges(&candidates);
1464        challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1465
1466        let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
1467        expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1468        assert_eq!(challenges, expected);
1469    }
1470
1471    #[test]
1472    fn confirmed_keys_require_quorum_of_target_peers_present() {
1473        let peer_a = peer_id_from_byte(1);
1474        let peer_b = peer_id_from_byte(2);
1475        let peer_c = peer_id_from_byte(3);
1476        let key = key_from_byte(0xC);
1477        let candidates = vec![candidate(key, vec![peer_a, peer_b, peer_c])];
1478        let mut present_by_key = HashMap::new();
1479        present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
1480
1481        // Two of three proofs meet a quorum of 2 even though one peer is
1482        // missing — unanimity is not required.
1483        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1484        assert_eq!(confirmed, vec![key]);
1485
1486        // The same evidence fails a quorum of 3.
1487        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 3);
1488        assert!(confirmed.is_empty());
1489    }
1490
1491    #[test]
1492    fn confirmed_keys_defer_below_quorum_or_missing_peer_evidence() {
1493        let peer_a = peer_id_from_byte(1);
1494        let peer_b = peer_id_from_byte(2);
1495        let quorum_key = key_from_byte(0xD);
1496        let below_quorum_key = key_from_byte(0xE);
1497        let missing_key = key_from_byte(0xF);
1498        let candidates = vec![
1499            candidate(quorum_key, vec![peer_a, peer_b]),
1500            candidate(below_quorum_key, vec![peer_a, peer_b]),
1501            candidate(missing_key, vec![peer_a, peer_b]),
1502        ];
1503        let mut present_by_key = HashMap::new();
1504        present_by_key.insert(quorum_key, HashSet::from([peer_a, peer_b]));
1505        present_by_key.insert(below_quorum_key, HashSet::from([peer_a]));
1506
1507        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1508
1509        assert_eq!(confirmed, vec![quorum_key]);
1510    }
1511
1512    #[test]
1513    fn prune_proofs_needed_tolerates_exactly_one_lagging_peer() {
1514        assert_eq!(prune_proofs_needed(0), 0);
1515        // Tiny groups require every proof.
1516        assert_eq!(prune_proofs_needed(1), 1);
1517        assert_eq!(prune_proofs_needed(2), 2);
1518        assert_eq!(prune_proofs_needed(3), 2);
1519        assert_eq!(prune_proofs_needed(5), 4);
1520        // Production close group: 6 of 7 proofs required.
1521        assert_eq!(prune_proofs_needed(7), 6);
1522    }
1523
1524    #[test]
1525    fn paid_prune_confirmations_are_three_quarters_rounded_up() {
1526        assert_eq!(paid_prune_confirmations_needed(0), 0);
1527        assert_eq!(paid_prune_confirmations_needed(1), 1);
1528        assert_eq!(paid_prune_confirmations_needed(2), 2);
1529        assert_eq!(paid_prune_confirmations_needed(4), 3);
1530        // Production paid close group: 15 of 20 confirmations required.
1531        assert_eq!(paid_prune_confirmations_needed(20), 15);
1532    }
1533
1534    #[test]
1535    fn paid_prune_peer_budget_allows_overlapping_targets() {
1536        let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS);
1537        let mut selected_peers = HashSet::new();
1538
1539        assert!(reserve_paid_prune_peer_budget(&peers, &mut selected_peers));
1540        assert_eq!(
1541            selected_peers.len(),
1542            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1543        );
1544
1545        let overlapping_targets = vec![peers[0], peers[1]];
1546        assert!(reserve_paid_prune_peer_budget(
1547            &overlapping_targets,
1548            &mut selected_peers,
1549        ));
1550        assert_eq!(
1551            selected_peers.len(),
1552            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1553        );
1554    }
1555
1556    #[test]
1557    fn paid_prune_peer_budget_rejects_new_peers_past_cap() {
1558        let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS + 1);
1559        let mut selected_peers = HashSet::new();
1560
1561        assert!(reserve_paid_prune_peer_budget(
1562            &peers[..MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS],
1563            &mut selected_peers,
1564        ));
1565        assert!(!reserve_paid_prune_peer_budget(
1566            &peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS..],
1567            &mut selected_peers,
1568        ));
1569        assert_eq!(
1570            selected_peers.len(),
1571            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1572        );
1573        assert!(!selected_peers.contains(&peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS]));
1574    }
1575
1576    #[test]
1577    fn paid_confirmations_count_only_confirmed_target_peers() {
1578        let confirmed_peer = peer_id_from_byte(1);
1579        let not_found_peer = peer_id_from_byte(2);
1580        let unresolved_peer = peer_id_from_byte(3);
1581        let outsider = peer_id_from_byte(4);
1582        let key = key_from_byte(0x21);
1583        let candidates = vec![(key, vec![confirmed_peer, not_found_peer, unresolved_peer])];
1584
1585        let mut evidence = HashMap::new();
1586        evidence.insert(
1587            key,
1588            KeyVerificationEvidence {
1589                presence: HashMap::new(),
1590                paid_list: HashMap::from([
1591                    (confirmed_peer, PaidListEvidence::Confirmed),
1592                    (not_found_peer, PaidListEvidence::NotFound),
1593                    (unresolved_peer, PaidListEvidence::Unresolved),
1594                    // Confirmation from a peer outside the target set.
1595                    (outsider, PaidListEvidence::Confirmed),
1596                ]),
1597            },
1598        );
1599
1600        let confirmed_by_key = paid_confirmations_by_key(&candidates, &evidence);
1601
1602        assert_eq!(
1603            confirmed_by_key.get(&key),
1604            Some(&HashSet::from([confirmed_peer])),
1605            "only Confirmed answers from target peers may count",
1606        );
1607    }
1608
1609    #[test]
1610    fn paid_confirmations_skip_keys_without_evidence() {
1611        let peer = peer_id_from_byte(1);
1612        let key = key_from_byte(0x22);
1613        let candidates = vec![(key, vec![peer])];
1614
1615        let confirmed_by_key = paid_confirmations_by_key(&candidates, &HashMap::new());
1616
1617        assert!(confirmed_by_key.is_empty());
1618    }
1619
1620    #[test]
1621    fn zero_quorum_never_confirms() {
1622        let peer_a = peer_id_from_byte(1);
1623        let key = key_from_byte(0x10);
1624        let mut present_by_key = HashMap::new();
1625        present_by_key.insert(key, HashSet::from([peer_a]));
1626
1627        assert!(!target_peers_reported_present(
1628            &key,
1629            &[peer_a],
1630            &present_by_key,
1631            0
1632        ));
1633    }
1634
1635    #[test]
1636    fn proofs_from_non_target_peers_do_not_count_toward_quorum() {
1637        let target = peer_id_from_byte(1);
1638        let outsider = peer_id_from_byte(2);
1639        let key = key_from_byte(0x11);
1640        let mut present_by_key = HashMap::new();
1641        present_by_key.insert(key, HashSet::from([outsider]));
1642
1643        assert!(!target_peers_reported_present(
1644            &key,
1645            &[target],
1646            &present_by_key,
1647            1
1648        ));
1649    }
1650
1651    #[test]
1652    fn duplicated_target_peer_counts_once_toward_quorum() {
1653        let peer = peer_id_from_byte(1);
1654        let key = key_from_byte(0x12);
1655        let mut present_by_key = HashMap::new();
1656        present_by_key.insert(key, HashSet::from([peer]));
1657
1658        assert!(!target_peers_reported_present(
1659            &key,
1660            &[peer, peer],
1661            &present_by_key,
1662            2
1663        ));
1664    }
1665
1666    #[test]
1667    fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
1668        let peer = peer_id_from_byte(1);
1669        let other_peer = peer_id_from_byte(2);
1670        let key = key_from_byte(0x11);
1671        let other_key = key_from_byte(0x12);
1672        let nonce = [0xAA; 32];
1673        let other_nonce = [0xBB; 32];
1674        let bytes = b"record bytes";
1675        let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
1676
1677        assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
1678        assert!(!audit_digest_proves_key(
1679            &other_peer,
1680            &key,
1681            &nonce,
1682            bytes,
1683            &digest
1684        ));
1685        assert!(!audit_digest_proves_key(
1686            &peer, &other_key, &nonce, bytes, &digest
1687        ));
1688        assert!(!audit_digest_proves_key(
1689            &peer,
1690            &key,
1691            &other_nonce,
1692            bytes,
1693            &digest
1694        ));
1695        assert!(!audit_digest_proves_key(
1696            &peer,
1697            &key,
1698            &nonce,
1699            b"different bytes",
1700            &digest
1701        ));
1702        assert!(!audit_digest_proves_key(
1703            &peer,
1704            &key,
1705            &nonce,
1706            bytes,
1707            &ABSENT_KEY_DIGEST
1708        ));
1709    }
1710
1711    #[tokio::test]
1712    async fn prune_cursor_advances_past_selected_budget_window() {
1713        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1714        state.write().await.prune_cursor = 2;
1715
1716        let start = prune_scan_start(&state, 10).await;
1717        advance_prune_cursor(&state, 10, start, Some(3)).await;
1718
1719        assert_eq!(state.read().await.prune_cursor, 6);
1720    }
1721
1722    #[tokio::test]
1723    async fn prune_cursor_advances_even_when_no_candidate_selected() {
1724        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1725        state.write().await.prune_cursor = 9;
1726
1727        let start = prune_scan_start(&state, 10).await;
1728        advance_prune_cursor(&state, 10, start, None).await;
1729
1730        assert_eq!(state.read().await.prune_cursor, 0);
1731    }
1732
1733    #[tokio::test]
1734    async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1735        let peer = peer_id_from_byte(1);
1736        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1737        let first_seen = Instant::now();
1738        state
1739            .write()
1740            .await
1741            .bootstrap_claims
1742            .insert(peer, first_seen);
1743        state
1744            .write()
1745            .await
1746            .bootstrap_claim_history
1747            .insert(peer, first_seen);
1748
1749        clear_prune_bootstrap_claim(&peer, &state).await;
1750
1751        let state = state.read().await;
1752        assert!(!state.bootstrap_claims.contains_key(&peer));
1753        assert!(state.bootstrap_claim_history.contains_key(&peer));
1754    }
1755
1756    #[test]
1757    fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1758        assert!(prune_audit_response_clears_bootstrap_claim(
1759            PruneAuditStatus::Proven
1760        ));
1761        assert!(prune_audit_response_clears_bootstrap_claim(
1762            PruneAuditStatus::Failed
1763        ));
1764        assert!(!prune_audit_response_clears_bootstrap_claim(
1765            PruneAuditStatus::Bootstrapping
1766        ));
1767    }
1768
1769    #[tokio::test]
1770    async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1771        let peer = peer_id_from_byte(1);
1772        let other_peer = peer_id_from_byte(2);
1773        let report_state = PruneAuditReportState::default();
1774
1775        assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1776        assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1777        assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1778
1779        let reported = report_state.audit_failures.read().await;
1780        assert_eq!(reported.len(), 2);
1781        assert!(reported.contains(&peer));
1782        assert!(reported.contains(&other_peer));
1783    }
1784
1785    #[tokio::test]
1786    async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1787        let peer = peer_id_from_byte(1);
1788        let other_peer = peer_id_from_byte(2);
1789        let report_state = PruneAuditReportState::default();
1790
1791        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1792        assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1793        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1794
1795        let reported = report_state.bootstrap_abuse.read().await;
1796        assert_eq!(reported.len(), 2);
1797        assert!(reported.contains(&peer));
1798        assert!(reported.contains(&other_peer));
1799    }
1800}