nodedb_cluster/catalog/
ghosts.rs1use crate::error::Result;
7use crate::ghost::GhostTable;
8
9use super::core::ClusterCatalog;
10use super::schema::{GHOST_TABLE, catalog_err};
11
12impl ClusterCatalog {
13 pub fn save_ghosts(&self, vshard_id: u32, ghost_table: &GhostTable) -> Result<()> {
18 let bytes = ghost_table.to_bytes();
19 let key = format!("ghosts:{vshard_id}");
20
21 let txn = self.db.begin_write().map_err(catalog_err)?;
22 {
23 let mut table = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
24 table
25 .insert(key.as_str(), bytes.as_slice())
26 .map_err(catalog_err)?;
27 }
28 txn.commit().map_err(catalog_err)?;
29 Ok(())
30 }
31
32 pub fn load_ghosts(&self, vshard_id: u32) -> Result<Option<GhostTable>> {
34 let key = format!("ghosts:{vshard_id}");
35
36 let txn = self.db.begin_read().map_err(catalog_err)?;
37 let table = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
38
39 match table.get(key.as_str()).map_err(catalog_err)? {
40 Some(guard) => Ok(GhostTable::from_bytes(guard.value())),
41 None => Ok(None),
42 }
43 }
44
45 pub fn load_all_ghosts(&self) -> Result<Vec<(u32, GhostTable)>> {
49 let txn = self.db.begin_read().map_err(catalog_err)?;
50 let table = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
51
52 let mut results = Vec::new();
53 let range = table.range::<&str>(..).map_err(catalog_err)?;
54 for entry in range {
55 let (key, value) = entry.map_err(catalog_err)?;
56 let key_str = key.value();
57 if let Some(id_str) = key_str.strip_prefix("ghosts:")
58 && let Ok(vshard_id) = id_str.parse::<u32>()
59 && let Some(ghost_table) = GhostTable::from_bytes(value.value())
60 {
61 results.push((vshard_id, ghost_table));
62 }
63 }
64 Ok(results)
65 }
66
67 pub fn delete_ghosts(&self, vshard_id: u32) -> Result<()> {
69 let key = format!("ghosts:{vshard_id}");
70
71 let txn = self.db.begin_write().map_err(catalog_err)?;
72 {
73 let mut table = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
74 let _ = table.remove(key.as_str()).map_err(catalog_err)?;
75 }
76 txn.commit().map_err(catalog_err)?;
77 Ok(())
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use crate::ghost::GhostStub;
85
86 fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
87 let dir = tempfile::tempdir().unwrap();
88 let path = dir.path().join("cluster.redb");
89 let catalog = ClusterCatalog::open(&path).unwrap();
90 (dir, catalog)
91 }
92
93 #[test]
94 fn ghost_persistence_roundtrip() {
95 let (_dir, catalog) = temp_catalog();
96
97 let mut ghosts = GhostTable::new();
98 ghosts.insert(GhostStub::new("node-A".into(), 5, 3));
99 ghosts.insert(GhostStub::new("node-B".into(), 10, 1));
100
101 catalog.save_ghosts(42, &ghosts).unwrap();
102
103 let loaded = catalog.load_ghosts(42).unwrap().unwrap();
104 assert_eq!(loaded.len(), 2);
105 assert_eq!(loaded.resolve("node-A"), Some(5));
106 assert_eq!(loaded.resolve("node-B"), Some(10));
107 assert_eq!(loaded.get("node-A").unwrap().refcount, 3);
108 }
109
110 #[test]
111 fn ghost_load_all() {
112 let (_dir, catalog) = temp_catalog();
113
114 let mut g1 = GhostTable::new();
115 g1.insert(GhostStub::new("x".into(), 1, 1));
116 catalog.save_ghosts(10, &g1).unwrap();
117
118 let mut g2 = GhostTable::new();
119 g2.insert(GhostStub::new("y".into(), 2, 2));
120 catalog.save_ghosts(20, &g2).unwrap();
121
122 let all = catalog.load_all_ghosts().unwrap();
123 assert_eq!(all.len(), 2);
124 }
125
126 #[test]
127 fn ghost_delete() {
128 let (_dir, catalog) = temp_catalog();
129
130 let mut ghosts = GhostTable::new();
131 ghosts.insert(GhostStub::new("z".into(), 3, 1));
132 catalog.save_ghosts(99, &ghosts).unwrap();
133
134 catalog.delete_ghosts(99).unwrap();
135 assert!(catalog.load_ghosts(99).unwrap().is_none());
136 }
137}