Skip to main content

nodedb_cluster/
ghost.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3use std::collections::HashMap;
4use tracing::{debug, info};
5
6/// Serializable ghost stub for persistence.
7#[derive(
8    Debug,
9    Clone,
10    serde::Serialize,
11    serde::Deserialize,
12    zerompk::ToMessagePack,
13    zerompk::FromMessagePack,
14)]
15struct PersistedGhostStub {
16    target_shard: u32,
17    refcount: u32,
18    created_at_ms: u64,
19}
20
21/// Ghost edge stub.
22///
23/// When vShard rebalancing moves a node to a new shard, a ghost stub
24/// `(node_id, target_shard_id, refcount)` remains on the source.
25/// During traversal, ghost stubs trigger transparent scatter-gather
26/// to the target shard. Ghosts are garbage-collected when `refcount`
27/// (number of inbound edges from this shard) reaches zero.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct GhostStub {
30    /// The node ID that was moved.
31    pub node_id: String,
32    /// The target vShard where the node now lives.
33    pub target_shard: u32,
34    /// Number of local edges that still reference this ghost.
35    pub refcount: u32,
36    /// When this ghost was created.
37    pub created_at_ms: u64,
38}
39
40impl GhostStub {
41    pub fn new(node_id: String, target_shard: u32, initial_refcount: u32) -> Self {
42        Self {
43            node_id,
44            target_shard,
45            refcount: initial_refcount,
46            created_at_ms: std::time::SystemTime::now()
47                .duration_since(std::time::UNIX_EPOCH)
48                .unwrap_or_else(|e| e.duration())
49                .as_millis() as u64,
50        }
51    }
52
53    /// Decrement refcount when a local edge pointing to this ghost is deleted.
54    /// Returns true if refcount reached zero (ghost can be purged).
55    pub fn decrement_ref(&mut self) -> bool {
56        self.refcount = self.refcount.saturating_sub(1);
57        self.refcount == 0
58    }
59
60    /// Increment refcount when a local edge pointing to this ghost is added.
61    pub fn increment_ref(&mut self) {
62        self.refcount = self.refcount.saturating_add(1);
63    }
64}
65
66/// Anti-entropy sweeper result for a single ghost.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum SweepVerdict {
69    /// Ghost is still needed (edges exist or target confirms node).
70    Keep,
71    /// Ghost can be purged (no local edges AND target doesn't have the node).
72    Purge,
73    /// Couldn't verify (target unreachable). Keep for now, retry later.
74    Inconclusive,
75}
76
77/// Ghost table for a single vShard.
78///
79/// Tracks ghost stubs left behind after node migration.
80/// The anti-entropy sweeper runs periodically to purge stale ghosts.
81#[derive(Debug, Clone)]
82pub struct GhostTable {
83    /// node_id → GhostStub.
84    stubs: HashMap<String, GhostStub>,
85    /// Cumulative count of ghosts purged (for metrics).
86    purge_count: u64,
87    /// Last sweep timestamp.
88    last_sweep_ms: u64,
89}
90
91impl GhostTable {
92    pub fn new() -> Self {
93        Self {
94            stubs: HashMap::new(),
95            purge_count: 0,
96            last_sweep_ms: 0,
97        }
98    }
99
100    /// Insert a ghost stub after a node is migrated away.
101    pub fn insert(&mut self, stub: GhostStub) {
102        debug!(
103            node = %stub.node_id,
104            target_shard = stub.target_shard,
105            refcount = stub.refcount,
106            "inserted ghost stub"
107        );
108        self.stubs.insert(stub.node_id.clone(), stub);
109    }
110
111    /// Look up a ghost stub by node ID.
112    pub fn get(&self, node_id: &str) -> Option<&GhostStub> {
113        self.stubs.get(node_id)
114    }
115
116    /// Decrement refcount for a ghost (when a local edge is deleted).
117    /// Returns true if the ghost was purged (refcount reached zero).
118    pub fn decrement_ref(&mut self, node_id: &str) -> bool {
119        if let Some(stub) = self.stubs.get_mut(node_id)
120            && stub.decrement_ref()
121        {
122            self.stubs.remove(node_id);
123            self.purge_count += 1;
124            return true;
125        }
126        false
127    }
128
129    /// Increment refcount for a ghost (when a local edge to a ghost node is added).
130    pub fn increment_ref(&mut self, node_id: &str) {
131        if let Some(stub) = self.stubs.get_mut(node_id) {
132            stub.increment_ref();
133        }
134    }
135
136    /// Run anti-entropy sweep.
137    ///
138    /// For each ghost stub, the caller must verify against the target shard:
139    /// 1. Does the target shard acknowledge the node exists?
140    /// 2. Do any local edges still reference this ghost?
141    ///
142    /// `verify_fn` takes (node_id, target_shard) and returns SweepVerdict.
143    /// The sweeper runs at lowest I/O priority and is rate-limited.
144    pub fn sweep<F>(&mut self, verify_fn: F) -> SweepReport
145    where
146        F: Fn(&str, u32) -> SweepVerdict,
147    {
148        let now_ms = std::time::SystemTime::now()
149            .duration_since(std::time::UNIX_EPOCH)
150            .unwrap_or_default()
151            .as_millis() as u64;
152        self.last_sweep_ms = now_ms;
153
154        let mut report = SweepReport::default();
155        let mut to_purge = Vec::new();
156
157        for (node_id, stub) in &self.stubs {
158            report.checked += 1;
159
160            // Fast path: if refcount is already 0, purge without remote check.
161            if stub.refcount == 0 {
162                to_purge.push(node_id.clone());
163                report.purged += 1;
164                continue;
165            }
166
167            match verify_fn(node_id, stub.target_shard) {
168                SweepVerdict::Purge => {
169                    to_purge.push(node_id.clone());
170                    report.purged += 1;
171                }
172                SweepVerdict::Keep => {
173                    report.kept += 1;
174                }
175                SweepVerdict::Inconclusive => {
176                    report.inconclusive += 1;
177                }
178            }
179        }
180
181        for node_id in to_purge {
182            self.stubs.remove(&node_id);
183            self.purge_count += 1;
184        }
185
186        if report.purged > 0 {
187            info!(
188                purged = report.purged,
189                kept = report.kept,
190                inconclusive = report.inconclusive,
191                total_ghosts = self.stubs.len(),
192                "anti-entropy sweep complete"
193            );
194        }
195
196        report
197    }
198
199    pub fn len(&self) -> usize {
200        self.stubs.len()
201    }
202
203    pub fn is_empty(&self) -> bool {
204        self.stubs.is_empty()
205    }
206
207    pub fn total_purged(&self) -> u64 {
208        self.purge_count
209    }
210
211    pub fn last_sweep_ms(&self) -> u64 {
212        self.last_sweep_ms
213    }
214
215    /// All ghost stubs (for metrics/debugging).
216    pub fn stubs(&self) -> impl Iterator<Item = &GhostStub> {
217        self.stubs.values()
218    }
219
220    /// Resolve a node lookup: if the node is a ghost, return the target shard
221    /// for scatter-gather. Otherwise return None.
222    pub fn resolve(&self, node_id: &str) -> Option<u32> {
223        self.stubs.get(node_id).map(|s| s.target_shard)
224    }
225}
226
227impl GhostTable {
228    /// Serialize all ghost stubs for persistence.
229    ///
230    /// Returns MessagePack bytes that can be stored in the cluster catalog.
231    pub fn to_bytes(&self) -> Vec<u8> {
232        let persisted: HashMap<String, PersistedGhostStub> = self
233            .stubs
234            .iter()
235            .map(|(k, v)| {
236                (
237                    k.clone(),
238                    PersistedGhostStub {
239                        target_shard: v.target_shard,
240                        refcount: v.refcount,
241                        created_at_ms: v.created_at_ms,
242                    },
243                )
244            })
245            .collect();
246        match zerompk::to_msgpack_vec(&persisted) {
247            Ok(bytes) => bytes,
248            Err(e) => {
249                tracing::error!(error = %e, "ghost table serialization failed — state will not persist");
250                Vec::new()
251            }
252        }
253    }
254
255    /// Restore ghost stubs from persisted bytes.
256    ///
257    /// Called on startup to recover ghost state from the cluster catalog.
258    pub fn from_bytes(data: &[u8]) -> Option<Self> {
259        let persisted: HashMap<String, PersistedGhostStub> = zerompk::from_msgpack(data).ok()?;
260        let stubs: HashMap<String, GhostStub> = persisted
261            .into_iter()
262            .map(|(k, v)| {
263                (
264                    k.clone(),
265                    GhostStub {
266                        node_id: k,
267                        target_shard: v.target_shard,
268                        refcount: v.refcount,
269                        created_at_ms: v.created_at_ms,
270                    },
271                )
272            })
273            .collect();
274        Some(Self {
275            stubs,
276            purge_count: 0,
277            last_sweep_ms: 0,
278        })
279    }
280}
281
282impl Default for GhostTable {
283    fn default() -> Self {
284        Self::new()
285    }
286}
287
288/// Report from an anti-entropy sweep.
289#[derive(Debug, Default, Clone)]
290pub struct SweepReport {
291    pub checked: usize,
292    pub purged: usize,
293    pub kept: usize,
294    pub inconclusive: usize,
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn ghost_lifecycle() {
303        let mut table = GhostTable::new();
304        let stub = GhostStub::new("node-42".into(), 10, 3);
305        table.insert(stub);
306        assert_eq!(table.len(), 1);
307
308        // Resolve returns target shard.
309        assert_eq!(table.resolve("node-42"), Some(10));
310        assert_eq!(table.resolve("nonexistent"), None);
311
312        // Decrement refcount 3 times.
313        assert!(!table.decrement_ref("node-42"));
314        assert!(!table.decrement_ref("node-42"));
315        assert!(table.decrement_ref("node-42")); // refcount 0 → purged.
316        assert!(table.is_empty());
317        assert_eq!(table.total_purged(), 1);
318    }
319
320    #[test]
321    fn ghost_increment_ref() {
322        let mut table = GhostTable::new();
323        table.insert(GhostStub::new("n1".into(), 5, 1));
324        table.increment_ref("n1");
325        // Now refcount is 2, needs 2 decrements to purge.
326        assert!(!table.decrement_ref("n1"));
327        assert!(table.decrement_ref("n1"));
328    }
329
330    #[test]
331    fn sweep_purges_stale_ghosts() {
332        let mut table = GhostTable::new();
333        table.insert(GhostStub::new("stale".into(), 5, 1));
334        table.insert(GhostStub::new("alive".into(), 6, 2));
335        table.insert(GhostStub::new("unreachable".into(), 7, 1));
336
337        let report = table.sweep(|node_id, _target| match node_id {
338            "stale" => SweepVerdict::Purge,
339            "alive" => SweepVerdict::Keep,
340            "unreachable" => SweepVerdict::Inconclusive,
341            _ => SweepVerdict::Keep,
342        });
343
344        assert_eq!(report.checked, 3);
345        assert_eq!(report.purged, 1);
346        assert_eq!(report.kept, 1);
347        assert_eq!(report.inconclusive, 1);
348        assert_eq!(table.len(), 2); // alive + unreachable remain
349    }
350
351    #[test]
352    fn sweep_purges_zero_refcount_without_remote() {
353        let mut table = GhostTable::new();
354        let mut stub = GhostStub::new("zero-ref".into(), 5, 1);
355        stub.refcount = 0; // Already zero.
356        table.insert(stub);
357
358        // verify_fn should not be called for zero-refcount ghosts,
359        // but we make it return Keep to prove it's bypassed.
360        let report = table.sweep(|_, _| SweepVerdict::Keep);
361        assert_eq!(report.purged, 1);
362        assert!(table.is_empty());
363    }
364
365    #[test]
366    fn resolve_for_scatter_gather() {
367        let mut table = GhostTable::new();
368        table.insert(GhostStub::new("migrated-node".into(), 42, 5));
369
370        // During graph traversal, encountering "migrated-node" resolves to shard 42.
371        let target = table.resolve("migrated-node");
372        assert_eq!(target, Some(42));
373    }
374}