Skip to main content

ant_node/replication/
pruning.rs

1//! Post-cycle responsibility pruning (Section 11).
2//!
3//! On `NeighborSyncCycleComplete`: prune stored records and `PaidForList`
4//! entries that have been continuously out of range for at least
5//! `PRUNE_HYSTERESIS_DURATION`.
6
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::logging::{debug, info, warn};
11
12use saorsa_core::identity::PeerId;
13use saorsa_core::{DHTNode, P2PNode};
14
15use crate::replication::config::ReplicationConfig;
16use crate::replication::paid_list::PaidList;
17use crate::storage::LmdbStorage;
18
19// ---------------------------------------------------------------------------
20// Result type
21// ---------------------------------------------------------------------------
22
23/// Summary of a prune pass.
24#[derive(Debug, Default)]
25pub struct PruneResult {
26    /// Number of records deleted from storage.
27    pub records_pruned: usize,
28    /// Number of records with out-of-range timestamp newly set.
29    pub records_marked_out_of_range: usize,
30    /// Number of records with out-of-range timestamp cleared (back in range).
31    pub records_cleared: usize,
32    /// Number of `PaidForList` entries removed.
33    pub paid_entries_pruned: usize,
34    /// Number of `PaidForList` entries with out-of-range timestamp newly set.
35    pub paid_entries_marked: usize,
36    /// Number of `PaidForList` entries cleared (back in range).
37    pub paid_entries_cleared: usize,
38}
39
40// ---------------------------------------------------------------------------
41// Prune pass
42// ---------------------------------------------------------------------------
43
44/// Execute post-cycle responsibility pruning.
45///
46/// For each stored record K:
47/// - If `IsResponsible(self, K)`: clear `RecordOutOfRangeFirstSeen`.
48/// - If not responsible: set timestamp if not already set; delete if the
49///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old.
50///
51/// For each `PaidForList` entry K:
52/// - If self is in `PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen`.
53/// - If not in group: set timestamp if not already set; remove entry if the
54///   timestamp is at least `PRUNE_HYSTERESIS_DURATION` old.
55pub async fn run_prune_pass(
56    self_id: &PeerId,
57    storage: &Arc<LmdbStorage>,
58    paid_list: &Arc<PaidList>,
59    p2p_node: &Arc<P2PNode>,
60    config: &ReplicationConfig,
61) -> PruneResult {
62    let dht = p2p_node.dht_manager();
63    let mut result = PruneResult::default();
64    let now = Instant::now();
65
66    // -- Prune stored records ---------------------------------------------
67
68    let stored_keys = match storage.all_keys().await {
69        Ok(keys) => keys,
70        Err(e) => {
71            warn!("Failed to read stored keys for pruning: {e}");
72            return result;
73        }
74    };
75
76    let mut keys_to_delete = Vec::new();
77
78    for key in &stored_keys {
79        let closest: Vec<DHTNode> = dht
80            .find_closest_nodes_local_with_self(key, config.close_group_size)
81            .await;
82        let is_responsible = closest.iter().any(|n| n.peer_id == *self_id);
83
84        if is_responsible {
85            if paid_list.record_out_of_range_since(key).is_some() {
86                paid_list.clear_record_out_of_range(key);
87                result.records_cleared += 1;
88            }
89        } else {
90            if paid_list.record_out_of_range_since(key).is_none() {
91                result.records_marked_out_of_range += 1;
92            }
93            paid_list.set_record_out_of_range(key);
94
95            if let Some(first_seen) = paid_list.record_out_of_range_since(key) {
96                let elapsed = now
97                    .checked_duration_since(first_seen)
98                    .unwrap_or(Duration::ZERO);
99                if elapsed >= config.prune_hysteresis_duration {
100                    keys_to_delete.push(*key);
101                }
102            }
103        }
104    }
105
106    for key in &keys_to_delete {
107        if let Err(e) = storage.delete(key).await {
108            warn!("Failed to prune record {}: {e}", hex::encode(key));
109        } else {
110            result.records_pruned += 1;
111            paid_list.clear_record_out_of_range(key);
112            // Seed the PaidForList out-of-range timer so the second pass can
113            // prune the entry sooner, closing the re-admission window between
114            // the storage delete and the PaidForList prune pass.
115            paid_list.set_paid_out_of_range(key);
116            debug!("Pruned out-of-range record {}", hex::encode(key));
117        }
118    }
119
120    // -- Prune PaidForList entries -----------------------------------------
121
122    let paid_keys = match paid_list.all_keys() {
123        Ok(keys) => keys,
124        Err(e) => {
125            warn!("Failed to read PaidForList for pruning: {e}");
126            return result;
127        }
128    };
129
130    let mut paid_keys_to_delete = Vec::new();
131
132    for key in &paid_keys {
133        let closest: Vec<DHTNode> = dht
134            .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
135            .await;
136        let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
137
138        if in_paid_group {
139            if paid_list.paid_out_of_range_since(key).is_some() {
140                paid_list.clear_paid_out_of_range(key);
141                result.paid_entries_cleared += 1;
142            }
143        } else {
144            if paid_list.paid_out_of_range_since(key).is_none() {
145                result.paid_entries_marked += 1;
146            }
147            paid_list.set_paid_out_of_range(key);
148
149            if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
150                let elapsed = now
151                    .checked_duration_since(first_seen)
152                    .unwrap_or(Duration::ZERO);
153                if elapsed >= config.prune_hysteresis_duration {
154                    paid_keys_to_delete.push(*key);
155                }
156            }
157        }
158    }
159
160    if !paid_keys_to_delete.is_empty() {
161        match paid_list.remove_batch(&paid_keys_to_delete).await {
162            Ok(count) => {
163                result.paid_entries_pruned = count;
164                debug!("Pruned {count} out-of-range PaidForList entries");
165            }
166            Err(e) => {
167                warn!("Failed to prune PaidForList entries: {e}");
168            }
169        }
170    }
171
172    info!(
173        "Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
174        result.records_pruned,
175        stored_keys.len(),
176        result.paid_entries_pruned,
177        paid_keys.len(),
178    );
179
180    result
181}