Skip to main content

ant_node/replication/
pruning.rs

1//! Post-cycle responsibility pruning (Section 11).
2//!
3//! On `NeighborSyncCycleComplete`: prune stored records and `PaidForList`
4//! entries that have been continuously out of range for at least
5//! `PRUNE_HYSTERESIS_DURATION`.
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::commitment_state::ResponderCommitmentState;
21use crate::replication::config::{
22    storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT,
23    MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID,
24};
25use crate::replication::paid_list::PaidList;
26use crate::replication::protocol::{
27    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
28    ReplicationMessageBody, ABSENT_KEY_DIGEST,
29};
30use crate::replication::quorum::{self, VerificationTargets};
31use crate::replication::types::{
32    BootstrapClaimObservation, KeyVerificationEvidence, NeighborSyncState, PaidListEvidence,
33    RepairProofs,
34};
35use crate::storage::LmdbStorage;
36
37use super::REPLICATION_TRUST_WEIGHT;
38
39const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
40
41/// Maximum expired `PaidForList` entries selected for verification per prune
42/// pass. The unique peer fan-out for those entries is capped separately.
43const MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS: usize = 32;
44/// Maximum unique peers contacted for paid-list verification per prune pass.
45/// `quorum::run_verification_round` sends one request per target peer.
46const MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS: usize = MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES;
47
48// ---------------------------------------------------------------------------
49// Result type
50// ---------------------------------------------------------------------------
51
52/// Summary of a prune pass.
53#[derive(Debug, Default)]
54pub struct PruneResult {
55    /// Number of records deleted from storage.
56    pub records_pruned: usize,
57    /// Number of records with out-of-range timestamp newly set.
58    pub records_marked_out_of_range: usize,
59    /// Number of records with out-of-range timestamp cleared (back in range).
60    pub records_cleared: usize,
61    /// Number of `PaidForList` entries removed.
62    pub paid_entries_pruned: usize,
63    /// Number of `PaidForList` entries with out-of-range timestamp newly set.
64    pub paid_entries_marked: usize,
65    /// Number of `PaidForList` entries cleared (back in range).
66    pub paid_entries_cleared: usize,
67}
68
69/// Shared dependencies and switches for one prune pass.
70pub struct PrunePassContext<'a> {
71    /// Local peer id.
72    pub self_id: &'a PeerId,
73    /// Local record storage.
74    pub storage: &'a Arc<LmdbStorage>,
75    /// Persistent paid-list state.
76    pub paid_list: &'a Arc<PaidList>,
77    /// P2P node used for routing lookups and prune-confirmation audits.
78    pub p2p_node: &'a Arc<P2PNode>,
79    /// Replication configuration.
80    pub config: &'a ReplicationConfig,
81    /// Neighbor-sync state, including prune cursor and bootstrap claims.
82    pub sync_state: &'a Arc<RwLock<NeighborSyncState>>,
83    /// Key-specific repair proofs used to gate prune-confirmation audits.
84    pub repair_proofs: &'a Arc<RwLock<RepairProofs>>,
85    /// Current local neighbor-sync cycle epoch for repair-proof maturity.
86    pub current_sync_epoch: u64,
87    /// Test-only clock override for repair-proof maturity checks.
88    #[cfg(any(test, feature = "test-utils"))]
89    pub repair_proof_now: Option<Instant>,
90    /// Whether remote prune-confirmation audits are allowed this pass.
91    pub allow_remote_prune_audits: bool,
92    /// Responder commitment state, used to veto deleting a chunk still held
93    /// under a recently-gossiped commitment (so the storage-commitment audit's
94    /// round-2 byte challenge cannot false-positive an honest node). `None` on
95    /// the legacy/test-only prune path, which keeps the pre-retention behavior.
96    pub commitment_state: Option<&'a Arc<ResponderCommitmentState>>,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100enum PruneAuditStatus {
101    Proven,
102    Failed,
103    Bootstrapping,
104}
105
106#[derive(Debug, Default)]
107struct RecordPruneStats {
108    marked: usize,
109    cleared: usize,
110    pruned: usize,
111    held_by_commitment: usize,
112}
113
114#[derive(Debug, Default)]
115struct PaidPruneStats {
116    marked: usize,
117    cleared: usize,
118    pruned: usize,
119}
120
121#[derive(Debug, Default)]
122struct PaidPruneDeferredCounts {
123    entry_budget: usize,
124    remote_gate: usize,
125    peer_budget: usize,
126}
127
128impl PaidPruneDeferredCounts {
129    fn log(&self) {
130        if self.entry_budget > 0 {
131            debug!(
132                "Deferred {} expired PaidForList entries beyond the per-pass verification cap \
133                 ({MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS})",
134                self.entry_budget,
135            );
136        }
137
138        if self.remote_gate > 0 {
139            debug!(
140                "Deferred {} expired PaidForList entries until bootstrap drain allows remote \
141                 paid-prune verification",
142                self.remote_gate,
143            );
144        }
145
146        if self.peer_budget > 0 {
147            debug!(
148                "Deferred {} expired PaidForList entries beyond the per-pass paid-prune peer cap \
149                 ({MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS})",
150                self.peer_budget,
151            );
152        }
153    }
154}
155
156#[derive(Debug, Clone)]
157struct RecordPruneCandidate {
158    key: XorName,
159    target_peers: Vec<PeerId>,
160}
161
162struct RecordPruneKeyOutcome {
163    marked: bool,
164    state: RecordPruneKeyState,
165}
166
167impl Default for RecordPruneKeyOutcome {
168    fn default() -> Self {
169        Self {
170            marked: false,
171            state: RecordPruneKeyState::None,
172        }
173    }
174}
175
176enum RecordPruneKeyState {
177    None,
178    Cleared,
179    BootstrapDeferred,
180    BudgetDeferred,
181    /// Out of range, but still committed under a recently-gossiped commitment:
182    /// deletion is vetoed (and the out-of-range hysteresis clock is not even
183    /// started) until the key ages out of the last-2-gossiped window.
184    HeldByCommitment,
185    Candidate(RecordPruneCandidate),
186}
187
188enum PaidPruneKeyState {
189    None,
190    RemoteDeferred,
191    EntryBudgetDeferred,
192    PeerBudgetDeferred,
193    Candidate(Vec<PeerId>),
194}
195
196#[derive(Default)]
197struct PruneAuditReportState {
198    audit_failures: RwLock<HashSet<PeerId>>,
199    bootstrap_abuse: RwLock<HashSet<PeerId>>,
200}
201
202// ---------------------------------------------------------------------------
203// Prune pass
204// ---------------------------------------------------------------------------
205
206/// Execute post-cycle responsibility pruning.
207///
208/// For each stored record K:
209/// - If `self` is within the storage-admission group
210///   (`close_group_size + STORAGE_ADMISSION_MARGIN`): clear
211///   `RecordOutOfRangeFirstSeen`.
212/// - If not in that group: set timestamp if not already set; delete if the
213///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and all but one
214///   of the strict current close group prove they store the record.
215///
216/// For each `PaidForList` entry K:
217/// - If self is in `PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen`.
218/// - If not in group: set timestamp if not already set; remove entry if the
219///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and three
220///   quarters of the current paid close group (15 of 20 at production
221///   parameters) confirm the key in their own `PaidForList`.
222///
223/// Compatibility wrapper for callers that have not adopted repair-proof
224/// tracking. It preserves the original public signature, but it has no proof
225/// table or advanced sync epoch to pass into record prune-confirmation audits.
226/// Out-of-range records are therefore marked/deferred rather than deleted via
227/// remote confirmation. The replication engine calls
228/// [`run_prune_pass_with_context`] so it can pass real repair proofs.
229pub async fn run_prune_pass(
230    self_id: &PeerId,
231    storage: &Arc<LmdbStorage>,
232    paid_list: &Arc<PaidList>,
233    p2p_node: &Arc<P2PNode>,
234    config: &ReplicationConfig,
235    sync_state: &Arc<RwLock<NeighborSyncState>>,
236    allow_remote_prune_audits: bool,
237) -> PruneResult {
238    let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
239    run_prune_pass_with_context(PrunePassContext {
240        self_id,
241        storage,
242        paid_list,
243        p2p_node,
244        config,
245        sync_state,
246        repair_proofs: &repair_proofs,
247        current_sync_epoch: 0,
248        #[cfg(any(test, feature = "test-utils"))]
249        repair_proof_now: None,
250        allow_remote_prune_audits,
251        commitment_state: None,
252    })
253    .await
254}
255
256/// Execute one prune pass with repair-proof-gated remote confirmations.
257pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult {
258    let (stored_count, record_stats) = prune_stored_records(&ctx).await;
259    let now = Instant::now();
260    let (paid_count, paid_stats) = prune_paid_entries(
261        ctx.self_id,
262        ctx.paid_list,
263        ctx.p2p_node,
264        ctx.config,
265        now,
266        ctx.allow_remote_prune_audits,
267    )
268    .await;
269
270    let result = PruneResult {
271        records_pruned: record_stats.pruned,
272        records_marked_out_of_range: record_stats.marked,
273        records_cleared: record_stats.cleared,
274        paid_entries_pruned: paid_stats.pruned,
275        paid_entries_marked: paid_stats.marked,
276        paid_entries_cleared: paid_stats.cleared,
277    };
278
279    info!(
280        "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
281        result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
282    );
283
284    result
285}
286
287// Combines main's prune-proof/admission gate with ADR-0002's commitment
288// retention veto in one pass; the merged body runs just over the line budget.
289#[allow(clippy::too_many_lines)]
290async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) {
291    let stored_keys = match ctx.storage.all_keys().await {
292        Ok(keys) => keys,
293        Err(e) => {
294            warn!("Failed to read stored keys for pruning: {e}");
295            return (0, RecordPruneStats::default());
296        }
297    };
298
299    let now = Instant::now();
300    let mut stats = RecordPruneStats::default();
301    let mut candidates = Vec::new();
302    let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
303    let mut budget_deferred = 0usize;
304    let mut bootstrap_deferred = 0usize;
305    let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await;
306    let mut last_selected_offset = None;
307
308    for offset in 0..stored_keys.len() {
309        let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
310        let (storage_admission_group, strict_close_group) =
311            record_prune_lookup_groups(key, ctx.p2p_node, ctx.config).await;
312
313        let outcome = evaluate_record_prune_key(
314            ctx,
315            key,
316            &storage_admission_group,
317            &strict_close_group,
318            now,
319            &mut audit_challenge_budget,
320        )
321        .await;
322        if outcome.marked {
323            stats.marked += 1;
324        }
325        match outcome.state {
326            RecordPruneKeyState::None => {}
327            RecordPruneKeyState::Cleared => stats.cleared += 1,
328            RecordPruneKeyState::BootstrapDeferred => {
329                bootstrap_deferred = bootstrap_deferred.saturating_add(1);
330            }
331            RecordPruneKeyState::BudgetDeferred => {
332                budget_deferred = budget_deferred.saturating_add(1);
333            }
334            RecordPruneKeyState::HeldByCommitment => {
335                stats.held_by_commitment = stats.held_by_commitment.saturating_add(1);
336            }
337            RecordPruneKeyState::Candidate(candidate) => {
338                last_selected_offset = Some(offset);
339                candidates.push(candidate);
340            }
341        }
342    }
343
344    advance_prune_cursor(
345        ctx.sync_state,
346        stored_keys.len(),
347        scan_start,
348        last_selected_offset,
349    )
350    .await;
351
352    if bootstrap_deferred > 0 {
353        debug!(
354            "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
355             remote prune-confirmation audits"
356        );
357    }
358
359    if budget_deferred > 0 {
360        debug!(
361            "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
362             ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
363        );
364    }
365
366    if stats.held_by_commitment > 0 {
367        debug!(
368            "Vetoed {} prune candidate(s) still committed under a recently-gossiped \
369             commitment (bounded reprieve until they age out of the retention window)",
370            stats.held_by_commitment
371        );
372    }
373
374    let present_by_key = collect_record_prune_proofs(
375        &candidates,
376        ctx.storage,
377        ctx.p2p_node,
378        ctx.config,
379        ctx.sync_state,
380    )
381    .await;
382    let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
383        &candidates,
384        &present_by_key,
385        ctx.self_id,
386        ctx.paid_list,
387        ctx.p2p_node,
388        ctx.config,
389        ctx.commitment_state,
390    )
391    .await;
392    stats.cleared += revalidated_cleared;
393    stats.pruned = delete_stored_records(
394        &keys_to_delete,
395        ctx.storage,
396        ctx.paid_list,
397        ctx.repair_proofs,
398    )
399    .await;
400
401    (stored_keys.len(), stats)
402}
403
404async fn record_prune_lookup_groups(
405    key: &XorName,
406    p2p_node: &Arc<P2PNode>,
407    config: &ReplicationConfig,
408) -> (Vec<DHTNode>, Vec<DHTNode>) {
409    let dht = p2p_node.dht_manager();
410    let storage_admission_group = dht
411        .find_closest_nodes_local_with_self(key, storage_admission_width(config.close_group_size))
412        .await;
413    let strict_close_group = dht
414        .find_closest_nodes_local_with_self(key, config.close_group_size)
415        .await;
416    (storage_admission_group, strict_close_group)
417}
418
419async fn evaluate_record_prune_key(
420    ctx: &PrunePassContext<'_>,
421    key: &XorName,
422    storage_admission_group: &[DHTNode],
423    strict_close_group: &[DHTNode],
424    now: Instant,
425    audit_challenge_budget: &mut usize,
426) -> RecordPruneKeyOutcome {
427    let mut outcome = RecordPruneKeyOutcome::default();
428    let is_responsible = storage_admission_group
429        .iter()
430        .any(|node| node.peer_id == *ctx.self_id);
431
432    if is_responsible {
433        if ctx.paid_list.record_out_of_range_since(key).is_some() {
434            ctx.paid_list.clear_record_out_of_range(key);
435            outcome.state = RecordPruneKeyState::Cleared;
436        }
437        return outcome;
438    }
439
440    // Retention veto: the key has left our close group, but if it is still
441    // committed under a recently-gossiped commitment a neighbour can pin that
442    // root and demand its bytes in a round-2 byte challenge. Deleting it now
443    // would turn an honest node's response into `Absent` → a confirmed audit
444    // failure. Veto deletion AND do not even start the out-of-range hysteresis
445    // clock yet: the commitment rebuild only commits to keys we are still
446    // responsible for, so this key drops out of the next rebuilt commitment and
447    // ages out of the last-2-gossiped window within at most
448    // `RETAINED_GOSSIPED_COMMITMENTS` gossip rotations, after which `is_held`
449    // returns false and the key prunes through the normal path. This is a
450    // bounded reprieve, not a permanent pin.
451    if let Some(cs) = ctx.commitment_state {
452        if cs.is_held(key) {
453            outcome.state = RecordPruneKeyState::HeldByCommitment;
454            return outcome;
455        }
456    }
457
458    if ctx.paid_list.record_out_of_range_since(key).is_none() {
459        outcome.marked = true;
460    }
461    ctx.paid_list.set_record_out_of_range(key);
462
463    let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else {
464        return outcome;
465    };
466    let elapsed = now
467        .checked_duration_since(first_seen)
468        .unwrap_or(Duration::ZERO);
469    if elapsed < ctx.config.prune_hysteresis_duration {
470        return outcome;
471    }
472
473    if !ctx.allow_remote_prune_audits {
474        outcome.state = RecordPruneKeyState::BootstrapDeferred;
475        return outcome;
476    }
477
478    let target_peers = remote_close_group_peers(strict_close_group, ctx.self_id);
479    if target_peers.is_empty() {
480        warn!(
481            "Cannot prune {}: current close group has no remote peers",
482            hex::encode(key)
483        );
484        return outcome;
485    }
486
487    // Only peers we have hinted (mature repair proof) may be audited; the
488    // proof threshold must be reachable among them. A never-synced peer in
489    // the close group reduces the audit pool instead of vetoing the prune.
490    let current_close_peers: HashSet<PeerId> =
491        strict_close_group.iter().map(|node| node.peer_id).collect();
492    #[cfg(any(test, feature = "test-utils"))]
493    let repair_proof_now = ctx.repair_proof_now.unwrap_or(now);
494    #[cfg(not(any(test, feature = "test-utils")))]
495    let repair_proof_now = now;
496    let audit_targets = peers_with_mature_repair_proofs(
497        key,
498        &target_peers,
499        &current_close_peers,
500        ctx.repair_proofs,
501        ctx.current_sync_epoch,
502        repair_proof_now,
503    )
504    .await;
505    let proofs_needed = prune_proofs_needed(target_peers.len());
506    if proofs_needed == 0 || audit_targets.len() < proofs_needed {
507        debug!(
508            "Deferring prune for {} until enough of the close group has mature \
509             repair proofs",
510            hex::encode(key)
511        );
512        return outcome;
513    }
514
515    if audit_targets.len() > *audit_challenge_budget {
516        outcome.state = RecordPruneKeyState::BudgetDeferred;
517        return outcome;
518    }
519
520    *audit_challenge_budget -= audit_targets.len();
521    outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate {
522        key: *key,
523        target_peers: audit_targets,
524    });
525    outcome
526}
527
528async fn prune_paid_entries(
529    self_id: &PeerId,
530    paid_list: &Arc<PaidList>,
531    p2p_node: &Arc<P2PNode>,
532    config: &ReplicationConfig,
533    now: Instant,
534    allow_remote_prune_audits: bool,
535) -> (usize, PaidPruneStats) {
536    let paid_keys = match paid_list.all_keys() {
537        Ok(keys) => keys,
538        Err(e) => {
539            warn!("Failed to read PaidForList for pruning: {e}");
540            return (0, PaidPruneStats::default());
541        }
542    };
543
544    let dht = p2p_node.dht_manager();
545    let mut stats = PaidPruneStats::default();
546    let mut expired_candidates: Vec<(XorName, Vec<PeerId>)> = Vec::new();
547    let mut deferred_counts = PaidPruneDeferredCounts::default();
548    let mut selected_verification_peers = HashSet::new();
549    // Rotate the scan start so expired entries beyond the per-pass cap are
550    // not starved by the same head-of-list entries every pass.
551    let scan_start = paid_list.paid_prune_scan_start(paid_keys.len());
552    let mut last_selected_offset = None;
553
554    for offset in 0..paid_keys.len() {
555        let key = &paid_keys[(scan_start + offset) % paid_keys.len()];
556        let closest: Vec<DHTNode> = dht
557            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
558            .await;
559        let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
560
561        if in_paid_group {
562            if paid_list.paid_out_of_range_since(key).is_some() {
563                paid_list.clear_paid_out_of_range(key);
564                stats.cleared += 1;
565            }
566        } else {
567            if paid_list.paid_out_of_range_since(key).is_none() {
568                stats.marked += 1;
569            }
570            paid_list.set_paid_out_of_range(key);
571
572            if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
573                let elapsed = now
574                    .checked_duration_since(first_seen)
575                    .unwrap_or(Duration::ZERO);
576                if elapsed >= config.prune_hysteresis_duration {
577                    match select_paid_prune_candidate(
578                        key,
579                        &closest,
580                        self_id,
581                        allow_remote_prune_audits,
582                        expired_candidates.len(),
583                        &mut selected_verification_peers,
584                    ) {
585                        PaidPruneKeyState::None => {}
586                        PaidPruneKeyState::RemoteDeferred => {
587                            deferred_counts.remote_gate =
588                                deferred_counts.remote_gate.saturating_add(1);
589                        }
590                        PaidPruneKeyState::EntryBudgetDeferred => {
591                            deferred_counts.entry_budget =
592                                deferred_counts.entry_budget.saturating_add(1);
593                        }
594                        PaidPruneKeyState::PeerBudgetDeferred => {
595                            deferred_counts.peer_budget =
596                                deferred_counts.peer_budget.saturating_add(1);
597                        }
598                        PaidPruneKeyState::Candidate(target_peers) => {
599                            expired_candidates.push((*key, target_peers));
600                            last_selected_offset = Some(offset);
601                        }
602                    }
603                }
604            }
605        }
606    }
607
608    paid_list.advance_paid_prune_cursor(paid_keys.len(), scan_start, last_selected_offset);
609    deferred_counts.log();
610
611    let confirmed_by_key =
612        collect_paid_prune_confirmations(&expired_candidates, p2p_node, config).await;
613    let (paid_keys_to_delete, revalidated_cleared) = revalidated_paid_prune_keys(
614        &expired_candidates,
615        &confirmed_by_key,
616        self_id,
617        paid_list,
618        p2p_node,
619        config,
620    )
621    .await;
622    stats.cleared += revalidated_cleared;
623    stats.pruned = delete_paid_entries(&paid_keys_to_delete, paid_list).await;
624
625    (paid_keys.len(), stats)
626}
627
628fn select_paid_prune_candidate(
629    key: &XorName,
630    closest: &[DHTNode],
631    self_id: &PeerId,
632    allow_remote_prune_audits: bool,
633    selected_candidate_count: usize,
634    selected_verification_peers: &mut HashSet<PeerId>,
635) -> PaidPruneKeyState {
636    if !allow_remote_prune_audits {
637        return PaidPruneKeyState::RemoteDeferred;
638    }
639
640    let target_peers = remote_close_group_peers(closest, self_id);
641    if target_peers.is_empty() {
642        warn!(
643            "Cannot prune paid entry {}: current paid close group has no remote peers",
644            hex::encode(key)
645        );
646        return PaidPruneKeyState::None;
647    }
648
649    if selected_candidate_count >= MAX_PAID_PRUNE_VERIFICATIONS_PER_PASS {
650        return PaidPruneKeyState::EntryBudgetDeferred;
651    }
652
653    if !reserve_paid_prune_peer_budget(&target_peers, selected_verification_peers) {
654        return PaidPruneKeyState::PeerBudgetDeferred;
655    }
656
657    PaidPruneKeyState::Candidate(target_peers)
658}
659
660async fn delete_paid_entries(keys_to_delete: &[XorName], paid_list: &Arc<PaidList>) -> usize {
661    if keys_to_delete.is_empty() {
662        return 0;
663    }
664
665    match paid_list.remove_batch(keys_to_delete).await {
666        Ok(count) => {
667            debug!("Pruned {count} out-of-range PaidForList entries");
668            count
669        }
670        Err(e) => {
671            warn!("Failed to prune PaidForList entries: {e}");
672            0
673        }
674    }
675}
676
677/// Re-check each confirmed candidate against current local state before
678/// deletion.
679///
680/// The network round in [`collect_paid_prune_confirmations`] takes time;
681/// the paid close group may have changed underneath it, including self
682/// moving back into range. Mirrors [`revalidated_record_prune_keys`]:
683/// confirmations only count from peers still in the current paid close
684/// group, against a threshold computed from that current group.
685async fn revalidated_paid_prune_keys(
686    expired_candidates: &[(XorName, Vec<PeerId>)],
687    confirmed_by_key: &HashMap<XorName, HashSet<PeerId>>,
688    self_id: &PeerId,
689    paid_list: &Arc<PaidList>,
690    p2p_node: &Arc<P2PNode>,
691    config: &ReplicationConfig,
692) -> (Vec<XorName>, usize) {
693    let dht = p2p_node.dht_manager();
694    let mut keys_to_delete = Vec::new();
695    let mut cleared = 0;
696    let now = Instant::now();
697
698    for (key, _) in expired_candidates {
699        let closest: Vec<DHTNode> = dht
700            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
701            .await;
702
703        if closest.iter().any(|n| n.peer_id == *self_id) {
704            if paid_list.paid_out_of_range_since(key).is_some() {
705                paid_list.clear_paid_out_of_range(key);
706                cleared += 1;
707            }
708            continue;
709        }
710
711        let Some(first_seen) = paid_list.paid_out_of_range_since(key) else {
712            continue;
713        };
714        let elapsed = now
715            .checked_duration_since(first_seen)
716            .unwrap_or(Duration::ZERO);
717        if elapsed < config.prune_hysteresis_duration {
718            continue;
719        }
720
721        let current_target_peers = remote_close_group_peers(&closest, self_id);
722        if current_target_peers.is_empty() {
723            warn!(
724                "Cannot prune paid entry {}: current paid close group has no remote peers",
725                hex::encode(key)
726            );
727            continue;
728        }
729
730        let confirmations_needed = paid_prune_confirmations_needed(current_target_peers.len());
731        if target_peers_reported_present(
732            key,
733            &current_target_peers,
734            confirmed_by_key,
735            confirmations_needed,
736        ) {
737            keys_to_delete.push(*key);
738        } else {
739            debug!(
740                "Deferring paid-entry prune for {} until enough of the current paid \
741                 close group confirm it",
742                hex::encode(key)
743            );
744        }
745    }
746
747    (keys_to_delete, cleared)
748}
749
750fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
751    close_group
752        .iter()
753        .filter(|node| node.peer_id != *self_id)
754        .map(|node| node.peer_id)
755        .collect()
756}
757
758/// Confirmations required before removing an out-of-range `PaidForList`
759/// entry: three quarters of the paid close group rounded up, 15 of 20 at
760/// production parameters.
761///
762/// Paid-entry pruning is deliberately gated on the paid lists of the current
763/// paid close group, never on chunk possession: the paid list is the
764/// authorization record, and a wide confirmed majority must already track
765/// the key before this node may forget it.
766fn paid_prune_confirmations_needed(group_size: usize) -> usize {
767    (3 * group_size).div_ceil(4)
768}
769
770fn reserve_paid_prune_peer_budget(
771    target_peers: &[PeerId],
772    selected_verification_peers: &mut HashSet<PeerId>,
773) -> bool {
774    let new_peer_count = target_peers
775        .iter()
776        .filter(|peer| !selected_verification_peers.contains(peer))
777        .count();
778    if selected_verification_peers
779        .len()
780        .saturating_add(new_peer_count)
781        > MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS
782    {
783        return false;
784    }
785
786    selected_verification_peers.extend(target_peers.iter().copied());
787    true
788}
789
790/// Ask the current paid close group whether they track each expired key in
791/// their `PaidForList`, and return the confirming peers per key.
792///
793/// The deletion decision happens afterwards in
794/// [`revalidated_paid_prune_keys`], against the paid close group as it
795/// stands once the network round has completed.
796async fn collect_paid_prune_confirmations(
797    expired_candidates: &[(XorName, Vec<PeerId>)],
798    p2p_node: &Arc<P2PNode>,
799    config: &ReplicationConfig,
800) -> HashMap<XorName, HashSet<PeerId>> {
801    if expired_candidates.is_empty() {
802        return HashMap::new();
803    }
804
805    let mut targets = VerificationTargets::default();
806    let mut keys = Vec::new();
807    for (key, target_peers) in expired_candidates {
808        if target_peers.is_empty() {
809            warn!(
810                "Cannot prune paid entry {}: current paid close group has no remote peers",
811                hex::encode(key)
812            );
813            continue;
814        }
815        keys.push(*key);
816        for peer in target_peers {
817            targets.all_peers.insert(*peer);
818            targets.peer_to_keys.entry(*peer).or_default().push(*key);
819            targets
820                .peer_to_paid_keys
821                .entry(*peer)
822                .or_default()
823                .insert(*key);
824        }
825        targets.paid_targets.insert(*key, target_peers.clone());
826        targets.paid_group_sizes.insert(*key, target_peers.len());
827    }
828    for keys_list in targets.peer_to_keys.values_mut() {
829        keys_list.sort_unstable();
830        keys_list.dedup();
831    }
832
833    let evidence = quorum::run_verification_round(&keys, &targets, p2p_node, config).await;
834    paid_confirmations_by_key(expired_candidates, &evidence)
835}
836
837/// Aggregate `Confirmed` paid-list evidence into per-key peer sets.
838///
839/// Only peers in the candidate's own target set count; `NotFound` and
840/// `Unresolved` answers never confirm.
841fn paid_confirmations_by_key(
842    expired_candidates: &[(XorName, Vec<PeerId>)],
843    evidence: &HashMap<XorName, KeyVerificationEvidence>,
844) -> HashMap<XorName, HashSet<PeerId>> {
845    let mut confirmed_by_key: HashMap<XorName, HashSet<PeerId>> = HashMap::new();
846    for (key, target_peers) in expired_candidates {
847        let Some(key_evidence) = evidence.get(key) else {
848            continue;
849        };
850        let confirmed: HashSet<PeerId> = key_evidence
851            .paid_list
852            .iter()
853            .filter(|&(peer, status)| {
854                *status == PaidListEvidence::Confirmed && target_peers.contains(peer)
855            })
856            .map(|(peer, _)| *peer)
857            .collect();
858        if !confirmed.is_empty() {
859            confirmed_by_key.insert(*key, confirmed);
860        }
861    }
862    confirmed_by_key
863}
864
865/// Filter `target_peers` down to those with a mature repair proof for `key`.
866///
867/// Per design rule 20, peers without a key-specific mature repair hint proof
868/// are never audited for that key.
869async fn peers_with_mature_repair_proofs(
870    key: &XorName,
871    target_peers: &[PeerId],
872    current_close_peers: &HashSet<PeerId>,
873    repair_proofs: &Arc<RwLock<RepairProofs>>,
874    current_sync_epoch: u64,
875    now: Instant,
876) -> Vec<PeerId> {
877    let mut proofs = repair_proofs.write().await;
878    target_peers
879        .iter()
880        .filter(|peer| {
881            proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch, now)
882        })
883        .copied()
884        .collect()
885}
886
887async fn prune_scan_start(
888    sync_state: &Arc<RwLock<NeighborSyncState>>,
889    stored_key_count: usize,
890) -> usize {
891    if stored_key_count == 0 {
892        return 0;
893    }
894    sync_state.read().await.prune_cursor % stored_key_count
895}
896
897async fn advance_prune_cursor(
898    sync_state: &Arc<RwLock<NeighborSyncState>>,
899    stored_key_count: usize,
900    scan_start: usize,
901    last_selected_offset: Option<usize>,
902) {
903    if stored_key_count == 0 {
904        sync_state.write().await.prune_cursor = 0;
905        return;
906    }
907
908    let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
909    sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
910}
911
912async fn delete_stored_records(
913    keys_to_delete: &[XorName],
914    storage: &Arc<LmdbStorage>,
915    paid_list: &Arc<PaidList>,
916    repair_proofs: &Arc<RwLock<RepairProofs>>,
917) -> usize {
918    let mut pruned = 0;
919
920    for key in keys_to_delete {
921        if let Err(e) = storage.delete(key).await {
922            warn!("Failed to prune record {}: {e}", hex::encode(key));
923        } else {
924            pruned += 1;
925            paid_list.clear_record_out_of_range(key);
926            repair_proofs.write().await.remove_key(key);
927            // Seed the PaidForList out-of-range timer so the second pass can
928            // prune the entry sooner, closing the re-admission window between
929            // the storage delete and the PaidForList prune pass.
930            paid_list.set_paid_out_of_range(key);
931            debug!("Pruned out-of-range record {}", hex::encode(key));
932        }
933    }
934
935    pruned
936}
937
938/// Collect positive presence reports for prune candidates.
939///
940/// A key is deleted once all but one of the current close group prove
941/// possession ([`prune_proofs_needed`]). Requiring unanimous proofs left
942/// out-of-range records undeletable whenever a single close-group peer
943/// lagged, while the all-but-one threshold still demands more copies than
944/// the storage quorum used elsewhere. Keys below the proof threshold stay
945/// local, and the retained record continues to participate in normal
946/// neighbor-sync repair because replica hint construction walks all locally
947/// stored keys, including out-of-range keys retained by hysteresis.
948async fn collect_record_prune_proofs(
949    candidates: &[RecordPruneCandidate],
950    storage: &Arc<LmdbStorage>,
951    p2p_node: &Arc<P2PNode>,
952    config: &ReplicationConfig,
953    sync_state: &Arc<RwLock<NeighborSyncState>>,
954) -> HashMap<XorName, HashSet<PeerId>> {
955    if candidates.is_empty() {
956        return HashMap::new();
957    }
958
959    let report_state = PruneAuditReportState::default();
960    let mut requests = stream::iter(build_peer_audit_challenges(candidates))
961        .map(|(peer, key)| {
962            peer_proves_record(
963                peer,
964                key,
965                storage,
966                p2p_node,
967                config,
968                sync_state,
969                &report_state,
970            )
971        })
972        .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
973
974    let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
975    while let Some(proof) = requests.next().await {
976        if let Some((peer, key)) = proof {
977            present_by_key.entry(key).or_default().insert(peer);
978        }
979    }
980
981    present_by_key
982}
983
984async fn revalidated_record_prune_keys(
985    candidates: &[RecordPruneCandidate],
986    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
987    self_id: &PeerId,
988    paid_list: &Arc<PaidList>,
989    p2p_node: &Arc<P2PNode>,
990    config: &ReplicationConfig,
991    commitment_state: Option<&Arc<ResponderCommitmentState>>,
992) -> (Vec<XorName>, usize) {
993    let mut keys_to_delete = Vec::new();
994    let mut cleared = 0;
995    let now = Instant::now();
996
997    for candidate in candidates {
998        // TOCTOU guard: a rotation/gossip may have (re-)committed this key
999        // between candidate selection and now. Re-check retention immediately
1000        // before scheduling deletion so we never delete bytes a recently
1001        // gossiped commitment still owes in a round-2 byte challenge.
1002        if let Some(cs) = commitment_state {
1003            if cs.is_held(&candidate.key) {
1004                continue;
1005            }
1006        }
1007
1008        let (storage_admission_group, strict_close_group) =
1009            record_prune_lookup_groups(&candidate.key, p2p_node, config).await;
1010
1011        if storage_admission_group
1012            .iter()
1013            .any(|n| n.peer_id == *self_id)
1014        {
1015            if paid_list
1016                .record_out_of_range_since(&candidate.key)
1017                .is_some()
1018            {
1019                paid_list.clear_record_out_of_range(&candidate.key);
1020                cleared += 1;
1021            }
1022            continue;
1023        }
1024
1025        let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
1026            continue;
1027        };
1028        let elapsed = now
1029            .checked_duration_since(first_seen)
1030            .unwrap_or(Duration::ZERO);
1031        if elapsed < config.prune_hysteresis_duration {
1032            continue;
1033        }
1034
1035        let current_target_peers = remote_close_group_peers(&strict_close_group, self_id);
1036        if current_target_peers.is_empty() {
1037            warn!(
1038                "Cannot prune {}: current close group has no remote peers",
1039                hex::encode(candidate.key)
1040            );
1041            continue;
1042        }
1043
1044        let proofs_needed = prune_proofs_needed(current_target_peers.len());
1045        if target_peers_reported_present(
1046            &candidate.key,
1047            &current_target_peers,
1048            present_by_key,
1049            proofs_needed,
1050        ) {
1051            keys_to_delete.push(candidate.key);
1052        } else {
1053            debug!(
1054                "Deferring prune for {} until all but one of the current close group \
1055                 report it",
1056                hex::encode(candidate.key)
1057            );
1058        }
1059    }
1060
1061    (keys_to_delete, cleared)
1062}
1063
1064fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
1065    let mut challenges = Vec::new();
1066    for candidate in candidates {
1067        for peer in &candidate.target_peers {
1068            challenges.push((*peer, candidate.key));
1069        }
1070    }
1071    challenges
1072}
1073
1074#[cfg(test)]
1075fn confirmed_keys_from_presence(
1076    candidates: &[RecordPruneCandidate],
1077    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1078    proofs_needed: usize,
1079) -> Vec<XorName> {
1080    candidates
1081        .iter()
1082        .filter(|candidate| {
1083            target_peers_reported_present(
1084                &candidate.key,
1085                &candidate.target_peers,
1086                present_by_key,
1087                proofs_needed,
1088            )
1089        })
1090        .map(|candidate| candidate.key)
1091        .collect()
1092}
1093
1094/// Proofs required before deleting an out-of-range record: all but one of
1095/// the close group (6 of 7 at production parameters).
1096///
1097/// Stricter than the storage quorum (`QuorumNeeded`) because pruning only
1098/// runs after `PRUNE_HYSTERESIS_DURATION` out of range, by which time many
1099/// sync cycles should have replicated the record across the whole close
1100/// group. Tolerating exactly one lagging peer keeps a single absent peer
1101/// from vetoing deletion forever without accepting under-replication.
1102/// Groups of one or two peers require every proof: tolerating a miss there
1103/// would allow deletion on a single attestation.
1104fn prune_proofs_needed(group_size: usize) -> usize {
1105    if group_size <= 2 {
1106        group_size
1107    } else {
1108        group_size - 1
1109    }
1110}
1111
1112/// Whether enough target peers supplied positive evidence to allow deletion.
1113///
1114/// `proofs_needed == 0` means confirmation is impossible (no targets), not
1115/// trivially met.
1116fn target_peers_reported_present(
1117    key: &XorName,
1118    target_peers: &[PeerId],
1119    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
1120    proofs_needed: usize,
1121) -> bool {
1122    if proofs_needed == 0 {
1123        return false;
1124    }
1125    let Some(present_peers) = present_by_key.get(key) else {
1126        return false;
1127    };
1128    // Count distinct proven peers: iterating the present set keeps a
1129    // duplicated entry in `target_peers` from being counted twice.
1130    let proven = present_peers
1131        .iter()
1132        .filter(|peer| target_peers.contains(peer))
1133        .count();
1134    proven >= proofs_needed
1135}
1136
1137/// Challenge a peer to prove it holds the exact record bytes for `key`.
1138/// `None` means the peer failed to provide usable proof.
1139async fn peer_proves_record(
1140    peer: PeerId,
1141    key: XorName,
1142    storage: &Arc<LmdbStorage>,
1143    p2p_node: &Arc<P2PNode>,
1144    config: &ReplicationConfig,
1145    sync_state: &Arc<RwLock<NeighborSyncState>>,
1146    report_state: &PruneAuditReportState,
1147) -> Option<(PeerId, XorName)> {
1148    let local_bytes = local_record_bytes(&key, storage).await?;
1149
1150    let (challenge_id, nonce) = {
1151        let mut rng = rand::thread_rng();
1152        (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
1153    };
1154    let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
1155    let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
1156    else {
1157        // No decoded response means a timeout or an undecodable reply — the
1158        // same "no response" case the main audit path treats as a timeout. The
1159        // penalty is gated behind `TIMEOUT_EVICTION_ENABLED` (off this
1160        // release — a not-yet-upgraded or briefly-slow peer must not be evicted
1161        // by a no-response during the breaking rollout). Mirrors the suppressed
1162        // timeout penalty in handle_failed_audit; only a DECODED
1163        // PruneAuditStatus::Failed below (a peer that answered with bad/absent
1164        // bytes) is penalised regardless of the gate.
1165        if crate::replication::config::TIMEOUT_EVICTION_ENABLED {
1166            report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1167        } else {
1168            debug!(
1169                "Prune audit for {peer} key {} got no decodable response \
1170                 (eviction disabled this release — not penalising)",
1171                hex::encode(key)
1172            );
1173        }
1174        return None;
1175    };
1176
1177    let status =
1178        prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
1179    if prune_audit_response_clears_bootstrap_claim(status) {
1180        clear_prune_bootstrap_claim(&peer, sync_state).await;
1181    }
1182
1183    match status {
1184        PruneAuditStatus::Proven => Some((peer, key)),
1185        PruneAuditStatus::Bootstrapping => {
1186            report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
1187                .await;
1188            None
1189        }
1190        PruneAuditStatus::Failed => {
1191            report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
1192            None
1193        }
1194    }
1195}
1196
1197fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
1198    matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
1199}
1200
1201// The responder for an incoming `AuditChallenge` (including prune-confirmation
1202// challenges, which reuse the same wire message) lives in
1203// `super::handle_audit_challenge_msg` -> `audit::handle_audit_challenge`, the
1204// responsible-chunk audit responder. No separate prune-only responder is needed.
1205
1206fn encode_prune_audit_challenge(
1207    peer: &PeerId,
1208    key: XorName,
1209    challenge_id: u64,
1210    nonce: [u8; 32],
1211) -> Option<Vec<u8>> {
1212    let challenge = AuditChallenge {
1213        challenge_id,
1214        nonce,
1215        challenged_peer_id: *peer.as_bytes(),
1216        keys: vec![key],
1217    };
1218    let msg = ReplicationMessage {
1219        request_id: challenge_id,
1220        body: ReplicationMessageBody::AuditChallenge(challenge),
1221    };
1222    let encoded = match msg.encode() {
1223        Ok(data) => data,
1224        Err(e) => {
1225            warn!(
1226                "Failed to encode prune audit challenge for {} against {peer}: {e}",
1227                hex::encode(key),
1228            );
1229            return None;
1230        }
1231    };
1232    Some(encoded)
1233}
1234
1235async fn send_prune_audit_challenge(
1236    peer: &PeerId,
1237    key: &XorName,
1238    encoded: Vec<u8>,
1239    p2p_node: &Arc<P2PNode>,
1240    config: &ReplicationConfig,
1241) -> Option<ReplicationMessage> {
1242    let response = match p2p_node
1243        .send_request(
1244            peer,
1245            REPLICATION_PROTOCOL_ID,
1246            encoded,
1247            config.prune_audit_response_timeout,
1248        )
1249        .await
1250    {
1251        Ok(response) => response,
1252        Err(e) => {
1253            debug!(
1254                "Prune audit challenge for {} against {peer} failed: {e}",
1255                hex::encode(key)
1256            );
1257            return None;
1258        }
1259    };
1260
1261    let decoded = match ReplicationMessage::decode(&response.data) {
1262        Ok(msg) => msg,
1263        Err(e) => {
1264            warn!("Failed to decode prune audit response from {peer}: {e}");
1265            return None;
1266        }
1267    };
1268
1269    Some(decoded)
1270}
1271
1272fn prune_audit_response_status(
1273    decoded: ReplicationMessage,
1274    challenge_id: u64,
1275    peer: &PeerId,
1276    key: &XorName,
1277    nonce: &[u8; 32],
1278    local_bytes: &[u8],
1279) -> PruneAuditStatus {
1280    match decoded.body {
1281        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
1282            challenge_id: resp_id,
1283            digests,
1284        }) => {
1285            if resp_id != challenge_id {
1286                warn!("Prune audit challenge ID mismatch from {peer}");
1287                return PruneAuditStatus::Failed;
1288            }
1289            let [digest] = digests.as_slice() else {
1290                warn!(
1291                    "Prune audit response from {peer} returned {} digests for one challenged key",
1292                    digests.len(),
1293                );
1294                return PruneAuditStatus::Failed;
1295            };
1296            if *digest == ABSENT_KEY_DIGEST {
1297                warn!(
1298                    "Prune audit proof from {peer} failed for {}: peer reports key absent",
1299                    hex::encode(key)
1300                );
1301                return PruneAuditStatus::Failed;
1302            }
1303            if audit_digest_proves_key(peer, key, nonce, local_bytes, digest) {
1304                PruneAuditStatus::Proven
1305            } else {
1306                warn!(
1307                    "Prune audit proof from {peer} failed for {}: digest mismatch",
1308                    hex::encode(key)
1309                );
1310                PruneAuditStatus::Failed
1311            }
1312        }
1313        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
1314            challenge_id: resp_id,
1315        }) => {
1316            if resp_id == challenge_id {
1317                warn!(
1318                    "Prune audit proof for {} blocked by bootstrap claim from {peer}",
1319                    hex::encode(key)
1320                );
1321                PruneAuditStatus::Bootstrapping
1322            } else {
1323                warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
1324                PruneAuditStatus::Failed
1325            }
1326        }
1327        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
1328            challenge_id: resp_id,
1329            reason,
1330        }) => {
1331            if resp_id == challenge_id {
1332                warn!(
1333                    "Prune audit proof for {} rejected by {peer}: {reason}",
1334                    hex::encode(key)
1335                );
1336            } else {
1337                warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
1338            }
1339            PruneAuditStatus::Failed
1340        }
1341        _ => {
1342            warn!("Unexpected prune audit response type from {peer}");
1343            PruneAuditStatus::Failed
1344        }
1345    }
1346}
1347
1348async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
1349    match storage.get_raw(key).await {
1350        Ok(Some(bytes)) => Some(bytes),
1351        Ok(None) => {
1352            debug!(
1353                "Cannot prune-audit {}: local record disappeared",
1354                hex::encode(key)
1355            );
1356            None
1357        }
1358        Err(e) => {
1359            warn!(
1360                "Cannot prune-audit {}: failed to read local record: {e}",
1361                hex::encode(key)
1362            );
1363            None
1364        }
1365    }
1366}
1367
1368fn audit_digest_proves_key(
1369    peer: &PeerId,
1370    key: &XorName,
1371    nonce: &[u8; 32],
1372    local_bytes: &[u8],
1373    digest: &[u8; 32],
1374) -> bool {
1375    if *digest == ABSENT_KEY_DIGEST {
1376        return false;
1377    }
1378    let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
1379    *digest == expected
1380}
1381
1382async fn report_prune_audit_failure_once(
1383    peer: &PeerId,
1384    key: &XorName,
1385    p2p_node: &Arc<P2PNode>,
1386    config: &ReplicationConfig,
1387    report_state: &PruneAuditReportState,
1388) -> bool {
1389    let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
1390        && reserve_prune_audit_failure_report(report_state, peer).await;
1391    if !should_report {
1392        return false;
1393    }
1394
1395    p2p_node
1396        .report_trust_event(
1397            peer,
1398            saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
1399        )
1400        .await;
1401    true
1402}
1403
1404async fn reserve_prune_audit_failure_report(
1405    report_state: &PruneAuditReportState,
1406    peer: &PeerId,
1407) -> bool {
1408    report_state.audit_failures.write().await.insert(*peer)
1409}
1410
1411async fn reserve_prune_bootstrap_abuse_report(
1412    report_state: &PruneAuditReportState,
1413    peer: &PeerId,
1414) -> bool {
1415    report_state.bootstrap_abuse.write().await.insert(*peer)
1416}
1417
1418async fn report_prune_bootstrap_claim(
1419    peer: &PeerId,
1420    key: &XorName,
1421    p2p_node: &Arc<P2PNode>,
1422    config: &ReplicationConfig,
1423    sync_state: &Arc<RwLock<NeighborSyncState>>,
1424    report_state: &PruneAuditReportState,
1425) {
1426    if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
1427        return;
1428    }
1429
1430    let observation = {
1431        let now = Instant::now();
1432        let mut state = sync_state.write().await;
1433        (
1434            now,
1435            state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
1436        )
1437    };
1438
1439    let (now, observation) = observation;
1440    match observation {
1441        BootstrapClaimObservation::WithinGrace { .. } => {
1442            debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
1443            return;
1444        }
1445        BootstrapClaimObservation::PastGrace { first_seen } => {
1446            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1447                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1448                return;
1449            }
1450            warn!(
1451                "Prune audit: peer {peer} claiming bootstrap past grace period \
1452                 ({:?} > {:?}), reporting abuse",
1453                now.duration_since(first_seen),
1454                config.bootstrap_claim_grace_period,
1455            );
1456        }
1457        BootstrapClaimObservation::Repeated { first_seen } => {
1458            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
1459                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
1460                return;
1461            }
1462            warn!(
1463                "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
1464                 first claim was {:?} ago, reporting abuse",
1465                now.duration_since(first_seen),
1466            );
1467        }
1468    }
1469
1470    p2p_node
1471        .report_trust_event(
1472            peer,
1473            saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
1474        )
1475        .await;
1476}
1477
1478async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
1479    let removed = {
1480        let mut state = sync_state.write().await;
1481        state.clear_active_bootstrap_claim(peer)
1482    };
1483    if removed {
1484        debug!("Prune audit: cleared active bootstrap claim for {peer}");
1485    }
1486}
1487
1488async fn peer_is_currently_responsible(
1489    peer: &PeerId,
1490    key: &XorName,
1491    p2p_node: &Arc<P2PNode>,
1492    config: &ReplicationConfig,
1493) -> bool {
1494    let closest = p2p_node
1495        .dht_manager()
1496        .find_closest_nodes_local_with_self(key, config.close_group_size)
1497        .await;
1498    closest.iter().any(|node| node.peer_id == *peer)
1499}
1500
1501#[cfg(test)]
1502#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1503mod tests {
1504    use super::*;
1505
1506    fn peer_id_from_byte(b: u8) -> PeerId {
1507        let mut bytes = [0u8; 32];
1508        bytes[0] = b;
1509        PeerId::from_bytes(bytes)
1510    }
1511
1512    fn key_from_byte(b: u8) -> XorName {
1513        [b; 32]
1514    }
1515
1516    fn peer_ids(count: usize) -> Vec<PeerId> {
1517        (0..count)
1518            .map(|idx| peer_id_from_byte(u8::try_from(idx + 1).expect("peer byte")))
1519            .collect()
1520    }
1521
1522    fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
1523        RecordPruneCandidate { key, target_peers }
1524    }
1525
1526    #[test]
1527    fn prune_audit_challenges_are_one_per_candidate_peer() {
1528        let peer_a = peer_id_from_byte(1);
1529        let peer_b = peer_id_from_byte(2);
1530        let key_a = key_from_byte(0xA);
1531        let key_b = key_from_byte(0xB);
1532        let candidates = vec![
1533            candidate(key_a, vec![peer_a, peer_b]),
1534            candidate(key_b, vec![peer_b]),
1535        ];
1536
1537        let mut challenges = build_peer_audit_challenges(&candidates);
1538        challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1539
1540        let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
1541        expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1542        assert_eq!(challenges, expected);
1543    }
1544
1545    #[test]
1546    fn confirmed_keys_require_quorum_of_target_peers_present() {
1547        let peer_a = peer_id_from_byte(1);
1548        let peer_b = peer_id_from_byte(2);
1549        let peer_c = peer_id_from_byte(3);
1550        let key = key_from_byte(0xC);
1551        let candidates = vec![candidate(key, vec![peer_a, peer_b, peer_c])];
1552        let mut present_by_key = HashMap::new();
1553        present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
1554
1555        // Two of three proofs meet a quorum of 2 even though one peer is
1556        // missing — unanimity is not required.
1557        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1558        assert_eq!(confirmed, vec![key]);
1559
1560        // The same evidence fails a quorum of 3.
1561        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 3);
1562        assert!(confirmed.is_empty());
1563    }
1564
1565    #[test]
1566    fn confirmed_keys_defer_below_quorum_or_missing_peer_evidence() {
1567        let peer_a = peer_id_from_byte(1);
1568        let peer_b = peer_id_from_byte(2);
1569        let quorum_key = key_from_byte(0xD);
1570        let below_quorum_key = key_from_byte(0xE);
1571        let missing_key = key_from_byte(0xF);
1572        let candidates = vec![
1573            candidate(quorum_key, vec![peer_a, peer_b]),
1574            candidate(below_quorum_key, vec![peer_a, peer_b]),
1575            candidate(missing_key, vec![peer_a, peer_b]),
1576        ];
1577        let mut present_by_key = HashMap::new();
1578        present_by_key.insert(quorum_key, HashSet::from([peer_a, peer_b]));
1579        present_by_key.insert(below_quorum_key, HashSet::from([peer_a]));
1580
1581        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key, 2);
1582
1583        assert_eq!(confirmed, vec![quorum_key]);
1584    }
1585
1586    #[test]
1587    fn prune_proofs_needed_tolerates_exactly_one_lagging_peer() {
1588        assert_eq!(prune_proofs_needed(0), 0);
1589        // Tiny groups require every proof.
1590        assert_eq!(prune_proofs_needed(1), 1);
1591        assert_eq!(prune_proofs_needed(2), 2);
1592        assert_eq!(prune_proofs_needed(3), 2);
1593        assert_eq!(prune_proofs_needed(5), 4);
1594        // Production close group: 6 of 7 proofs required.
1595        assert_eq!(prune_proofs_needed(7), 6);
1596    }
1597
1598    #[test]
1599    fn paid_prune_confirmations_are_three_quarters_rounded_up() {
1600        assert_eq!(paid_prune_confirmations_needed(0), 0);
1601        assert_eq!(paid_prune_confirmations_needed(1), 1);
1602        assert_eq!(paid_prune_confirmations_needed(2), 2);
1603        assert_eq!(paid_prune_confirmations_needed(4), 3);
1604        // Production paid close group: 15 of 20 confirmations required.
1605        assert_eq!(paid_prune_confirmations_needed(20), 15);
1606    }
1607
1608    #[test]
1609    fn paid_prune_peer_budget_allows_overlapping_targets() {
1610        let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS);
1611        let mut selected_peers = HashSet::new();
1612
1613        assert!(reserve_paid_prune_peer_budget(&peers, &mut selected_peers));
1614        assert_eq!(
1615            selected_peers.len(),
1616            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1617        );
1618
1619        let overlapping_targets = vec![peers[0], peers[1]];
1620        assert!(reserve_paid_prune_peer_budget(
1621            &overlapping_targets,
1622            &mut selected_peers,
1623        ));
1624        assert_eq!(
1625            selected_peers.len(),
1626            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1627        );
1628    }
1629
1630    #[test]
1631    fn paid_prune_peer_budget_rejects_new_peers_past_cap() {
1632        let peers = peer_ids(MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS + 1);
1633        let mut selected_peers = HashSet::new();
1634
1635        assert!(reserve_paid_prune_peer_budget(
1636            &peers[..MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS],
1637            &mut selected_peers,
1638        ));
1639        assert!(!reserve_paid_prune_peer_budget(
1640            &peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS..],
1641            &mut selected_peers,
1642        ));
1643        assert_eq!(
1644            selected_peers.len(),
1645            MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS,
1646        );
1647        assert!(!selected_peers.contains(&peers[MAX_PAID_PRUNE_VERIFICATION_PEERS_PER_PASS]));
1648    }
1649
1650    #[test]
1651    fn paid_confirmations_count_only_confirmed_target_peers() {
1652        let confirmed_peer = peer_id_from_byte(1);
1653        let not_found_peer = peer_id_from_byte(2);
1654        let unresolved_peer = peer_id_from_byte(3);
1655        let outsider = peer_id_from_byte(4);
1656        let key = key_from_byte(0x21);
1657        let candidates = vec![(key, vec![confirmed_peer, not_found_peer, unresolved_peer])];
1658
1659        let mut evidence = HashMap::new();
1660        evidence.insert(
1661            key,
1662            KeyVerificationEvidence {
1663                presence: HashMap::new(),
1664                paid_list: HashMap::from([
1665                    (confirmed_peer, PaidListEvidence::Confirmed),
1666                    (not_found_peer, PaidListEvidence::NotFound),
1667                    (unresolved_peer, PaidListEvidence::Unresolved),
1668                    // Confirmation from a peer outside the target set.
1669                    (outsider, PaidListEvidence::Confirmed),
1670                ]),
1671            },
1672        );
1673
1674        let confirmed_by_key = paid_confirmations_by_key(&candidates, &evidence);
1675
1676        assert_eq!(
1677            confirmed_by_key.get(&key),
1678            Some(&HashSet::from([confirmed_peer])),
1679            "only Confirmed answers from target peers may count",
1680        );
1681    }
1682
1683    #[test]
1684    fn paid_confirmations_skip_keys_without_evidence() {
1685        let peer = peer_id_from_byte(1);
1686        let key = key_from_byte(0x22);
1687        let candidates = vec![(key, vec![peer])];
1688
1689        let confirmed_by_key = paid_confirmations_by_key(&candidates, &HashMap::new());
1690
1691        assert!(confirmed_by_key.is_empty());
1692    }
1693
1694    #[test]
1695    fn zero_quorum_never_confirms() {
1696        let peer_a = peer_id_from_byte(1);
1697        let key = key_from_byte(0x10);
1698        let mut present_by_key = HashMap::new();
1699        present_by_key.insert(key, HashSet::from([peer_a]));
1700
1701        assert!(!target_peers_reported_present(
1702            &key,
1703            &[peer_a],
1704            &present_by_key,
1705            0
1706        ));
1707    }
1708
1709    #[test]
1710    fn proofs_from_non_target_peers_do_not_count_toward_quorum() {
1711        let target = peer_id_from_byte(1);
1712        let outsider = peer_id_from_byte(2);
1713        let key = key_from_byte(0x11);
1714        let mut present_by_key = HashMap::new();
1715        present_by_key.insert(key, HashSet::from([outsider]));
1716
1717        assert!(!target_peers_reported_present(
1718            &key,
1719            &[target],
1720            &present_by_key,
1721            1
1722        ));
1723    }
1724
1725    #[test]
1726    fn duplicated_target_peer_counts_once_toward_quorum() {
1727        let peer = peer_id_from_byte(1);
1728        let key = key_from_byte(0x12);
1729        let mut present_by_key = HashMap::new();
1730        present_by_key.insert(key, HashSet::from([peer]));
1731
1732        assert!(!target_peers_reported_present(
1733            &key,
1734            &[peer, peer],
1735            &present_by_key,
1736            2
1737        ));
1738    }
1739
1740    #[test]
1741    fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
1742        let peer = peer_id_from_byte(1);
1743        let other_peer = peer_id_from_byte(2);
1744        let key = key_from_byte(0x11);
1745        let other_key = key_from_byte(0x12);
1746        let nonce = [0xAA; 32];
1747        let other_nonce = [0xBB; 32];
1748        let bytes = b"record bytes";
1749        let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
1750
1751        assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
1752        assert!(!audit_digest_proves_key(
1753            &other_peer,
1754            &key,
1755            &nonce,
1756            bytes,
1757            &digest
1758        ));
1759        assert!(!audit_digest_proves_key(
1760            &peer, &other_key, &nonce, bytes, &digest
1761        ));
1762        assert!(!audit_digest_proves_key(
1763            &peer,
1764            &key,
1765            &other_nonce,
1766            bytes,
1767            &digest
1768        ));
1769        assert!(!audit_digest_proves_key(
1770            &peer,
1771            &key,
1772            &nonce,
1773            b"different bytes",
1774            &digest
1775        ));
1776        assert!(!audit_digest_proves_key(
1777            &peer,
1778            &key,
1779            &nonce,
1780            bytes,
1781            &ABSENT_KEY_DIGEST
1782        ));
1783    }
1784
1785    #[tokio::test]
1786    async fn prune_cursor_advances_past_selected_budget_window() {
1787        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1788        state.write().await.prune_cursor = 2;
1789
1790        let start = prune_scan_start(&state, 10).await;
1791        advance_prune_cursor(&state, 10, start, Some(3)).await;
1792
1793        assert_eq!(state.read().await.prune_cursor, 6);
1794    }
1795
1796    #[tokio::test]
1797    async fn prune_cursor_advances_even_when_no_candidate_selected() {
1798        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1799        state.write().await.prune_cursor = 9;
1800
1801        let start = prune_scan_start(&state, 10).await;
1802        advance_prune_cursor(&state, 10, start, None).await;
1803
1804        assert_eq!(state.read().await.prune_cursor, 0);
1805    }
1806
1807    #[tokio::test]
1808    async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1809        let peer = peer_id_from_byte(1);
1810        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1811        let first_seen = Instant::now();
1812        state
1813            .write()
1814            .await
1815            .bootstrap_claims
1816            .insert(peer, first_seen);
1817        state
1818            .write()
1819            .await
1820            .bootstrap_claim_history
1821            .insert(peer, first_seen);
1822
1823        clear_prune_bootstrap_claim(&peer, &state).await;
1824
1825        let state = state.read().await;
1826        assert!(!state.bootstrap_claims.contains_key(&peer));
1827        assert!(state.bootstrap_claim_history.contains_key(&peer));
1828    }
1829
1830    #[test]
1831    fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1832        assert!(prune_audit_response_clears_bootstrap_claim(
1833            PruneAuditStatus::Proven
1834        ));
1835        assert!(prune_audit_response_clears_bootstrap_claim(
1836            PruneAuditStatus::Failed
1837        ));
1838        assert!(!prune_audit_response_clears_bootstrap_claim(
1839            PruneAuditStatus::Bootstrapping
1840        ));
1841    }
1842
1843    #[tokio::test]
1844    async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1845        let peer = peer_id_from_byte(1);
1846        let other_peer = peer_id_from_byte(2);
1847        let report_state = PruneAuditReportState::default();
1848
1849        assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1850        assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1851        assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1852
1853        let reported = report_state.audit_failures.read().await;
1854        assert_eq!(reported.len(), 2);
1855        assert!(reported.contains(&peer));
1856        assert!(reported.contains(&other_peer));
1857    }
1858
1859    #[tokio::test]
1860    async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1861        let peer = peer_id_from_byte(1);
1862        let other_peer = peer_id_from_byte(2);
1863        let report_state = PruneAuditReportState::default();
1864
1865        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1866        assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1867        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1868
1869        let reported = report_state.bootstrap_abuse.read().await;
1870        assert_eq!(reported.len(), 2);
1871        assert!(reported.contains(&peer));
1872        assert!(reported.contains(&other_peer));
1873    }
1874}