nodedb_cluster/
ghost_sweeper.rs1use std::sync::{Arc, Mutex};
14use std::time::Duration;
15
16use tracing::{debug, info};
17
18use crate::ghost::{GhostTable, SweepVerdict};
19
20pub const DEFAULT_SWEEP_INTERVAL: Duration = Duration::from_secs(30 * 60);
24
25pub struct GhostSweeperConfig {
27 pub interval: Duration,
29}
30
31impl Default for GhostSweeperConfig {
32 fn default() -> Self {
33 Self {
34 interval: DEFAULT_SWEEP_INTERVAL,
35 }
36 }
37}
38
39pub fn run_sweep_loop<V>(
49 ghost_tables: Arc<Mutex<Vec<(u32, GhostTable)>>>,
50 config: GhostSweeperConfig,
51 verify_fn: V,
52 shutdown: Arc<std::sync::atomic::AtomicBool>,
53) where
54 V: Fn(&str, u32) -> SweepVerdict + Send + 'static,
55{
56 info!(
57 interval_secs = config.interval.as_secs(),
58 "ghost sweeper started"
59 );
60
61 loop {
62 std::thread::sleep(config.interval);
63
64 if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
65 info!("ghost sweeper shutting down");
66 break;
67 }
68
69 let mut tables = match ghost_tables.lock() {
70 Ok(t) => t,
71 Err(poisoned) => poisoned.into_inner(),
72 };
73
74 let mut total_purged = 0;
75 let mut total_checked = 0;
76
77 for (vshard_id, table) in tables.iter_mut() {
78 if table.is_empty() {
79 continue;
80 }
81 let report = table.sweep(|node_id, target_shard| verify_fn(node_id, target_shard));
82 total_purged += report.purged;
83 total_checked += report.checked;
84
85 if report.purged > 0 {
86 debug!(
87 vshard = vshard_id,
88 purged = report.purged,
89 remaining = table.len(),
90 "ghost sweep for vshard"
91 );
92 }
93 }
94
95 if total_checked > 0 {
96 info!(
97 checked = total_checked,
98 purged = total_purged,
99 "ghost sweep cycle complete"
100 );
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 use crate::ghost::GhostStub;
109
110 #[test]
111 fn sweep_purges_zero_refcount() {
112 let mut table = GhostTable::new();
113 let mut stub = GhostStub::new("old-node".into(), 5, 1);
114 stub.refcount = 0;
115 table.insert(stub);
116
117 let tables = Arc::new(Mutex::new(vec![(0u32, table)]));
118
119 {
121 let mut locked = tables.lock().unwrap();
122 for (_, t) in locked.iter_mut() {
123 t.sweep(|_, _| SweepVerdict::Keep);
124 }
125 }
126
127 let locked = tables.lock().unwrap();
128 assert!(
129 locked[0].1.is_empty(),
130 "zero-refcount ghost should be purged"
131 );
132 }
133}