1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::config::{
21 ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS,
22 REPLICATION_PROTOCOL_ID,
23};
24use crate::replication::paid_list::PaidList;
25use crate::replication::protocol::{
26 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
27 ReplicationMessageBody, ABSENT_KEY_DIGEST,
28};
29use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState, RepairProofs};
30use crate::storage::LmdbStorage;
31
32use super::REPLICATION_TRUST_WEIGHT;
33
34const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
35
36#[derive(Debug, Default)]
42pub struct PruneResult {
43 pub records_pruned: usize,
45 pub records_marked_out_of_range: usize,
47 pub records_cleared: usize,
49 pub paid_entries_pruned: usize,
51 pub paid_entries_marked: usize,
53 pub paid_entries_cleared: usize,
55}
56
57pub struct PrunePassContext<'a> {
59 pub self_id: &'a PeerId,
61 pub storage: &'a Arc<LmdbStorage>,
63 pub paid_list: &'a Arc<PaidList>,
65 pub p2p_node: &'a Arc<P2PNode>,
67 pub config: &'a ReplicationConfig,
69 pub sync_state: &'a Arc<RwLock<NeighborSyncState>>,
71 pub repair_proofs: &'a Arc<RwLock<RepairProofs>>,
73 pub current_sync_epoch: u64,
75 pub allow_remote_prune_audits: bool,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80enum PruneAuditStatus {
81 Proven,
82 Failed,
83 Bootstrapping,
84}
85
86#[derive(Debug, Default)]
87struct RecordPruneStats {
88 marked: usize,
89 cleared: usize,
90 pruned: usize,
91}
92
93#[derive(Debug, Default)]
94struct PaidPruneStats {
95 marked: usize,
96 cleared: usize,
97 pruned: usize,
98}
99
100#[derive(Debug, Clone)]
101struct RecordPruneCandidate {
102 key: XorName,
103 target_peers: Vec<PeerId>,
104}
105
106struct RecordPruneKeyOutcome {
107 marked: bool,
108 state: RecordPruneKeyState,
109}
110
111impl Default for RecordPruneKeyOutcome {
112 fn default() -> Self {
113 Self {
114 marked: false,
115 state: RecordPruneKeyState::None,
116 }
117 }
118}
119
120enum RecordPruneKeyState {
121 None,
122 Cleared,
123 BootstrapDeferred,
124 BudgetDeferred,
125 Candidate(RecordPruneCandidate),
126}
127
128#[derive(Default)]
129struct PruneAuditReportState {
130 audit_failures: RwLock<HashSet<PeerId>>,
131 bootstrap_abuse: RwLock<HashSet<PeerId>>,
132}
133
134pub async fn run_prune_pass(
158 self_id: &PeerId,
159 storage: &Arc<LmdbStorage>,
160 paid_list: &Arc<PaidList>,
161 p2p_node: &Arc<P2PNode>,
162 config: &ReplicationConfig,
163 sync_state: &Arc<RwLock<NeighborSyncState>>,
164 allow_remote_prune_audits: bool,
165) -> PruneResult {
166 let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
167 run_prune_pass_with_context(PrunePassContext {
168 self_id,
169 storage,
170 paid_list,
171 p2p_node,
172 config,
173 sync_state,
174 repair_proofs: &repair_proofs,
175 current_sync_epoch: 0,
176 allow_remote_prune_audits,
177 })
178 .await
179}
180
181pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult {
183 let (stored_count, record_stats) = prune_stored_records(&ctx).await;
184 let now = Instant::now();
185 let (paid_count, paid_stats) =
186 prune_paid_entries(ctx.self_id, ctx.paid_list, ctx.p2p_node, ctx.config, now).await;
187
188 let result = PruneResult {
189 records_pruned: record_stats.pruned,
190 records_marked_out_of_range: record_stats.marked,
191 records_cleared: record_stats.cleared,
192 paid_entries_pruned: paid_stats.pruned,
193 paid_entries_marked: paid_stats.marked,
194 paid_entries_cleared: paid_stats.cleared,
195 };
196
197 info!(
198 "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
199 result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
200 );
201
202 result
203}
204
205async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) {
206 let stored_keys = match ctx.storage.all_keys().await {
207 Ok(keys) => keys,
208 Err(e) => {
209 warn!("Failed to read stored keys for pruning: {e}");
210 return (0, RecordPruneStats::default());
211 }
212 };
213
214 let now = Instant::now();
215 let dht = ctx.p2p_node.dht_manager();
216 let mut stats = RecordPruneStats::default();
217 let mut candidates = Vec::new();
218 let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
219 let mut budget_deferred = 0usize;
220 let mut bootstrap_deferred = 0usize;
221 let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await;
222 let mut last_selected_offset = None;
223
224 for offset in 0..stored_keys.len() {
225 let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
226 let closest: Vec<DHTNode> = dht
227 .find_closest_nodes_local_with_self(key, ctx.config.close_group_size)
228 .await;
229
230 let outcome =
231 evaluate_record_prune_key(ctx, key, &closest, now, &mut audit_challenge_budget).await;
232 if outcome.marked {
233 stats.marked += 1;
234 }
235 match outcome.state {
236 RecordPruneKeyState::None => {}
237 RecordPruneKeyState::Cleared => stats.cleared += 1,
238 RecordPruneKeyState::BootstrapDeferred => {
239 bootstrap_deferred = bootstrap_deferred.saturating_add(1);
240 }
241 RecordPruneKeyState::BudgetDeferred => {
242 budget_deferred = budget_deferred.saturating_add(1);
243 }
244 RecordPruneKeyState::Candidate(candidate) => {
245 last_selected_offset = Some(offset);
246 candidates.push(candidate);
247 }
248 }
249 }
250
251 advance_prune_cursor(
252 ctx.sync_state,
253 stored_keys.len(),
254 scan_start,
255 last_selected_offset,
256 )
257 .await;
258
259 if bootstrap_deferred > 0 {
260 debug!(
261 "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
262 remote prune-confirmation audits"
263 );
264 }
265
266 if budget_deferred > 0 {
267 debug!(
268 "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
269 ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
270 );
271 }
272
273 let present_by_key = collect_record_prune_proofs(
274 &candidates,
275 ctx.storage,
276 ctx.p2p_node,
277 ctx.config,
278 ctx.sync_state,
279 )
280 .await;
281 let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
282 &candidates,
283 &present_by_key,
284 ctx.self_id,
285 ctx.paid_list,
286 ctx.p2p_node,
287 ctx.config,
288 )
289 .await;
290 stats.cleared += revalidated_cleared;
291 stats.pruned = delete_stored_records(
292 &keys_to_delete,
293 ctx.storage,
294 ctx.paid_list,
295 ctx.repair_proofs,
296 )
297 .await;
298
299 (stored_keys.len(), stats)
300}
301
302async fn evaluate_record_prune_key(
303 ctx: &PrunePassContext<'_>,
304 key: &XorName,
305 closest: &[DHTNode],
306 now: Instant,
307 audit_challenge_budget: &mut usize,
308) -> RecordPruneKeyOutcome {
309 let mut outcome = RecordPruneKeyOutcome::default();
310 let is_responsible = closest.iter().any(|node| node.peer_id == *ctx.self_id);
311
312 if is_responsible {
313 if ctx.paid_list.record_out_of_range_since(key).is_some() {
314 ctx.paid_list.clear_record_out_of_range(key);
315 outcome.state = RecordPruneKeyState::Cleared;
316 }
317 return outcome;
318 }
319
320 if ctx.paid_list.record_out_of_range_since(key).is_none() {
321 outcome.marked = true;
322 }
323 ctx.paid_list.set_record_out_of_range(key);
324
325 let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else {
326 return outcome;
327 };
328 let elapsed = now
329 .checked_duration_since(first_seen)
330 .unwrap_or(Duration::ZERO);
331 if elapsed < ctx.config.prune_hysteresis_duration {
332 return outcome;
333 }
334
335 if !ctx.allow_remote_prune_audits {
336 outcome.state = RecordPruneKeyState::BootstrapDeferred;
337 return outcome;
338 }
339
340 let target_peers = remote_close_group_peers(closest, ctx.self_id);
341 if target_peers.is_empty() {
342 warn!(
343 "Cannot prune {}: current close group has no remote peers",
344 hex::encode(key)
345 );
346 return outcome;
347 }
348
349 let current_close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
350 if !target_peers_have_mature_repair_proofs(
351 key,
352 &target_peers,
353 ¤t_close_peers,
354 ctx.repair_proofs,
355 ctx.current_sync_epoch,
356 )
357 .await
358 {
359 debug!(
360 "Deferring prune for {} until current close group has mature repair proofs",
361 hex::encode(key)
362 );
363 return outcome;
364 }
365
366 if target_peers.len() > *audit_challenge_budget {
367 outcome.state = RecordPruneKeyState::BudgetDeferred;
368 return outcome;
369 }
370
371 *audit_challenge_budget -= target_peers.len();
372 outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate {
373 key: *key,
374 target_peers,
375 });
376 outcome
377}
378
379async fn prune_paid_entries(
380 self_id: &PeerId,
381 paid_list: &Arc<PaidList>,
382 p2p_node: &Arc<P2PNode>,
383 config: &ReplicationConfig,
384 now: Instant,
385) -> (usize, PaidPruneStats) {
386 let paid_keys = match paid_list.all_keys() {
387 Ok(keys) => keys,
388 Err(e) => {
389 warn!("Failed to read PaidForList for pruning: {e}");
390 return (0, PaidPruneStats::default());
391 }
392 };
393
394 let dht = p2p_node.dht_manager();
395 let mut stats = PaidPruneStats::default();
396 let mut paid_keys_to_delete = Vec::new();
397
398 for key in &paid_keys {
399 let closest: Vec<DHTNode> = dht
400 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
401 .await;
402 let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
403
404 if in_paid_group {
405 if paid_list.paid_out_of_range_since(key).is_some() {
406 paid_list.clear_paid_out_of_range(key);
407 stats.cleared += 1;
408 }
409 } else {
410 if paid_list.paid_out_of_range_since(key).is_none() {
411 stats.marked += 1;
412 }
413 paid_list.set_paid_out_of_range(key);
414
415 if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
416 let elapsed = now
417 .checked_duration_since(first_seen)
418 .unwrap_or(Duration::ZERO);
419 if elapsed >= config.prune_hysteresis_duration {
420 paid_keys_to_delete.push(*key);
421 }
422 }
423 }
424 }
425
426 if !paid_keys_to_delete.is_empty() {
427 match paid_list.remove_batch(&paid_keys_to_delete).await {
428 Ok(count) => {
429 stats.pruned = count;
430 debug!("Pruned {count} out-of-range PaidForList entries");
431 }
432 Err(e) => {
433 warn!("Failed to prune PaidForList entries: {e}");
434 }
435 }
436 }
437
438 (paid_keys.len(), stats)
439}
440
441fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
442 close_group
443 .iter()
444 .filter(|node| node.peer_id != *self_id)
445 .map(|node| node.peer_id)
446 .collect()
447}
448
449async fn target_peers_have_mature_repair_proofs(
450 key: &XorName,
451 target_peers: &[PeerId],
452 current_close_peers: &HashSet<PeerId>,
453 repair_proofs: &Arc<RwLock<RepairProofs>>,
454 current_sync_epoch: u64,
455) -> bool {
456 let mut proofs = repair_proofs.write().await;
457 target_peers.iter().all(|peer| {
458 proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch)
459 })
460}
461
462async fn prune_scan_start(
463 sync_state: &Arc<RwLock<NeighborSyncState>>,
464 stored_key_count: usize,
465) -> usize {
466 if stored_key_count == 0 {
467 return 0;
468 }
469 sync_state.read().await.prune_cursor % stored_key_count
470}
471
472async fn advance_prune_cursor(
473 sync_state: &Arc<RwLock<NeighborSyncState>>,
474 stored_key_count: usize,
475 scan_start: usize,
476 last_selected_offset: Option<usize>,
477) {
478 if stored_key_count == 0 {
479 sync_state.write().await.prune_cursor = 0;
480 return;
481 }
482
483 let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
484 sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
485}
486
487async fn delete_stored_records(
488 keys_to_delete: &[XorName],
489 storage: &Arc<LmdbStorage>,
490 paid_list: &Arc<PaidList>,
491 repair_proofs: &Arc<RwLock<RepairProofs>>,
492) -> usize {
493 let mut pruned = 0;
494
495 for key in keys_to_delete {
496 if let Err(e) = storage.delete(key).await {
497 warn!("Failed to prune record {}: {e}", hex::encode(key));
498 } else {
499 pruned += 1;
500 paid_list.clear_record_out_of_range(key);
501 repair_proofs.write().await.remove_key(key);
502 paid_list.set_paid_out_of_range(key);
506 debug!("Pruned out-of-range record {}", hex::encode(key));
507 }
508 }
509
510 pruned
511}
512
513async fn collect_record_prune_proofs(
520 candidates: &[RecordPruneCandidate],
521 storage: &Arc<LmdbStorage>,
522 p2p_node: &Arc<P2PNode>,
523 config: &ReplicationConfig,
524 sync_state: &Arc<RwLock<NeighborSyncState>>,
525) -> HashMap<XorName, HashSet<PeerId>> {
526 if candidates.is_empty() {
527 return HashMap::new();
528 }
529
530 let report_state = PruneAuditReportState::default();
531 let mut requests = stream::iter(build_peer_audit_challenges(candidates))
532 .map(|(peer, key)| {
533 peer_proves_record(
534 peer,
535 key,
536 storage,
537 p2p_node,
538 config,
539 sync_state,
540 &report_state,
541 )
542 })
543 .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
544
545 let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
546 while let Some(proof) = requests.next().await {
547 if let Some((peer, key)) = proof {
548 present_by_key.entry(key).or_default().insert(peer);
549 }
550 }
551
552 present_by_key
553}
554
555async fn revalidated_record_prune_keys(
556 candidates: &[RecordPruneCandidate],
557 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
558 self_id: &PeerId,
559 paid_list: &Arc<PaidList>,
560 p2p_node: &Arc<P2PNode>,
561 config: &ReplicationConfig,
562) -> (Vec<XorName>, usize) {
563 let dht = p2p_node.dht_manager();
564 let mut keys_to_delete = Vec::new();
565 let mut cleared = 0;
566 let now = Instant::now();
567
568 for candidate in candidates {
569 let closest: Vec<DHTNode> = dht
570 .find_closest_nodes_local_with_self(&candidate.key, config.close_group_size)
571 .await;
572
573 if closest.iter().any(|n| n.peer_id == *self_id) {
574 if paid_list
575 .record_out_of_range_since(&candidate.key)
576 .is_some()
577 {
578 paid_list.clear_record_out_of_range(&candidate.key);
579 cleared += 1;
580 }
581 continue;
582 }
583
584 let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
585 continue;
586 };
587 let elapsed = now
588 .checked_duration_since(first_seen)
589 .unwrap_or(Duration::ZERO);
590 if elapsed < config.prune_hysteresis_duration {
591 continue;
592 }
593
594 let current_target_peers = remote_close_group_peers(&closest, self_id);
595 if current_target_peers.is_empty() {
596 warn!(
597 "Cannot prune {}: current close group has no remote peers",
598 hex::encode(candidate.key)
599 );
600 continue;
601 }
602
603 if target_peers_reported_present(&candidate.key, ¤t_target_peers, present_by_key) {
604 keys_to_delete.push(candidate.key);
605 } else {
606 debug!(
607 "Deferring prune for {} until current close group reports it",
608 hex::encode(candidate.key)
609 );
610 }
611 }
612
613 (keys_to_delete, cleared)
614}
615
616fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
617 let mut challenges = Vec::new();
618 for candidate in candidates {
619 for peer in &candidate.target_peers {
620 challenges.push((*peer, candidate.key));
621 }
622 }
623 challenges
624}
625
626#[cfg(test)]
627fn confirmed_keys_from_presence(
628 candidates: &[RecordPruneCandidate],
629 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
630) -> Vec<XorName> {
631 candidates
632 .iter()
633 .filter(|candidate| {
634 target_peers_reported_present(&candidate.key, &candidate.target_peers, present_by_key)
635 })
636 .map(|candidate| candidate.key)
637 .collect()
638}
639
640fn target_peers_reported_present(
641 key: &XorName,
642 target_peers: &[PeerId],
643 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
644) -> bool {
645 let Some(present_peers) = present_by_key.get(key) else {
646 return false;
647 };
648 target_peers.iter().all(|peer| present_peers.contains(peer))
649}
650
651async fn peer_proves_record(
654 peer: PeerId,
655 key: XorName,
656 storage: &Arc<LmdbStorage>,
657 p2p_node: &Arc<P2PNode>,
658 config: &ReplicationConfig,
659 sync_state: &Arc<RwLock<NeighborSyncState>>,
660 report_state: &PruneAuditReportState,
661) -> Option<(PeerId, XorName)> {
662 let local_bytes = local_record_bytes(&key, storage).await?;
663
664 let (challenge_id, nonce) = {
665 let mut rng = rand::thread_rng();
666 (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
667 };
668 let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
669 let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
670 else {
671 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
675 return None;
676 };
677
678 let status =
679 prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
680 if prune_audit_response_clears_bootstrap_claim(status) {
681 clear_prune_bootstrap_claim(&peer, sync_state).await;
682 }
683
684 match status {
685 PruneAuditStatus::Proven => Some((peer, key)),
686 PruneAuditStatus::Bootstrapping => {
687 report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
688 .await;
689 None
690 }
691 PruneAuditStatus::Failed => {
692 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
693 None
694 }
695 }
696}
697
698fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
699 matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
700}
701
702fn encode_prune_audit_challenge(
703 peer: &PeerId,
704 key: XorName,
705 challenge_id: u64,
706 nonce: [u8; 32],
707) -> Option<Vec<u8>> {
708 let challenge = AuditChallenge {
709 challenge_id,
710 nonce,
711 challenged_peer_id: *peer.as_bytes(),
712 keys: vec![key],
713 };
714 let msg = ReplicationMessage {
715 request_id: challenge_id,
716 body: ReplicationMessageBody::AuditChallenge(challenge),
717 };
718 let encoded = match msg.encode() {
719 Ok(data) => data,
720 Err(e) => {
721 warn!(
722 "Failed to encode prune audit challenge for {} against {peer}: {e}",
723 hex::encode(key),
724 );
725 return None;
726 }
727 };
728 Some(encoded)
729}
730
731async fn send_prune_audit_challenge(
732 peer: &PeerId,
733 key: &XorName,
734 encoded: Vec<u8>,
735 p2p_node: &Arc<P2PNode>,
736 config: &ReplicationConfig,
737) -> Option<ReplicationMessage> {
738 let response = match p2p_node
739 .send_request(
740 peer,
741 REPLICATION_PROTOCOL_ID,
742 encoded,
743 config.audit_response_timeout(1),
744 )
745 .await
746 {
747 Ok(response) => response,
748 Err(e) => {
749 debug!(
750 "Prune audit challenge for {} against {peer} failed: {e}",
751 hex::encode(key)
752 );
753 return None;
754 }
755 };
756
757 let decoded = match ReplicationMessage::decode(&response.data) {
758 Ok(msg) => msg,
759 Err(e) => {
760 warn!("Failed to decode prune audit response from {peer}: {e}");
761 return None;
762 }
763 };
764
765 Some(decoded)
766}
767
768fn prune_audit_response_status(
769 decoded: ReplicationMessage,
770 challenge_id: u64,
771 peer: &PeerId,
772 key: &XorName,
773 nonce: &[u8; 32],
774 local_bytes: &[u8],
775) -> PruneAuditStatus {
776 match decoded.body {
777 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
778 challenge_id: resp_id,
779 digests,
780 }) => {
781 if resp_id != challenge_id {
782 warn!("Prune audit challenge ID mismatch from {peer}");
783 return PruneAuditStatus::Failed;
784 }
785 if digests.len() != 1 {
786 warn!(
787 "Prune audit response from {peer} returned {} digests for one challenged key",
788 digests.len(),
789 );
790 return PruneAuditStatus::Failed;
791 }
792
793 if audit_digest_proves_key(peer, key, nonce, local_bytes, &digests[0]) {
794 PruneAuditStatus::Proven
795 } else {
796 warn!(
797 "Prune audit proof from {peer} failed for {}",
798 hex::encode(key)
799 );
800 PruneAuditStatus::Failed
801 }
802 }
803 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
804 challenge_id: resp_id,
805 }) => {
806 if resp_id == challenge_id {
807 warn!(
808 "Prune audit proof for {} blocked by bootstrap claim from {peer}",
809 hex::encode(key)
810 );
811 PruneAuditStatus::Bootstrapping
812 } else {
813 warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
814 PruneAuditStatus::Failed
815 }
816 }
817 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
818 challenge_id: resp_id,
819 reason,
820 }) => {
821 if resp_id == challenge_id {
822 warn!(
823 "Prune audit proof for {} rejected by {peer}: {reason}",
824 hex::encode(key)
825 );
826 } else {
827 warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
828 }
829 PruneAuditStatus::Failed
830 }
831 _ => {
832 warn!("Unexpected prune audit response type from {peer}");
833 PruneAuditStatus::Failed
834 }
835 }
836}
837
838async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
839 match storage.get_raw(key).await {
840 Ok(Some(bytes)) => Some(bytes),
841 Ok(None) => {
842 debug!(
843 "Cannot prune-audit {}: local record disappeared",
844 hex::encode(key)
845 );
846 None
847 }
848 Err(e) => {
849 warn!(
850 "Cannot prune-audit {}: failed to read local record: {e}",
851 hex::encode(key)
852 );
853 None
854 }
855 }
856}
857
858fn audit_digest_proves_key(
859 peer: &PeerId,
860 key: &XorName,
861 nonce: &[u8; 32],
862 local_bytes: &[u8],
863 digest: &[u8; 32],
864) -> bool {
865 if *digest == ABSENT_KEY_DIGEST {
866 return false;
867 }
868 let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
869 *digest == expected
870}
871
872async fn report_prune_audit_failure_once(
873 peer: &PeerId,
874 key: &XorName,
875 p2p_node: &Arc<P2PNode>,
876 config: &ReplicationConfig,
877 report_state: &PruneAuditReportState,
878) -> bool {
879 let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
880 && reserve_prune_audit_failure_report(report_state, peer).await;
881 if !should_report {
882 return false;
883 }
884
885 p2p_node
886 .report_trust_event(
887 peer,
888 saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
889 )
890 .await;
891 true
892}
893
894async fn reserve_prune_audit_failure_report(
895 report_state: &PruneAuditReportState,
896 peer: &PeerId,
897) -> bool {
898 report_state.audit_failures.write().await.insert(*peer)
899}
900
901async fn reserve_prune_bootstrap_abuse_report(
902 report_state: &PruneAuditReportState,
903 peer: &PeerId,
904) -> bool {
905 report_state.bootstrap_abuse.write().await.insert(*peer)
906}
907
908async fn report_prune_bootstrap_claim(
909 peer: &PeerId,
910 key: &XorName,
911 p2p_node: &Arc<P2PNode>,
912 config: &ReplicationConfig,
913 sync_state: &Arc<RwLock<NeighborSyncState>>,
914 report_state: &PruneAuditReportState,
915) {
916 if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
917 return;
918 }
919
920 let observation = {
921 let now = Instant::now();
922 let mut state = sync_state.write().await;
923 (
924 now,
925 state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
926 )
927 };
928
929 let (now, observation) = observation;
930 match observation {
931 BootstrapClaimObservation::WithinGrace { .. } => {
932 debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
933 return;
934 }
935 BootstrapClaimObservation::PastGrace { first_seen } => {
936 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
937 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
938 return;
939 }
940 warn!(
941 "Prune audit: peer {peer} claiming bootstrap past grace period \
942 ({:?} > {:?}), reporting abuse",
943 now.duration_since(first_seen),
944 config.bootstrap_claim_grace_period,
945 );
946 }
947 BootstrapClaimObservation::Repeated { first_seen } => {
948 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
949 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
950 return;
951 }
952 warn!(
953 "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
954 first claim was {:?} ago, reporting abuse",
955 now.duration_since(first_seen),
956 );
957 }
958 }
959
960 p2p_node
961 .report_trust_event(
962 peer,
963 saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
964 )
965 .await;
966}
967
968async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
969 let removed = {
970 let mut state = sync_state.write().await;
971 state.clear_active_bootstrap_claim(peer)
972 };
973 if removed {
974 debug!("Prune audit: cleared active bootstrap claim for {peer}");
975 }
976}
977
978async fn peer_is_currently_responsible(
979 peer: &PeerId,
980 key: &XorName,
981 p2p_node: &Arc<P2PNode>,
982 config: &ReplicationConfig,
983) -> bool {
984 let closest = p2p_node
985 .dht_manager()
986 .find_closest_nodes_local_with_self(key, config.close_group_size)
987 .await;
988 closest.iter().any(|node| node.peer_id == *peer)
989}
990
991#[cfg(test)]
992#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
993mod tests {
994 use super::*;
995
996 fn peer_id_from_byte(b: u8) -> PeerId {
997 let mut bytes = [0u8; 32];
998 bytes[0] = b;
999 PeerId::from_bytes(bytes)
1000 }
1001
1002 fn key_from_byte(b: u8) -> XorName {
1003 [b; 32]
1004 }
1005
1006 fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
1007 RecordPruneCandidate { key, target_peers }
1008 }
1009
1010 #[test]
1011 fn prune_audit_challenges_are_one_per_candidate_peer() {
1012 let peer_a = peer_id_from_byte(1);
1013 let peer_b = peer_id_from_byte(2);
1014 let key_a = key_from_byte(0xA);
1015 let key_b = key_from_byte(0xB);
1016 let candidates = vec![
1017 candidate(key_a, vec![peer_a, peer_b]),
1018 candidate(key_b, vec![peer_b]),
1019 ];
1020
1021 let mut challenges = build_peer_audit_challenges(&candidates);
1022 challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1023
1024 let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
1025 expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
1026 assert_eq!(challenges, expected);
1027 }
1028
1029 #[test]
1030 fn confirmed_keys_require_all_target_peers_present() {
1031 let peer_a = peer_id_from_byte(1);
1032 let peer_b = peer_id_from_byte(2);
1033 let key = key_from_byte(0xC);
1034 let candidates = vec![candidate(key, vec![peer_a, peer_b])];
1035 let mut present_by_key = HashMap::new();
1036 present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
1037
1038 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key);
1039
1040 assert_eq!(confirmed, vec![key]);
1041 }
1042
1043 #[test]
1044 fn confirmed_keys_defer_absent_or_missing_peer_evidence() {
1045 let peer_a = peer_id_from_byte(1);
1046 let peer_b = peer_id_from_byte(2);
1047 let complete_key = key_from_byte(0xD);
1048 let absent_key = key_from_byte(0xE);
1049 let missing_key = key_from_byte(0xF);
1050 let candidates = vec![
1051 candidate(complete_key, vec![peer_a, peer_b]),
1052 candidate(absent_key, vec![peer_a, peer_b]),
1053 candidate(missing_key, vec![peer_a, peer_b]),
1054 ];
1055 let mut present_by_key = HashMap::new();
1056 present_by_key.insert(complete_key, HashSet::from([peer_a, peer_b]));
1057 present_by_key.insert(absent_key, HashSet::from([peer_a]));
1058
1059 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key);
1060
1061 assert_eq!(confirmed, vec![complete_key]);
1062 }
1063
1064 #[test]
1065 fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
1066 let peer = peer_id_from_byte(1);
1067 let other_peer = peer_id_from_byte(2);
1068 let key = key_from_byte(0x11);
1069 let other_key = key_from_byte(0x12);
1070 let nonce = [0xAA; 32];
1071 let other_nonce = [0xBB; 32];
1072 let bytes = b"record bytes";
1073 let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
1074
1075 assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
1076 assert!(!audit_digest_proves_key(
1077 &other_peer,
1078 &key,
1079 &nonce,
1080 bytes,
1081 &digest
1082 ));
1083 assert!(!audit_digest_proves_key(
1084 &peer, &other_key, &nonce, bytes, &digest
1085 ));
1086 assert!(!audit_digest_proves_key(
1087 &peer,
1088 &key,
1089 &other_nonce,
1090 bytes,
1091 &digest
1092 ));
1093 assert!(!audit_digest_proves_key(
1094 &peer,
1095 &key,
1096 &nonce,
1097 b"different bytes",
1098 &digest
1099 ));
1100 assert!(!audit_digest_proves_key(
1101 &peer,
1102 &key,
1103 &nonce,
1104 bytes,
1105 &ABSENT_KEY_DIGEST
1106 ));
1107 }
1108
1109 #[tokio::test]
1110 async fn prune_cursor_advances_past_selected_budget_window() {
1111 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1112 state.write().await.prune_cursor = 2;
1113
1114 let start = prune_scan_start(&state, 10).await;
1115 advance_prune_cursor(&state, 10, start, Some(3)).await;
1116
1117 assert_eq!(state.read().await.prune_cursor, 6);
1118 }
1119
1120 #[tokio::test]
1121 async fn prune_cursor_advances_even_when_no_candidate_selected() {
1122 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
1123 state.write().await.prune_cursor = 9;
1124
1125 let start = prune_scan_start(&state, 10).await;
1126 advance_prune_cursor(&state, 10, start, None).await;
1127
1128 assert_eq!(state.read().await.prune_cursor, 0);
1129 }
1130
1131 #[tokio::test]
1132 async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1133 let peer = peer_id_from_byte(1);
1134 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1135 let first_seen = Instant::now();
1136 state
1137 .write()
1138 .await
1139 .bootstrap_claims
1140 .insert(peer, first_seen);
1141 state
1142 .write()
1143 .await
1144 .bootstrap_claim_history
1145 .insert(peer, first_seen);
1146
1147 clear_prune_bootstrap_claim(&peer, &state).await;
1148
1149 let state = state.read().await;
1150 assert!(!state.bootstrap_claims.contains_key(&peer));
1151 assert!(state.bootstrap_claim_history.contains_key(&peer));
1152 }
1153
1154 #[test]
1155 fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1156 assert!(prune_audit_response_clears_bootstrap_claim(
1157 PruneAuditStatus::Proven
1158 ));
1159 assert!(prune_audit_response_clears_bootstrap_claim(
1160 PruneAuditStatus::Failed
1161 ));
1162 assert!(!prune_audit_response_clears_bootstrap_claim(
1163 PruneAuditStatus::Bootstrapping
1164 ));
1165 }
1166
1167 #[tokio::test]
1168 async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1169 let peer = peer_id_from_byte(1);
1170 let other_peer = peer_id_from_byte(2);
1171 let report_state = PruneAuditReportState::default();
1172
1173 assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1174 assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1175 assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1176
1177 let reported = report_state.audit_failures.read().await;
1178 assert_eq!(reported.len(), 2);
1179 assert!(reported.contains(&peer));
1180 assert!(reported.contains(&other_peer));
1181 }
1182
1183 #[tokio::test]
1184 async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1185 let peer = peer_id_from_byte(1);
1186 let other_peer = peer_id_from_byte(2);
1187 let report_state = PruneAuditReportState::default();
1188
1189 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1190 assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1191 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1192
1193 let reported = report_state.bootstrap_abuse.read().await;
1194 assert_eq!(reported.len(), 2);
1195 assert!(reported.contains(&peer));
1196 assert!(reported.contains(&other_peer));
1197 }
1198}