1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use crate::logging::{debug, info, warn};
12
13use futures::{stream, StreamExt};
14use rand::Rng;
15use saorsa_core::identity::PeerId;
16use saorsa_core::{DHTNode, P2PNode};
17use tokio::sync::RwLock;
18
19use crate::ant_protocol::XorName;
20use crate::replication::config::{
21 ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS,
22 REPLICATION_PROTOCOL_ID,
23};
24use crate::replication::paid_list::PaidList;
25use crate::replication::protocol::{
26 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
27 ReplicationMessageBody, ABSENT_KEY_DIGEST,
28};
29use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
30use crate::storage::LmdbStorage;
31
32use super::REPLICATION_TRUST_WEIGHT;
33
34const MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES: usize = 32;
35
36#[derive(Debug, Default)]
42pub struct PruneResult {
43 pub records_pruned: usize,
45 pub records_marked_out_of_range: usize,
47 pub records_cleared: usize,
49 pub paid_entries_pruned: usize,
51 pub paid_entries_marked: usize,
53 pub paid_entries_cleared: usize,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58enum PruneAuditStatus {
59 Proven,
60 Failed,
61 Bootstrapping,
62}
63
64#[derive(Debug, Default)]
65struct RecordPruneStats {
66 marked: usize,
67 cleared: usize,
68 pruned: usize,
69}
70
71#[derive(Debug, Default)]
72struct PaidPruneStats {
73 marked: usize,
74 cleared: usize,
75 pruned: usize,
76}
77
78#[derive(Debug, Clone)]
79struct RecordPruneCandidate {
80 key: XorName,
81 target_peers: Vec<PeerId>,
82}
83
84#[derive(Default)]
85struct PruneAuditReportState {
86 audit_failures: RwLock<HashSet<PeerId>>,
87 bootstrap_abuse: RwLock<HashSet<PeerId>>,
88}
89
90pub async fn run_prune_pass(
107 self_id: &PeerId,
108 storage: &Arc<LmdbStorage>,
109 paid_list: &Arc<PaidList>,
110 p2p_node: &Arc<P2PNode>,
111 config: &ReplicationConfig,
112 sync_state: &Arc<RwLock<NeighborSyncState>>,
113 allow_remote_prune_audits: bool,
114) -> PruneResult {
115 let (stored_count, record_stats) = prune_stored_records(
116 self_id,
117 storage,
118 paid_list,
119 p2p_node,
120 config,
121 sync_state,
122 allow_remote_prune_audits,
123 )
124 .await;
125 let now = Instant::now();
126 let (paid_count, paid_stats) =
127 prune_paid_entries(self_id, paid_list, p2p_node, config, now).await;
128
129 let result = PruneResult {
130 records_pruned: record_stats.pruned,
131 records_marked_out_of_range: record_stats.marked,
132 records_cleared: record_stats.cleared,
133 paid_entries_pruned: paid_stats.pruned,
134 paid_entries_marked: paid_stats.marked,
135 paid_entries_cleared: paid_stats.cleared,
136 };
137
138 info!(
139 "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
140 result.records_pruned, stored_count, result.paid_entries_pruned, paid_count,
141 );
142
143 result
144}
145
146async fn prune_stored_records(
147 self_id: &PeerId,
148 storage: &Arc<LmdbStorage>,
149 paid_list: &Arc<PaidList>,
150 p2p_node: &Arc<P2PNode>,
151 config: &ReplicationConfig,
152 sync_state: &Arc<RwLock<NeighborSyncState>>,
153 allow_remote_prune_audits: bool,
154) -> (usize, RecordPruneStats) {
155 let stored_keys = match storage.all_keys().await {
156 Ok(keys) => keys,
157 Err(e) => {
158 warn!("Failed to read stored keys for pruning: {e}");
159 return (0, RecordPruneStats::default());
160 }
161 };
162
163 let now = Instant::now();
164 let dht = p2p_node.dht_manager();
165 let mut stats = RecordPruneStats::default();
166 let mut candidates = Vec::new();
167 let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS;
168 let mut budget_deferred = 0usize;
169 let mut bootstrap_deferred = 0usize;
170 let scan_start = prune_scan_start(sync_state, stored_keys.len()).await;
171 let mut last_selected_offset = None;
172
173 for offset in 0..stored_keys.len() {
174 let key = &stored_keys[(scan_start + offset) % stored_keys.len()];
175 let closest: Vec<DHTNode> = dht
176 .find_closest_nodes_local_with_self(key, config.close_group_size)
177 .await;
178 let is_responsible = closest.iter().any(|n| n.peer_id == *self_id);
179
180 if is_responsible {
181 if paid_list.record_out_of_range_since(key).is_some() {
182 paid_list.clear_record_out_of_range(key);
183 stats.cleared += 1;
184 }
185 } else {
186 if paid_list.record_out_of_range_since(key).is_none() {
187 stats.marked += 1;
188 }
189 paid_list.set_record_out_of_range(key);
190
191 if let Some(first_seen) = paid_list.record_out_of_range_since(key) {
192 let elapsed = now
193 .checked_duration_since(first_seen)
194 .unwrap_or(Duration::ZERO);
195 if elapsed >= config.prune_hysteresis_duration {
196 if !allow_remote_prune_audits {
197 bootstrap_deferred = bootstrap_deferred.saturating_add(1);
198 continue;
199 }
200 let target_peers = remote_close_group_peers(&closest, self_id);
201 if target_peers.is_empty() {
202 warn!(
203 "Cannot prune {}: current close group has no remote peers",
204 hex::encode(key)
205 );
206 continue;
207 }
208 if target_peers.len() > audit_challenge_budget {
209 budget_deferred = budget_deferred.saturating_add(1);
210 continue;
211 }
212 audit_challenge_budget -= target_peers.len();
213 last_selected_offset = Some(offset);
214 candidates.push(RecordPruneCandidate {
215 key: *key,
216 target_peers,
217 });
218 }
219 }
220 }
221 }
222
223 advance_prune_cursor(
224 sync_state,
225 stored_keys.len(),
226 scan_start,
227 last_selected_offset,
228 )
229 .await;
230
231 if bootstrap_deferred > 0 {
232 debug!(
233 "Deferred {bootstrap_deferred} prune candidates until bootstrap drain allows \
234 remote prune-confirmation audits"
235 );
236 }
237
238 if budget_deferred > 0 {
239 debug!(
240 "Deferred {budget_deferred} prune candidates due to per-pass audit budget \
241 ({MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS} challenges)"
242 );
243 }
244
245 let present_by_key =
246 collect_record_prune_proofs(&candidates, storage, p2p_node, config, sync_state).await;
247 let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys(
248 &candidates,
249 &present_by_key,
250 self_id,
251 paid_list,
252 p2p_node,
253 config,
254 )
255 .await;
256 stats.cleared += revalidated_cleared;
257 stats.pruned = delete_stored_records(&keys_to_delete, storage, paid_list).await;
258
259 (stored_keys.len(), stats)
260}
261
262async fn prune_paid_entries(
263 self_id: &PeerId,
264 paid_list: &Arc<PaidList>,
265 p2p_node: &Arc<P2PNode>,
266 config: &ReplicationConfig,
267 now: Instant,
268) -> (usize, PaidPruneStats) {
269 let paid_keys = match paid_list.all_keys() {
270 Ok(keys) => keys,
271 Err(e) => {
272 warn!("Failed to read PaidForList for pruning: {e}");
273 return (0, PaidPruneStats::default());
274 }
275 };
276
277 let dht = p2p_node.dht_manager();
278 let mut stats = PaidPruneStats::default();
279 let mut paid_keys_to_delete = Vec::new();
280
281 for key in &paid_keys {
282 let closest: Vec<DHTNode> = dht
283 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
284 .await;
285 let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
286
287 if in_paid_group {
288 if paid_list.paid_out_of_range_since(key).is_some() {
289 paid_list.clear_paid_out_of_range(key);
290 stats.cleared += 1;
291 }
292 } else {
293 if paid_list.paid_out_of_range_since(key).is_none() {
294 stats.marked += 1;
295 }
296 paid_list.set_paid_out_of_range(key);
297
298 if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
299 let elapsed = now
300 .checked_duration_since(first_seen)
301 .unwrap_or(Duration::ZERO);
302 if elapsed >= config.prune_hysteresis_duration {
303 paid_keys_to_delete.push(*key);
304 }
305 }
306 }
307 }
308
309 if !paid_keys_to_delete.is_empty() {
310 match paid_list.remove_batch(&paid_keys_to_delete).await {
311 Ok(count) => {
312 stats.pruned = count;
313 debug!("Pruned {count} out-of-range PaidForList entries");
314 }
315 Err(e) => {
316 warn!("Failed to prune PaidForList entries: {e}");
317 }
318 }
319 }
320
321 (paid_keys.len(), stats)
322}
323
324fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec<PeerId> {
325 close_group
326 .iter()
327 .filter(|node| node.peer_id != *self_id)
328 .map(|node| node.peer_id)
329 .collect()
330}
331
332async fn prune_scan_start(
333 sync_state: &Arc<RwLock<NeighborSyncState>>,
334 stored_key_count: usize,
335) -> usize {
336 if stored_key_count == 0 {
337 return 0;
338 }
339 sync_state.read().await.prune_cursor % stored_key_count
340}
341
342async fn advance_prune_cursor(
343 sync_state: &Arc<RwLock<NeighborSyncState>>,
344 stored_key_count: usize,
345 scan_start: usize,
346 last_selected_offset: Option<usize>,
347) {
348 if stored_key_count == 0 {
349 sync_state.write().await.prune_cursor = 0;
350 return;
351 }
352
353 let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
354 sync_state.write().await.prune_cursor = (scan_start + advance_by) % stored_key_count;
355}
356
357async fn delete_stored_records(
358 keys_to_delete: &[XorName],
359 storage: &Arc<LmdbStorage>,
360 paid_list: &Arc<PaidList>,
361) -> usize {
362 let mut pruned = 0;
363
364 for key in keys_to_delete {
365 if let Err(e) = storage.delete(key).await {
366 warn!("Failed to prune record {}: {e}", hex::encode(key));
367 } else {
368 pruned += 1;
369 paid_list.clear_record_out_of_range(key);
370 paid_list.set_paid_out_of_range(key);
374 debug!("Pruned out-of-range record {}", hex::encode(key));
375 }
376 }
377
378 pruned
379}
380
381async fn collect_record_prune_proofs(
388 candidates: &[RecordPruneCandidate],
389 storage: &Arc<LmdbStorage>,
390 p2p_node: &Arc<P2PNode>,
391 config: &ReplicationConfig,
392 sync_state: &Arc<RwLock<NeighborSyncState>>,
393) -> HashMap<XorName, HashSet<PeerId>> {
394 if candidates.is_empty() {
395 return HashMap::new();
396 }
397
398 let report_state = PruneAuditReportState::default();
399 let mut requests = stream::iter(build_peer_audit_challenges(candidates))
400 .map(|(peer, key)| {
401 peer_proves_record(
402 peer,
403 key,
404 storage,
405 p2p_node,
406 config,
407 sync_state,
408 &report_state,
409 )
410 })
411 .buffer_unordered(MAX_CONCURRENT_PRUNE_AUDIT_CHALLENGES);
412
413 let mut present_by_key = HashMap::<XorName, HashSet<PeerId>>::new();
414 while let Some(proof) = requests.next().await {
415 if let Some((peer, key)) = proof {
416 present_by_key.entry(key).or_default().insert(peer);
417 }
418 }
419
420 present_by_key
421}
422
423async fn revalidated_record_prune_keys(
424 candidates: &[RecordPruneCandidate],
425 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
426 self_id: &PeerId,
427 paid_list: &Arc<PaidList>,
428 p2p_node: &Arc<P2PNode>,
429 config: &ReplicationConfig,
430) -> (Vec<XorName>, usize) {
431 let dht = p2p_node.dht_manager();
432 let mut keys_to_delete = Vec::new();
433 let mut cleared = 0;
434 let now = Instant::now();
435
436 for candidate in candidates {
437 let closest: Vec<DHTNode> = dht
438 .find_closest_nodes_local_with_self(&candidate.key, config.close_group_size)
439 .await;
440
441 if closest.iter().any(|n| n.peer_id == *self_id) {
442 if paid_list
443 .record_out_of_range_since(&candidate.key)
444 .is_some()
445 {
446 paid_list.clear_record_out_of_range(&candidate.key);
447 cleared += 1;
448 }
449 continue;
450 }
451
452 let Some(first_seen) = paid_list.record_out_of_range_since(&candidate.key) else {
453 continue;
454 };
455 let elapsed = now
456 .checked_duration_since(first_seen)
457 .unwrap_or(Duration::ZERO);
458 if elapsed < config.prune_hysteresis_duration {
459 continue;
460 }
461
462 let current_target_peers = remote_close_group_peers(&closest, self_id);
463 if current_target_peers.is_empty() {
464 warn!(
465 "Cannot prune {}: current close group has no remote peers",
466 hex::encode(candidate.key)
467 );
468 continue;
469 }
470
471 if target_peers_reported_present(&candidate.key, ¤t_target_peers, present_by_key) {
472 keys_to_delete.push(candidate.key);
473 } else {
474 debug!(
475 "Deferring prune for {} until current close group reports it",
476 hex::encode(candidate.key)
477 );
478 }
479 }
480
481 (keys_to_delete, cleared)
482}
483
484fn build_peer_audit_challenges(candidates: &[RecordPruneCandidate]) -> Vec<(PeerId, XorName)> {
485 let mut challenges = Vec::new();
486 for candidate in candidates {
487 for peer in &candidate.target_peers {
488 challenges.push((*peer, candidate.key));
489 }
490 }
491 challenges
492}
493
494#[cfg(test)]
495fn confirmed_keys_from_presence(
496 candidates: &[RecordPruneCandidate],
497 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
498) -> Vec<XorName> {
499 candidates
500 .iter()
501 .filter(|candidate| {
502 target_peers_reported_present(&candidate.key, &candidate.target_peers, present_by_key)
503 })
504 .map(|candidate| candidate.key)
505 .collect()
506}
507
508fn target_peers_reported_present(
509 key: &XorName,
510 target_peers: &[PeerId],
511 present_by_key: &HashMap<XorName, HashSet<PeerId>>,
512) -> bool {
513 let Some(present_peers) = present_by_key.get(key) else {
514 return false;
515 };
516 target_peers.iter().all(|peer| present_peers.contains(peer))
517}
518
519async fn peer_proves_record(
522 peer: PeerId,
523 key: XorName,
524 storage: &Arc<LmdbStorage>,
525 p2p_node: &Arc<P2PNode>,
526 config: &ReplicationConfig,
527 sync_state: &Arc<RwLock<NeighborSyncState>>,
528 report_state: &PruneAuditReportState,
529) -> Option<(PeerId, XorName)> {
530 let local_bytes = local_record_bytes(&key, storage).await?;
531
532 let (challenge_id, nonce) = {
533 let mut rng = rand::thread_rng();
534 (rng.gen::<u64>(), rng.gen::<[u8; 32]>())
535 };
536 let encoded = encode_prune_audit_challenge(&peer, key, challenge_id, nonce)?;
537 let Some(decoded) = send_prune_audit_challenge(&peer, &key, encoded, p2p_node, config).await
538 else {
539 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
543 return None;
544 };
545
546 let status =
547 prune_audit_response_status(decoded, challenge_id, &peer, &key, &nonce, &local_bytes);
548 if prune_audit_response_clears_bootstrap_claim(status) {
549 clear_prune_bootstrap_claim(&peer, sync_state).await;
550 }
551
552 match status {
553 PruneAuditStatus::Proven => Some((peer, key)),
554 PruneAuditStatus::Bootstrapping => {
555 report_prune_bootstrap_claim(&peer, &key, p2p_node, config, sync_state, report_state)
556 .await;
557 None
558 }
559 PruneAuditStatus::Failed => {
560 report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await;
561 None
562 }
563 }
564}
565
566fn prune_audit_response_clears_bootstrap_claim(status: PruneAuditStatus) -> bool {
567 matches!(status, PruneAuditStatus::Proven | PruneAuditStatus::Failed)
568}
569
570fn encode_prune_audit_challenge(
571 peer: &PeerId,
572 key: XorName,
573 challenge_id: u64,
574 nonce: [u8; 32],
575) -> Option<Vec<u8>> {
576 let challenge = AuditChallenge {
577 challenge_id,
578 nonce,
579 challenged_peer_id: *peer.as_bytes(),
580 keys: vec![key],
581 };
582 let msg = ReplicationMessage {
583 request_id: challenge_id,
584 body: ReplicationMessageBody::AuditChallenge(challenge),
585 };
586 let encoded = match msg.encode() {
587 Ok(data) => data,
588 Err(e) => {
589 warn!(
590 "Failed to encode prune audit challenge for {} against {peer}: {e}",
591 hex::encode(key),
592 );
593 return None;
594 }
595 };
596 Some(encoded)
597}
598
599async fn send_prune_audit_challenge(
600 peer: &PeerId,
601 key: &XorName,
602 encoded: Vec<u8>,
603 p2p_node: &Arc<P2PNode>,
604 config: &ReplicationConfig,
605) -> Option<ReplicationMessage> {
606 let response = match p2p_node
607 .send_request(
608 peer,
609 REPLICATION_PROTOCOL_ID,
610 encoded,
611 config.audit_response_timeout(1),
612 )
613 .await
614 {
615 Ok(response) => response,
616 Err(e) => {
617 debug!(
618 "Prune audit challenge for {} against {peer} failed: {e}",
619 hex::encode(key)
620 );
621 return None;
622 }
623 };
624
625 let decoded = match ReplicationMessage::decode(&response.data) {
626 Ok(msg) => msg,
627 Err(e) => {
628 warn!("Failed to decode prune audit response from {peer}: {e}");
629 return None;
630 }
631 };
632
633 Some(decoded)
634}
635
636fn prune_audit_response_status(
637 decoded: ReplicationMessage,
638 challenge_id: u64,
639 peer: &PeerId,
640 key: &XorName,
641 nonce: &[u8; 32],
642 local_bytes: &[u8],
643) -> PruneAuditStatus {
644 match decoded.body {
645 ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
646 challenge_id: resp_id,
647 digests,
648 }) => {
649 if resp_id != challenge_id {
650 warn!("Prune audit challenge ID mismatch from {peer}");
651 return PruneAuditStatus::Failed;
652 }
653 if digests.len() != 1 {
654 warn!(
655 "Prune audit response from {peer} returned {} digests for one challenged key",
656 digests.len(),
657 );
658 return PruneAuditStatus::Failed;
659 }
660
661 if audit_digest_proves_key(peer, key, nonce, local_bytes, &digests[0]) {
662 PruneAuditStatus::Proven
663 } else {
664 warn!(
665 "Prune audit proof from {peer} failed for {}",
666 hex::encode(key)
667 );
668 PruneAuditStatus::Failed
669 }
670 }
671 ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
672 challenge_id: resp_id,
673 }) => {
674 if resp_id == challenge_id {
675 warn!(
676 "Prune audit proof for {} blocked by bootstrap claim from {peer}",
677 hex::encode(key)
678 );
679 PruneAuditStatus::Bootstrapping
680 } else {
681 warn!("Prune audit challenge ID mismatch on Bootstrapping from {peer}");
682 PruneAuditStatus::Failed
683 }
684 }
685 ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
686 challenge_id: resp_id,
687 reason,
688 }) => {
689 if resp_id == challenge_id {
690 warn!(
691 "Prune audit proof for {} rejected by {peer}: {reason}",
692 hex::encode(key)
693 );
694 } else {
695 warn!("Prune audit challenge ID mismatch on Rejected from {peer}");
696 }
697 PruneAuditStatus::Failed
698 }
699 _ => {
700 warn!("Unexpected prune audit response type from {peer}");
701 PruneAuditStatus::Failed
702 }
703 }
704}
705
706async fn local_record_bytes(key: &XorName, storage: &Arc<LmdbStorage>) -> Option<Vec<u8>> {
707 match storage.get_raw(key).await {
708 Ok(Some(bytes)) => Some(bytes),
709 Ok(None) => {
710 debug!(
711 "Cannot prune-audit {}: local record disappeared",
712 hex::encode(key)
713 );
714 None
715 }
716 Err(e) => {
717 warn!(
718 "Cannot prune-audit {}: failed to read local record: {e}",
719 hex::encode(key)
720 );
721 None
722 }
723 }
724}
725
726fn audit_digest_proves_key(
727 peer: &PeerId,
728 key: &XorName,
729 nonce: &[u8; 32],
730 local_bytes: &[u8],
731 digest: &[u8; 32],
732) -> bool {
733 if *digest == ABSENT_KEY_DIGEST {
734 return false;
735 }
736 let expected = compute_audit_digest(nonce, peer.as_bytes(), key, local_bytes);
737 *digest == expected
738}
739
740async fn report_prune_audit_failure_once(
741 peer: &PeerId,
742 key: &XorName,
743 p2p_node: &Arc<P2PNode>,
744 config: &ReplicationConfig,
745 report_state: &PruneAuditReportState,
746) -> bool {
747 let should_report = peer_is_currently_responsible(peer, key, p2p_node, config).await
748 && reserve_prune_audit_failure_report(report_state, peer).await;
749 if !should_report {
750 return false;
751 }
752
753 p2p_node
754 .report_trust_event(
755 peer,
756 saorsa_core::TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
757 )
758 .await;
759 true
760}
761
762async fn reserve_prune_audit_failure_report(
763 report_state: &PruneAuditReportState,
764 peer: &PeerId,
765) -> bool {
766 report_state.audit_failures.write().await.insert(*peer)
767}
768
769async fn reserve_prune_bootstrap_abuse_report(
770 report_state: &PruneAuditReportState,
771 peer: &PeerId,
772) -> bool {
773 report_state.bootstrap_abuse.write().await.insert(*peer)
774}
775
776async fn report_prune_bootstrap_claim(
777 peer: &PeerId,
778 key: &XorName,
779 p2p_node: &Arc<P2PNode>,
780 config: &ReplicationConfig,
781 sync_state: &Arc<RwLock<NeighborSyncState>>,
782 report_state: &PruneAuditReportState,
783) {
784 if !peer_is_currently_responsible(peer, key, p2p_node, config).await {
785 return;
786 }
787
788 let observation = {
789 let now = Instant::now();
790 let mut state = sync_state.write().await;
791 (
792 now,
793 state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
794 )
795 };
796
797 let (now, observation) = observation;
798 match observation {
799 BootstrapClaimObservation::WithinGrace { .. } => {
800 debug!("Prune audit: peer {peer} claims bootstrapping (within grace period)");
801 return;
802 }
803 BootstrapClaimObservation::PastGrace { first_seen } => {
804 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
805 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
806 return;
807 }
808 warn!(
809 "Prune audit: peer {peer} claiming bootstrap past grace period \
810 ({:?} > {:?}), reporting abuse",
811 now.duration_since(first_seen),
812 config.bootstrap_claim_grace_period,
813 );
814 }
815 BootstrapClaimObservation::Repeated { first_seen } => {
816 if !reserve_prune_bootstrap_abuse_report(report_state, peer).await {
817 debug!("Prune audit: peer {peer} bootstrap abuse already reported this pass");
818 return;
819 }
820 warn!(
821 "Prune audit: peer {peer} repeated bootstrap claim after previously stopping; \
822 first claim was {:?} ago, reporting abuse",
823 now.duration_since(first_seen),
824 );
825 }
826 }
827
828 p2p_node
829 .report_trust_event(
830 peer,
831 saorsa_core::TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
832 )
833 .await;
834}
835
836async fn clear_prune_bootstrap_claim(peer: &PeerId, sync_state: &Arc<RwLock<NeighborSyncState>>) {
837 let removed = {
838 let mut state = sync_state.write().await;
839 state.clear_active_bootstrap_claim(peer)
840 };
841 if removed {
842 debug!("Prune audit: cleared active bootstrap claim for {peer}");
843 }
844}
845
846async fn peer_is_currently_responsible(
847 peer: &PeerId,
848 key: &XorName,
849 p2p_node: &Arc<P2PNode>,
850 config: &ReplicationConfig,
851) -> bool {
852 let closest = p2p_node
853 .dht_manager()
854 .find_closest_nodes_local_with_self(key, config.close_group_size)
855 .await;
856 closest.iter().any(|node| node.peer_id == *peer)
857}
858
859#[cfg(test)]
860#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
861mod tests {
862 use super::*;
863
864 fn peer_id_from_byte(b: u8) -> PeerId {
865 let mut bytes = [0u8; 32];
866 bytes[0] = b;
867 PeerId::from_bytes(bytes)
868 }
869
870 fn key_from_byte(b: u8) -> XorName {
871 [b; 32]
872 }
873
874 fn candidate(key: XorName, target_peers: Vec<PeerId>) -> RecordPruneCandidate {
875 RecordPruneCandidate { key, target_peers }
876 }
877
878 #[test]
879 fn prune_audit_challenges_are_one_per_candidate_peer() {
880 let peer_a = peer_id_from_byte(1);
881 let peer_b = peer_id_from_byte(2);
882 let key_a = key_from_byte(0xA);
883 let key_b = key_from_byte(0xB);
884 let candidates = vec![
885 candidate(key_a, vec![peer_a, peer_b]),
886 candidate(key_b, vec![peer_b]),
887 ];
888
889 let mut challenges = build_peer_audit_challenges(&candidates);
890 challenges.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
891
892 let mut expected = vec![(peer_a, key_a), (peer_b, key_a), (peer_b, key_b)];
893 expected.sort_unstable_by_key(|(peer, key)| (*peer.as_bytes(), *key));
894 assert_eq!(challenges, expected);
895 }
896
897 #[test]
898 fn confirmed_keys_require_all_target_peers_present() {
899 let peer_a = peer_id_from_byte(1);
900 let peer_b = peer_id_from_byte(2);
901 let key = key_from_byte(0xC);
902 let candidates = vec![candidate(key, vec![peer_a, peer_b])];
903 let mut present_by_key = HashMap::new();
904 present_by_key.insert(key, HashSet::from([peer_a, peer_b]));
905
906 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key);
907
908 assert_eq!(confirmed, vec![key]);
909 }
910
911 #[test]
912 fn confirmed_keys_defer_absent_or_missing_peer_evidence() {
913 let peer_a = peer_id_from_byte(1);
914 let peer_b = peer_id_from_byte(2);
915 let complete_key = key_from_byte(0xD);
916 let absent_key = key_from_byte(0xE);
917 let missing_key = key_from_byte(0xF);
918 let candidates = vec![
919 candidate(complete_key, vec![peer_a, peer_b]),
920 candidate(absent_key, vec![peer_a, peer_b]),
921 candidate(missing_key, vec![peer_a, peer_b]),
922 ];
923 let mut present_by_key = HashMap::new();
924 present_by_key.insert(complete_key, HashSet::from([peer_a, peer_b]));
925 present_by_key.insert(absent_key, HashSet::from([peer_a]));
926
927 let confirmed = confirmed_keys_from_presence(&candidates, &present_by_key);
928
929 assert_eq!(confirmed, vec![complete_key]);
930 }
931
932 #[test]
933 fn audit_digest_proof_requires_matching_peer_key_nonce_and_bytes() {
934 let peer = peer_id_from_byte(1);
935 let other_peer = peer_id_from_byte(2);
936 let key = key_from_byte(0x11);
937 let other_key = key_from_byte(0x12);
938 let nonce = [0xAA; 32];
939 let other_nonce = [0xBB; 32];
940 let bytes = b"record bytes";
941 let digest = compute_audit_digest(&nonce, peer.as_bytes(), &key, bytes);
942
943 assert!(audit_digest_proves_key(&peer, &key, &nonce, bytes, &digest));
944 assert!(!audit_digest_proves_key(
945 &other_peer,
946 &key,
947 &nonce,
948 bytes,
949 &digest
950 ));
951 assert!(!audit_digest_proves_key(
952 &peer, &other_key, &nonce, bytes, &digest
953 ));
954 assert!(!audit_digest_proves_key(
955 &peer,
956 &key,
957 &other_nonce,
958 bytes,
959 &digest
960 ));
961 assert!(!audit_digest_proves_key(
962 &peer,
963 &key,
964 &nonce,
965 b"different bytes",
966 &digest
967 ));
968 assert!(!audit_digest_proves_key(
969 &peer,
970 &key,
971 &nonce,
972 bytes,
973 &ABSENT_KEY_DIGEST
974 ));
975 }
976
977 #[tokio::test]
978 async fn prune_cursor_advances_past_selected_budget_window() {
979 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
980 state.write().await.prune_cursor = 2;
981
982 let start = prune_scan_start(&state, 10).await;
983 advance_prune_cursor(&state, 10, start, Some(3)).await;
984
985 assert_eq!(state.read().await.prune_cursor, 6);
986 }
987
988 #[tokio::test]
989 async fn prune_cursor_advances_even_when_no_candidate_selected() {
990 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![])));
991 state.write().await.prune_cursor = 9;
992
993 let start = prune_scan_start(&state, 10).await;
994 advance_prune_cursor(&state, 10, start, None).await;
995
996 assert_eq!(state.read().await.prune_cursor, 0);
997 }
998
999 #[tokio::test]
1000 async fn prune_audit_normal_response_clears_stale_bootstrap_claim() {
1001 let peer = peer_id_from_byte(1);
1002 let state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![peer])));
1003 let first_seen = Instant::now();
1004 state
1005 .write()
1006 .await
1007 .bootstrap_claims
1008 .insert(peer, first_seen);
1009 state
1010 .write()
1011 .await
1012 .bootstrap_claim_history
1013 .insert(peer, first_seen);
1014
1015 clear_prune_bootstrap_claim(&peer, &state).await;
1016
1017 let state = state.read().await;
1018 assert!(!state.bootstrap_claims.contains_key(&peer));
1019 assert!(state.bootstrap_claim_history.contains_key(&peer));
1020 }
1021
1022 #[test]
1023 fn prune_audit_clear_policy_requires_decoded_non_bootstrap_response() {
1024 assert!(prune_audit_response_clears_bootstrap_claim(
1025 PruneAuditStatus::Proven
1026 ));
1027 assert!(prune_audit_response_clears_bootstrap_claim(
1028 PruneAuditStatus::Failed
1029 ));
1030 assert!(!prune_audit_response_clears_bootstrap_claim(
1031 PruneAuditStatus::Bootstrapping
1032 ));
1033 }
1034
1035 #[tokio::test]
1036 async fn prune_audit_failure_penalty_is_reserved_once_per_peer_per_pass() {
1037 let peer = peer_id_from_byte(1);
1038 let other_peer = peer_id_from_byte(2);
1039 let report_state = PruneAuditReportState::default();
1040
1041 assert!(reserve_prune_audit_failure_report(&report_state, &peer).await);
1042 assert!(!reserve_prune_audit_failure_report(&report_state, &peer).await);
1043 assert!(reserve_prune_audit_failure_report(&report_state, &other_peer).await);
1044
1045 let reported = report_state.audit_failures.read().await;
1046 assert_eq!(reported.len(), 2);
1047 assert!(reported.contains(&peer));
1048 assert!(reported.contains(&other_peer));
1049 }
1050
1051 #[tokio::test]
1052 async fn prune_bootstrap_abuse_penalty_is_reserved_once_per_peer_per_pass() {
1053 let peer = peer_id_from_byte(1);
1054 let other_peer = peer_id_from_byte(2);
1055 let report_state = PruneAuditReportState::default();
1056
1057 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1058 assert!(!reserve_prune_bootstrap_abuse_report(&report_state, &peer).await);
1059 assert!(reserve_prune_bootstrap_abuse_report(&report_state, &other_peer).await);
1060
1061 let reported = report_state.bootstrap_abuse.read().await;
1062 assert_eq!(reported.len(), 2);
1063 assert!(reported.contains(&peer));
1064 assert!(reported.contains(&other_peer));
1065 }
1066}