use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::logging::{debug, info, warn};
use saorsa_core::identity::PeerId;
use saorsa_core::{DHTNode, P2PNode};
use crate::replication::config::ReplicationConfig;
use crate::replication::paid_list::PaidList;
use crate::storage::LmdbStorage;
#[derive(Debug, Default)]
pub struct PruneResult {
pub records_pruned: usize,
pub records_marked_out_of_range: usize,
pub records_cleared: usize,
pub paid_entries_pruned: usize,
pub paid_entries_marked: usize,
pub paid_entries_cleared: usize,
}
pub async fn run_prune_pass(
self_id: &PeerId,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
) -> PruneResult {
let dht = p2p_node.dht_manager();
let mut result = PruneResult::default();
let now = Instant::now();
let stored_keys = match storage.all_keys().await {
Ok(keys) => keys,
Err(e) => {
warn!("Failed to read stored keys for pruning: {e}");
return result;
}
};
let mut keys_to_delete = Vec::new();
for key in &stored_keys {
let closest: Vec<DHTNode> = dht
.find_closest_nodes_local_with_self(key, config.close_group_size)
.await;
let is_responsible = closest.iter().any(|n| n.peer_id == *self_id);
if is_responsible {
if paid_list.record_out_of_range_since(key).is_some() {
paid_list.clear_record_out_of_range(key);
result.records_cleared += 1;
}
} else {
if paid_list.record_out_of_range_since(key).is_none() {
result.records_marked_out_of_range += 1;
}
paid_list.set_record_out_of_range(key);
if let Some(first_seen) = paid_list.record_out_of_range_since(key) {
let elapsed = now
.checked_duration_since(first_seen)
.unwrap_or(Duration::ZERO);
if elapsed >= config.prune_hysteresis_duration {
keys_to_delete.push(*key);
}
}
}
}
for key in &keys_to_delete {
if let Err(e) = storage.delete(key).await {
warn!("Failed to prune record {}: {e}", hex::encode(key));
} else {
result.records_pruned += 1;
paid_list.clear_record_out_of_range(key);
paid_list.set_paid_out_of_range(key);
debug!("Pruned out-of-range record {}", hex::encode(key));
}
}
let paid_keys = match paid_list.all_keys() {
Ok(keys) => keys,
Err(e) => {
warn!("Failed to read PaidForList for pruning: {e}");
return result;
}
};
let mut paid_keys_to_delete = Vec::new();
for key in &paid_keys {
let closest: Vec<DHTNode> = dht
.find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
.await;
let in_paid_group = closest.iter().any(|n| n.peer_id == *self_id);
if in_paid_group {
if paid_list.paid_out_of_range_since(key).is_some() {
paid_list.clear_paid_out_of_range(key);
result.paid_entries_cleared += 1;
}
} else {
if paid_list.paid_out_of_range_since(key).is_none() {
result.paid_entries_marked += 1;
}
paid_list.set_paid_out_of_range(key);
if let Some(first_seen) = paid_list.paid_out_of_range_since(key) {
let elapsed = now
.checked_duration_since(first_seen)
.unwrap_or(Duration::ZERO);
if elapsed >= config.prune_hysteresis_duration {
paid_keys_to_delete.push(*key);
}
}
}
}
if !paid_keys_to_delete.is_empty() {
match paid_list.remove_batch(&paid_keys_to_delete).await {
Ok(count) => {
result.paid_entries_pruned = count;
debug!("Pruned {count} out-of-range PaidForList entries");
}
Err(e) => {
warn!("Failed to prune PaidForList entries: {e}");
}
}
}
info!(
"Prune pass complete: records={}/{} pruned, paid={}/{} pruned",
result.records_pruned,
stored_keys.len(),
result.paid_entries_pruned,
paid_keys.len(),
);
result
}