Skip to main content

ant_node/replication/
pruning.rs

1//! Post-cycle responsibility pruning (Section 11).
2//!
3//! On `NeighborSyncCycleComplete`: prune stored records and `PaidForList`
4//! entries that have been continuously out of range for at least
5//! `PRUNE_HYSTERESIS_DURATION`.
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::config::{
21    ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS,
22    REPLICATION_PROTOCOL_ID,
23};
24use crate::replication::paid_list::PaidList;
25use crate::replication::protocol::{
26    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
27    ReplicationMessageBody, ABSENT_KEY_DIGEST,
28};
29use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState, RepairProofs};
30use crate::storage::LmdbStorage;
31
32use super::REPLICATION_TRUST_WEIGHT;
33
34const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
35
36// ---------------------------------------------------------------------------
37// Result type
38// ---------------------------------------------------------------------------
39
40/// Summary of a prune pass.
41#[derive(Debug, Default)]
42pub struct PruneResult {
43    /// Number of records deleted from storage.
44    pub records_pruned: usize,
45    /// Number of records with out-of-range timestamp newly set.
46    pub records_marked_out_of_range: usize,
47    /// Number of records with out-of-range timestamp cleared (back in range).
48    pub records_cleared: usize,
49    /// Number of `PaidForList` entries removed.
50    pub paid_entries_pruned: usize,
51    /// Number of `PaidForList` entries with out-of-range timestamp newly set.
52    pub paid_entries_marked: usize,
53    /// Number of `PaidForList` entries cleared (back in range).
54    pub paid_entries_cleared: usize,
55}
56
57/// Shared dependencies and switches for one prune pass.
58pub struct PrunePassContext<'a> {
59    /// Local peer id.
60    pub self_id: &'a PeerId,
61    /// Local record storage.
62    pub storage: &'a Arc<LmdbStorage>,
63    /// Persistent paid-list state.
64    pub paid_list: &'a Arc<PaidList>,
65    /// P2P node used for routing lookups and prune-confirmation audits.
66    pub p2p_node: &'a Arc<P2PNode>,
67    /// Replication configuration.
68    pub config: &'a ReplicationConfig,
69    /// Neighbor-sync state, including prune cursor and bootstrap claims.
70    pub sync_state: &'a Arc<RwLock<NeighborSyncState>>,
71    /// Key-specific repair proofs used to gate prune-confirmation audits.
72    pub repair_proofs: &'a Arc<RwLock<RepairProofs>>,
73    /// Current local neighbor-sync cycle epoch.
74    pub current_sync_epoch: u64,
75    /// Whether remote prune-confirmation audits are allowed this pass.
76    pub allow_remote_prune_audits: bool,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80enum PruneAuditStatus {
81    Proven,
82    Failed,
83    Bootstrapping,
84}
85
86#[derive(Debug, Default)]
87struct RecordPruneStats {
88    marked: usize,
89    cleared: usize,
90    pruned: usize,
91}
92
93#[derive(Debug, Default)]
94struct PaidPruneStats {
95    marked: usize,
96    cleared: usize,
97    pruned: usize,
98}
99
100#[derive(Debug, Clone)]
101struct RecordPruneCandidate {
102    key: XorName,
103    target_peers: Vec<PeerId>,
104}
105
106struct RecordPruneKeyOutcome {
107    marked: bool,
108    state: RecordPruneKeyState,
109}
110
111impl Default for RecordPruneKeyOutcome {
112    fn default() -> Self {
113        Self {
114            marked: false,
115            state: RecordPruneKeyState::None,
116        }
117    }
118}
119
120enum RecordPruneKeyState {
121    None,
122    Cleared,
123    BootstrapDeferred,
124    BudgetDeferred,
125    Candidate(RecordPruneCandidate),
126}
127
128#[derive(Default)]
129struct PruneAuditReportState {
130    audit_failures: RwLock<HashSet<PeerId>>,
131    bootstrap_abuse: RwLock<HashSet<PeerId>>,
132}
133
134// ---------------------------------------------------------------------------
135// Prune pass
136// ---------------------------------------------------------------------------
137
138/// Execute post-cycle responsibility pruning.
139///
140/// For each stored record K:
141/// - If `IsResponsible(self, K)`: clear `RecordOutOfRangeFirstSeen`.
142/// - If not responsible: set timestamp if not already set; delete if the
143///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and the current
144///   close group proves it stores the record.
145///
146/// For each `PaidForList` entry K:
147/// - If self is in `PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen`.
148/// - If not in group: set timestamp if not already set; remove entry if the
149///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old.
150///
151/// Compatibility wrapper for callers that have not adopted repair-proof
152/// tracking. It preserves the original public signature, but it has no proof
153/// table or advanced sync epoch to pass into record prune-confirmation audits.
154/// Out-of-range records are therefore marked/deferred rather than deleted via
155/// remote confirmation. The replication engine calls
156/// [`run_prune_pass_with_context`] so it can pass real repair proofs.
157pub async fn run_prune_pass(
158    self_id: &PeerId,
159    storage: &Arc<LmdbStorage>,
160    paid_list: &Arc<PaidList>,
161    p2p_node: &Arc<P2PNode>,
162    config: &ReplicationConfig,
163    sync_state: &Arc<RwLock<NeighborSyncState>>,
164    allow_remote_prune_audits: bool,
165) -> PruneResult {
166    let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
167    run_prune_pass_with_context(PrunePassContext {
168        self_id,
169        storage,
170        paid_list,
171        p2p_node,
172        config,
173        sync_state,
174        repair_proofs: &repair_proofs,
175        current_sync_epoch: 0,
176        allow_remote_prune_audits,
177    })
178    .await
179}
180
181/// Execute one prune pass with repair-proof-gated remote confirmations.
182pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult {
183    let (stored_count, record_stats) = prune_stored_records(&ctx).await;
184    let now = Instant::now();
185    let (paid_count, paid_stats) =
186        prune_paid_entries(ctx.self_id, ctx.paid_list, ctx.p2p_node, ctx.config, now).await;
187
188    let result = PruneResult {
189        records_pruned: record_stats.pruned,
190        records_marked_out_of_range: record_stats.marked,
191        records_cleared: record_stats.cleared,
192        paid_entries_pruned: paid_stats.pruned,
193        paid_entries_marked: paid_stats.marked,
194        paid_entries_cleared: paid_stats.cleared,
195    };
196
197    info!(
198        "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
199        result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
200    );
201
202    result
203}
204
205async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) {
206    let stored_keys = match ctx.storage.all_keys().await {
207        Ok(keys) => keys,
208        Err(e) => {
209            warn!("Failed to read stored keys for pruning: {e}");
210            return (0, RecordPruneStats::default());
211        }
212    };
213
214    let now = Instant::now();
215    let dht = ctx.p2p_node.dht_manager();
216    let mut stats = RecordPruneStats::default();
217    let mut candidates = Vec::new();
218    let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
219    let mut budget_deferred = 0usize;
220    let mut bootstrap_deferred = 0usize;
221    let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await;
222    let mut last_selected_offset = None;
223
224    for offset in 0..stored_keys.len() {
225        let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
226        let closest: Vec<DHTNode> = dht
227            .find_closest_nodes_local_with_self(key, ctx.config.close_group_size)
228            .await;
229
230        let outcome =
231            evaluate_record_prune_key(ctx, key, &closest, now, &mut audit_challenge_budget).await;
232        if outcome.marked {
233            stats.marked += 1;
234        }
235        match outcome.state {
236            RecordPruneKeyState::None => {}
237            RecordPruneKeyState::Cleared => stats.cleared += 1,
238            RecordPruneKeyState::BootstrapDeferred => {
239                bootstrap_deferred = bootstrap_deferred.saturating_add(1);
240            }
241            RecordPruneKeyState::BudgetDeferred => {
242                budget_deferred = budget_deferred.saturating_add(1);
243            }
244            RecordPruneKeyState::Candidate(candidate) => {
245                last_selected_offset = Some(offset);
246                candidates.push(candidate);
247            }
248        }
249    }
250
251    advance_prune_cursor(
252        ctx.sync_state,
253        stored_keys.len(),
254        scan_start,
255        last_selected_offset,
256    )
257    .await;
258
259    if bootstrap_deferred > 0 {
260        debug!(
261            "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
262             remote prune-confirmation audits"
263        );
264    }
265
266    if budget_deferred > 0 {
267        debug!(
268            "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
269             ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
270        );
271    }
272
273    let present_by_key = collect_record_prune_proofs(
274        &candidates,
275        ctx.storage,
276        ctx.p2p_node,
277        ctx.config,
278        ctx.sync_state,
279    )
280    .await;
281    let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
282        &candidates,
283        &present_by_key,
284        ctx.self_id,
285        ctx.paid_list,
286        ctx.p2p_node,
287        ctx.config,
288    )
289    .await;
290    stats.cleared += revalidated_cleared;
291    stats.pruned = delete_stored_records(
292        &keys_to_delete,
293        ctx.storage,
294        ctx.paid_list,
295        ctx.repair_proofs,
296    )
297    .await;
298
299    (stored_keys.len(), stats)
300}
301
302async fn evaluate_record_prune_key(
303    ctx: &PrunePassContext<'_>,
304    key: &XorName,
305    closest: &[DHTNode],
306    now: Instant,
307    audit_challenge_budget: &mut usize,
308) -> RecordPruneKeyOutcome {
309    let mut outcome = RecordPruneKeyOutcome::default();
310    let is_responsible = closest.iter().any(|node| node.peer_id == *ctx.self_id);
311
312    if is_responsible {
313        if ctx.paid_list.record_out_of_range_since(key).is_some() {
314            ctx.paid_list.clear_record_out_of_range(key);
315            outcome.state = RecordPruneKeyState::Cleared;
316        }
317        return outcome;
318    }
319
320    if ctx.paid_list.record_out_of_range_since(key).is_none() {
321        outcome.marked = true;
322    }
323    ctx.paid_list.set_record_out_of_range(key);
324
325    let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else {
326        return outcome;
327    };
328    let elapsed = now
329        .checked_duration_since(first_seen)
330        .unwrap_or(Duration::ZERO);
331    if elapsed < ctx.config.prune_hysteresis_duration {
332        return outcome;
333    }
334
335    if !ctx.allow_remote_prune_audits {
336        outcome.state = RecordPruneKeyState::BootstrapDeferred;
337        return outcome;
338    }
339
340    let target_peers = remote_close_group_peers(closest, ctx.self_id);
341    if target_peers.is_empty() {
342        warn!(
343            "Cannot prune {}: current close group has no remote peers",
344            hex::encode(key)
345        );
346        return outcome;
347    }
348
349    let current_close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
350    if !target_peers_have_mature_repair_proofs(
351        key,
352        &target_peers,
353        &current_close_peers,
354        ctx.repair_proofs,
355        ctx.current_sync_epoch,
356    )
357    .await
358    {
359        debug!(
360            "Deferring prune for {} until current close group has mature repair proofs",
361            hex::encode(key)
362        );
363        return outcome;
364    }
365
366    if target_peers.len() > *audit_challenge_budget {
367        outcome.state = RecordPruneKeyState::BudgetDeferred;
368        return outcome;
369    }
370
371    *audit_challenge_budget -= target_peers.len();
372    outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate {
373        key: *key,
374        target_peers,
375    });
376    outcome
377}
378
379async fn prune_paid_entries(
380    self_id: &PeerId,
381    paid_list: &Arc<PaidList>,
382    p2p_node: &Arc<P2PNode>,
383    config: &ReplicationConfig,
384    now: Instant,
385) -> (usize, PaidPruneStats) {
386    let paid_keys = match paid_list.all_keys() {
387        Ok(keys) => keys,
388        Err(e) => {
389            warn!("Failed to read PaidForList for pruning: {e}");
390            return (0, PaidPruneStats::default());
391        }
392    };
393
394    let dht = p2p_node.dht_manager();
395    let mut stats = PaidPruneStats::default();
396    let mut paid_keys_to_delete = Vec::new();
397
398    for key in &paid_keys {
399        let closest: Vec<DHTNode> = dht
400            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
401            .await;
402        let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
403
404        if in_paid_group {
405            if paid_list.paid_out_of_range_since(key).is_some() {
406                paid_list.clear_paid_out_of_range(key);
407                stats.cleared += 1;
408            }
409        } else {
410            if paid_list.paid_out_of_range_since(key).is_none() {
411                stats.marked += 1;
412            }
413            paid_list.set_paid_out_of_range(key);
414
415            if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
416                let elapsed = now
417                    .checked_duration_since(first_seen)
418                    .unwrap_or(Duration::ZERO);
419                if elapsed >= config.prune_hysteresis_duration {
420                    paid_keys_to_delete.push(*key);
421                }
422            }
423        }
424    }
425
426    if !paid_keys_to_delete.is_empty() {
427        match paid_list.remove_batch(&paid_keys_to_delete).await {
428            Ok(count) => {
429                stats.pruned = count;
430                debug!("Pruned {count} out-of-range PaidForList entries");
431            }
432            Err(e) => {
433                warn!("Failed to prune PaidForList entries: {e}");
434            }
435        }
436    }
437
438    (paid_keys.len(), stats)
439}
440
441fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
442    close_group
443        .iter()
444        .filter(|node| node.peer_id != *self_id)
445        .map(|node| node.peer_id)
446        .collect()
447}
448
449async fn target_peers_have_mature_repair_proofs(
450    key: &XorName,
451    target_peers: &[PeerId],
452    current_close_peers: &HashSet<PeerId>,
453    repair_proofs: &Arc<RwLock<RepairProofs>>,
454    current_sync_epoch: u64,
455) -> bool {
456    let mut proofs = repair_proofs.write().await;
457    target_peers.iter().all(|peer| {
458        proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch)
459    })
460}
461
462async fn prune_scan_start(
463    sync_state: &Arc<RwLock<NeighborSyncState>>,
464    stored_key_count: usize,
465) -> usize {
466    if stored_key_count == 0 {
467        return 0;
468    }
469    sync_state.read().await.prune_cursor % stored_key_count
470}
471
472async fn advance_prune_cursor(
473    sync_state: &Arc<RwLock<NeighborSyncState>>,
474    stored_key_count: usize,
475    scan_start: usize,
476    last_selected_offset: Option<usize>,
477) {
478    if stored_key_count == 0 {
479        sync_state.write().await.prune_cursor = 0;
480        return;
481    }
482
483    let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
484    sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
485}
486
487async fn delete_stored_records(
488    keys_to_delete: &[XorName],
489    storage: &Arc<LmdbStorage>,
490    paid_list: &Arc<PaidList>,
491    repair_proofs: &Arc<RwLock<RepairProofs>>,
492) -> usize {
493    let mut pruned = 0;
494
495    for key in keys_to_delete {
496        if let Err(e) = storage.delete(key).await {
497            warn!("Failed to prune record {}: {e}", hex::encode(key));
498        } else {
499            pruned += 1;
500            paid_list.clear_record_out_of_range(key);
501            repair_proofs.write().await.remove_key(key);
502            // Seed the PaidForList out-of-range timer so the second pass can
503            // prune the entry sooner, closing the re-admission window between
504            // the storage delete and the PaidForList prune pass.
505            paid_list.set_paid_out_of_range(key);
506            debug!("Pruned out-of-range record {}", hex::encode(key));
507        }
508    }
509
510    pruned
511}
512
513/// Collect positive presence reports for prune candidates.
514///
515/// Peers that fail to prove storage block pruning for their keys. The
516/// retained local record continues to participate in normal neighbor-sync
517/// repair because replica hint construction walks all locally stored keys,
518/// including out-of-range keys retained by hysteresis.
519async fn collect_record_prune_proofs(
520    candidates: &[RecordPruneCandidate],
521    storage: &Arc<LmdbStorage>,
522    p2p_node: &Arc<P2PNode>,
523    config: &ReplicationConfig,
524    sync_state: &Arc<RwLock<NeighborSyncState>>,
525) -> HashMap<XorName, HashSet<PeerId>> {
526    if candidates.is_empty() {
527        return HashMap::new();
528    }
529
530    let report_state = PruneAuditReportState::default();
531    let mut requests = stream::iter(build_peer_audit_challenges(candidates))
532        .map(|(peer, key)| {
533            peer_proves_record(
534                peer,
535                key,
536                storage,
537                p2p_node,
538                config,
539                sync_state,
540                &report_state,
541            )
542        })
543        .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
544
545    let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
546    while let Some(proof) = requests.next().await {
547        if let Some((peer, key)) = proof {
548            present_by_key.entry(key).or_default().insert(peer);
549        }
550    }
551
552    present_by_key
553}
554
555async fn revalidated_record_prune_keys(
556    candidates: &[RecordPruneCandidate],
557    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
558    self_id: &PeerId,
559    paid_list: &Arc<PaidList>,
560    p2p_node: &Arc<P2PNode>,
561    config: &ReplicationConfig,
562) -> (Vec<XorName>, usize) {
563    let dht = p2p_node.dht_manager();
564    let mut keys_to_delete = Vec::new();
565    let mut cleared = 0;
566    let now = Instant::now();
567
568    for candidate in candidates {
569        let closest: Vec<DHTNode> = dht
570            .find_closest_nodes_local_with_self(&candidate.key, config.close_group_size)
571            .await;
572
573        if closest.iter().any(|n| n.peer_id == *self_id) {
574            if paid_list
575                .record_out_of_range_since(&candidate.key)
576                .is_some()
577            {
578                paid_list.clear_record_out_of_range(&candidate.key);
579                cleared += 1;
580            }
581            continue;
582        }
583
584        let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
585            continue;
586        };
587        let elapsed = now
588            .checked_duration_since(first_seen)
589            .unwrap_or(Duration::ZERO);
590        if elapsed < config.prune_hysteresis_duration {
591            continue;
592        }
593
594        let current_target_peers = remote_close_group_peers(&closest, self_id);
595        if current_target_peers.is_empty() {
596            warn!(
597                "Cannot prune {}: current close group has no remote peers",
598                hex::encode(candidate.key)
599            );
600            continue;
601        }
602
603        if target_peers_reported_present(&candidate.key, &current_target_peers, present_by_key) {
604            keys_to_delete.push(candidate.key);
605        } else {
606            debug!(
607                "Deferring prune for {} until current close group reports it",
608                hex::encode(candidate.key)
609            );
610        }
611    }
612
613    (keys_to_delete, cleared)
614}
615
616fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
617    let mut challenges = Vec::new();
618    for candidate in candidates {
619        for peer in &candidate.target_peers {
620            challenges.push((*peer, candidate.key));
621        }
622    }
623    challenges
624}
625
626#[cfg(test)]
627fn confirmed_keys_from_presence(
628    candidates: &[RecordPruneCandidate],
629    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
630) -> Vec<XorName> {
631    candidates
632        .iter()
633        .filter(|candidate| {
634            target_peers_reported_present(&candidate.key, &candidate.target_peers, present_by_key)
635        })
636        .map(|candidate| candidate.key)
637        .collect()
638}
639
640fn target_peers_reported_present(
641    key: &XorName,
642    target_peers: &[PeerId],
643    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
644) -> bool {
645    let Some(present_peers) = present_by_key.get(key) else {
646        return false;
647    };
648    target_peers.iter().all(|peer| present_peers.contains(peer))
649}
650
651/// Challenge a peer to prove it holds the exact record bytes for `key`.
652/// `None` means the peer failed to provide usable proof.
653async fn peer_proves_record(
654    peer: PeerId,
655    key: XorName,
656    storage: &Arc<LmdbStorage>,
657    p2p_node: &Arc<P2PNode>,
658    config: &ReplicationConfig,
659    sync_state: &Arc<RwLock<NeighborSyncState>>,
660    report_state: &PruneAuditReportState,
661) -> Option<(PeerId, XorName)> {
662    let local_bytes = local_record_bytes(&key, storage).await?;
663
664    let (challenge_id, nonce) = {
665        let mut rng = rand::thread_rng();
666        (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
667    };
668    let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
669    let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
670    else {
671        // No decoded response means we did not observe the peer stop claiming
672        // bootstrap status. Preserve any active claim so a later claim is not
673        // misclassified as repeated abuse.
674        report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
675        return None;
676    };
677
678    let status =
679        prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
680    if prune_audit_response_clears_bootstrap_claim(status) {
681        clear_prune_bootstrap_claim(&peer, sync_state).await;
682    }
683
684    match status {
685        PruneAuditStatus::Proven => Some((peer, key)),
686        PruneAuditStatus::Bootstrapping => {
687            report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
688                .await;
689            None
690        }
691        PruneAuditStatus::Failed => {
692            report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
693            None
694        }
695    }
696}
697
698fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
699    matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
700}
701
702fn encode_prune_audit_challenge(
703    peer: &PeerId,
704    key: XorName,
705    challenge_id: u64,
706    nonce: [u8; 32],
707) -> Option<Vec<u8>> {
708    let challenge = AuditChallenge {
709        challenge_id,
710        nonce,
711        challenged_peer_id: *peer.as_bytes(),
712        keys: vec![key],
713    };
714    let msg = ReplicationMessage {
715        request_id: challenge_id,
716        body: ReplicationMessageBody::AuditChallenge(challenge),
717    };
718    let encoded = match msg.encode() {
719        Ok(data) => data,
720        Err(e) => {
721            warn!(
722                "Failed to encode prune audit challenge for {} against {peer}: {e}",
723                hex::encode(key),
724            );
725            return None;
726        }
727    };
728    Some(encoded)
729}
730
731async fn send_prune_audit_challenge(
732    peer: &PeerId,
733    key: &XorName,
734    encoded: Vec<u8>,
735    p2p_node: &Arc<P2PNode>,
736    config: &ReplicationConfig,
737) -> Option<ReplicationMessage> {
738    let response = match p2p_node
739        .send_request(
740            peer,
741            REPLICATION_PROTOCOL_ID,
742            encoded,
743            config.audit_response_timeout(1),
744        )
745        .await
746    {
747        Ok(response) => response,
748        Err(e) => {
749            debug!(
750                "Prune audit challenge for {} against {peer} failed: {e}",
751                hex::encode(key)
752            );
753            return None;
754        }
755    };
756
757    let decoded = match ReplicationMessage::decode(&response.data) {
758        Ok(msg) => msg,
759        Err(e) => {
760            warn!("Failed to decode prune audit response from {peer}: {e}");
761            return None;
762        }
763    };
764
765    Some(decoded)
766}
767
768fn prune_audit_response_status(
769    decoded: ReplicationMessage,
770    challenge_id: u64,
771    peer: &PeerId,
772    key: &XorName,
773    nonce: &[u8; 32],
774    local_bytes: &[u8],
775) -> PruneAuditStatus {
776    match decoded.body {
777        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
778            challenge_id: resp_id,
779            digests,
780        }) => {
781            if resp_id != challenge_id {
782                warn!("Prune audit challenge ID mismatch from {peer}");
783                return PruneAuditStatus::Failed;
784            }
785            if digests.len() != 1 {
786                warn!(
787                    "Prune audit response from {peer} returned {} digests for one challenged key",
788                    digests.len(),
789                );
790                return PruneAuditStatus::Failed;
791            }
792
793            if audit_digest_proves_key(peer, key, nonce, local_bytes, &digests[0]) {
794                PruneAuditStatus::Proven
795            } else {
796                warn!(
797                    "Prune audit proof from {peer} failed for {}",
798                    hex::encode(key)
799                );
800                PruneAuditStatus::Failed
801            }
802        }
803        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
804            challenge_id: resp_id,
805        }) => {
806            if resp_id == challenge_id {
807                warn!(
808                    "Prune audit proof for {} blocked by bootstrap claim from {peer}",
809                    hex::encode(key)
810                );
811                PruneAuditStatus::Bootstrapping
812            } else {
813                warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
814                PruneAuditStatus::Failed
815            }
816        }
817        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
818            challenge_id: resp_id,
819            reason,
820        }) => {
821            if resp_id == challenge_id {
822                warn!(
823                    "Prune audit proof for {} rejected by {peer}: {reason}",
824                    hex::encode(key)
825                );
826            } else {
827                warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
828            }
829            PruneAuditStatus::Failed
830        }
831        _ => {
832            warn!("Unexpected prune audit response type from {peer}");
833            PruneAuditStatus::Failed
834        }
835    }
836}
837
838async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
839    match storage.get_raw(key).await {
840        Ok(Some(bytes)) => Some(bytes),
841        Ok(None) => {
842            debug!(
843                "Cannot prune-audit {}: local record disappeared",
844                hex::encode(key)
845            );
846            None
847        }
848        Err(e) => {
849            warn!(
850                "Cannot prune-audit {}: failed to read local record: {e}",
851                hex::encode(key)
852            );
853            None
854        }
855    }
856}
857
858fn audit_digest_proves_key(
859    peer: &PeerId,
860    key: &XorName,
861    nonce: &[u8; 32],
862    local_bytes: &[u8],
863    digest: &[u8; 32],
864) -> bool {
865    if *digest == ABSENT_KEY_DIGEST {
866        return false;
867    }
868    let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
869    *digest == expected
870}
871
872async fn report_prune_audit_failure_once(
873    peer: &PeerId,
874    key: &XorName,
875    p2p_node: &Arc<P2PNode>,
876    config: &ReplicationConfig,
877    report_state: &PruneAuditReportState,
878) -> bool {
879    let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
880        && reserve_prune_audit_failure_report(report_state, peer).await;
881    if !should_report {
882        return false;
883    }
884
885    p2p_node
886        .report_trust_event(
887            peer,
888            saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
889        )
890        .await;
891    true
892}
893
894async fn reserve_prune_audit_failure_report(
895    report_state: &PruneAuditReportState,
896    peer: &PeerId,
897) -> bool {
898    report_state.audit_failures.write().await.insert(*peer)
899}
900
901async fn reserve_prune_bootstrap_abuse_report(
902    report_state: &PruneAuditReportState,
903    peer: &PeerId,
904) -> bool {
905    report_state.bootstrap_abuse.write().await.insert(*peer)
906}
907
908async fn report_prune_bootstrap_claim(
909    peer: &PeerId,
910    key: &XorName,
911    p2p_node: &Arc<P2PNode>,
912    config: &ReplicationConfig,
913    sync_state: &Arc<RwLock<NeighborSyncState>>,
914    report_state: &PruneAuditReportState,
915) {
916    if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
917        return;
918    }
919
920    let observation = {
921        let now = Instant::now();
922        let mut state = sync_state.write().await;
923        (
924            now,
925            state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
926        )
927    };
928
929    let (now, observation) = observation;
930    match observation {
931        BootstrapClaimObservation::WithinGrace { .. } => {
932            debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
933            return;
934        }
935        BootstrapClaimObservation::PastGrace { first_seen } => {
936            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
937                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
938                return;
939            }
940            warn!(
941                "Prune audit: peer {peer} claiming bootstrap past grace period \
942                 ({:?} > {:?}), reporting abuse",
943                now.duration_since(first_seen),
944                config.bootstrap_claim_grace_period,
945            );
946        }
947        BootstrapClaimObservation::Repeated { first_seen } => {
948            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
949                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
950                return;
951            }
952            warn!(
953                "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
954                 first claim was {:?} ago, reporting abuse",
955                now.duration_since(first_seen),
956            );
957        }
958    }
959
960    p2p_node
961        .report_trust_event(
962            peer,
963            saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
964        )
965        .await;
966}
967
968async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
969    let removed = {
970        let mut state = sync_state.write().await;
971        state.clear_active_bootstrap_claim(peer)
972    };
973    if removed {
974        debug!("Prune audit: cleared active bootstrap claim for {peer}");
975    }
976}
977
978async fn peer_is_currently_responsible(
979    peer: &PeerId,
980    key: &XorName,
981    p2p_node: &Arc<P2PNode>,
982    config: &ReplicationConfig,
983) -> bool {
984    let closest = p2p_node
985        .dht_manager()
986        .find_closest_nodes_local_with_self(key, config.close_group_size)
987        .await;
988    closest.iter().any(|node| node.peer_id == *peer)
989}
990
991#[cfg(test)]
992#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
993mod tests {
994    use super::*;
995
996    fn peer_id_from_byte(b: u8) -> PeerId {
997        let mut bytes = [0u8; 32];
998        bytes[0] = b;
999        PeerId::from_bytes(bytes)
1000    }
1001
1002    fn key_from_byte(b: u8) -> XorName {
1003        [b; 32]
1004    }
1005
1006    fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
1007        RecordPruneCandidate { key, target_peers }
1008    }
1009
1010    #[test]
1011    fn prune_audit_challenges_are_one_per_candidate_peer() {
1012        let peer_a = peer_id_from_byte(1);
1013        let peer_b = peer_id_from_byte(2);
1014        let key_a = key_from_byte(0xA);
1015        let key_b = key_from_byte(0xB);
1016        let candidates = vec![
1017            candidate(key_a, vec![peer_a, peer_b]),
1018            candidate(key_b, vec![peer_b]),
1019        ];
1020
1021        let mut challenges = build_peer_audit_challenges(&candidates);
1022        challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1023
1024        let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
1025        expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1026        assert_eq!(challenges, expected);
1027    }
1028
1029    #[test]
1030    fn confirmed_keys_require_all_target_peers_present() {
1031        let peer_a = peer_id_from_byte(1);
1032        let peer_b = peer_id_from_byte(2);
1033        let key = key_from_byte(0xC);
1034        let candidates = vec![candidate(key, vec![peer_a, peer_b])];
1035        let mut present_by_key = HashMap::new();
1036        present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
1037
1038        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key);
1039
1040        assert_eq!(confirmed, vec![key]);
1041    }
1042
1043    #[test]
1044    fn confirmed_keys_defer_absent_or_missing_peer_evidence() {
1045        let peer_a = peer_id_from_byte(1);
1046        let peer_b = peer_id_from_byte(2);
1047        let complete_key = key_from_byte(0xD);
1048        let absent_key = key_from_byte(0xE);
1049        let missing_key = key_from_byte(0xF);
1050        let candidates = vec![
1051            candidate(complete_key, vec![peer_a, peer_b]),
1052            candidate(absent_key, vec![peer_a, peer_b]),
1053            candidate(missing_key, vec![peer_a, peer_b]),
1054        ];
1055        let mut present_by_key = HashMap::new();
1056        present_by_key.insert(complete_key, HashSet::from([peer_a, peer_b]));
1057        present_by_key.insert(absent_key, HashSet::from([peer_a]));
1058
1059        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key);
1060
1061        assert_eq!(confirmed, vec![complete_key]);
1062    }
1063
1064    #[test]
1065    fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
1066        let peer = peer_id_from_byte(1);
1067        let other_peer = peer_id_from_byte(2);
1068        let key = key_from_byte(0x11);
1069        let other_key = key_from_byte(0x12);
1070        let nonce = [0xAA; 32];
1071        let other_nonce = [0xBB; 32];
1072        let bytes = b"record bytes";
1073        let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
1074
1075        assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
1076        assert!(!audit_digest_proves_key(
1077            &other_peer,
1078            &key,
1079            &nonce,
1080            bytes,
1081            &digest
1082        ));
1083        assert!(!audit_digest_proves_key(
1084            &peer, &other_key, &nonce, bytes, &digest
1085        ));
1086        assert!(!audit_digest_proves_key(
1087            &peer,
1088            &key,
1089            &other_nonce,
1090            bytes,
1091            &digest
1092        ));
1093        assert!(!audit_digest_proves_key(
1094            &peer,
1095            &key,
1096            &nonce,
1097            b"different bytes",
1098            &digest
1099        ));
1100        assert!(!audit_digest_proves_key(
1101            &peer,
1102            &key,
1103            &nonce,
1104            bytes,
1105            &ABSENT_KEY_DIGEST
1106        ));
1107    }
1108
1109    #[tokio::test]
1110    async fn prune_cursor_advances_past_selected_budget_window() {
1111        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1112        state.write().await.prune_cursor = 2;
1113
1114        let start = prune_scan_start(&state, 10).await;
1115        advance_prune_cursor(&state, 10, start, Some(3)).await;
1116
1117        assert_eq!(state.read().await.prune_cursor, 6);
1118    }
1119
1120    #[tokio::test]
1121    async fn prune_cursor_advances_even_when_no_candidate_selected() {
1122        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1123        state.write().await.prune_cursor = 9;
1124
1125        let start = prune_scan_start(&state, 10).await;
1126        advance_prune_cursor(&state, 10, start, None).await;
1127
1128        assert_eq!(state.read().await.prune_cursor, 0);
1129    }
1130
1131    #[tokio::test]
1132    async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1133        let peer = peer_id_from_byte(1);
1134        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1135        let first_seen = Instant::now();
1136        state
1137            .write()
1138            .await
1139            .bootstrap_claims
1140            .insert(peer, first_seen);
1141        state
1142            .write()
1143            .await
1144            .bootstrap_claim_history
1145            .insert(peer, first_seen);
1146
1147        clear_prune_bootstrap_claim(&peer, &state).await;
1148
1149        let state = state.read().await;
1150        assert!(!state.bootstrap_claims.contains_key(&peer));
1151        assert!(state.bootstrap_claim_history.contains_key(&peer));
1152    }
1153
1154    #[test]
1155    fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1156        assert!(prune_audit_response_clears_bootstrap_claim(
1157            PruneAuditStatus::Proven
1158        ));
1159        assert!(prune_audit_response_clears_bootstrap_claim(
1160            PruneAuditStatus::Failed
1161        ));
1162        assert!(!prune_audit_response_clears_bootstrap_claim(
1163            PruneAuditStatus::Bootstrapping
1164        ));
1165    }
1166
1167    #[tokio::test]
1168    async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1169        let peer = peer_id_from_byte(1);
1170        let other_peer = peer_id_from_byte(2);
1171        let report_state = PruneAuditReportState::default();
1172
1173        assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1174        assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1175        assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1176
1177        let reported = report_state.audit_failures.read().await;
1178        assert_eq!(reported.len(), 2);
1179        assert!(reported.contains(&peer));
1180        assert!(reported.contains(&other_peer));
1181    }
1182
1183    #[tokio::test]
1184    async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1185        let peer = peer_id_from_byte(1);
1186        let other_peer = peer_id_from_byte(2);
1187        let report_state = PruneAuditReportState::default();
1188
1189        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1190        assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1191        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1192
1193        let reported = report_state.bootstrap_abuse.read().await;
1194        assert_eq!(reported.len(), 2);
1195        assert!(reported.contains(&peer));
1196        assert!(reported.contains(&other_peer));
1197    }
1198}