Skip to main content

nodedb_cluster/catalog/
cluster_settings.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cluster-wide settings persisted in the catalog.
4//!
5//! [`ClusterSettings`] is written once at bootstrap (from the operator's
6//! [`ClusterConfig`]) and read back on every open. Changing these values
7//! after the first bootstrap requires a coordinated cluster-wide migration;
8//! the format version guard in `migration.rs` gates upgrades.
9
10use crate::catalog::core::ClusterCatalog;
11use crate::catalog::schema::{METADATA_TABLE, catalog_err};
12use crate::error::Result;
13
14/// Key under which the zerompk-encoded `ClusterSettings` blob is stored
15/// in the metadata table.
16pub(super) const KEY_CLUSTER_SETTINGS: &str = "cluster_settings";
17
18/// Which hash algorithm is used to partition keys across vShards.
19///
20/// The value is persisted as a `u8` discriminant — the numeric
21/// representation must remain stable across software versions.
22#[derive(
23    Debug,
24    Clone,
25    Copy,
26    PartialEq,
27    Eq,
28    serde::Serialize,
29    serde::Deserialize,
30    zerompk::ToMessagePack,
31    zerompk::FromMessagePack,
32)]
33#[repr(u8)]
34pub enum PlacementHashId {
35    /// FNV-1a 64-bit — the cluster default prior to launch.
36    Fnv1a = 0,
37    /// xxHash3 64-bit — higher throughput at large key sizes.
38    XxHash3 = 1,
39}
40
41/// Cluster-wide settings that are fixed at bootstrap time.
42#[derive(
43    Debug,
44    Clone,
45    PartialEq,
46    Eq,
47    serde::Serialize,
48    serde::Deserialize,
49    zerompk::ToMessagePack,
50    zerompk::FromMessagePack,
51)]
52pub struct ClusterSettings {
53    /// Hash algorithm used to map row keys to vShards.
54    pub placement_hash_id: PlacementHashId,
55    /// Number of virtual shards in this cluster. Must match the
56    /// `VSHARD_COUNT` constant used when the routing table was built.
57    pub vshard_count: u32,
58    /// Number of replicas per Raft group.
59    pub replication_factor: u32,
60    /// Minimum wire-protocol version that peers must speak.
61    pub min_wire_version: u16,
62}
63
64impl Default for ClusterSettings {
65    fn default() -> Self {
66        Self {
67            placement_hash_id: PlacementHashId::Fnv1a,
68            vshard_count: crate::routing::VSHARD_COUNT,
69            replication_factor: 1,
70            min_wire_version: 1,
71        }
72    }
73}
74
75impl ClusterSettings {
76    /// Construct settings from a [`ClusterConfig`].
77    pub fn from_config(config: &crate::bootstrap::ClusterConfig) -> Self {
78        Self {
79            placement_hash_id: PlacementHashId::Fnv1a,
80            vshard_count: crate::routing::VSHARD_COUNT,
81            replication_factor: config.replication_factor as u32,
82            min_wire_version: 1,
83        }
84    }
85}
86
87/// Dispatch a key through the configured placement hash, returning a
88/// deterministic 64-bit value. The caller is responsible for reducing
89/// the output into a vShard ID (e.g. `% vshard_count`).
90pub fn placement_hash(id: PlacementHashId, key: &[u8]) -> u64 {
91    match id {
92        PlacementHashId::Fnv1a => {
93            // FNV-1a 64-bit (offset basis 0xcbf29ce484222325, prime 0x100000001b3).
94            let mut hash: u64 = 0xcbf29ce484222325;
95            for byte in key {
96                hash ^= *byte as u64;
97                hash = hash.wrapping_mul(0x100000001b3);
98            }
99            hash
100        }
101        PlacementHashId::XxHash3 => xxhash_rust::xxh3::xxh3_64(key),
102    }
103}
104
105// ── ClusterCatalog methods ───────────────────────────────────────────────────
106
107impl ClusterCatalog {
108    /// Persist the cluster settings blob. Called once at bootstrap.
109    pub fn save_cluster_settings(&self, settings: &ClusterSettings) -> Result<()> {
110        let bytes =
111            zerompk::to_msgpack_vec(settings).map_err(|e| crate::error::ClusterError::Codec {
112                detail: format!("serialize ClusterSettings: {e}"),
113            })?;
114        let txn = self.db.begin_write().map_err(catalog_err)?;
115        {
116            let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
117            table
118                .insert(KEY_CLUSTER_SETTINGS, bytes.as_slice())
119                .map_err(catalog_err)?;
120        }
121        txn.commit().map_err(catalog_err)?;
122        Ok(())
123    }
124
125    /// Load the cluster settings. Returns `None` if not yet bootstrapped.
126    pub fn load_cluster_settings(&self) -> Result<Option<ClusterSettings>> {
127        let txn = self.db.begin_read().map_err(catalog_err)?;
128        let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
129        match table.get(KEY_CLUSTER_SETTINGS).map_err(catalog_err)? {
130            Some(guard) => {
131                let settings = zerompk::from_msgpack(guard.value()).map_err(|e| {
132                    crate::error::ClusterError::Codec {
133                        detail: format!("deserialize ClusterSettings: {e}"),
134                    }
135                })?;
136                Ok(Some(settings))
137            }
138            None => Ok(None),
139        }
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::catalog::ClusterCatalog;
147
148    fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
149        let dir = tempfile::tempdir().unwrap();
150        let path = dir.path().join("cluster.redb");
151        let catalog = ClusterCatalog::open(&path).unwrap();
152        (dir, catalog)
153    }
154
155    // ── zerompk roundtrip ───────────────────────────────────────────
156
157    #[test]
158    fn cluster_settings_msgpack_roundtrip_fnv1a() {
159        let original = ClusterSettings {
160            placement_hash_id: PlacementHashId::Fnv1a,
161            vshard_count: 1024,
162            replication_factor: 3,
163            min_wire_version: 1,
164        };
165        let bytes = zerompk::to_msgpack_vec(&original).unwrap();
166        let decoded: ClusterSettings = zerompk::from_msgpack(&bytes).unwrap();
167        assert_eq!(original, decoded);
168    }
169
170    #[test]
171    fn cluster_settings_msgpack_roundtrip_xxhash3() {
172        let original = ClusterSettings {
173            placement_hash_id: PlacementHashId::XxHash3,
174            vshard_count: 512,
175            replication_factor: 1,
176            min_wire_version: 2,
177        };
178        let bytes = zerompk::to_msgpack_vec(&original).unwrap();
179        let decoded: ClusterSettings = zerompk::from_msgpack(&bytes).unwrap();
180        assert_eq!(original, decoded);
181    }
182
183    // ── catalog persistence ─────────────────────────────────────────
184
185    #[test]
186    fn save_and_load_cluster_settings() {
187        let (_dir, catalog) = temp_catalog();
188        assert_eq!(catalog.load_cluster_settings().unwrap(), None);
189
190        let settings = ClusterSettings {
191            placement_hash_id: PlacementHashId::Fnv1a,
192            vshard_count: 1024,
193            replication_factor: 3,
194            min_wire_version: 1,
195        };
196        catalog.save_cluster_settings(&settings).unwrap();
197
198        let loaded = catalog.load_cluster_settings().unwrap().unwrap();
199        assert_eq!(loaded, settings);
200    }
201
202    #[test]
203    fn save_cluster_settings_overwrite_roundtrip() {
204        let (_dir, catalog) = temp_catalog();
205
206        let first = ClusterSettings {
207            placement_hash_id: PlacementHashId::Fnv1a,
208            vshard_count: 1024,
209            replication_factor: 1,
210            min_wire_version: 1,
211        };
212        catalog.save_cluster_settings(&first).unwrap();
213
214        let updated = ClusterSettings {
215            placement_hash_id: PlacementHashId::XxHash3,
216            vshard_count: 1024,
217            replication_factor: 3,
218            min_wire_version: 2,
219        };
220        catalog.save_cluster_settings(&updated).unwrap();
221
222        let loaded = catalog.load_cluster_settings().unwrap().unwrap();
223        assert_eq!(loaded, updated);
224    }
225
226    // ── placement_hash dispatch ─────────────────────────────────────
227
228    #[test]
229    fn placement_hash_deterministic() {
230        let key = b"my-collection-key";
231        let a = placement_hash(PlacementHashId::Fnv1a, key);
232        let b = placement_hash(PlacementHashId::Fnv1a, key);
233        assert_eq!(a, b, "FNV-1a must be deterministic");
234
235        let c = placement_hash(PlacementHashId::XxHash3, key);
236        let d = placement_hash(PlacementHashId::XxHash3, key);
237        assert_eq!(c, d, "XxHash3 must be deterministic");
238    }
239
240    #[test]
241    fn placement_hash_different_ids_produce_different_values() {
242        let key = b"my-collection-key";
243        let fnv = placement_hash(PlacementHashId::Fnv1a, key);
244        let xx3 = placement_hash(PlacementHashId::XxHash3, key);
245        assert_ne!(fnv, xx3, "FNV-1a and XxHash3 must differ for the same key");
246    }
247}