Skip to main content

nodedb_cluster/catalog/
core.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `ClusterCatalog` struct, `open`, and metadata persistence
4//! (cluster id, CA cert, bootstrap check).
5//!
6//! `open` is the single entry point that walks the format-version
7//! key and delegates to the migration runner — no other file in
8//! this module touches that key directly.
9
10use std::path::Path;
11
12use tracing::info;
13
14use crate::error::Result;
15
16use super::migration::migrate_if_needed;
17use super::schema::{
18    CATALOG_FORMAT_VERSION, GHOST_TABLE, KEY_CA_CERT, KEY_CLUSTER_EPOCH, KEY_CLUSTER_ID,
19    KEY_FORMAT_VERSION, METADATA_TABLE, MIGRATION_STATE_TABLE, ROUTING_TABLE, TOPOLOGY_TABLE,
20    catalog_err,
21};
22
23/// Persistent cluster catalog backed by redb.
24pub struct ClusterCatalog {
25    pub(super) db: redb::Database,
26}
27
28impl ClusterCatalog {
29    /// Open or create the cluster catalog at the given path.
30    ///
31    /// Delegates to [`super::migration::migrate_if_needed`] after
32    /// the redb tables are in place. Fresh catalogs get stamped
33    /// with the current format version; catalogs stamped with a
34    /// higher version than this binary supports are refused with
35    /// a clear error (preventing silent corruption on an
36    /// accidental downgrade). Future schema changes land as
37    /// explicit migration arms in `migration.rs`.
38    pub fn open(path: &Path) -> Result<Self> {
39        let db =
40            redb::Database::create(path).map_err(|e| crate::error::ClusterError::Transport {
41                detail: format!("open cluster catalog {}: {e}", path.display()),
42            })?;
43
44        // Ensure every table exists before we try to read from any of
45        // them. redb requires tables to be created inside a write txn
46        // before they can be opened read-only.
47        let txn = db.begin_write().map_err(catalog_err)?;
48        {
49            let _ = txn.open_table(TOPOLOGY_TABLE).map_err(catalog_err)?;
50            let _ = txn.open_table(ROUTING_TABLE).map_err(catalog_err)?;
51            let _ = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
52            let _ = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
53            let _ = txn.open_table(MIGRATION_STATE_TABLE).map_err(catalog_err)?;
54        }
55        txn.commit().map_err(catalog_err)?;
56
57        // Stamp or validate the on-disk format version. Every
58        // branch is funnelled through `migrate_if_needed` so there
59        // is a single place to add future migration arms.
60        migrate_if_needed(&db)?;
61
62        info!(
63            path = %path.display(),
64            format_version = CATALOG_FORMAT_VERSION,
65            "cluster catalog opened"
66        );
67
68        Ok(Self { db })
69    }
70
71    // ── Metadata ────────────────────────────────────────────────────
72
73    /// Store the cluster ID (generated at bootstrap, immutable).
74    pub fn save_cluster_id(&self, cluster_id: u64) -> Result<()> {
75        let bytes = cluster_id.to_le_bytes();
76        let txn = self.db.begin_write().map_err(catalog_err)?;
77        {
78            let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
79            table
80                .insert(KEY_CLUSTER_ID, bytes.as_slice())
81                .map_err(catalog_err)?;
82        }
83        txn.commit().map_err(catalog_err)?;
84        Ok(())
85    }
86
87    /// Load the cluster ID. Returns None if not yet bootstrapped.
88    pub fn load_cluster_id(&self) -> Result<Option<u64>> {
89        let txn = self.db.begin_read().map_err(catalog_err)?;
90        let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
91
92        match table.get(KEY_CLUSTER_ID).map_err(catalog_err)? {
93            Some(guard) => {
94                let bytes = guard.value();
95                if bytes.len() == 8 {
96                    let mut arr = [0u8; 8];
97                    arr.copy_from_slice(bytes);
98                    let id = u64::from_le_bytes(arr);
99                    Ok(Some(id))
100                } else {
101                    Ok(None)
102                }
103            }
104            None => Ok(None),
105        }
106    }
107
108    /// Check if this catalog has been bootstrapped (has a cluster_id).
109    pub fn is_bootstrapped(&self) -> Result<bool> {
110        self.load_cluster_id().map(|id| id.is_some())
111    }
112
113    /// Persist the cluster epoch (the leader-bumped monotonic fence
114    /// token stamped on every Raft RPC). Overwrites any prior value.
115    pub fn save_cluster_epoch(&self, epoch: u64) -> Result<()> {
116        let bytes = epoch.to_le_bytes();
117        let txn = self.db.begin_write().map_err(catalog_err)?;
118        {
119            let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
120            table
121                .insert(KEY_CLUSTER_EPOCH, bytes.as_slice())
122                .map_err(catalog_err)?;
123        }
124        txn.commit().map_err(catalog_err)?;
125        Ok(())
126    }
127
128    /// Load the persisted cluster epoch. Returns `None` on a catalog
129    /// that has never written one (callers treat that as 0).
130    pub fn load_cluster_epoch(&self) -> Result<Option<u64>> {
131        let txn = self.db.begin_read().map_err(catalog_err)?;
132        let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
133        match table.get(KEY_CLUSTER_EPOCH).map_err(catalog_err)? {
134            Some(guard) => {
135                let bytes = guard.value();
136                if bytes.len() == 8 {
137                    let mut arr = [0u8; 8];
138                    arr.copy_from_slice(bytes);
139                    Ok(Some(u64::from_le_bytes(arr)))
140                } else {
141                    Ok(None)
142                }
143            }
144            None => Ok(None),
145        }
146    }
147
148    // ── TLS Certificates ────────────────────────────────────────────
149
150    /// Store the cluster CA certificate (DER-encoded).
151    pub fn save_ca_cert(&self, ca_cert_der: &[u8]) -> Result<()> {
152        let txn = self.db.begin_write().map_err(catalog_err)?;
153        {
154            let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
155            table
156                .insert(KEY_CA_CERT, ca_cert_der)
157                .map_err(catalog_err)?;
158        }
159        txn.commit().map_err(catalog_err)?;
160        Ok(())
161    }
162
163    /// Load the cluster CA certificate. Returns None if not bootstrapped.
164    pub fn load_ca_cert(&self) -> Result<Option<Vec<u8>>> {
165        let txn = self.db.begin_read().map_err(catalog_err)?;
166        let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
167        match table.get(KEY_CA_CERT).map_err(catalog_err)? {
168            Some(guard) => Ok(Some(guard.value().to_vec())),
169            None => Ok(None),
170        }
171    }
172}
173
174/// Read the current catalog format version from the metadata table,
175/// or `None` if the key hasn't been written yet.
176///
177/// Shared helper used by both `open` and the migration runner.
178pub(super) fn read_format_version(db: &redb::Database) -> Result<Option<u32>> {
179    let txn = db.begin_read().map_err(catalog_err)?;
180    let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
181    match table.get(KEY_FORMAT_VERSION).map_err(catalog_err)? {
182        Some(guard) => {
183            let bytes = guard.value();
184            if bytes.len() != 4 {
185                return Ok(None);
186            }
187            let mut arr = [0u8; 4];
188            arr.copy_from_slice(bytes);
189            Ok(Some(u32::from_le_bytes(arr)))
190        }
191        None => Ok(None),
192    }
193}
194
195/// Stamp the catalog with the given format version.
196///
197/// Called from the migration runner — both on fresh catalogs
198/// (to record the initial version) and, in the future, after a
199/// successful upgrade arm.
200pub(super) fn write_format_version(db: &redb::Database, version: u32) -> Result<()> {
201    let bytes = version.to_le_bytes();
202    let txn = db.begin_write().map_err(catalog_err)?;
203    {
204        let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
205        table
206            .insert(KEY_FORMAT_VERSION, bytes.as_slice())
207            .map_err(catalog_err)?;
208    }
209    txn.commit().map_err(catalog_err)?;
210    Ok(())
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
218        let dir = tempfile::tempdir().unwrap();
219        let path = dir.path().join("cluster.redb");
220        let catalog = ClusterCatalog::open(&path).unwrap();
221        (dir, catalog)
222    }
223
224    #[test]
225    fn cluster_id_persistence() {
226        let (_dir, catalog) = temp_catalog();
227
228        assert!(!catalog.is_bootstrapped().unwrap());
229        assert_eq!(catalog.load_cluster_id().unwrap(), None);
230
231        catalog.save_cluster_id(42).unwrap();
232        assert!(catalog.is_bootstrapped().unwrap());
233        assert_eq!(catalog.load_cluster_id().unwrap(), Some(42));
234    }
235
236    #[test]
237    fn fresh_catalog_is_stamped_with_current_format_version() {
238        let dir = tempfile::tempdir().unwrap();
239        let path = dir.path().join("cluster.redb");
240        {
241            let _ = ClusterCatalog::open(&path).unwrap();
242        }
243        // Reopen via a bare redb handle and verify the version key
244        // is present.
245        let db = redb::Database::create(&path).unwrap();
246        let version = read_format_version(&db).unwrap();
247        assert_eq!(version, Some(CATALOG_FORMAT_VERSION));
248    }
249
250    #[test]
251    fn reopening_current_catalog_is_idempotent() {
252        let dir = tempfile::tempdir().unwrap();
253        let path = dir.path().join("cluster.redb");
254        let _ = ClusterCatalog::open(&path).unwrap();
255        // Second open must succeed without triggering any migration
256        // error and without changing the stamped version.
257        let _ = ClusterCatalog::open(&path).unwrap();
258        let db = redb::Database::create(&path).unwrap();
259        assert_eq!(
260            read_format_version(&db).unwrap(),
261            Some(CATALOG_FORMAT_VERSION)
262        );
263    }
264
265    #[test]
266    fn future_format_version_is_refused() {
267        let dir = tempfile::tempdir().unwrap();
268        let path = dir.path().join("cluster.redb");
269        // Create a catalog, then manually stamp a future version to
270        // simulate a downgrade attempt.
271        {
272            let _ = ClusterCatalog::open(&path).unwrap();
273        }
274        {
275            let db = redb::Database::create(&path).unwrap();
276            write_format_version(&db, CATALOG_FORMAT_VERSION + 1).unwrap();
277        }
278        // Reopening must refuse with a clear error. Match on the
279        // `Err` variant directly so we don't require
280        // `ClusterCatalog: Debug` for `unwrap_err`.
281        match ClusterCatalog::open(&path) {
282            Ok(_) => panic!("expected downgrade refusal, got Ok"),
283            Err(e) => {
284                let msg = e.to_string();
285                assert!(
286                    msg.contains("newer than supported"),
287                    "expected a clear downgrade refusal, got: {msg}"
288                );
289            }
290        }
291    }
292}