Skip to main content

nodedb_cluster/catalog/
ghosts.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Ghost stub refcount persistence — survives restarts so ghost
4//! edges aren't counted twice after a crash.
5
6use crate::error::Result;
7use crate::ghost::GhostTable;
8
9use super::core::ClusterCatalog;
10use super::schema::{GHOST_TABLE, catalog_err};
11
12impl ClusterCatalog {
13    /// Persist ghost stubs for a vShard.
14    ///
15    /// Called after each sweep or after ghost table mutations to ensure
16    /// refcounts survive crash/restart.
17    pub fn save_ghosts(&self, vshard_id: u32, ghost_table: &GhostTable) -> Result<()> {
18        let bytes = ghost_table.to_bytes();
19        let key = format!("ghosts:{vshard_id}");
20
21        let txn = self.db.begin_write().map_err(catalog_err)?;
22        {
23            let mut table = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
24            table
25                .insert(key.as_str(), bytes.as_slice())
26                .map_err(catalog_err)?;
27        }
28        txn.commit().map_err(catalog_err)?;
29        Ok(())
30    }
31
32    /// Load ghost stubs for a vShard. Returns None if no ghosts persisted.
33    pub fn load_ghosts(&self, vshard_id: u32) -> Result<Option<GhostTable>> {
34        let key = format!("ghosts:{vshard_id}");
35
36        let txn = self.db.begin_read().map_err(catalog_err)?;
37        let table = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
38
39        match table.get(key.as_str()).map_err(catalog_err)? {
40            Some(guard) => Ok(GhostTable::from_bytes(guard.value())),
41            None => Ok(None),
42        }
43    }
44
45    /// Load all persisted ghost tables across all vShards.
46    ///
47    /// Returns `(vshard_id, GhostTable)` pairs for all vShards that have ghosts.
48    pub fn load_all_ghosts(&self) -> Result<Vec<(u32, GhostTable)>> {
49        let txn = self.db.begin_read().map_err(catalog_err)?;
50        let table = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
51
52        let mut results = Vec::new();
53        let range = table.range::<&str>(..).map_err(catalog_err)?;
54        for entry in range {
55            let (key, value) = entry.map_err(catalog_err)?;
56            let key_str = key.value();
57            if let Some(id_str) = key_str.strip_prefix("ghosts:")
58                && let Ok(vshard_id) = id_str.parse::<u32>()
59                && let Some(ghost_table) = GhostTable::from_bytes(value.value())
60            {
61                results.push((vshard_id, ghost_table));
62            }
63        }
64        Ok(results)
65    }
66
67    /// Delete persisted ghosts for a vShard (after all ghosts purged).
68    pub fn delete_ghosts(&self, vshard_id: u32) -> Result<()> {
69        let key = format!("ghosts:{vshard_id}");
70
71        let txn = self.db.begin_write().map_err(catalog_err)?;
72        {
73            let mut table = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
74            let _ = table.remove(key.as_str()).map_err(catalog_err)?;
75        }
76        txn.commit().map_err(catalog_err)?;
77        Ok(())
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use crate::ghost::GhostStub;
85
86    fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
87        let dir = tempfile::tempdir().unwrap();
88        let path = dir.path().join("cluster.redb");
89        let catalog = ClusterCatalog::open(&path).unwrap();
90        (dir, catalog)
91    }
92
93    #[test]
94    fn ghost_persistence_roundtrip() {
95        let (_dir, catalog) = temp_catalog();
96
97        let mut ghosts = GhostTable::new();
98        ghosts.insert(GhostStub::new("node-A".into(), 5, 3));
99        ghosts.insert(GhostStub::new("node-B".into(), 10, 1));
100
101        catalog.save_ghosts(42, &ghosts).unwrap();
102
103        let loaded = catalog.load_ghosts(42).unwrap().unwrap();
104        assert_eq!(loaded.len(), 2);
105        assert_eq!(loaded.resolve("node-A"), Some(5));
106        assert_eq!(loaded.resolve("node-B"), Some(10));
107        assert_eq!(loaded.get("node-A").unwrap().refcount, 3);
108    }
109
110    #[test]
111    fn ghost_load_all() {
112        let (_dir, catalog) = temp_catalog();
113
114        let mut g1 = GhostTable::new();
115        g1.insert(GhostStub::new("x".into(), 1, 1));
116        catalog.save_ghosts(10, &g1).unwrap();
117
118        let mut g2 = GhostTable::new();
119        g2.insert(GhostStub::new("y".into(), 2, 2));
120        catalog.save_ghosts(20, &g2).unwrap();
121
122        let all = catalog.load_all_ghosts().unwrap();
123        assert_eq!(all.len(), 2);
124    }
125
126    #[test]
127    fn ghost_delete() {
128        let (_dir, catalog) = temp_catalog();
129
130        let mut ghosts = GhostTable::new();
131        ghosts.insert(GhostStub::new("z".into(), 3, 1));
132        catalog.save_ghosts(99, &ghosts).unwrap();
133
134        catalog.delete_ghosts(99).unwrap();
135        assert!(catalog.load_ghosts(99).unwrap().is_none());
136    }
137}