Skip to main content

nodedb_cluster/
ghost_sweeper.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Periodic ghost edge anti-entropy sweeper.
4//!
5//! Runs as a background task that periodically sweeps all ghost tables
6//! to purge stale ghost stubs. This prevents unbounded ghost accumulation
7//! after shard migrations.
8//!
9//! Sweep interval: configurable, default 30 minutes.
10//! Sweep logic: for each ghost stub, check refcount (zero = purge fast path)
11//! then verify against the target shard if needed.
12
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15
16use tracing::{debug, info};
17
18use crate::ghost::{GhostTable, SweepVerdict};
19
20/// Default sweep interval: 30 minutes.
21///
22/// Corresponds to `ClusterTransportTuning::ghost_sweep_interval_secs`.
23pub const DEFAULT_SWEEP_INTERVAL: Duration = Duration::from_secs(30 * 60);
24
25/// Ghost sweeper configuration.
26pub struct GhostSweeperConfig {
27    /// How often to run the sweep.
28    pub interval: Duration,
29}
30
31impl Default for GhostSweeperConfig {
32    fn default() -> Self {
33        Self {
34            interval: DEFAULT_SWEEP_INTERVAL,
35        }
36    }
37}
38
39/// Run the ghost sweeper as a blocking loop (call from a dedicated thread or
40/// tokio::spawn_blocking). Sweeps all ghost tables periodically.
41///
42/// `ghost_tables` is a map of vshard_id → GhostTable. Each vShard on this
43/// node has its own ghost table tracking migrated-away nodes.
44///
45/// `verify_fn` checks whether a ghost should be purged by querying the
46/// target shard. In single-node mode, this always returns `Purge` (no
47/// remote shards to check). In cluster mode, it sends an RPC.
48pub fn run_sweep_loop<V>(
49    ghost_tables: Arc<Mutex<Vec<(u32, GhostTable)>>>,
50    config: GhostSweeperConfig,
51    verify_fn: V,
52    shutdown: Arc<std::sync::atomic::AtomicBool>,
53) where
54    V: Fn(&str, u32) -> SweepVerdict + Send + 'static,
55{
56    info!(
57        interval_secs = config.interval.as_secs(),
58        "ghost sweeper started"
59    );
60
61    loop {
62        std::thread::sleep(config.interval);
63
64        if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
65            info!("ghost sweeper shutting down");
66            break;
67        }
68
69        let mut tables = match ghost_tables.lock() {
70            Ok(t) => t,
71            Err(poisoned) => poisoned.into_inner(),
72        };
73
74        let mut total_purged = 0;
75        let mut total_checked = 0;
76
77        for (vshard_id, table) in tables.iter_mut() {
78            if table.is_empty() {
79                continue;
80            }
81            let report = table.sweep(|node_id, target_shard| verify_fn(node_id, target_shard));
82            total_purged += report.purged;
83            total_checked += report.checked;
84
85            if report.purged > 0 {
86                debug!(
87                    vshard = vshard_id,
88                    purged = report.purged,
89                    remaining = table.len(),
90                    "ghost sweep for vshard"
91                );
92            }
93        }
94
95        if total_checked > 0 {
96            info!(
97                checked = total_checked,
98                purged = total_purged,
99                "ghost sweep cycle complete"
100            );
101        }
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    use crate::ghost::GhostStub;
109
110    #[test]
111    fn sweep_purges_zero_refcount() {
112        let mut table = GhostTable::new();
113        let mut stub = GhostStub::new("old-node".into(), 5, 1);
114        stub.refcount = 0;
115        table.insert(stub);
116
117        let tables = Arc::new(Mutex::new(vec![(0u32, table)]));
118
119        // Run one sweep manually.
120        {
121            let mut locked = tables.lock().unwrap();
122            for (_, t) in locked.iter_mut() {
123                t.sweep(|_, _| SweepVerdict::Keep);
124            }
125        }
126
127        let locked = tables.lock().unwrap();
128        assert!(
129            locked[0].1.is_empty(),
130            "zero-refcount ghost should be purged"
131        );
132    }
133}