nodedb_cluster/catalog/
cluster_settings.rs1use crate::catalog::core::ClusterCatalog;
11use crate::catalog::schema::{METADATA_TABLE, catalog_err};
12use crate::error::Result;
13
14pub(super) const KEY_CLUSTER_SETTINGS: &str = "cluster_settings";
17
18#[derive(
23 Debug,
24 Clone,
25 Copy,
26 PartialEq,
27 Eq,
28 serde::Serialize,
29 serde::Deserialize,
30 zerompk::ToMessagePack,
31 zerompk::FromMessagePack,
32)]
33#[repr(u8)]
34pub enum PlacementHashId {
35 Fnv1a = 0,
37 XxHash3 = 1,
39}
40
41#[derive(
43 Debug,
44 Clone,
45 PartialEq,
46 Eq,
47 serde::Serialize,
48 serde::Deserialize,
49 zerompk::ToMessagePack,
50 zerompk::FromMessagePack,
51)]
52pub struct ClusterSettings {
53 pub placement_hash_id: PlacementHashId,
55 pub vshard_count: u32,
58 pub replication_factor: u32,
60 pub min_wire_version: u16,
62}
63
64impl Default for ClusterSettings {
65 fn default() -> Self {
66 Self {
67 placement_hash_id: PlacementHashId::Fnv1a,
68 vshard_count: crate::routing::VSHARD_COUNT,
69 replication_factor: 1,
70 min_wire_version: 1,
71 }
72 }
73}
74
75impl ClusterSettings {
76 pub fn from_config(config: &crate::bootstrap::ClusterConfig) -> Self {
78 Self {
79 placement_hash_id: PlacementHashId::Fnv1a,
80 vshard_count: crate::routing::VSHARD_COUNT,
81 replication_factor: config.replication_factor as u32,
82 min_wire_version: 1,
83 }
84 }
85}
86
87pub fn placement_hash(id: PlacementHashId, key: &[u8]) -> u64 {
91 match id {
92 PlacementHashId::Fnv1a => {
93 let mut hash: u64 = 0xcbf29ce484222325;
95 for byte in key {
96 hash ^= *byte as u64;
97 hash = hash.wrapping_mul(0x100000001b3);
98 }
99 hash
100 }
101 PlacementHashId::XxHash3 => xxhash_rust::xxh3::xxh3_64(key),
102 }
103}
104
105impl ClusterCatalog {
108 pub fn save_cluster_settings(&self, settings: &ClusterSettings) -> Result<()> {
110 let bytes =
111 zerompk::to_msgpack_vec(settings).map_err(|e| crate::error::ClusterError::Codec {
112 detail: format!("serialize ClusterSettings: {e}"),
113 })?;
114 let txn = self.db.begin_write().map_err(catalog_err)?;
115 {
116 let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
117 table
118 .insert(KEY_CLUSTER_SETTINGS, bytes.as_slice())
119 .map_err(catalog_err)?;
120 }
121 txn.commit().map_err(catalog_err)?;
122 Ok(())
123 }
124
125 pub fn load_cluster_settings(&self) -> Result<Option<ClusterSettings>> {
127 let txn = self.db.begin_read().map_err(catalog_err)?;
128 let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
129 match table.get(KEY_CLUSTER_SETTINGS).map_err(catalog_err)? {
130 Some(guard) => {
131 let settings = zerompk::from_msgpack(guard.value()).map_err(|e| {
132 crate::error::ClusterError::Codec {
133 detail: format!("deserialize ClusterSettings: {e}"),
134 }
135 })?;
136 Ok(Some(settings))
137 }
138 None => Ok(None),
139 }
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::catalog::ClusterCatalog;
147
148 fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
149 let dir = tempfile::tempdir().unwrap();
150 let path = dir.path().join("cluster.redb");
151 let catalog = ClusterCatalog::open(&path).unwrap();
152 (dir, catalog)
153 }
154
155 #[test]
158 fn cluster_settings_msgpack_roundtrip_fnv1a() {
159 let original = ClusterSettings {
160 placement_hash_id: PlacementHashId::Fnv1a,
161 vshard_count: 1024,
162 replication_factor: 3,
163 min_wire_version: 1,
164 };
165 let bytes = zerompk::to_msgpack_vec(&original).unwrap();
166 let decoded: ClusterSettings = zerompk::from_msgpack(&bytes).unwrap();
167 assert_eq!(original, decoded);
168 }
169
170 #[test]
171 fn cluster_settings_msgpack_roundtrip_xxhash3() {
172 let original = ClusterSettings {
173 placement_hash_id: PlacementHashId::XxHash3,
174 vshard_count: 512,
175 replication_factor: 1,
176 min_wire_version: 2,
177 };
178 let bytes = zerompk::to_msgpack_vec(&original).unwrap();
179 let decoded: ClusterSettings = zerompk::from_msgpack(&bytes).unwrap();
180 assert_eq!(original, decoded);
181 }
182
183 #[test]
186 fn save_and_load_cluster_settings() {
187 let (_dir, catalog) = temp_catalog();
188 assert_eq!(catalog.load_cluster_settings().unwrap(), None);
189
190 let settings = ClusterSettings {
191 placement_hash_id: PlacementHashId::Fnv1a,
192 vshard_count: 1024,
193 replication_factor: 3,
194 min_wire_version: 1,
195 };
196 catalog.save_cluster_settings(&settings).unwrap();
197
198 let loaded = catalog.load_cluster_settings().unwrap().unwrap();
199 assert_eq!(loaded, settings);
200 }
201
202 #[test]
203 fn save_cluster_settings_overwrite_roundtrip() {
204 let (_dir, catalog) = temp_catalog();
205
206 let first = ClusterSettings {
207 placement_hash_id: PlacementHashId::Fnv1a,
208 vshard_count: 1024,
209 replication_factor: 1,
210 min_wire_version: 1,
211 };
212 catalog.save_cluster_settings(&first).unwrap();
213
214 let updated = ClusterSettings {
215 placement_hash_id: PlacementHashId::XxHash3,
216 vshard_count: 1024,
217 replication_factor: 3,
218 min_wire_version: 2,
219 };
220 catalog.save_cluster_settings(&updated).unwrap();
221
222 let loaded = catalog.load_cluster_settings().unwrap().unwrap();
223 assert_eq!(loaded, updated);
224 }
225
226 #[test]
229 fn placement_hash_deterministic() {
230 let key = b"my-collection-key";
231 let a = placement_hash(PlacementHashId::Fnv1a, key);
232 let b = placement_hash(PlacementHashId::Fnv1a, key);
233 assert_eq!(a, b, "FNV-1a must be deterministic");
234
235 let c = placement_hash(PlacementHashId::XxHash3, key);
236 let d = placement_hash(PlacementHashId::XxHash3, key);
237 assert_eq!(c, d, "XxHash3 must be deterministic");
238 }
239
240 #[test]
241 fn placement_hash_different_ids_produce_different_values() {
242 let key = b"my-collection-key";
243 let fnv = placement_hash(PlacementHashId::Fnv1a, key);
244 let xx3 = placement_hash(PlacementHashId::XxHash3, key);
245 assert_ne!(fnv, xx3, "FNV-1a and XxHash3 must differ for the same key");
246 }
247}