nodedb_cluster/
ghost_sweeper.rs1use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use tracing::{debug, info};
15
16use crate::ghost::{GhostTable, SweepVerdict};
17
18pub const DEFAULT_SWEEP_INTERVAL: Duration = Duration::from_secs(30 * 60);
22
23pub struct GhostSweeperConfig {
25 pub interval: Duration,
27}
28
29impl Default for GhostSweeperConfig {
30 fn default() -> Self {
31 Self {
32 interval: DEFAULT_SWEEP_INTERVAL,
33 }
34 }
35}
36
37pub fn run_sweep_loop<V>(
47 ghost_tables: Arc<Mutex<Vec<(u16, GhostTable)>>>,
48 config: GhostSweeperConfig,
49 verify_fn: V,
50 shutdown: Arc<std::sync::atomic::AtomicBool>,
51) where
52 V: Fn(&str, u16) -> SweepVerdict + Send + 'static,
53{
54 info!(
55 interval_secs = config.interval.as_secs(),
56 "ghost sweeper started"
57 );
58
59 loop {
60 std::thread::sleep(config.interval);
61
62 if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
63 info!("ghost sweeper shutting down");
64 break;
65 }
66
67 let mut tables = match ghost_tables.lock() {
68 Ok(t) => t,
69 Err(poisoned) => poisoned.into_inner(),
70 };
71
72 let mut total_purged = 0;
73 let mut total_checked = 0;
74
75 for (vshard_id, table) in tables.iter_mut() {
76 if table.is_empty() {
77 continue;
78 }
79 let report = table.sweep(|node_id, target_shard| verify_fn(node_id, target_shard));
80 total_purged += report.purged;
81 total_checked += report.checked;
82
83 if report.purged > 0 {
84 debug!(
85 vshard = vshard_id,
86 purged = report.purged,
87 remaining = table.len(),
88 "ghost sweep for vshard"
89 );
90 }
91 }
92
93 if total_checked > 0 {
94 info!(
95 checked = total_checked,
96 purged = total_purged,
97 "ghost sweep cycle complete"
98 );
99 }
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::ghost::GhostStub;
107
108 #[test]
109 fn sweep_purges_zero_refcount() {
110 let mut table = GhostTable::new();
111 let mut stub = GhostStub::new("old-node".into(), 5, 1);
112 stub.refcount = 0;
113 table.insert(stub);
114
115 let tables = Arc::new(Mutex::new(vec![(0u16, table)]));
116
117 {
119 let mut locked = tables.lock().unwrap();
120 for (_, t) in locked.iter_mut() {
121 t.sweep(|_, _| SweepVerdict::Keep);
122 }
123 }
124
125 let locked = tables.lock().unwrap();
126 assert!(
127 locked[0].1.is_empty(),
128 "zero-refcount ghost should be purged"
129 );
130 }
131}