ant_node/replication/
pruning.rs1use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::logging::{debug, info, warn};
11
12use saorsa_core::identity::PeerId;
13use saorsa_core::{DHTNode, P2PNode};
14
15use crate::replication::config::ReplicationConfig;
16use crate::replication::paid_list::PaidList;
17use crate::storage::LmdbStorage;
18
19#[derive(Debug, Default)]
25pub struct PruneResult {
26 pub records_pruned: usize,
28 pub records_marked_out_of_range: usize,
30 pub records_cleared: usize,
32 pub paid_entries_pruned: usize,
34 pub paid_entries_marked: usize,
36 pub paid_entries_cleared: usize,
38}
39
40pub async fn run_prune_pass(
56 self_id: &PeerId,
57 storage: &Arc<LmdbStorage>,
58 paid_list: &Arc<PaidList>,
59 p2p_node: &Arc<P2PNode>,
60 config: &ReplicationConfig,
61) -> PruneResult {
62 let dht = p2p_node.dht_manager();
63 let mut result = PruneResult::default();
64 let now = Instant::now();
65
66 let stored_keys = match storage.all_keys().await {
69 Ok(keys) => keys,
70 Err(e) => {
71 warn!("Failed to read stored keys for pruning: {e}");
72 return result;
73 }
74 };
75
76 let mut keys_to_delete = Vec::new();
77
78 for key in &stored_keys {
79 let closest: Vec<DHTNode> = dht
80 .find_closest_nodes_local_with_self(key, config.close_group_size)
81 .await;
82 let is_responsible = closest.iter().any(|n| n.peer_id == *self_id);
83
84 if is_responsible {
85 if paid_list.record_out_of_range_since(key).is_some() {
86 paid_list.clear_record_out_of_range(key);
87 result.records_cleared += 1;
88 }
89 } else {
90 if paid_list.record_out_of_range_since(key).is_none() {
91 result.records_marked_out_of_range += 1;
92 }
93 paid_list.set_record_out_of_range(key);
94
95 if let Some(first_seen) = paid_list.record_out_of_range_since(key) {
96 let elapsed = now
97 .checked_duration_since(first_seen)
98 .unwrap_or(Duration::ZERO);
99 if elapsed >= config.prune_hysteresis_duration {
100 keys_to_delete.push(*key);
101 }
102 }
103 }
104 }
105
106 for key in &keys_to_delete {
107 if let Err(e) = storage.delete(key).await {
108 warn!("Failed to prune record {}: {e}", hex::encode(key));
109 } else {
110 result.records_pruned += 1;
111 paid_list.clear_record_out_of_range(key);
112 paid_list.set_paid_out_of_range(key);
116 debug!("Pruned out-of-range record {}", hex::encode(key));
117 }
118 }
119
120 let paid_keys = match paid_list.all_keys() {
123 Ok(keys) => keys,
124 Err(e) => {
125 warn!("Failed to read PaidForList for pruning: {e}");
126 return result;
127 }
128 };
129
130 let mut paid_keys_to_delete = Vec::new();
131
132 for key in &paid_keys {
133 let closest: Vec<DHTNode> = dht
134 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
135 .await;
136 let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
137
138 if in_paid_group {
139 if paid_list.paid_out_of_range_since(key).is_some() {
140 paid_list.clear_paid_out_of_range(key);
141 result.paid_entries_cleared += 1;
142 }
143 } else {
144 if paid_list.paid_out_of_range_since(key).is_none() {
145 result.paid_entries_marked += 1;
146 }
147 paid_list.set_paid_out_of_range(key);
148
149 if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
150 let elapsed = now
151 .checked_duration_since(first_seen)
152 .unwrap_or(Duration::ZERO);
153 if elapsed >= config.prune_hysteresis_duration {
154 paid_keys_to_delete.push(*key);
155 }
156 }
157 }
158 }
159
160 if !paid_keys_to_delete.is_empty() {
161 match paid_list.remove_batch(&paid_keys_to_delete).await {
162 Ok(count) => {
163 result.paid_entries_pruned = count;
164 debug!("Pruned {count} out-of-range PaidForList entries");
165 }
166 Err(e) => {
167 warn!("Failed to prune PaidForList entries: {e}");
168 }
169 }
170 }
171
172 info!(
173 "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
174 result.records_pruned,
175 stored_keys.len(),
176 result.paid_entries_pruned,
177 paid_keys.len(),
178 );
179
180 result
181}