Skip to main content

ant_node/replication/
pruning.rs

1//! Post-cycle responsibility pruning (Section 11).
2//!
3//! On `NeighborSyncCycleComplete`: prune stored records and `PaidForList`
4//! entries that have been continuously out of range for at least
5//! `PRUNE_HYSTERESIS_DURATION`.
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::config::{
21    ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS,
22    REPLICATION_PROTOCOL_ID,
23};
24use crate::replication::paid_list::PaidList;
25use crate::replication::protocol::{
26    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
27    ReplicationMessageBody, ABSENT_KEY_DIGEST,
28};
29use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
30use crate::storage::LmdbStorage;
31
32use super::REPLICATION_TRUST_WEIGHT;
33
34const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
35
36// ---------------------------------------------------------------------------
37// Result type
38// ---------------------------------------------------------------------------
39
40/// Summary of a prune pass.
41#[derive(Debug, Default)]
42pub struct PruneResult {
43    /// Number of records deleted from storage.
44    pub records_pruned: usize,
45    /// Number of records with out-of-range timestamp newly set.
46    pub records_marked_out_of_range: usize,
47    /// Number of records with out-of-range timestamp cleared (back in range).
48    pub records_cleared: usize,
49    /// Number of `PaidForList` entries removed.
50    pub paid_entries_pruned: usize,
51    /// Number of `PaidForList` entries with out-of-range timestamp newly set.
52    pub paid_entries_marked: usize,
53    /// Number of `PaidForList` entries cleared (back in range).
54    pub paid_entries_cleared: usize,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58enum PruneAuditStatus {
59    Proven,
60    Failed,
61    Bootstrapping,
62}
63
64#[derive(Debug, Default)]
65struct RecordPruneStats {
66    marked: usize,
67    cleared: usize,
68    pruned: usize,
69}
70
71#[derive(Debug, Default)]
72struct PaidPruneStats {
73    marked: usize,
74    cleared: usize,
75    pruned: usize,
76}
77
78#[derive(Debug, Clone)]
79struct RecordPruneCandidate {
80    key: XorName,
81    target_peers: Vec<PeerId>,
82}
83
84#[derive(Default)]
85struct PruneAuditReportState {
86    audit_failures: RwLock<HashSet<PeerId>>,
87    bootstrap_abuse: RwLock<HashSet<PeerId>>,
88}
89
90// ---------------------------------------------------------------------------
91// Prune pass
92// ---------------------------------------------------------------------------
93
94/// Execute post-cycle responsibility pruning.
95///
96/// For each stored record K:
97/// - If `IsResponsible(self, K)`: clear `RecordOutOfRangeFirstSeen`.
98/// - If not responsible: set timestamp if not already set; delete if the
99///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and the current
100///   close group proves it stores the record.
101///
102/// For each `PaidForList` entry K:
103/// - If self is in `PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen`.
104/// - If not in group: set timestamp if not already set; remove entry if the
105///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old.
106pub async fn run_prune_pass(
107    self_id: &PeerId,
108    storage: &Arc<LmdbStorage>,
109    paid_list: &Arc<PaidList>,
110    p2p_node: &Arc<P2PNode>,
111    config: &ReplicationConfig,
112    sync_state: &Arc<RwLock<NeighborSyncState>>,
113    allow_remote_prune_audits: bool,
114) -> PruneResult {
115    let (stored_count, record_stats) = prune_stored_records(
116        self_id,
117        storage,
118        paid_list,
119        p2p_node,
120        config,
121        sync_state,
122        allow_remote_prune_audits,
123    )
124    .await;
125    let now = Instant::now();
126    let (paid_count, paid_stats) =
127        prune_paid_entries(self_id, paid_list, p2p_node, config, now).await;
128
129    let result = PruneResult {
130        records_pruned: record_stats.pruned,
131        records_marked_out_of_range: record_stats.marked,
132        records_cleared: record_stats.cleared,
133        paid_entries_pruned: paid_stats.pruned,
134        paid_entries_marked: paid_stats.marked,
135        paid_entries_cleared: paid_stats.cleared,
136    };
137
138    info!(
139        "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
140        result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
141    );
142
143    result
144}
145
146async fn prune_stored_records(
147    self_id: &PeerId,
148    storage: &Arc<LmdbStorage>,
149    paid_list: &Arc<PaidList>,
150    p2p_node: &Arc<P2PNode>,
151    config: &ReplicationConfig,
152    sync_state: &Arc<RwLock<NeighborSyncState>>,
153    allow_remote_prune_audits: bool,
154) -> (usize, RecordPruneStats) {
155    let stored_keys = match storage.all_keys().await {
156        Ok(keys) => keys,
157        Err(e) => {
158            warn!("Failed to read stored keys for pruning: {e}");
159            return (0, RecordPruneStats::default());
160        }
161    };
162
163    let now = Instant::now();
164    let dht = p2p_node.dht_manager();
165    let mut stats = RecordPruneStats::default();
166    let mut candidates = Vec::new();
167    let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
168    let mut budget_deferred = 0usize;
169    let mut bootstrap_deferred = 0usize;
170    let scan_start = prune_scan_start(sync_state, stored_keys.len()).await;
171    let mut last_selected_offset = None;
172
173    for offset in 0..stored_keys.len() {
174        let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
175        let closest: Vec<DHTNode> = dht
176            .find_closest_nodes_local_with_self(key, config.close_group_size)
177            .await;
178        let is_responsible = closest.iter().any(|n| n.peer_id == *self_id);
179
180        if is_responsible {
181            if paid_list.record_out_of_range_since(key).is_some() {
182                paid_list.clear_record_out_of_range(key);
183                stats.cleared += 1;
184            }
185        } else {
186            if paid_list.record_out_of_range_since(key).is_none() {
187                stats.marked += 1;
188            }
189            paid_list.set_record_out_of_range(key);
190
191            if let Some(first_seen) = paid_list.record_out_of_range_since(key) {
192                let elapsed = now
193                    .checked_duration_since(first_seen)
194                    .unwrap_or(Duration::ZERO);
195                if elapsed >= config.prune_hysteresis_duration {
196                    if !allow_remote_prune_audits {
197                        bootstrap_deferred = bootstrap_deferred.saturating_add(1);
198                        continue;
199                    }
200                    let target_peers = remote_close_group_peers(&closest, self_id);
201                    if target_peers.is_empty() {
202                        warn!(
203                            "Cannot prune {}: current close group has no remote peers",
204                            hex::encode(key)
205                        );
206                        continue;
207                    }
208                    if target_peers.len() > audit_challenge_budget {
209                        budget_deferred = budget_deferred.saturating_add(1);
210                        continue;
211                    }
212                    audit_challenge_budget -= target_peers.len();
213                    last_selected_offset = Some(offset);
214                    candidates.push(RecordPruneCandidate {
215                        key: *key,
216                        target_peers,
217                    });
218                }
219            }
220        }
221    }
222
223    advance_prune_cursor(
224        sync_state,
225        stored_keys.len(),
226        scan_start,
227        last_selected_offset,
228    )
229    .await;
230
231    if bootstrap_deferred > 0 {
232        debug!(
233            "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
234             remote prune-confirmation audits"
235        );
236    }
237
238    if budget_deferred > 0 {
239        debug!(
240            "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
241             ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
242        );
243    }
244
245    let present_by_key =
246        collect_record_prune_proofs(&candidates, storage, p2p_node, config, sync_state).await;
247    let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
248        &candidates,
249        &present_by_key,
250        self_id,
251        paid_list,
252        p2p_node,
253        config,
254    )
255    .await;
256    stats.cleared += revalidated_cleared;
257    stats.pruned = delete_stored_records(&keys_to_delete, storage, paid_list).await;
258
259    (stored_keys.len(), stats)
260}
261
262async fn prune_paid_entries(
263    self_id: &PeerId,
264    paid_list: &Arc<PaidList>,
265    p2p_node: &Arc<P2PNode>,
266    config: &ReplicationConfig,
267    now: Instant,
268) -> (usize, PaidPruneStats) {
269    let paid_keys = match paid_list.all_keys() {
270        Ok(keys) => keys,
271        Err(e) => {
272            warn!("Failed to read PaidForList for pruning: {e}");
273            return (0, PaidPruneStats::default());
274        }
275    };
276
277    let dht = p2p_node.dht_manager();
278    let mut stats = PaidPruneStats::default();
279    let mut paid_keys_to_delete = Vec::new();
280
281    for key in &paid_keys {
282        let closest: Vec<DHTNode> = dht
283            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
284            .await;
285        let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
286
287        if in_paid_group {
288            if paid_list.paid_out_of_range_since(key).is_some() {
289                paid_list.clear_paid_out_of_range(key);
290                stats.cleared += 1;
291            }
292        } else {
293            if paid_list.paid_out_of_range_since(key).is_none() {
294                stats.marked += 1;
295            }
296            paid_list.set_paid_out_of_range(key);
297
298            if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
299                let elapsed = now
300                    .checked_duration_since(first_seen)
301                    .unwrap_or(Duration::ZERO);
302                if elapsed >= config.prune_hysteresis_duration {
303                    paid_keys_to_delete.push(*key);
304                }
305            }
306        }
307    }
308
309    if !paid_keys_to_delete.is_empty() {
310        match paid_list.remove_batch(&paid_keys_to_delete).await {
311            Ok(count) => {
312                stats.pruned = count;
313                debug!("Pruned {count} out-of-range PaidForList entries");
314            }
315            Err(e) => {
316                warn!("Failed to prune PaidForList entries: {e}");
317            }
318        }
319    }
320
321    (paid_keys.len(), stats)
322}
323
324fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
325    close_group
326        .iter()
327        .filter(|node| node.peer_id != *self_id)
328        .map(|node| node.peer_id)
329        .collect()
330}
331
332async fn prune_scan_start(
333    sync_state: &Arc<RwLock<NeighborSyncState>>,
334    stored_key_count: usize,
335) -> usize {
336    if stored_key_count == 0 {
337        return 0;
338    }
339    sync_state.read().await.prune_cursor % stored_key_count
340}
341
342async fn advance_prune_cursor(
343    sync_state: &Arc<RwLock<NeighborSyncState>>,
344    stored_key_count: usize,
345    scan_start: usize,
346    last_selected_offset: Option<usize>,
347) {
348    if stored_key_count == 0 {
349        sync_state.write().await.prune_cursor = 0;
350        return;
351    }
352
353    let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
354    sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
355}
356
357async fn delete_stored_records(
358    keys_to_delete: &[XorName],
359    storage: &Arc<LmdbStorage>,
360    paid_list: &Arc<PaidList>,
361) -> usize {
362    let mut pruned = 0;
363
364    for key in keys_to_delete {
365        if let Err(e) = storage.delete(key).await {
366            warn!("Failed to prune record {}: {e}", hex::encode(key));
367        } else {
368            pruned += 1;
369            paid_list.clear_record_out_of_range(key);
370            // Seed the PaidForList out-of-range timer so the second pass can
371            // prune the entry sooner, closing the re-admission window between
372            // the storage delete and the PaidForList prune pass.
373            paid_list.set_paid_out_of_range(key);
374            debug!("Pruned out-of-range record {}", hex::encode(key));
375        }
376    }
377
378    pruned
379}
380
381/// Collect positive presence reports for prune candidates.
382///
383/// Peers that fail to prove storage block pruning for their keys. The
384/// retained local record continues to participate in normal neighbor-sync
385/// repair because replica hint construction walks all locally stored keys,
386/// including out-of-range keys retained by hysteresis.
387async fn collect_record_prune_proofs(
388    candidates: &[RecordPruneCandidate],
389    storage: &Arc<LmdbStorage>,
390    p2p_node: &Arc<P2PNode>,
391    config: &ReplicationConfig,
392    sync_state: &Arc<RwLock<NeighborSyncState>>,
393) -> HashMap<XorName, HashSet<PeerId>> {
394    if candidates.is_empty() {
395        return HashMap::new();
396    }
397
398    let report_state = PruneAuditReportState::default();
399    let mut requests = stream::iter(build_peer_audit_challenges(candidates))
400        .map(|(peer, key)| {
401            peer_proves_record(
402                peer,
403                key,
404                storage,
405                p2p_node,
406                config,
407                sync_state,
408                &report_state,
409            )
410        })
411        .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
412
413    let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
414    while let Some(proof) = requests.next().await {
415        if let Some((peer, key)) = proof {
416            present_by_key.entry(key).or_default().insert(peer);
417        }
418    }
419
420    present_by_key
421}
422
423async fn revalidated_record_prune_keys(
424    candidates: &[RecordPruneCandidate],
425    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
426    self_id: &PeerId,
427    paid_list: &Arc<PaidList>,
428    p2p_node: &Arc<P2PNode>,
429    config: &ReplicationConfig,
430) -> (Vec<XorName>, usize) {
431    let dht = p2p_node.dht_manager();
432    let mut keys_to_delete = Vec::new();
433    let mut cleared = 0;
434    let now = Instant::now();
435
436    for candidate in candidates {
437        let closest: Vec<DHTNode> = dht
438            .find_closest_nodes_local_with_self(&candidate.key, config.close_group_size)
439            .await;
440
441        if closest.iter().any(|n| n.peer_id == *self_id) {
442            if paid_list
443                .record_out_of_range_since(&candidate.key)
444                .is_some()
445            {
446                paid_list.clear_record_out_of_range(&candidate.key);
447                cleared += 1;
448            }
449            continue;
450        }
451
452        let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
453            continue;
454        };
455        let elapsed = now
456            .checked_duration_since(first_seen)
457            .unwrap_or(Duration::ZERO);
458        if elapsed < config.prune_hysteresis_duration {
459            continue;
460        }
461
462        let current_target_peers = remote_close_group_peers(&closest, self_id);
463        if current_target_peers.is_empty() {
464            warn!(
465                "Cannot prune {}: current close group has no remote peers",
466                hex::encode(candidate.key)
467            );
468            continue;
469        }
470
471        if target_peers_reported_present(&candidate.key, &current_target_peers, present_by_key) {
472            keys_to_delete.push(candidate.key);
473        } else {
474            debug!(
475                "Deferring prune for {} until current close group reports it",
476                hex::encode(candidate.key)
477            );
478        }
479    }
480
481    (keys_to_delete, cleared)
482}
483
484fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
485    let mut challenges = Vec::new();
486    for candidate in candidates {
487        for peer in &candidate.target_peers {
488            challenges.push((*peer, candidate.key));
489        }
490    }
491    challenges
492}
493
494#[cfg(test)]
495fn confirmed_keys_from_presence(
496    candidates: &[RecordPruneCandidate],
497    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
498) -> Vec<XorName> {
499    candidates
500        .iter()
501        .filter(|candidate| {
502            target_peers_reported_present(&candidate.key, &candidate.target_peers, present_by_key)
503        })
504        .map(|candidate| candidate.key)
505        .collect()
506}
507
508fn target_peers_reported_present(
509    key: &XorName,
510    target_peers: &[PeerId],
511    present_by_key: &HashMap<XorName, HashSet<PeerId>>,
512) -> bool {
513    let Some(present_peers) = present_by_key.get(key) else {
514        return false;
515    };
516    target_peers.iter().all(|peer| present_peers.contains(peer))
517}
518
519/// Challenge a peer to prove it holds the exact record bytes for `key`.
520/// `None` means the peer failed to provide usable proof.
521async fn peer_proves_record(
522    peer: PeerId,
523    key: XorName,
524    storage: &Arc<LmdbStorage>,
525    p2p_node: &Arc<P2PNode>,
526    config: &ReplicationConfig,
527    sync_state: &Arc<RwLock<NeighborSyncState>>,
528    report_state: &PruneAuditReportState,
529) -> Option<(PeerId, XorName)> {
530    let local_bytes = local_record_bytes(&key, storage).await?;
531
532    let (challenge_id, nonce) = {
533        let mut rng = rand::thread_rng();
534        (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
535    };
536    let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
537    let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
538    else {
539        // No decoded response means we did not observe the peer stop claiming
540        // bootstrap status. Preserve any active claim so a later claim is not
541        // misclassified as repeated abuse.
542        report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
543        return None;
544    };
545
546    let status =
547        prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
548    if prune_audit_response_clears_bootstrap_claim(status) {
549        clear_prune_bootstrap_claim(&peer, sync_state).await;
550    }
551
552    match status {
553        PruneAuditStatus::Proven => Some((peer, key)),
554        PruneAuditStatus::Bootstrapping => {
555            report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
556                .await;
557            None
558        }
559        PruneAuditStatus::Failed => {
560            report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
561            None
562        }
563    }
564}
565
566fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
567    matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
568}
569
570fn encode_prune_audit_challenge(
571    peer: &PeerId,
572    key: XorName,
573    challenge_id: u64,
574    nonce: [u8; 32],
575) -> Option<Vec<u8>> {
576    let challenge = AuditChallenge {
577        challenge_id,
578        nonce,
579        challenged_peer_id: *peer.as_bytes(),
580        keys: vec![key],
581    };
582    let msg = ReplicationMessage {
583        request_id: challenge_id,
584        body: ReplicationMessageBody::AuditChallenge(challenge),
585    };
586    let encoded = match msg.encode() {
587        Ok(data) => data,
588        Err(e) => {
589            warn!(
590                "Failed to encode prune audit challenge for {} against {peer}: {e}",
591                hex::encode(key),
592            );
593            return None;
594        }
595    };
596    Some(encoded)
597}
598
599async fn send_prune_audit_challenge(
600    peer: &PeerId,
601    key: &XorName,
602    encoded: Vec<u8>,
603    p2p_node: &Arc<P2PNode>,
604    config: &ReplicationConfig,
605) -> Option<ReplicationMessage> {
606    let response = match p2p_node
607        .send_request(
608            peer,
609            REPLICATION_PROTOCOL_ID,
610            encoded,
611            config.audit_response_timeout(1),
612        )
613        .await
614    {
615        Ok(response) => response,
616        Err(e) => {
617            debug!(
618                "Prune audit challenge for {} against {peer} failed: {e}",
619                hex::encode(key)
620            );
621            return None;
622        }
623    };
624
625    let decoded = match ReplicationMessage::decode(&response.data) {
626        Ok(msg) => msg,
627        Err(e) => {
628            warn!("Failed to decode prune audit response from {peer}: {e}");
629            return None;
630        }
631    };
632
633    Some(decoded)
634}
635
636fn prune_audit_response_status(
637    decoded: ReplicationMessage,
638    challenge_id: u64,
639    peer: &PeerId,
640    key: &XorName,
641    nonce: &[u8; 32],
642    local_bytes: &[u8],
643) -> PruneAuditStatus {
644    match decoded.body {
645        ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
646            challenge_id: resp_id,
647            digests,
648        }) => {
649            if resp_id != challenge_id {
650                warn!("Prune audit challenge ID mismatch from {peer}");
651                return PruneAuditStatus::Failed;
652            }
653            if digests.len() != 1 {
654                warn!(
655                    "Prune audit response from {peer} returned {} digests for one challenged key",
656                    digests.len(),
657                );
658                return PruneAuditStatus::Failed;
659            }
660
661            if audit_digest_proves_key(peer, key, nonce, local_bytes, &digests[0]) {
662                PruneAuditStatus::Proven
663            } else {
664                warn!(
665                    "Prune audit proof from {peer} failed for {}",
666                    hex::encode(key)
667                );
668                PruneAuditStatus::Failed
669            }
670        }
671        ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
672            challenge_id: resp_id,
673        }) => {
674            if resp_id == challenge_id {
675                warn!(
676                    "Prune audit proof for {} blocked by bootstrap claim from {peer}",
677                    hex::encode(key)
678                );
679                PruneAuditStatus::Bootstrapping
680            } else {
681                warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
682                PruneAuditStatus::Failed
683            }
684        }
685        ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
686            challenge_id: resp_id,
687            reason,
688        }) => {
689            if resp_id == challenge_id {
690                warn!(
691                    "Prune audit proof for {} rejected by {peer}: {reason}",
692                    hex::encode(key)
693                );
694            } else {
695                warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
696            }
697            PruneAuditStatus::Failed
698        }
699        _ => {
700            warn!("Unexpected prune audit response type from {peer}");
701            PruneAuditStatus::Failed
702        }
703    }
704}
705
706async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
707    match storage.get_raw(key).await {
708        Ok(Some(bytes)) => Some(bytes),
709        Ok(None) => {
710            debug!(
711                "Cannot prune-audit {}: local record disappeared",
712                hex::encode(key)
713            );
714            None
715        }
716        Err(e) => {
717            warn!(
718                "Cannot prune-audit {}: failed to read local record: {e}",
719                hex::encode(key)
720            );
721            None
722        }
723    }
724}
725
726fn audit_digest_proves_key(
727    peer: &PeerId,
728    key: &XorName,
729    nonce: &[u8; 32],
730    local_bytes: &[u8],
731    digest: &[u8; 32],
732) -> bool {
733    if *digest == ABSENT_KEY_DIGEST {
734        return false;
735    }
736    let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
737    *digest == expected
738}
739
740async fn report_prune_audit_failure_once(
741    peer: &PeerId,
742    key: &XorName,
743    p2p_node: &Arc<P2PNode>,
744    config: &ReplicationConfig,
745    report_state: &PruneAuditReportState,
746) -> bool {
747    let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
748        && reserve_prune_audit_failure_report(report_state, peer).await;
749    if !should_report {
750        return false;
751    }
752
753    p2p_node
754        .report_trust_event(
755            peer,
756            saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
757        )
758        .await;
759    true
760}
761
762async fn reserve_prune_audit_failure_report(
763    report_state: &PruneAuditReportState,
764    peer: &PeerId,
765) -> bool {
766    report_state.audit_failures.write().await.insert(*peer)
767}
768
769async fn reserve_prune_bootstrap_abuse_report(
770    report_state: &PruneAuditReportState,
771    peer: &PeerId,
772) -> bool {
773    report_state.bootstrap_abuse.write().await.insert(*peer)
774}
775
776async fn report_prune_bootstrap_claim(
777    peer: &PeerId,
778    key: &XorName,
779    p2p_node: &Arc<P2PNode>,
780    config: &ReplicationConfig,
781    sync_state: &Arc<RwLock<NeighborSyncState>>,
782    report_state: &PruneAuditReportState,
783) {
784    if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
785        return;
786    }
787
788    let observation = {
789        let now = Instant::now();
790        let mut state = sync_state.write().await;
791        (
792            now,
793            state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
794        )
795    };
796
797    let (now, observation) = observation;
798    match observation {
799        BootstrapClaimObservation::WithinGrace { .. } => {
800            debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
801            return;
802        }
803        BootstrapClaimObservation::PastGrace { first_seen } => {
804            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
805                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
806                return;
807            }
808            warn!(
809                "Prune audit: peer {peer} claiming bootstrap past grace period \
810                 ({:?} > {:?}), reporting abuse",
811                now.duration_since(first_seen),
812                config.bootstrap_claim_grace_period,
813            );
814        }
815        BootstrapClaimObservation::Repeated { first_seen } => {
816            if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
817                debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
818                return;
819            }
820            warn!(
821                "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
822                 first claim was {:?} ago, reporting abuse",
823                now.duration_since(first_seen),
824            );
825        }
826    }
827
828    p2p_node
829        .report_trust_event(
830            peer,
831            saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
832        )
833        .await;
834}
835
836async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
837    let removed = {
838        let mut state = sync_state.write().await;
839        state.clear_active_bootstrap_claim(peer)
840    };
841    if removed {
842        debug!("Prune audit: cleared active bootstrap claim for {peer}");
843    }
844}
845
846async fn peer_is_currently_responsible(
847    peer: &PeerId,
848    key: &XorName,
849    p2p_node: &Arc<P2PNode>,
850    config: &ReplicationConfig,
851) -> bool {
852    let closest = p2p_node
853        .dht_manager()
854        .find_closest_nodes_local_with_self(key, config.close_group_size)
855        .await;
856    closest.iter().any(|node| node.peer_id == *peer)
857}
858
859#[cfg(test)]
860#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
861mod tests {
862    use super::*;
863
864    fn peer_id_from_byte(b: u8) -> PeerId {
865        let mut bytes = [0u8; 32];
866        bytes[0] = b;
867        PeerId::from_bytes(bytes)
868    }
869
870    fn key_from_byte(b: u8) -> XorName {
871        [b; 32]
872    }
873
874    fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
875        RecordPruneCandidate { key, target_peers }
876    }
877
878    #[test]
879    fn prune_audit_challenges_are_one_per_candidate_peer() {
880        let peer_a = peer_id_from_byte(1);
881        let peer_b = peer_id_from_byte(2);
882        let key_a = key_from_byte(0xA);
883        let key_b = key_from_byte(0xB);
884        let candidates = vec![
885            candidate(key_a, vec![peer_a, peer_b]),
886            candidate(key_b, vec![peer_b]),
887        ];
888
889        let mut challenges = build_peer_audit_challenges(&candidates);
890        challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
891
892        let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
893        expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
894        assert_eq!(challenges, expected);
895    }
896
897    #[test]
898    fn confirmed_keys_require_all_target_peers_present() {
899        let peer_a = peer_id_from_byte(1);
900        let peer_b = peer_id_from_byte(2);
901        let key = key_from_byte(0xC);
902        let candidates = vec![candidate(key, vec![peer_a, peer_b])];
903        let mut present_by_key = HashMap::new();
904        present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
905
906        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key);
907
908        assert_eq!(confirmed, vec![key]);
909    }
910
911    #[test]
912    fn confirmed_keys_defer_absent_or_missing_peer_evidence() {
913        let peer_a = peer_id_from_byte(1);
914        let peer_b = peer_id_from_byte(2);
915        let complete_key = key_from_byte(0xD);
916        let absent_key = key_from_byte(0xE);
917        let missing_key = key_from_byte(0xF);
918        let candidates = vec![
919            candidate(complete_key, vec![peer_a, peer_b]),
920            candidate(absent_key, vec![peer_a, peer_b]),
921            candidate(missing_key, vec![peer_a, peer_b]),
922        ];
923        let mut present_by_key = HashMap::new();
924        present_by_key.insert(complete_key, HashSet::from([peer_a, peer_b]));
925        present_by_key.insert(absent_key, HashSet::from([peer_a]));
926
927        let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key);
928
929        assert_eq!(confirmed, vec![complete_key]);
930    }
931
932    #[test]
933    fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
934        let peer = peer_id_from_byte(1);
935        let other_peer = peer_id_from_byte(2);
936        let key = key_from_byte(0x11);
937        let other_key = key_from_byte(0x12);
938        let nonce = [0xAA; 32];
939        let other_nonce = [0xBB; 32];
940        let bytes = b"record bytes";
941        let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
942
943        assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
944        assert!(!audit_digest_proves_key(
945            &other_peer,
946            &key,
947            &nonce,
948            bytes,
949            &digest
950        ));
951        assert!(!audit_digest_proves_key(
952            &peer, &other_key, &nonce, bytes, &digest
953        ));
954        assert!(!audit_digest_proves_key(
955            &peer,
956            &key,
957            &other_nonce,
958            bytes,
959            &digest
960        ));
961        assert!(!audit_digest_proves_key(
962            &peer,
963            &key,
964            &nonce,
965            b"different bytes",
966            &digest
967        ));
968        assert!(!audit_digest_proves_key(
969            &peer,
970            &key,
971            &nonce,
972            bytes,
973            &ABSENT_KEY_DIGEST
974        ));
975    }
976
977    #[tokio::test]
978    async fn prune_cursor_advances_past_selected_budget_window() {
979        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
980        state.write().await.prune_cursor = 2;
981
982        let start = prune_scan_start(&state, 10).await;
983        advance_prune_cursor(&state, 10, start, Some(3)).await;
984
985        assert_eq!(state.read().await.prune_cursor, 6);
986    }
987
988    #[tokio::test]
989    async fn prune_cursor_advances_even_when_no_candidate_selected() {
990        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
991        state.write().await.prune_cursor = 9;
992
993        let start = prune_scan_start(&state, 10).await;
994        advance_prune_cursor(&state, 10, start, None).await;
995
996        assert_eq!(state.read().await.prune_cursor, 0);
997    }
998
999    #[tokio::test]
1000    async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1001        let peer = peer_id_from_byte(1);
1002        let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1003        let first_seen = Instant::now();
1004        state
1005            .write()
1006            .await
1007            .bootstrap_claims
1008            .insert(peer, first_seen);
1009        state
1010            .write()
1011            .await
1012            .bootstrap_claim_history
1013            .insert(peer, first_seen);
1014
1015        clear_prune_bootstrap_claim(&peer, &state).await;
1016
1017        let state = state.read().await;
1018        assert!(!state.bootstrap_claims.contains_key(&peer));
1019        assert!(state.bootstrap_claim_history.contains_key(&peer));
1020    }
1021
1022    #[test]
1023    fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1024        assert!(prune_audit_response_clears_bootstrap_claim(
1025            PruneAuditStatus::Proven
1026        ));
1027        assert!(prune_audit_response_clears_bootstrap_claim(
1028            PruneAuditStatus::Failed
1029        ));
1030        assert!(!prune_audit_response_clears_bootstrap_claim(
1031            PruneAuditStatus::Bootstrapping
1032        ));
1033    }
1034
1035    #[tokio::test]
1036    async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1037        let peer = peer_id_from_byte(1);
1038        let other_peer = peer_id_from_byte(2);
1039        let report_state = PruneAuditReportState::default();
1040
1041        assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1042        assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1043        assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1044
1045        let reported = report_state.audit_failures.read().await;
1046        assert_eq!(reported.len(), 2);
1047        assert!(reported.contains(&peer));
1048        assert!(reported.contains(&other_peer));
1049    }
1050
1051    #[tokio::test]
1052    async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1053        let peer = peer_id_from_byte(1);
1054        let other_peer = peer_id_from_byte(2);
1055        let report_state = PruneAuditReportState::default();
1056
1057        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1058        assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1059        assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1060
1061        let reported = report_state.bootstrap_abuse.read().await;
1062        assert_eq!(reported.len(), 2);
1063        assert!(reported.contains(&peer));
1064        assert!(reported.contains(&other_peer));
1065    }
1066}