nodedb_cluster/catalog/
core.rs1use std::path::Path;
11
12use tracing::info;
13
14use crate::error::Result;
15
16use super::migration::migrate_if_needed;
17use super::schema::{
18 CATALOG_FORMAT_VERSION, GHOST_TABLE, KEY_CA_CERT, KEY_CLUSTER_EPOCH, KEY_CLUSTER_ID,
19 KEY_FORMAT_VERSION, METADATA_TABLE, MIGRATION_STATE_TABLE, ROUTING_TABLE, TOPOLOGY_TABLE,
20 catalog_err,
21};
22
23pub struct ClusterCatalog {
25 pub(super) db: redb::Database,
26}
27
28impl ClusterCatalog {
29 pub fn open(path: &Path) -> Result<Self> {
39 let db =
40 redb::Database::create(path).map_err(|e| crate::error::ClusterError::Transport {
41 detail: format!("open cluster catalog {}: {e}", path.display()),
42 })?;
43
44 let txn = db.begin_write().map_err(catalog_err)?;
48 {
49 let _ = txn.open_table(TOPOLOGY_TABLE).map_err(catalog_err)?;
50 let _ = txn.open_table(ROUTING_TABLE).map_err(catalog_err)?;
51 let _ = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
52 let _ = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
53 let _ = txn.open_table(MIGRATION_STATE_TABLE).map_err(catalog_err)?;
54 }
55 txn.commit().map_err(catalog_err)?;
56
57 migrate_if_needed(&db)?;
61
62 info!(
63 path = %path.display(),
64 format_version = CATALOG_FORMAT_VERSION,
65 "cluster catalog opened"
66 );
67
68 Ok(Self { db })
69 }
70
71 pub fn save_cluster_id(&self, cluster_id: u64) -> Result<()> {
75 let bytes = cluster_id.to_le_bytes();
76 let txn = self.db.begin_write().map_err(catalog_err)?;
77 {
78 let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
79 table
80 .insert(KEY_CLUSTER_ID, bytes.as_slice())
81 .map_err(catalog_err)?;
82 }
83 txn.commit().map_err(catalog_err)?;
84 Ok(())
85 }
86
87 pub fn load_cluster_id(&self) -> Result<Option<u64>> {
89 let txn = self.db.begin_read().map_err(catalog_err)?;
90 let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
91
92 match table.get(KEY_CLUSTER_ID).map_err(catalog_err)? {
93 Some(guard) => {
94 let bytes = guard.value();
95 if bytes.len() == 8 {
96 let mut arr = [0u8; 8];
97 arr.copy_from_slice(bytes);
98 let id = u64::from_le_bytes(arr);
99 Ok(Some(id))
100 } else {
101 Ok(None)
102 }
103 }
104 None => Ok(None),
105 }
106 }
107
108 pub fn is_bootstrapped(&self) -> Result<bool> {
110 self.load_cluster_id().map(|id| id.is_some())
111 }
112
113 pub fn save_cluster_epoch(&self, epoch: u64) -> Result<()> {
116 let bytes = epoch.to_le_bytes();
117 let txn = self.db.begin_write().map_err(catalog_err)?;
118 {
119 let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
120 table
121 .insert(KEY_CLUSTER_EPOCH, bytes.as_slice())
122 .map_err(catalog_err)?;
123 }
124 txn.commit().map_err(catalog_err)?;
125 Ok(())
126 }
127
128 pub fn load_cluster_epoch(&self) -> Result<Option<u64>> {
131 let txn = self.db.begin_read().map_err(catalog_err)?;
132 let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
133 match table.get(KEY_CLUSTER_EPOCH).map_err(catalog_err)? {
134 Some(guard) => {
135 let bytes = guard.value();
136 if bytes.len() == 8 {
137 let mut arr = [0u8; 8];
138 arr.copy_from_slice(bytes);
139 Ok(Some(u64::from_le_bytes(arr)))
140 } else {
141 Ok(None)
142 }
143 }
144 None => Ok(None),
145 }
146 }
147
148 pub fn save_ca_cert(&self, ca_cert_der: &[u8]) -> Result<()> {
152 let txn = self.db.begin_write().map_err(catalog_err)?;
153 {
154 let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
155 table
156 .insert(KEY_CA_CERT, ca_cert_der)
157 .map_err(catalog_err)?;
158 }
159 txn.commit().map_err(catalog_err)?;
160 Ok(())
161 }
162
163 pub fn load_ca_cert(&self) -> Result<Option<Vec<u8>>> {
165 let txn = self.db.begin_read().map_err(catalog_err)?;
166 let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
167 match table.get(KEY_CA_CERT).map_err(catalog_err)? {
168 Some(guard) => Ok(Some(guard.value().to_vec())),
169 None => Ok(None),
170 }
171 }
172}
173
174pub(super) fn read_format_version(db: &redb::Database) -> Result<Option<u32>> {
179 let txn = db.begin_read().map_err(catalog_err)?;
180 let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
181 match table.get(KEY_FORMAT_VERSION).map_err(catalog_err)? {
182 Some(guard) => {
183 let bytes = guard.value();
184 if bytes.len() != 4 {
185 return Ok(None);
186 }
187 let mut arr = [0u8; 4];
188 arr.copy_from_slice(bytes);
189 Ok(Some(u32::from_le_bytes(arr)))
190 }
191 None => Ok(None),
192 }
193}
194
195pub(super) fn write_format_version(db: &redb::Database, version: u32) -> Result<()> {
201 let bytes = version.to_le_bytes();
202 let txn = db.begin_write().map_err(catalog_err)?;
203 {
204 let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
205 table
206 .insert(KEY_FORMAT_VERSION, bytes.as_slice())
207 .map_err(catalog_err)?;
208 }
209 txn.commit().map_err(catalog_err)?;
210 Ok(())
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
218 let dir = tempfile::tempdir().unwrap();
219 let path = dir.path().join("cluster.redb");
220 let catalog = ClusterCatalog::open(&path).unwrap();
221 (dir, catalog)
222 }
223
224 #[test]
225 fn cluster_id_persistence() {
226 let (_dir, catalog) = temp_catalog();
227
228 assert!(!catalog.is_bootstrapped().unwrap());
229 assert_eq!(catalog.load_cluster_id().unwrap(), None);
230
231 catalog.save_cluster_id(42).unwrap();
232 assert!(catalog.is_bootstrapped().unwrap());
233 assert_eq!(catalog.load_cluster_id().unwrap(), Some(42));
234 }
235
236 #[test]
237 fn fresh_catalog_is_stamped_with_current_format_version() {
238 let dir = tempfile::tempdir().unwrap();
239 let path = dir.path().join("cluster.redb");
240 {
241 let _ = ClusterCatalog::open(&path).unwrap();
242 }
243 let db = redb::Database::create(&path).unwrap();
246 let version = read_format_version(&db).unwrap();
247 assert_eq!(version, Some(CATALOG_FORMAT_VERSION));
248 }
249
250 #[test]
251 fn reopening_current_catalog_is_idempotent() {
252 let dir = tempfile::tempdir().unwrap();
253 let path = dir.path().join("cluster.redb");
254 let _ = ClusterCatalog::open(&path).unwrap();
255 let _ = ClusterCatalog::open(&path).unwrap();
258 let db = redb::Database::create(&path).unwrap();
259 assert_eq!(
260 read_format_version(&db).unwrap(),
261 Some(CATALOG_FORMAT_VERSION)
262 );
263 }
264
265 #[test]
266 fn future_format_version_is_refused() {
267 let dir = tempfile::tempdir().unwrap();
268 let path = dir.path().join("cluster.redb");
269 {
272 let _ = ClusterCatalog::open(&path).unwrap();
273 }
274 {
275 let db = redb::Database::create(&path).unwrap();
276 write_format_version(&db, CATALOG_FORMAT_VERSION + 1).unwrap();
277 }
278 match ClusterCatalog::open(&path) {
282 Ok(_) => panic!("expected downgrade refusal, got Ok"),
283 Err(e) => {
284 let msg = e.to_string();
285 assert!(
286 msg.contains("newer than supported"),
287 "expected a clear downgrade refusal, got: {msg}"
288 );
289 }
290 }
291 }
292}