Skip to main content

nodedb_cluster/
ghost.rs

1use std::collections::HashMap;
2use tracing::{debug, info};
3
4/// Serializable ghost stub for persistence.
5#[derive(
6    Debug,
7    Clone,
8    serde::Serialize,
9    serde::Deserialize,
10    zerompk::ToMessagePack,
11    zerompk::FromMessagePack,
12)]
13struct PersistedGhostStub {
14    target_shard: u16,
15    refcount: u32,
16    created_at_ms: u64,
17}
18
19/// Ghost edge stub.
20///
21/// When vShard rebalancing moves a node to a new shard, a ghost stub
22/// `(node_id, target_shard_id, refcount)` remains on the source.
23/// During traversal, ghost stubs trigger transparent scatter-gather
24/// to the target shard. Ghosts are garbage-collected when `refcount`
25/// (number of inbound edges from this shard) reaches zero.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct GhostStub {
28    /// The node ID that was moved.
29    pub node_id: String,
30    /// The target vShard where the node now lives.
31    pub target_shard: u16,
32    /// Number of local edges that still reference this ghost.
33    pub refcount: u32,
34    /// When this ghost was created.
35    pub created_at_ms: u64,
36}
37
38impl GhostStub {
39    pub fn new(node_id: String, target_shard: u16, initial_refcount: u32) -> Self {
40        Self {
41            node_id,
42            target_shard,
43            refcount: initial_refcount,
44            created_at_ms: std::time::SystemTime::now()
45                .duration_since(std::time::UNIX_EPOCH)
46                .unwrap_or_else(|e| e.duration())
47                .as_millis() as u64,
48        }
49    }
50
51    /// Decrement refcount when a local edge pointing to this ghost is deleted.
52    /// Returns true if refcount reached zero (ghost can be purged).
53    pub fn decrement_ref(&mut self) -> bool {
54        self.refcount = self.refcount.saturating_sub(1);
55        self.refcount == 0
56    }
57
58    /// Increment refcount when a local edge pointing to this ghost is added.
59    pub fn increment_ref(&mut self) {
60        self.refcount = self.refcount.saturating_add(1);
61    }
62}
63
64/// Anti-entropy sweeper result for a single ghost.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub enum SweepVerdict {
67    /// Ghost is still needed (edges exist or target confirms node).
68    Keep,
69    /// Ghost can be purged (no local edges AND target doesn't have the node).
70    Purge,
71    /// Couldn't verify (target unreachable). Keep for now, retry later.
72    Inconclusive,
73}
74
75/// Ghost table for a single vShard.
76///
77/// Tracks ghost stubs left behind after node migration.
78/// The anti-entropy sweeper runs periodically to purge stale ghosts.
79#[derive(Debug, Clone)]
80pub struct GhostTable {
81    /// node_id → GhostStub.
82    stubs: HashMap<String, GhostStub>,
83    /// Cumulative count of ghosts purged (for metrics).
84    purge_count: u64,
85    /// Last sweep timestamp.
86    last_sweep_ms: u64,
87}
88
89impl GhostTable {
90    pub fn new() -> Self {
91        Self {
92            stubs: HashMap::new(),
93            purge_count: 0,
94            last_sweep_ms: 0,
95        }
96    }
97
98    /// Insert a ghost stub after a node is migrated away.
99    pub fn insert(&mut self, stub: GhostStub) {
100        debug!(
101            node = %stub.node_id,
102            target_shard = stub.target_shard,
103            refcount = stub.refcount,
104            "inserted ghost stub"
105        );
106        self.stubs.insert(stub.node_id.clone(), stub);
107    }
108
109    /// Look up a ghost stub by node ID.
110    pub fn get(&self, node_id: &str) -> Option<&GhostStub> {
111        self.stubs.get(node_id)
112    }
113
114    /// Decrement refcount for a ghost (when a local edge is deleted).
115    /// Returns true if the ghost was purged (refcount reached zero).
116    pub fn decrement_ref(&mut self, node_id: &str) -> bool {
117        if let Some(stub) = self.stubs.get_mut(node_id)
118            && stub.decrement_ref()
119        {
120            self.stubs.remove(node_id);
121            self.purge_count += 1;
122            return true;
123        }
124        false
125    }
126
127    /// Increment refcount for a ghost (when a local edge to a ghost node is added).
128    pub fn increment_ref(&mut self, node_id: &str) {
129        if let Some(stub) = self.stubs.get_mut(node_id) {
130            stub.increment_ref();
131        }
132    }
133
134    /// Run anti-entropy sweep.
135    ///
136    /// For each ghost stub, the caller must verify against the target shard:
137    /// 1. Does the target shard acknowledge the node exists?
138    /// 2. Do any local edges still reference this ghost?
139    ///
140    /// `verify_fn` takes (node_id, target_shard) and returns SweepVerdict.
141    /// The sweeper runs at lowest I/O priority and is rate-limited.
142    pub fn sweep<F>(&mut self, verify_fn: F) -> SweepReport
143    where
144        F: Fn(&str, u16) -> SweepVerdict,
145    {
146        let now_ms = std::time::SystemTime::now()
147            .duration_since(std::time::UNIX_EPOCH)
148            .unwrap_or_default()
149            .as_millis() as u64;
150        self.last_sweep_ms = now_ms;
151
152        let mut report = SweepReport::default();
153        let mut to_purge = Vec::new();
154
155        for (node_id, stub) in &self.stubs {
156            report.checked += 1;
157
158            // Fast path: if refcount is already 0, purge without remote check.
159            if stub.refcount == 0 {
160                to_purge.push(node_id.clone());
161                report.purged += 1;
162                continue;
163            }
164
165            match verify_fn(node_id, stub.target_shard) {
166                SweepVerdict::Purge => {
167                    to_purge.push(node_id.clone());
168                    report.purged += 1;
169                }
170                SweepVerdict::Keep => {
171                    report.kept += 1;
172                }
173                SweepVerdict::Inconclusive => {
174                    report.inconclusive += 1;
175                }
176            }
177        }
178
179        for node_id in to_purge {
180            self.stubs.remove(&node_id);
181            self.purge_count += 1;
182        }
183
184        if report.purged > 0 {
185            info!(
186                purged = report.purged,
187                kept = report.kept,
188                inconclusive = report.inconclusive,
189                total_ghosts = self.stubs.len(),
190                "anti-entropy sweep complete"
191            );
192        }
193
194        report
195    }
196
197    pub fn len(&self) -> usize {
198        self.stubs.len()
199    }
200
201    pub fn is_empty(&self) -> bool {
202        self.stubs.is_empty()
203    }
204
205    pub fn total_purged(&self) -> u64 {
206        self.purge_count
207    }
208
209    pub fn last_sweep_ms(&self) -> u64 {
210        self.last_sweep_ms
211    }
212
213    /// All ghost stubs (for metrics/debugging).
214    pub fn stubs(&self) -> impl Iterator<Item = &GhostStub> {
215        self.stubs.values()
216    }
217
218    /// Resolve a node lookup: if the node is a ghost, return the target shard
219    /// for scatter-gather. Otherwise return None.
220    pub fn resolve(&self, node_id: &str) -> Option<u16> {
221        self.stubs.get(node_id).map(|s| s.target_shard)
222    }
223}
224
225impl GhostTable {
226    /// Serialize all ghost stubs for persistence.
227    ///
228    /// Returns MessagePack bytes that can be stored in the cluster catalog.
229    pub fn to_bytes(&self) -> Vec<u8> {
230        let persisted: HashMap<String, PersistedGhostStub> = self
231            .stubs
232            .iter()
233            .map(|(k, v)| {
234                (
235                    k.clone(),
236                    PersistedGhostStub {
237                        target_shard: v.target_shard,
238                        refcount: v.refcount,
239                        created_at_ms: v.created_at_ms,
240                    },
241                )
242            })
243            .collect();
244        match zerompk::to_msgpack_vec(&persisted) {
245            Ok(bytes) => bytes,
246            Err(e) => {
247                tracing::error!(error = %e, "ghost table serialization failed — state will not persist");
248                Vec::new()
249            }
250        }
251    }
252
253    /// Restore ghost stubs from persisted bytes.
254    ///
255    /// Called on startup to recover ghost state from the cluster catalog.
256    pub fn from_bytes(data: &[u8]) -> Option<Self> {
257        let persisted: HashMap<String, PersistedGhostStub> = zerompk::from_msgpack(data).ok()?;
258        let stubs: HashMap<String, GhostStub> = persisted
259            .into_iter()
260            .map(|(k, v)| {
261                (
262                    k.clone(),
263                    GhostStub {
264                        node_id: k,
265                        target_shard: v.target_shard,
266                        refcount: v.refcount,
267                        created_at_ms: v.created_at_ms,
268                    },
269                )
270            })
271            .collect();
272        Some(Self {
273            stubs,
274            purge_count: 0,
275            last_sweep_ms: 0,
276        })
277    }
278}
279
280impl Default for GhostTable {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286/// Report from an anti-entropy sweep.
287#[derive(Debug, Default, Clone)]
288pub struct SweepReport {
289    pub checked: usize,
290    pub purged: usize,
291    pub kept: usize,
292    pub inconclusive: usize,
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn ghost_lifecycle() {
301        let mut table = GhostTable::new();
302        let stub = GhostStub::new("node-42".into(), 10, 3);
303        table.insert(stub);
304        assert_eq!(table.len(), 1);
305
306        // Resolve returns target shard.
307        assert_eq!(table.resolve("node-42"), Some(10));
308        assert_eq!(table.resolve("nonexistent"), None);
309
310        // Decrement refcount 3 times.
311        assert!(!table.decrement_ref("node-42"));
312        assert!(!table.decrement_ref("node-42"));
313        assert!(table.decrement_ref("node-42")); // refcount 0 → purged.
314        assert!(table.is_empty());
315        assert_eq!(table.total_purged(), 1);
316    }
317
318    #[test]
319    fn ghost_increment_ref() {
320        let mut table = GhostTable::new();
321        table.insert(GhostStub::new("n1".into(), 5, 1));
322        table.increment_ref("n1");
323        // Now refcount is 2, needs 2 decrements to purge.
324        assert!(!table.decrement_ref("n1"));
325        assert!(table.decrement_ref("n1"));
326    }
327
328    #[test]
329    fn sweep_purges_stale_ghosts() {
330        let mut table = GhostTable::new();
331        table.insert(GhostStub::new("stale".into(), 5, 1));
332        table.insert(GhostStub::new("alive".into(), 6, 2));
333        table.insert(GhostStub::new("unreachable".into(), 7, 1));
334
335        let report = table.sweep(|node_id, _target| match node_id {
336            "stale" => SweepVerdict::Purge,
337            "alive" => SweepVerdict::Keep,
338            "unreachable" => SweepVerdict::Inconclusive,
339            _ => SweepVerdict::Keep,
340        });
341
342        assert_eq!(report.checked, 3);
343        assert_eq!(report.purged, 1);
344        assert_eq!(report.kept, 1);
345        assert_eq!(report.inconclusive, 1);
346        assert_eq!(table.len(), 2); // alive + unreachable remain
347    }
348
349    #[test]
350    fn sweep_purges_zero_refcount_without_remote() {
351        let mut table = GhostTable::new();
352        let mut stub = GhostStub::new("zero-ref".into(), 5, 1);
353        stub.refcount = 0; // Already zero.
354        table.insert(stub);
355
356        // verify_fn should not be called for zero-refcount ghosts,
357        // but we make it return Keep to prove it's bypassed.
358        let report = table.sweep(|_, _| SweepVerdict::Keep);
359        assert_eq!(report.purged, 1);
360        assert!(table.is_empty());
361    }
362
363    #[test]
364    fn resolve_for_scatter_gather() {
365        let mut table = GhostTable::new();
366        table.insert(GhostStub::new("migrated-node".into(), 42, 5));
367
368        // During graph traversal, encountering "migrated-node" resolves to shard 42.
369        let target = table.resolve("migrated-node");
370        assert_eq!(target, Some(42));
371    }
372}