Skip to main content

nodedb_cluster/
ghost_sweeper.rs

1//! Periodic ghost edge anti-entropy sweeper.
2//!
3//! Runs as a background task that periodically sweeps all ghost tables
4//! to purge stale ghost stubs. This prevents unbounded ghost accumulation
5//! after shard migrations.
6//!
7//! Sweep interval: configurable, default 30 minutes.
8//! Sweep logic: for each ghost stub, check refcount (zero = purge fast path)
9//! then verify against the target shard if needed.
10
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use tracing::{debug, info};
15
16use crate::ghost::{GhostTable, SweepVerdict};
17
18/// Default sweep interval: 30 minutes.
19///
20/// Corresponds to `ClusterTransportTuning::ghost_sweep_interval_secs`.
21pub const DEFAULT_SWEEP_INTERVAL: Duration = Duration::from_secs(30 * 60);
22
23/// Ghost sweeper configuration.
24pub struct GhostSweeperConfig {
25    /// How often to run the sweep.
26    pub interval: Duration,
27}
28
29impl Default for GhostSweeperConfig {
30    fn default() -> Self {
31        Self {
32            interval: DEFAULT_SWEEP_INTERVAL,
33        }
34    }
35}
36
37/// Run the ghost sweeper as a blocking loop (call from a dedicated thread or
38/// tokio::spawn_blocking). Sweeps all ghost tables periodically.
39///
40/// `ghost_tables` is a map of vshard_id → GhostTable. Each vShard on this
41/// node has its own ghost table tracking migrated-away nodes.
42///
43/// `verify_fn` checks whether a ghost should be purged by querying the
44/// target shard. In single-node mode, this always returns `Purge` (no
45/// remote shards to check). In cluster mode, it sends an RPC.
46pub fn run_sweep_loop<V>(
47    ghost_tables: Arc<Mutex<Vec<(u16, GhostTable)>>>,
48    config: GhostSweeperConfig,
49    verify_fn: V,
50    shutdown: Arc<std::sync::atomic::AtomicBool>,
51) where
52    V: Fn(&str, u16) -> SweepVerdict + Send + 'static,
53{
54    info!(
55        interval_secs = config.interval.as_secs(),
56        "ghost sweeper started"
57    );
58
59    loop {
60        std::thread::sleep(config.interval);
61
62        if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
63            info!("ghost sweeper shutting down");
64            break;
65        }
66
67        let mut tables = match ghost_tables.lock() {
68            Ok(t) => t,
69            Err(poisoned) => poisoned.into_inner(),
70        };
71
72        let mut total_purged = 0;
73        let mut total_checked = 0;
74
75        for (vshard_id, table) in tables.iter_mut() {
76            if table.is_empty() {
77                continue;
78            }
79            let report = table.sweep(|node_id, target_shard| verify_fn(node_id, target_shard));
80            total_purged += report.purged;
81            total_checked += report.checked;
82
83            if report.purged > 0 {
84                debug!(
85                    vshard = vshard_id,
86                    purged = report.purged,
87                    remaining = table.len(),
88                    "ghost sweep for vshard"
89                );
90            }
91        }
92
93        if total_checked > 0 {
94            info!(
95                checked = total_checked,
96                purged = total_purged,
97                "ghost sweep cycle complete"
98            );
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::ghost::GhostStub;
107
108    #[test]
109    fn sweep_purges_zero_refcount() {
110        let mut table = GhostTable::new();
111        let mut stub = GhostStub::new("old-node".into(), 5, 1);
112        stub.refcount = 0;
113        table.insert(stub);
114
115        let tables = Arc::new(Mutex::new(vec![(0u16, table)]));
116
117        // Run one sweep manually.
118        {
119            let mut locked = tables.lock().unwrap();
120            for (_, t) in locked.iter_mut() {
121                t.sweep(|_, _| SweepVerdict::Keep);
122            }
123        }
124
125        let locked = tables.lock().unwrap();
126        assert!(
127            locked[0].1.is_empty(),
128            "zero-refcount ghost should be purged"
129        );
130    }
131}