Skip to main content

ant_node/replication/
pruning.rs

1//! Post-cycle responsibility pruning (Section 11).
2//!
3//! On `NeighborSyncCycleComplete`: prune stored records and `PaidForList`
4//! entries that have been continuously out of range for at least
5//! `PRUNE_HYSTERESIS_DURATION`.
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::commitment_state::ResponderCommitmentState;
21use crate::replication::config::{
22    storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT,
23    MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID,
24};
25use crate::replication::paid_list::PaidList;
26use crate::replication::protocol::{
27    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
28    ReplicationMessageBody, ABSENT_KEY_DIGEST,
29};
30use crate::replication::quorum::{self, VerificationTargets};
31use crate::replication::types::{
32    BootstrapClaimObservation, KeyVerificationEvidence, NeighborSyncState, PaidListEvidence,
33    RepairProofs,
34};
35use crate::storage::LmdbStorage;
36
37use super::REPLICATION_TRUST_WEIGHT;
38
39const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
40
41/// Maximum expired `PaidForList` entries selected for verification per prune
42/// pass. The unique peer fan-out for those entries is capped separately.
43const MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS: usize = 32;
44/// Maximum unique peers contacted for paid-list verification per prune pass.
45/// `quorum::run_verification_round` sends one request per target peer.
46const MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS: usize = MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES;
47
48// ---------------------------------------------------------------------------
49// Result type
50// ---------------------------------------------------------------------------
51
52/// Summary of a prune pass.
53#[derive(Debug, Default)]
54pub struct PruneResult {
55    /// Number of records deleted from storage.
56    pub records_pruned: usize,
57    /// Number of records with out-of-range timestamp newly set.
58    pub records_marked_out_of_range: usize,
59    /// Number of records with out-of-range timestamp cleared (back in range).
60    pub records_cleared: usize,
61    /// Number of `PaidForList` entries removed.
62    pub paid_entries_pruned: usize,
63    /// Number of `PaidForList` entries with out-of-range timestamp newly set.
64    pub paid_entries_marked: usize,
65    /// Number of `PaidForList` entries cleared (back in range).
66    pub paid_entries_cleared: usize,
67}
68
69/// Shared dependencies and switches for one prune pass.
70pub struct PrunePassContext<'a> {
71    /// Local peer id.
72    pub self_id: &'a PeerId,
73    /// Local record storage.
74    pub storage: &'a Arc<LmdbStorage>,
75    /// Persistent paid-list state.
76    pub paid_list: &'a Arc<PaidList>,
77    /// P2P node used for routing lookups and prune-confirmation audits.
78    pub p2p_node: &'a Arc<P2PNode>,
79    /// Replication configuration.
80    pub config: &'a ReplicationConfig,
81    /// Neighbor-sync state, including prune cursor and bootstrap claims.
82    pub sync_state: &'a Arc<RwLock<NeighborSyncState>>,
83    /// Key-specific repair proofs used to gate prune-confirmation audits.
84    pub repair_proofs: &'a Arc<RwLock<RepairProofs>>,
85    /// Current local neighbor-sync cycle epoch for repair-proof maturity.
86    pub current_sync_epoch: u64,
87    /// Test-only clock override for repair-proof maturity checks.
88    #[cfg(any(test, feature = "test-utils"))]
89    pub repair_proof_now: Option<Instant>,
90    /// Whether remote prune-confirmation audits are allowed this pass.
91    pub allow_remote_prune_audits: bool,
92    /// Responder commitment state, used to veto deleting a chunk still held
93    /// under a recently-gossiped commitment (so the storage-commitment audit's
94    /// round-2 byte challenge cannot false-positive an honest node). `None` on
95    /// the legacy/test-only prune path, which keeps the pre-retention behavior.
96    pub commitment_state: Option<&'a Arc<ResponderCommitmentState>>,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100enum PruneAuditStatus {
101    Proven,
102    Failed,
103    Bootstrapping,
104}
105
106#[derive(Debug, Default)]
107struct RecordPruneStats {
108    marked: usize,
109    cleared: usize,
110    pruned: usize,
111    held_by_commitment: usize,
112}
113
114#[derive(Debug, Default)]
115struct PaidPruneStats {
116    marked: usize,
117    cleared: usize,
118    pruned: usize,
119}
120
121#[derive(Debug, Default)]
122struct PaidPruneDeferredCounts {
123    entry_budget: usize,
124    remote_gate: usize,
125    peer_budget: usize,
126}
127
128impl PaidPruneDeferredCounts {
129    fn log(&self) {
130        if self.entry_budget > 0 {
131            debug!(
132                "Deferred {} expired PaidForList entries beyond the per-pass verification cap \
133                 ({MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS})",
134                self.entry_budget,
135            );
136        }
137
138        if self.remote_gate > 0 {
139            debug!(
140                "Deferred {} expired PaidForList entries until bootstrap drain allows remote \
141                 paid-prune verification",
142                self.remote_gate,
143            );
144        }
145
146        if self.peer_budget > 0 {
147            debug!(
148                "Deferred {} expired PaidForList entries beyond the per-pass paid-prune peer cap \
149                 ({MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS})",
150                self.peer_budget,
151            );
152        }
153    }
154}
155
156#[derive(Debug, Clone)]
157struct RecordPruneCandidate {
158    key: XorName,
159    target_peers: Vec<PeerId>,
160}
161
162struct RecordPruneKeyOutcome {
163    marked: bool,
164    state: RecordPruneKeyState,
165}
166
167impl Default for RecordPruneKeyOutcome {
168    fn default() -> Self {
169        Self {
170            marked: false,
171            state: RecordPruneKeyState::None,
172        }
173    }
174}
175
176enum RecordPruneKeyState {
177    None,
178    Cleared,
179    BootstrapDeferred,
180    BudgetDeferred,
181    /// Out of range, but still committed under a recently-gossiped commitment:
182    /// deletion is vetoed (and the out-of-range hysteresis clock is not even
183    /// started) until the key ages out of the last-2-gossiped window.
184    HeldByCommitment,
185    Candidate(RecordPruneCandidate),
186}
187
188enum PaidPruneKeyState {
189    None,
190    RemoteDeferred,
191    EntryBudgetDeferred,
192    PeerBudgetDeferred,
193    Candidate(Vec<PeerId>),
194}
195
196#[derive(Default)]
197struct PruneAuditReportState {
198    audit_failures: RwLock<HashSet<PeerId>>,
199    bootstrap_abuse: RwLock<HashSet<PeerId>>,
200}
201
202// ---------------------------------------------------------------------------
203// Prune pass
204// ---------------------------------------------------------------------------
205
206/// Execute post-cycle responsibility pruning.
207///
208/// For each stored record K:
209/// - If `self` is within the storage-admission group
210///   (`close_group_size + STORAGE_ADMISSION_MARGIN`): clear
211///   `RecordOutOfRangeFirstSeen`.
212/// - If not in that group: set timestamp if not already set; delete if the
213///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and all but one
214///   of the strict current close group prove they store the record.
215///
216/// For each `PaidForList` entry K:
217/// - If self is in `PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen`.
218/// - If not in group: set timestamp if not already set; remove entry if the
219///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and three
220///   quarters of the current paid close group (15 of 20 at production
221///   parameters) confirm the key in their own `PaidForList`.
222///
223/// Compatibility wrapper for callers that have not adopted repair-proof
224/// tracking. It preserves the original public signature, but it has no proof
225/// table or advanced sync epoch to pass into record prune-confirmation audits.
226/// Out-of-range records are therefore marked/deferred rather than deleted via
227/// remote confirmation. The replication engine calls
228/// [`run_prune_pass_with_context`] so it can pass real repair proofs.
229pub async fn run_prune_pass(
230    self_id: &PeerId,
231    storage: &Arc<LmdbStorage>,
232    paid_list: &Arc<PaidList>,
233    p2p_node: &Arc<P2PNode>,
234    config: &ReplicationConfig,
235    sync_state: &Arc<RwLock<NeighborSyncState>>,
236    allow_remote_prune_audits: bool,
237) -> PruneResult {
238    let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
239    run_prune_pass_with_context(PrunePassContext {
240        self_id,
241        storage,
242        paid_list,
243        p2p_node,
244        config,
245        sync_state,
246        repair_proofs: &repair_proofs,
247        current_sync_epoch: 0,
248        #[cfg(any(test, feature = "test-utils"))]
249        repair_proof_now: None,
250        allow_remote_prune_audits,
251        commitment_state: None,
252    })
253    .await
254}
255
256/// Execute one prune pass with repair-proof-gated remote confirmations.
257pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult {
258    let (stored_count, record_stats) = prune_stored_records(&ctx).await;
259    let now = Instant::now();
260    let (paid_count, paid_stats) = prune_paid_entries(
261        ctx.self_id,
262        ctx.paid_list,
263        ctx.p2p_node,
264        ctx.config,
265        now,
266        ctx.allow_remote_prune_audits,
267    )
268    .await;
269
270    let result = PruneResult {
271        records_pruned: record_stats.pruned,
272        records_marked_out_of_range: record_stats.marked,
273        records_cleared: record_stats.cleared,
274        paid_entries_pruned: paid_stats.pruned,
275        paid_entries_marked: paid_stats.marked,
276        paid_entries_cleared: paid_stats.cleared,
277    };
278
279    info!(
280        "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
281        result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
282    );
283
284    result
285}
286
287// Combines main's prune-proof/admission gate with ADR-0002's commitment
288// retention veto in one pass; the merged body runs just over the line budget.
289#[allow(clippy::too_many_lines)]
290async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) {
291    let stored_keys = match ctx.storage.all_keys().await {
292        Ok(keys) => keys,
293        Err(e) => {
294            warn!("Failed to read stored keys for pruning: {e}");
295            return (0, RecordPruneStats::default());
296        }
297    };
298
299    let now = Instant::now();
300    let mut stats = RecordPruneStats::default();
301    let mut candidates = Vec::new();
302    let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
303    let mut budget_deferred = 0usize;
304    let mut bootstrap_deferred = 0usize;
305    let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await;
306    let mut last_selected_offset = None;
307
308    for offset in 0..stored_keys.len() {
309        let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
310        let (storage_admission_group, strict_close_group) =
311            record_prune_lookup_groups(key, ctx.p2p_node, ctx.config).await;
312
313        let outcome = evaluate_record_prune_key(
314            ctx,
315            key,
316            &storage_admission_group,
317            &strict_close_group,
318            now,
319            &mut audit_challenge_budget,
320        )
321        .await;
322        if outcome.marked {
323            stats.marked += 1;
324        }
325        match outcome.state {
326            RecordPruneKeyState::None => {}
327            RecordPruneKeyState::Cleared => stats.cleared += 1,
328            RecordPruneKeyState::BootstrapDeferred => {
329                bootstrap_deferred = bootstrap_deferred.saturating_add(1);
330            }
331            RecordPruneKeyState::BudgetDeferred => {
332                budget_deferred = budget_deferred.saturating_add(1);
333            }
334            RecordPruneKeyState::HeldByCommitment => {
335                stats.held_by_commitment = stats.held_by_commitment.saturating_add(1);
336            }
337            RecordPruneKeyState::Candidate(candidate) => {
338                last_selected_offset = Some(offset);
339                candidates.push(candidate);
340            }
341        }
342    }
343
344    advance_prune_cursor(
345        ctx.sync_state,
346        stored_keys.len(),
347        scan_start,
348        last_selected_offset,
349    )
350    .await;
351
352    if bootstrap_deferred > 0 {
353        debug!(
354            "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
355             remote prune-confirmation audits"
356        );
357    }
358
359    if budget_deferred > 0 {
360        debug!(
361            "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
362             ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
363        );
364    }
365
366    if stats.held_by_commitment > 0 {
367        debug!(
368            "Vetoed {} prune candidate(s) still committed under a recently-gossiped \
369             commitment (bounded reprieve until they age out of the retention window)",
370            stats.held_by_commitment
371        );
372    }
373
374    let present_by_key = collect_record_prune_proofs(
375        &candidates,
376        ctx.storage,
377        ctx.p2p_node,
378        ctx.config,
379        ctx.sync_state,
380    )
381    .await;
382    let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
383        &candidates,
384        &present_by_key,
385        ctx.self_id,
386        ctx.paid_list,
387        ctx.p2p_node,
388        ctx.config,
389        ctx.commitment_state,
390    )
391    .await;
392    stats.cleared += revalidated_cleared;
393    stats.pruned = delete_stored_records(
394        &keys_to_delete,
395        ctx.storage,
396        ctx.paid_list,
397        ctx.repair_proofs,
398    )
399    .await;
400
401    (stored_keys.len(), stats)
402}
403
404async fn record_prune_lookup_groups(
405    key: &XorName,
406    p2p_node: &Arc<P2PNode>,
407    config: &ReplicationConfig,
408) -> (Vec<DHTNode>, Vec<DHTNode>) {
409    let dht = p2p_node.dht_manager();
410    let storage_admission_group = dht
411        .find_closest_nodes_local_with_self(key, storage_admission_width(config.close_group_size))
412        .await;
413    let strict_close_group = dht
414        .find_closest_nodes_local_with_self(key, config.close_group_size)
415        .await;
416    (storage_admission_group, strict_close_group)
417}
418
419async fn evaluate_record_prune_key(
420    ctx: &PrunePassContext<'_>,
421    key: &XorName,
422    storage_admission_group: &[DHTNode],
423    strict_close_group: &[DHTNode],
424    now: Instant,
425    audit_challenge_budget: &mut usize,
426) -> RecordPruneKeyOutcome {
427    let mut outcome = RecordPruneKeyOutcome::default();
428    let is_responsible = storage_admission_group
429        .iter()
430        .any(|node| node.peer_id == *ctx.self_id);
431
432    if is_responsible {
433        if ctx.paid_list.record_out_of_range_since(key).is_some() {
434            ctx.paid_list.clear_record_out_of_range(key);
435            outcome.state = RecordPruneKeyState::Cleared;
436        }
437        return outcome;
438    }
439
440    // Retention veto: the key has left our close group, but if it is still
441    // committed under a recently-gossiped commitment a neighbour can pin that
442    // root and demand its bytes in a round-2 byte challenge. Deleting it now
443    // would turn an honest node's response into `Absent` → a confirmed audit
444    // failure. Veto deletion AND do not even start the out-of-range hysteresis
445    // clock yet: the commitment rebuild only commits to keys we are still
446    // responsible for, so this key drops out of the next rebuilt commitment and
447    // ages out of the last-2-gossiped window within at most
448    // `RETAINED_GOSSIPED_COMMITMENTS` gossip rotations, after which `is_held`
449    // returns false and the key prunes through the normal path. This is a
450    // bounded reprieve, not a permanent pin.
451    if let Some(cs) = ctx.commitment_state {
452        if cs.is_held(key) {
453            outcome.state = RecordPruneKeyState::HeldByCommitment;
454            return outcome;
455        }
456    }
457
458    if ctx.paid_list.record_out_of_range_since(key).is_none() {
459        outcome.marked = true;
460    }
461    ctx.paid_list.set_record_out_of_range(key);
462
463    let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else {
464        return outcome;
465    };
466    let elapsed = now
467        .checked_duration_since(first_seen)
468        .unwrap_or(Duration::ZERO);
469    if elapsed < ctx.config.prune_hysteresis_duration {
470        return outcome;
471    }
472
473    if !ctx.allow_remote_prune_audits {
474        outcome.state = RecordPruneKeyState::BootstrapDeferred;
475        return outcome;
476    }
477
478    let target_peers = remote_close_group_peers(strict_close_group, ctx.self_id);
479    if target_peers.is_empty() {
480        warn!(
481            "Cannot prune {}: current close group has no remote peers",
482            hex::encode(key)
483        );
484        return outcome;
485    }
486
487    // Only peers we have hinted (mature repair proof) may be audited; the
488    // proof threshold must be reachable among them. A never-synced peer in
489    // the close group reduces the audit pool instead of vetoing the prune.
490    let current_close_peers: HashSet<PeerId> =
491        strict_close_group.iter().map(|node| node.peer_id).collect();
492    #[cfg(any(test, feature = "test-utils"))]
493    let repair_proof_now = ctx.repair_proof_now.unwrap_or(now);
494    #[cfg(not(any(test, feature = "test-utils")))]
495    let repair_proof_now = now;
496    let audit_targets = peers_with_mature_repair_proofs(
497        key,
498        &target_peers,
499        &current_close_peers,
500        ctx.repair_proofs,
501        ctx.current_sync_epoch,
502        repair_proof_now,
503    )
504    .await;
505    let proofs_needed = prune_proofs_needed(target_peers.len());
506    if proofs_needed == 0 || audit_targets.len() < proofs_needed {
507        debug!(
508            "Deferring prune for {} until enough of the close group has mature \
509             repair proofs",
510            hex::encode(key)
511        );
512        return outcome;
513    }
514
515    if audit_targets.len() > *audit_challenge_budget {
516        outcome.state = RecordPruneKeyState::BudgetDeferred;
517        return outcome;
518    }
519
520    *audit_challenge_budget -= audit_targets.len();
521    outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate {
522        key: *key,
523        target_peers: audit_targets,
524    });
525    outcome
526}
527
528async fn prune_paid_entries(
529    self_id: &PeerId,
530    paid_list: &Arc<PaidList>,
531    p2p_node: &Arc<P2PNode>,
532    config: &ReplicationConfig,
533    now: Instant,
534    allow_remote_prune_audits: bool,
535) -> (usize, PaidPruneStats) {
536    let paid_keys = match paid_list.all_keys() {
537        Ok(keys) => keys,
538        Err(e) => {
539            warn!("Failed to read PaidForList for pruning: {e}");
540            return (0, PaidPruneStats::default());
541        }
542    };
543
544    let dht = p2p_node.dht_manager();
545    let mut stats = PaidPruneStats::default();
546    let mut expired_candidates: Vec<(XorName, Vec<PeerId>)> = Vec::new();
547    let mut deferred_counts = PaidPruneDeferredCounts::default();
548    let mut selected_verification_peers = HashSet::new();
549    // Rotate the scan start so expired entries beyond the per-pass cap are
550    // not starved by the same head-of-list entries every pass.
551    let scan_start = paid_list.paid_prune_scan_start(paid_keys.len());
552    let mut last_selected_offset = None;
553
554    for offset in 0..paid_keys.len() {
555        let key = &paid_keys[(scan_start + offset) % paid_keys.len()];
556        let closest: Vec<DHTNode> = dht
557            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
558            .await;
559        let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
560
561        if in_paid_group {
562            if paid_list.paid_out_of_range_since(key).is_some() {
563                paid_list.clear_paid_out_of_range(key);
564                stats.cleared += 1;
565            }
566        } else {
567            if paid_list.paid_out_of_range_since(key).is_none() {
568                stats.marked += 1;
569            }
570            paid_list.set_paid_out_of_range(key);
571
572            if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
573                let elapsed = now
574                    .checked_duration_since(first_seen)
575                    .unwrap_or(Duration::ZERO);
576                if elapsed >= config.prune_hysteresis_duration {
577                    match select_paid_prune_candidate(
578                        key,
579                        &closest,
580                        self_id,
581                        allow_remote_prune_audits,
582                        expired_candidates.len(),
583                        &mut selected_verification_peers,
584                    ) {
585                        PaidPruneKeyState::None => {}
586                        PaidPruneKeyState::RemoteDeferred => {
587                            deferred_counts.remote_gate =
588                                deferred_counts.remote_gate.saturating_add(1);
589                        }
590                        PaidPruneKeyState::EntryBudgetDeferred => {
591                            deferred_counts.entry_budget =
592                                deferred_counts.entry_budget.saturating_add(1);
593                        }
594                        PaidPruneKeyState::PeerBudgetDeferred => {
595                            deferred_counts.peer_budget =
596                                deferred_counts.peer_budget.saturating_add(1);
597                        }
598                        PaidPruneKeyState::Candidate(target_peers) => {
599                            expired_candidates.push((*key, target_peers));
600                            last_selected_offset = Some(offset);
601                        }
602                    }
603                }
604            }
605        }
606    }
607
608    paid_list.advance_paid_prune_cursor(paid_keys.len(), scan_start, last_selected_offset);
609    deferred_counts.log();
610
611    let confirmed_by_key =
612        collect_paid_prune_confirmations(&expired_candidates, p2p_node, config).await;
613    let (paid_keys_to_delete, revalidated_cleared) = revalidated_paid_prune_keys(
614        &expired_candidates,
615        &confirmed_by_key,
616        self_id,
617        paid_list,
618        p2p_node,
619        config,
620    )
621    .await;
622    stats.cleared += revalidated_cleared;
623    stats.pruned = delete_paid_entries(&paid_keys_to_delete, paid_list).await;
624
625    (paid_keys.len(), stats)
626}
627
628fn select_paid_prune_candidate(
629    key: &XorName,
630    closest: &[DHTNode],
631    self_id: &PeerId,
632    allow_remote_prune_audits: bool,
633    selected_candidate_count: usize,
634    selected_verification_peers: &mut HashSet<PeerId>,
635) -> PaidPruneKeyState {
636    if !allow_remote_prune_audits {
637        return PaidPruneKeyState::RemoteDeferred;
638    }
639
640    let target_peers = remote_close_group_peers(closest, self_id);
641    if target_peers.is_empty() {
642        warn!(
643            "Cannot prune paid entry {}: current paid close group has no remote peers",
644            hex::encode(key)
645        );
646        return PaidPruneKeyState::None;
647    }
648
649    if selected_candidate_count >= MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS {
650        return PaidPruneKeyState::EntryBudgetDeferred;
651    }
652
653    if !reserve_paid_prune_peer_budget(&target_peers, selected_verification_peers) {
654        return PaidPruneKeyState::PeerBudgetDeferred;
655    }
656
657    PaidPruneKeyState::Candidate(target_peers)
658}
659
660async fn delete_paid_entries(keys_to_delete: &[XorName], paid_list: &Arc<PaidList>) -> usize {
661    if keys_to_delete.is_empty() {
662        return 0;
663    }
664
665    match paid_list.remove_batch(keys_to_delete).await {
666        Ok(count) => {
667            debug!("Pruned {count} out-of-range PaidForList entries");
668            count
669        }
670        Err(e) => {
671            warn!("Failed to prune PaidForList entries: {e}");
672            0
673        }
674    }
675}
676
677/// Re-check each confirmed candidate against current local state before
678/// deletion.
679///
680/// The network round in [`collect_paid_prune_confirmations`] takes time;
681/// the paid close group may have changed underneath it, including self
682/// moving back into range. Mirrors [`revalidated_record_prune_keys`]:
683/// confirmations only count from peers still in the current paid close
684/// group, against a threshold computed from that current group.
685async fn revalidated_paid_prune_keys(
686    expired_candidates: &[(XorName, Vec<PeerId>)],
687    confirmed_by_key: &HashMap<XorName, HashSet<PeerId>>,
688    self_id: &PeerId,
689    paid_list: &Arc<PaidList>,
690    p2p_node: &Arc<P2PNode>,
691    config: &ReplicationConfig,
692) -> (Vec<XorName>, usize) {
693    let dht = p2p_node.dht_manager();
694    let mut keys_to_delete = Vec::new();
695    let mut cleared = 0;
696    let now = Instant::now();
697
698    for (key, _) in expired_candidates {
699        let closest: Vec<DHTNode> = dht
700            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
701            .await;
702
703        if closest.iter().any(|n| n.peer_id == *self_id) {
704            if paid_list.paid_out_of_range_since(key).is_some() {
705                paid_list.clear_paid_out_of_range(key);
706                cleared += 1;
707            }
708            continue;
709        }
710
711        let Some(first_seen) = paid_list.paid_out_of_range_since(key) else {
712            continue;
713        };
714        let elapsed = now
715            .checked_duration_since(first_seen)
716            .unwrap_or(Duration::ZERO);
717        if elapsed < config.prune_hysteresis_duration {
718            continue;
719        }
720
721        let current_target_peers = remote_close_group_peers(&closest, self_id);
722        if current_target_peers.is_empty() {
723            warn!(
724                "Cannot prune paid entry {}: current paid close group has no remote peers",
725                hex::encode(key)
726            );
727            continue;
728        }
729
730        let confirmations_needed = paid_prune_confirmations_needed(current_target_peers.len());
731        if target_peers_reported_present(
732            key,
733            &current_target_peers,
734            confirmed_by_key,
735            confirmations_needed,
736        ) {
737            keys_to_delete.push(*key);
738        } else {
739            debug!(
740                "Deferring paid-entry prune for {} until enough of the current paid \
741                 close group confirm it",
742                hex::encode(key)
743            );
744        }
745    }
746
747    (keys_to_delete, cleared)
748}
749
750fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
751    close_group
752        .iter()
753        .filter(|node| node.peer_id != *self_id)
754        .map(|node| node.peer_id)
755        .collect()
756}
757
758/// Confirmations required before removing an out-of-range `PaidForList`
759/// entry: three quarters of the paid close group rounded up, 15 of 20 at
760/// production parameters.
761///
762/// Paid-entry pruning is deliberately gated on the paid lists of the current
763/// paid close group, never on chunk possession: the paid list is the
764/// authorization record, and a wide confirmed majority must already track
765/// the key before this node may forget it.
766fn paid_prune_confirmations_needed(group_size: usize) -> usize {
767    (3 * group_size).div_ceil(4)
768}
769
770fn reserve_paid_prune_peer_budget(
771    target_peers: &[PeerId],
772    selected_verification_peers: &mut HashSet<PeerId>,
773) -> bool {
774    let new_peer_count = target_peers
775        .iter()
776        .filter(|peer| !selected_verification_peers.contains(peer))
777        .count();
778    if selected_verification_peers
779        .len()
780        .saturating_add(new_peer_count)
781        > MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS
782    {
783        return false;
784    }
785
786    selected_verification_peers.extend(target_peers.iter().copied());
787    true
788}
789
790/// Ask the current paid close group whether they track each expired key in
791/// their `PaidForList`, and return the confirming peers per key.
792///
793/// The deletion decision happens afterwards in
794/// [`revalidated_paid_prune_keys`], against the paid close group as it
795/// stands once the network round has completed.
796async fn collect_paid_prune_confirmations(
797    expired_candidates: &[(XorName, Vec<PeerId>)],
798    p2p_node: &Arc<P2PNode>,
799    config: &ReplicationConfig,
800) -> HashMap<XorName, HashSet<PeerId>> {
801    if expired_candidates.is_empty() {
802        return HashMap::new();
803    }
804
805    let mut targets = VerificationTargets::default();
806    let mut keys = Vec::new();
807    for (key, target_peers) in expired_candidates {
808        if target_peers.is_empty() {
809            warn!(
810                "Cannot prune paid entry {}: current paid close group has no remote peers",
811                hex::encode(key)
812            );
813            continue;
814        }
815        keys.push(*key);
816        for peer in target_peers {
817            targets.all_peers.insert(*peer);
818            targets.peer_to_keys.entry(*peer).or_default().push(*key);
819            targets
820                .peer_to_paid_keys
821                .entry(*peer)
822                .or_default()
823                .insert(*key);
824        }
825        targets.paid_targets.insert(*key, target_peers.clone());
826        targets.paid_group_sizes.insert(*key, target_peers.len());
827    }
828    for keys_list in targets.peer_to_keys.values_mut() {
829        keys_list.sort_unstable();
830        keys_list.dedup();
831    }
832
833    let evidence = quorum::run_verification_round(&keys, &targets, p2p_node, config).await;
834    paid_confirmations_by_key(expired_candidates, &evidence)
835}
836
837/// Aggregate `Confirmed` paid-list evidence into per-key peer sets.
838///
839/// Only peers in the candidate's own target set count; `NotFound` and
840/// `Unresolved` answers never confirm.
841fn paid_confirmations_by_key(
842    expired_candidates: &[(XorName, Vec<PeerId>)],
843    evidence: &HashMap<XorName, KeyVerificationEvidence>,
844) -> HashMap<XorName, HashSet<PeerId>> {
845    let mut confirmed_by_key: HashMap<XorName, HashSet<PeerId>> = HashMap::new();
846    for (key, target_peers) in expired_candidates {
847        let Some(key_evidence) = evidence.get(key) else {
848            continue;
849        };
850        let confirmed: HashSet<PeerId> = key_evidence
851            .paid_list
852            .iter()
853            .filter(|&(peer, status)| {
854                *status == PaidListEvidence::Confirmed && target_peers.contains(peer)
855            })
856            .map(|(peer, _)| *peer)
857            .collect();
858        if !confirmed.is_empty() {
859            confirmed_by_key.insert(*key, confirmed);
860        }
861    }
862    confirmed_by_key
863}
864
865/// Filter `target_peers` down to those with a mature repair proof for `key`.
866///
867/// Per design rule 20, peers without a key-specific mature repair hint proof
868/// are never audited for that key.
869async fn peers_with_mature_repair_proofs(
870    key: &XorName,
871    target_peers: &[PeerId],
872    current_close_peers: &HashSet<PeerId>,
873    repair_proofs: &Arc<RwLock<RepairProofs>>,
874    current_sync_epoch: u64,
875    now: Instant,
876) -> Vec<PeerId> {
877    let mut proofs = repair_proofs.write().await;
878    target_peers
879        .iter()
880        .filter(|peer| {
881            proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch, now)
882        })
883        .copied()
884        .collect()
885}
886
887async fn prune_scan_start(
888    sync_state: &Arc<RwLock<NeighborSyncState>>,
889    stored_key_count: usize,
890) -> usize {
891    if stored_key_count == 0 {
892        return 0;
893    }
894    sync_state.read().await.prune_cursor % stored_key_count
895}
896
897async fn advance_prune_cursor(
898    sync_state: &Arc<RwLock<NeighborSyncState>>,
899    stored_key_count: usize,
900    scan_start: usize,
901    last_selected_offset: Option<usize>,
902) {
903    if stored_key_count == 0 {
904        sync_state.write().await.prune_cursor = 0;
905        return;
906    }
907
908    let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
909    sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
910}
911
912async fn delete_stored_records(
913    keys_to_delete: &[XorName],
914    storage: &Arc<LmdbStorage>,
915    paid_list: &Arc<PaidList>,
916    repair_proofs: &Arc<RwLock<RepairProofs>>,
917) -> usize {
918    let mut pruned = 0;
919
920    for key in keys_to_delete {
921        if let Err(e) = storage.delete(key).await {
922            warn!("Failed to prune record {}: {e}", hex::encode(key));
923        } else {
924            pruned += 1;
925            paid_list.clear_record_out_of_range(key);
926            repair_proofs.write().await.remove_key(key);
927            // Seed the PaidForList out-of-range timer so the second pass can
928            // prune the entry sooner, closing the re-admission window between
929            // the storage delete and the PaidForList prune pass.
930            paid_list.set_paid_out_of_range(key);
931            debug!("Pruned out-of-range record {}", hex::encode(key));
932        }
933    }
934
935    pruned
936}
937
938/// Collect positive presence reports for prune candidates.
939///
940/// A key is deleted once all but one of the current close group prove
941/// possession ([`prune_proofs_needed`]). Requiring unanimous proofs left
942/// out-of-range records undeletable whenever a single close-group peer
943/// lagged, while the all-but-one threshold still demands more copies than
944/// the storage quorum used elsewhere. Keys below the proof threshold stay
945/// local, and the retained record continues to participate in normal
946/// neighbor-sync repair because replica hint construction walks all locally
947/// stored keys, including out-of-range keys retained by hysteresis.
948async fn collect_record_prune_proofs(
949    candidates: &[RecordPruneCandidate],
950    storage: &Arc<LmdbStorage>,
951    p2p_node: &Arc<P2PNode>,
952    config: &ReplicationConfig,
953    sync_state: &Arc<RwLock<NeighborSyncState>>,
954) -> HashMap<XorName, HashSet<PeerId>> {
955    if candidates.is_empty() {
956        return HashMap::new();
957    }
958
959    let report_state = PruneAuditReportState::default();
960    let mut requests = stream::iter(build_peer_audit_challenges(candidates))
961        .map(|(peer, key)| {
962            peer_proves_record(
963                peer,
964                key,
965                storage,
966                p2p_node,
967                config,
968                sync_state,
969                &report_state,
970            )
971        })
972        .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
973
974    let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
975    while let Some(proof) = requests.next().await {
976        if let Some((peer, key)) = proof {
977            present_by_key.entry(key).or_default().insert(peer);
978        }
979    }
980
981    present_by_key
982}
983
984async fn revalidated_record_prune_keys(
985    candidates: &[RecordPruneCandidate],
986    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
987    self_id: &PeerId,
988    paid_list: &Arc<PaidList>,
989    p2p_node: &Arc<P2PNode>,
990    config: &ReplicationConfig,
991    commitment_state: Option<&Arc<ResponderCommitmentState>>,
992) -> (Vec<XorName>, usize) {
993    let mut keys_to_delete = Vec::new();
994    let mut cleared = 0;
995    let now = Instant::now();
996
997    for candidate in candidates {
998        // TOCTOU guard: a rotation/gossip may have (re-)committed this key
999        // between candidate selection and now. Re-check retention immediately
1000        // before scheduling deletion so we never delete bytes a recently
1001        // gossiped commitment still owes in a round-2 byte challenge.
1002        if let Some(cs) = commitment_state {
1003            if cs.is_held(&candidate.key) {
1004                continue;
1005            }
1006        }
1007
1008        let (storage_admission_group, strict_close_group) =
1009            record_prune_lookup_groups(&candidate.key, p2p_node, config).await;
1010
1011        if storage_admission_group
1012            .iter()
1013            .any(|n| n.peer_id == *self_id)
1014        {
1015            if paid_list
1016                .record_out_of_range_since(&candidate.key)
1017                .is_some()
1018            {
1019                paid_list.clear_record_out_of_range(&candidate.key);
1020                cleared += 1;
1021            }
1022            continue;
1023        }
1024
1025        let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
1026            continue;
1027        };
1028        let elapsed = now
1029            .checked_duration_since(first_seen)
1030            .unwrap_or(Duration::ZERO);
1031        if elapsed < config.prune_hysteresis_duration {
1032            continue;
1033        }
1034
1035        let current_target_peers = remote_close_group_peers(&strict_close_group, self_id);
1036        if current_target_peers.is_empty() {
1037            warn!(
1038                "Cannot prune {}: current close group has no remote peers",
1039                hex::encode(candidate.key)
1040            );
1041            continue;
1042        }
1043
1044        let proofs_needed = prune_proofs_needed(current_target_peers.len());
1045        if target_peers_reported_present(
1046            &candidate.key,
1047            &current_target_peers,
1048            present_by_key,
1049            proofs_needed,
1050        ) {
1051            keys_to_delete.push(candidate.key);
1052        } else {
1053            debug!(
1054                "Deferring prune for {} until all but one of the current close group \
1055                 report it",
1056                hex::encode(candidate.key)
1057            );
1058        }
1059    }
1060
1061    (keys_to_delete, cleared)
1062}
1063
1064fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
1065    let mut challenges = Vec::new();
1066    for candidate in candidates {
1067        for peer in &candidate.target_peers {
1068            challenges.push((*peer, candidate.key));
1069        }
1070    }
1071    challenges
1072}
1073
1074#[cfg(test)]
1075fn confirmed_keys_from_presence(
1076    candidates: &[RecordPruneCandidate],
1077    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1078    proofs_needed: usize,
1079) -> Vec<XorName> {
1080    candidates
1081        .iter()
1082        .filter(|candidate| {
1083            target_peers_reported_present(
1084                &candidate.key,
1085                &candidate.target_peers,
1086                present_by_key,
1087                proofs_needed,
1088            )
1089        })
1090        .map(|candidate| candidate.key)
1091        .collect()
1092}
1093
1094/// Proofs required before deleting an out-of-range record: all but one of
1095/// the close group (6 of 7 at production parameters).
1096///
1097/// Stricter than the storage quorum (`QuorumNeeded`) because pruning only
1098/// runs after `PRUNE_HYSTERESIS_DURATION` out of range, by which time many
1099/// sync cycles should have replicated the record across the whole close
1100/// group. Tolerating exactly one lagging peer keeps a single absent peer
1101/// from vetoing deletion forever without accepting under-replication.
1102/// Groups of one or two peers require every proof: tolerating a miss there
1103/// would allow deletion on a single attestation.
1104fn prune_proofs_needed(group_size: usize) -> usize {
1105    if group_size <= 2 {
1106        group_size
1107    } else {
1108        group_size - 1
1109    }
1110}
1111
1112/// Whether enough target peers supplied positive evidence to allow deletion.
1113///
1114/// `proofs_needed == 0` means confirmation is impossible (no targets), not
1115/// trivially met.
1116fn target_peers_reported_present(
1117    key: &XorName,
1118    target_peers: &[PeerId],
1119    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1120    proofs_needed: usize,
1121) -> bool {
1122    if proofs_needed == 0 {
1123        return false;
1124    }
1125    let Some(present_peers) = present_by_key.get(key) else {
1126        return false;
1127    };
1128    // Count distinct proven peers: iterating the present set keeps a
1129    // duplicated entry in `target_peers` from being counted twice.
1130    let proven = present_peers
1131        .iter()
1132        .filter(|peer| target_peers.contains(peer))
1133        .count();
1134    proven >= proofs_needed
1135}
1136
1137/// Challenge a peer to prove it holds the exact record bytes for `key`.
1138/// `None` means the peer failed to provide usable proof.
1139async fn peer_proves_record(
1140    peer: PeerId,
1141    key: XorName,
1142    storage: &Arc<LmdbStorage>,
1143    p2p_node: &Arc<P2PNode>,
1144    config: &ReplicationConfig,
1145    sync_state: &Arc<RwLock<NeighborSyncState>>,
1146    report_state: &PruneAuditReportState,
1147) -> Option<(PeerId, XorName)> {
1148    let local_bytes = local_record_bytes(&key, storage).await?;
1149
1150    let (challenge_id, nonce) = {
1151        let mut rng = rand::thread_rng();
1152        (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
1153    };
1154    let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
1155    let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
1156    else {
1157        // No decoded response means a timeout or malformed reply. Prune
1158        // confirmation reuses `AuditChallenge` semantics, so this is an immediate
1159        // audit failure just like a decoded bad proof below.
1160        report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1161        return None;
1162    };
1163
1164    let status =
1165        prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
1166    if prune_audit_response_clears_bootstrap_claim(status) {
1167        clear_prune_bootstrap_claim(&peer, sync_state).await;
1168    }
1169
1170    match status {
1171        PruneAuditStatus::Proven => Some((peer, key)),
1172        PruneAuditStatus::Bootstrapping => {
1173            report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
1174                .await;
1175            None
1176        }
1177        PruneAuditStatus::Failed => {
1178            report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1179            None
1180        }
1181    }
1182}
1183
1184fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
1185    matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
1186}
1187
1188// The responder for an incoming `AuditChallenge` (including prune-confirmation
1189// challenges, which reuse the same wire message) lives in
1190// `super::handle_audit_challenge_msg` -> `audit::handle_audit_challenge`, the
1191// responsible-chunk audit responder. No separate prune-only responder is needed.
1192
1193fn encode_prune_audit_challenge(
1194    peer: &PeerId,
1195    key: XorName,
1196    challenge_id: u64,
1197    nonce: [u8; 32],
1198) -> Option<Vec<u8>> {
1199    let challenge = AuditChallenge {
1200        challenge_id,
1201        nonce,
1202        challenged_peer_id: *peer.as_bytes(),
1203        keys: vec![key],
1204    };
1205    let msg = ReplicationMessage {
1206        request_id: challenge_id,
1207        body: ReplicationMessageBody::AuditChallenge(challenge),
1208    };
1209    let encoded = match msg.encode() {
1210        Ok(data) => data,
1211        Err(e) => {
1212            warn!(
1213                "Failed to encode prune audit challenge for {} against {peer}: {e}",
1214                hex::encode(key),
1215            );
1216            return None;
1217        }
1218    };
1219    Some(encoded)
1220}
1221
1222async fn send_prune_audit_challenge(
1223    peer: &PeerId,
1224    key: &XorName,
1225    encoded: Vec<u8>,
1226    p2p_node: &Arc<P2PNode>,
1227    config: &ReplicationConfig,
1228) -> Option<ReplicationMessage> {
1229    let response = match p2p_node
1230        .send_request(
1231            peer,
1232            REPLICATION_PROTOCOL_ID,
1233            encoded,
1234            config.prune_audit_response_timeout,
1235        )
1236        .await
1237    {
1238        Ok(response) => response,
1239        Err(e) => {
1240            debug!(
1241                "Prune audit challenge for {} against {peer} failed: {e}",
1242                hex::encode(key)
1243            );
1244            return None;
1245        }
1246    };
1247
1248    let decoded = match ReplicationMessage::decode(&response.data) {
1249        Ok(msg) => msg,
1250        Err(e) => {
1251            warn!("Failed to decode prune audit response from {peer}: {e}");
1252            return None;
1253        }
1254    };
1255
1256    Some(decoded)
1257}
1258
1259fn prune_audit_response_status(
1260    decoded: ReplicationMessage,
1261    challenge_id: u64,
1262    peer: &PeerId,
1263    key: &XorName,
1264    nonce: &[u8; 32],
1265    local_bytes: &[u8],
1266) -> PruneAuditStatus {
1267    match decoded.body {
1268        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
1269            challenge_id: resp_id,
1270            digests,
1271        }) => {
1272            if resp_id != challenge_id {
1273                warn!("Prune audit challenge ID mismatch from {peer}");
1274                return PruneAuditStatus::Failed;
1275            }
1276            let [digest] = digests.as_slice() else {
1277                warn!(
1278                    "Prune audit response from {peer} returned {} digests for one challenged key",
1279                    digests.len(),
1280                );
1281                return PruneAuditStatus::Failed;
1282            };
1283            if *digest == ABSENT_KEY_DIGEST {
1284                warn!(
1285                    "Prune audit proof from {peer} failed for {}: peer reports key absent",
1286                    hex::encode(key)
1287                );
1288                return PruneAuditStatus::Failed;
1289            }
1290            if audit_digest_proves_key(peer, key, nonce, local_bytes, digest) {
1291                PruneAuditStatus::Proven
1292            } else {
1293                warn!(
1294                    "Prune audit proof from {peer} failed for {}: digest mismatch",
1295                    hex::encode(key)
1296                );
1297                PruneAuditStatus::Failed
1298            }
1299        }
1300        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
1301            challenge_id: resp_id,
1302        }) => {
1303            if resp_id == challenge_id {
1304                warn!(
1305                    "Prune audit proof for {} blocked by bootstrap claim from {peer}",
1306                    hex::encode(key)
1307                );
1308                PruneAuditStatus::Bootstrapping
1309            } else {
1310                warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
1311                PruneAuditStatus::Failed
1312            }
1313        }
1314        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
1315            challenge_id: resp_id,
1316            reason,
1317        }) => {
1318            if resp_id == challenge_id {
1319                warn!(
1320                    "Prune audit proof for {} rejected by {peer}: {reason}",
1321                    hex::encode(key)
1322                );
1323            } else {
1324                warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
1325            }
1326            PruneAuditStatus::Failed
1327        }
1328        _ => {
1329            warn!("Unexpected prune audit response type from {peer}");
1330            PruneAuditStatus::Failed
1331        }
1332    }
1333}
1334
1335async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
1336    match storage.get_raw(key).await {
1337        Ok(Some(bytes)) => Some(bytes),
1338        Ok(None) => {
1339            debug!(
1340                "Cannot prune-audit {}: local record disappeared",
1341                hex::encode(key)
1342            );
1343            None
1344        }
1345        Err(e) => {
1346            warn!(
1347                "Cannot prune-audit {}: failed to read local record: {e}",
1348                hex::encode(key)
1349            );
1350            None
1351        }
1352    }
1353}
1354
1355fn audit_digest_proves_key(
1356    peer: &PeerId,
1357    key: &XorName,
1358    nonce: &[u8; 32],
1359    local_bytes: &[u8],
1360    digest: &[u8; 32],
1361) -> bool {
1362    if *digest == ABSENT_KEY_DIGEST {
1363        return false;
1364    }
1365    let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
1366    *digest == expected
1367}
1368
1369async fn report_prune_audit_failure_once(
1370    peer: &PeerId,
1371    key: &XorName,
1372    p2p_node: &Arc<P2PNode>,
1373    config: &ReplicationConfig,
1374    report_state: &PruneAuditReportState,
1375) -> bool {
1376    let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
1377        && reserve_prune_audit_failure_report(report_state, peer).await;
1378    if !should_report {
1379        return false;
1380    }
1381
1382    p2p_node
1383        .report_trust_event(
1384            peer,
1385            saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
1386        )
1387        .await;
1388    true
1389}
1390
1391async fn reserve_prune_audit_failure_report(
1392    report_state: &PruneAuditReportState,
1393    peer: &PeerId,
1394) -> bool {
1395    report_state.audit_failures.write().await.insert(*peer)
1396}
1397
1398async fn reserve_prune_bootstrap_abuse_report(
1399    report_state: &PruneAuditReportState,
1400    peer: &PeerId,
1401) -> bool {
1402    report_state.bootstrap_abuse.write().await.insert(*peer)
1403}
1404
1405async fn report_prune_bootstrap_claim(
1406    peer: &PeerId,
1407    key: &XorName,
1408    p2p_node: &Arc<P2PNode>,
1409    config: &ReplicationConfig,
1410    sync_state: &Arc<RwLock<NeighborSyncState>>,
1411    report_state: &PruneAuditReportState,
1412) {
1413    if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
1414        return;
1415    }
1416
1417    let observation = {
1418        let now = Instant::now();
1419        let mut state = sync_state.write().await;
1420        (
1421            now,
1422            state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
1423        )
1424    };
1425
1426    let (now, observation) = observation;
1427    match observation {
1428        BootstrapClaimObservation::WithinGrace { .. } => {
1429            debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
1430            return;
1431        }
1432        BootstrapClaimObservation::PastGrace { first_seen } => {
1433            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1434                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1435                return;
1436            }
1437            warn!(
1438                "Prune audit: peer {peer} claiming bootstrap past grace period \
1439                 ({:?} > {:?}), reporting abuse",
1440                now.duration_since(first_seen),
1441                config.bootstrap_claim_grace_period,
1442            );
1443        }
1444        BootstrapClaimObservation::Repeated { first_seen } => {
1445            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1446                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1447                return;
1448            }
1449            warn!(
1450                "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
1451                 first claim was {:?} ago, reporting abuse",
1452                now.duration_since(first_seen),
1453            );
1454        }
1455    }
1456
1457    p2p_node
1458        .report_trust_event(
1459            peer,
1460            saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1461        )
1462        .await;
1463}
1464
1465async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
1466    let removed = {
1467        let mut state = sync_state.write().await;
1468        state.clear_active_bootstrap_claim(peer)
1469    };
1470    if removed {
1471        debug!("Prune audit: cleared active bootstrap claim for {peer}");
1472    }
1473}
1474
1475async fn peer_is_currently_responsible(
1476    peer: &PeerId,
1477    key: &XorName,
1478    p2p_node: &Arc<P2PNode>,
1479    config: &ReplicationConfig,
1480) -> bool {
1481    let closest = p2p_node
1482        .dht_manager()
1483        .find_closest_nodes_local_with_self(key, config.close_group_size)
1484        .await;
1485    closest.iter().any(|node| node.peer_id == *peer)
1486}
1487
1488#[cfg(test)]
1489#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1490mod tests {
1491    use super::*;
1492
1493    fn peer_id_from_byte(b: u8) -> PeerId {
1494        let mut bytes = [0u8; 32];
1495        bytes[0] = b;
1496        PeerId::from_bytes(bytes)
1497    }
1498
1499    fn key_from_byte(b: u8) -> XorName {
1500        [b; 32]
1501    }
1502
1503    fn peer_ids(count: usize) -> Vec<PeerId> {
1504        (0..count)
1505            .map(|idx| peer_id_from_byte(u8::try_from(idx + 1).expect("peer byte")))
1506            .collect()
1507    }
1508
1509    fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
1510        RecordPruneCandidate { key, target_peers }
1511    }
1512
1513    #[test]
1514    fn prune_audit_challenges_are_one_per_candidate_peer() {
1515        let peer_a = peer_id_from_byte(1);
1516        let peer_b = peer_id_from_byte(2);
1517        let key_a = key_from_byte(0xA);
1518        let key_b = key_from_byte(0xB);
1519        let candidates = vec![
1520            candidate(key_a, vec![peer_a, peer_b]),
1521            candidate(key_b, vec![peer_b]),
1522        ];
1523
1524        let mut challenges = build_peer_audit_challenges(&candidates);
1525        challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1526
1527        let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
1528        expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1529        assert_eq!(challenges, expected);
1530    }
1531
1532    #[test]
1533    fn confirmed_keys_require_quorum_of_target_peers_present() {
1534        let peer_a = peer_id_from_byte(1);
1535        let peer_b = peer_id_from_byte(2);
1536        let peer_c = peer_id_from_byte(3);
1537        let key = key_from_byte(0xC);
1538        let candidates = vec![candidate(key, vec![peer_a, peer_b, peer_c])];
1539        let mut present_by_key = HashMap::new();
1540        present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
1541
1542        // Two of three proofs meet a quorum of 2 even though one peer is
1543        // missing — unanimity is not required.
1544        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1545        assert_eq!(confirmed, vec![key]);
1546
1547        // The same evidence fails a quorum of 3.
1548        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 3);
1549        assert!(confirmed.is_empty());
1550    }
1551
1552    #[test]
1553    fn confirmed_keys_defer_below_quorum_or_missing_peer_evidence() {
1554        let peer_a = peer_id_from_byte(1);
1555        let peer_b = peer_id_from_byte(2);
1556        let quorum_key = key_from_byte(0xD);
1557        let below_quorum_key = key_from_byte(0xE);
1558        let missing_key = key_from_byte(0xF);
1559        let candidates = vec![
1560            candidate(quorum_key, vec![peer_a, peer_b]),
1561            candidate(below_quorum_key, vec![peer_a, peer_b]),
1562            candidate(missing_key, vec![peer_a, peer_b]),
1563        ];
1564        let mut present_by_key = HashMap::new();
1565        present_by_key.insert(quorum_key, HashSet::from([peer_a, peer_b]));
1566        present_by_key.insert(below_quorum_key, HashSet::from([peer_a]));
1567
1568        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1569
1570        assert_eq!(confirmed, vec![quorum_key]);
1571    }
1572
1573    #[test]
1574    fn prune_proofs_needed_tolerates_exactly_one_lagging_peer() {
1575        assert_eq!(prune_proofs_needed(0), 0);
1576        // Tiny groups require every proof.
1577        assert_eq!(prune_proofs_needed(1), 1);
1578        assert_eq!(prune_proofs_needed(2), 2);
1579        assert_eq!(prune_proofs_needed(3), 2);
1580        assert_eq!(prune_proofs_needed(5), 4);
1581        // Production close group: 6 of 7 proofs required.
1582        assert_eq!(prune_proofs_needed(7), 6);
1583    }
1584
1585    #[test]
1586    fn paid_prune_confirmations_are_three_quarters_rounded_up() {
1587        assert_eq!(paid_prune_confirmations_needed(0), 0);
1588        assert_eq!(paid_prune_confirmations_needed(1), 1);
1589        assert_eq!(paid_prune_confirmations_needed(2), 2);
1590        assert_eq!(paid_prune_confirmations_needed(4), 3);
1591        // Production paid close group: 15 of 20 confirmations required.
1592        assert_eq!(paid_prune_confirmations_needed(20), 15);
1593    }
1594
1595    #[test]
1596    fn paid_prune_peer_budget_allows_overlapping_targets() {
1597        let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS);
1598        let mut selected_peers = HashSet::new();
1599
1600        assert!(reserve_paid_prune_peer_budget(&peers, &mut selected_peers));
1601        assert_eq!(
1602            selected_peers.len(),
1603            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1604        );
1605
1606        let overlapping_targets = vec![peers[0], peers[1]];
1607        assert!(reserve_paid_prune_peer_budget(
1608            &overlapping_targets,
1609            &mut selected_peers,
1610        ));
1611        assert_eq!(
1612            selected_peers.len(),
1613            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1614        );
1615    }
1616
1617    #[test]
1618    fn paid_prune_peer_budget_rejects_new_peers_past_cap() {
1619        let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS + 1);
1620        let mut selected_peers = HashSet::new();
1621
1622        assert!(reserve_paid_prune_peer_budget(
1623            &peers[..MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS],
1624            &mut selected_peers,
1625        ));
1626        assert!(!reserve_paid_prune_peer_budget(
1627            &peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS..],
1628            &mut selected_peers,
1629        ));
1630        assert_eq!(
1631            selected_peers.len(),
1632            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1633        );
1634        assert!(!selected_peers.contains(&peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS]));
1635    }
1636
1637    #[test]
1638    fn paid_confirmations_count_only_confirmed_target_peers() {
1639        let confirmed_peer = peer_id_from_byte(1);
1640        let not_found_peer = peer_id_from_byte(2);
1641        let unresolved_peer = peer_id_from_byte(3);
1642        let outsider = peer_id_from_byte(4);
1643        let key = key_from_byte(0x21);
1644        let candidates = vec![(key, vec![confirmed_peer, not_found_peer, unresolved_peer])];
1645
1646        let mut evidence = HashMap::new();
1647        evidence.insert(
1648            key,
1649            KeyVerificationEvidence {
1650                presence: HashMap::new(),
1651                paid_list: HashMap::from([
1652                    (confirmed_peer, PaidListEvidence::Confirmed),
1653                    (not_found_peer, PaidListEvidence::NotFound),
1654                    (unresolved_peer, PaidListEvidence::Unresolved),
1655                    // Confirmation from a peer outside the target set.
1656                    (outsider, PaidListEvidence::Confirmed),
1657                ]),
1658            },
1659        );
1660
1661        let confirmed_by_key = paid_confirmations_by_key(&candidates, &evidence);
1662
1663        assert_eq!(
1664            confirmed_by_key.get(&key),
1665            Some(&HashSet::from([confirmed_peer])),
1666            "only Confirmed answers from target peers may count",
1667        );
1668    }
1669
1670    #[test]
1671    fn paid_confirmations_skip_keys_without_evidence() {
1672        let peer = peer_id_from_byte(1);
1673        let key = key_from_byte(0x22);
1674        let candidates = vec![(key, vec![peer])];
1675
1676        let confirmed_by_key = paid_confirmations_by_key(&candidates, &HashMap::new());
1677
1678        assert!(confirmed_by_key.is_empty());
1679    }
1680
1681    #[test]
1682    fn zero_quorum_never_confirms() {
1683        let peer_a = peer_id_from_byte(1);
1684        let key = key_from_byte(0x10);
1685        let mut present_by_key = HashMap::new();
1686        present_by_key.insert(key, HashSet::from([peer_a]));
1687
1688        assert!(!target_peers_reported_present(
1689            &key,
1690            &[peer_a],
1691            &present_by_key,
1692            0
1693        ));
1694    }
1695
1696    #[test]
1697    fn proofs_from_non_target_peers_do_not_count_toward_quorum() {
1698        let target = peer_id_from_byte(1);
1699        let outsider = peer_id_from_byte(2);
1700        let key = key_from_byte(0x11);
1701        let mut present_by_key = HashMap::new();
1702        present_by_key.insert(key, HashSet::from([outsider]));
1703
1704        assert!(!target_peers_reported_present(
1705            &key,
1706            &[target],
1707            &present_by_key,
1708            1
1709        ));
1710    }
1711
1712    #[test]
1713    fn duplicated_target_peer_counts_once_toward_quorum() {
1714        let peer = peer_id_from_byte(1);
1715        let key = key_from_byte(0x12);
1716        let mut present_by_key = HashMap::new();
1717        present_by_key.insert(key, HashSet::from([peer]));
1718
1719        assert!(!target_peers_reported_present(
1720            &key,
1721            &[peer, peer],
1722            &present_by_key,
1723            2
1724        ));
1725    }
1726
1727    #[test]
1728    fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
1729        let peer = peer_id_from_byte(1);
1730        let other_peer = peer_id_from_byte(2);
1731        let key = key_from_byte(0x11);
1732        let other_key = key_from_byte(0x12);
1733        let nonce = [0xAA; 32];
1734        let other_nonce = [0xBB; 32];
1735        let bytes = b"record bytes";
1736        let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
1737
1738        assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
1739        assert!(!audit_digest_proves_key(
1740            &other_peer,
1741            &key,
1742            &nonce,
1743            bytes,
1744            &digest
1745        ));
1746        assert!(!audit_digest_proves_key(
1747            &peer, &other_key, &nonce, bytes, &digest
1748        ));
1749        assert!(!audit_digest_proves_key(
1750            &peer,
1751            &key,
1752            &other_nonce,
1753            bytes,
1754            &digest
1755        ));
1756        assert!(!audit_digest_proves_key(
1757            &peer,
1758            &key,
1759            &nonce,
1760            b"different bytes",
1761            &digest
1762        ));
1763        assert!(!audit_digest_proves_key(
1764            &peer,
1765            &key,
1766            &nonce,
1767            bytes,
1768            &ABSENT_KEY_DIGEST
1769        ));
1770    }
1771
1772    #[tokio::test]
1773    async fn prune_cursor_advances_past_selected_budget_window() {
1774        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1775        state.write().await.prune_cursor = 2;
1776
1777        let start = prune_scan_start(&state, 10).await;
1778        advance_prune_cursor(&state, 10, start, Some(3)).await;
1779
1780        assert_eq!(state.read().await.prune_cursor, 6);
1781    }
1782
1783    #[tokio::test]
1784    async fn prune_cursor_advances_even_when_no_candidate_selected() {
1785        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1786        state.write().await.prune_cursor = 9;
1787
1788        let start = prune_scan_start(&state, 10).await;
1789        advance_prune_cursor(&state, 10, start, None).await;
1790
1791        assert_eq!(state.read().await.prune_cursor, 0);
1792    }
1793
1794    #[tokio::test]
1795    async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1796        let peer = peer_id_from_byte(1);
1797        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1798        let first_seen = Instant::now();
1799        state
1800            .write()
1801            .await
1802            .bootstrap_claims
1803            .insert(peer, first_seen);
1804        state
1805            .write()
1806            .await
1807            .bootstrap_claim_history
1808            .insert(peer, first_seen);
1809
1810        clear_prune_bootstrap_claim(&peer, &state).await;
1811
1812        let state = state.read().await;
1813        assert!(!state.bootstrap_claims.contains_key(&peer));
1814        assert!(state.bootstrap_claim_history.contains_key(&peer));
1815    }
1816
1817    #[test]
1818    fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1819        assert!(prune_audit_response_clears_bootstrap_claim(
1820            PruneAuditStatus::Proven
1821        ));
1822        assert!(prune_audit_response_clears_bootstrap_claim(
1823            PruneAuditStatus::Failed
1824        ));
1825        assert!(!prune_audit_response_clears_bootstrap_claim(
1826            PruneAuditStatus::Bootstrapping
1827        ));
1828    }
1829
1830    #[tokio::test]
1831    async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1832        let peer = peer_id_from_byte(1);
1833        let other_peer = peer_id_from_byte(2);
1834        let report_state = PruneAuditReportState::default();
1835
1836        assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1837        assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1838        assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1839
1840        let reported = report_state.audit_failures.read().await;
1841        assert_eq!(reported.len(), 2);
1842        assert!(reported.contains(&peer));
1843        assert!(reported.contains(&other_peer));
1844    }
1845
1846    #[tokio::test]
1847    async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1848        let peer = peer_id_from_byte(1);
1849        let other_peer = peer_id_from_byte(2);
1850        let report_state = PruneAuditReportState::default();
1851
1852        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1853        assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1854        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1855
1856        let reported = report_state.bootstrap_abuse.read().await;
1857        assert_eq!(reported.len(), 2);
1858        assert!(reported.contains(&peer));
1859        assert!(reported.contains(&other_peer));
1860    }
1861}