Skip to main content

nodedb_cluster/catalog/
ops.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Topology and routing-table save / load.
4
5use crate::error::{ClusterError, Result};
6use crate::routing::RoutingTable;
7use crate::topology::ClusterTopology;
8
9use super::core::ClusterCatalog;
10use super::schema::{KEY_ROUTING, KEY_TOPOLOGY, ROUTING_TABLE, TOPOLOGY_TABLE, catalog_err};
11
12impl ClusterCatalog {
13    /// Persist the cluster topology.
14    pub fn save_topology(&self, topology: &ClusterTopology) -> Result<()> {
15        let bytes = zerompk::to_msgpack_vec(topology).map_err(|e| ClusterError::Codec {
16            detail: format!("serialize topology: {e}"),
17        })?;
18
19        let txn = self.db.begin_write().map_err(catalog_err)?;
20        {
21            let mut table = txn.open_table(TOPOLOGY_TABLE).map_err(catalog_err)?;
22            table
23                .insert(KEY_TOPOLOGY, bytes.as_slice())
24                .map_err(catalog_err)?;
25        }
26        txn.commit().map_err(catalog_err)?;
27        Ok(())
28    }
29
30    /// Load the cluster topology. Returns None if no topology has been saved.
31    pub fn load_topology(&self) -> Result<Option<ClusterTopology>> {
32        let txn = self.db.begin_read().map_err(catalog_err)?;
33        let table = txn.open_table(TOPOLOGY_TABLE).map_err(catalog_err)?;
34
35        match table.get(KEY_TOPOLOGY).map_err(catalog_err)? {
36            Some(guard) => {
37                let bytes = guard.value();
38                let topo: ClusterTopology =
39                    zerompk::from_msgpack(bytes).map_err(|e| ClusterError::Codec {
40                        detail: format!("deserialize topology: {e}"),
41                    })?;
42                Ok(Some(topo))
43            }
44            None => Ok(None),
45        }
46    }
47
48    /// Persist the routing table.
49    pub fn save_routing(&self, routing: &RoutingTable) -> Result<()> {
50        let bytes = zerompk::to_msgpack_vec(routing).map_err(|e| ClusterError::Codec {
51            detail: format!("serialize routing: {e}"),
52        })?;
53
54        let txn = self.db.begin_write().map_err(catalog_err)?;
55        {
56            let mut table = txn.open_table(ROUTING_TABLE).map_err(catalog_err)?;
57            table
58                .insert(KEY_ROUTING, bytes.as_slice())
59                .map_err(catalog_err)?;
60        }
61        txn.commit().map_err(catalog_err)?;
62        Ok(())
63    }
64
65    /// Load the routing table. Returns None if no routing table has been saved.
66    pub fn load_routing(&self) -> Result<Option<RoutingTable>> {
67        let txn = self.db.begin_read().map_err(catalog_err)?;
68        let table = txn.open_table(ROUTING_TABLE).map_err(catalog_err)?;
69
70        match table.get(KEY_ROUTING).map_err(catalog_err)? {
71            Some(guard) => {
72                let bytes = guard.value();
73                let rt: RoutingTable =
74                    zerompk::from_msgpack(bytes).map_err(|e| ClusterError::Codec {
75                        detail: format!("deserialize routing: {e}"),
76                    })?;
77                Ok(Some(rt))
78            }
79            None => Ok(None),
80        }
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use crate::topology::{NodeInfo, NodeState};
88
89    fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
90        let dir = tempfile::tempdir().unwrap();
91        let path = dir.path().join("cluster.redb");
92        let catalog = ClusterCatalog::open(&path).unwrap();
93        (dir, catalog)
94    }
95
96    #[test]
97    fn topology_save_load_roundtrip() {
98        let (_dir, catalog) = temp_catalog();
99
100        let mut topo = ClusterTopology::new();
101        topo.add_node(NodeInfo::new(
102            1,
103            "10.0.0.1:9400".parse().unwrap(),
104            NodeState::Active,
105        ));
106        topo.add_node(NodeInfo::new(
107            2,
108            "10.0.0.2:9400".parse().unwrap(),
109            NodeState::Active,
110        ));
111        topo.add_node(NodeInfo::new(
112            3,
113            "10.0.0.3:9400".parse().unwrap(),
114            NodeState::Joining,
115        ));
116
117        catalog.save_topology(&topo).unwrap();
118        let loaded = catalog.load_topology().unwrap().unwrap();
119
120        assert_eq!(loaded.node_count(), 3);
121        assert_eq!(loaded.version(), 3);
122        assert_eq!(loaded.active_nodes().len(), 2);
123        assert_eq!(loaded.get_node(1).unwrap().addr, "10.0.0.1:9400");
124    }
125
126    #[test]
127    fn routing_save_load_roundtrip() {
128        let (_dir, catalog) = temp_catalog();
129
130        // uniform(4, ...) creates 4 data groups + 1 metadata group = 5 total.
131        let rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
132        catalog.save_routing(&rt).unwrap();
133        let loaded = catalog.load_routing().unwrap().unwrap();
134
135        assert_eq!(loaded.num_groups(), 5);
136        assert_eq!(loaded.vshard_to_group().len(), 1024);
137        for i in 0..1024u32 {
138            assert_eq!(
139                rt.group_for_vshard(i).unwrap(),
140                loaded.group_for_vshard(i).unwrap()
141            );
142        }
143    }
144
145    #[test]
146    fn empty_catalog_returns_none() {
147        let (_dir, catalog) = temp_catalog();
148
149        assert!(catalog.load_topology().unwrap().is_none());
150        assert!(catalog.load_routing().unwrap().is_none());
151    }
152
153    #[test]
154    fn overwrite_topology() {
155        let (_dir, catalog) = temp_catalog();
156
157        let mut topo1 = ClusterTopology::new();
158        topo1.add_node(NodeInfo::new(
159            1,
160            "10.0.0.1:9400".parse().unwrap(),
161            NodeState::Active,
162        ));
163        catalog.save_topology(&topo1).unwrap();
164
165        let mut topo2 = ClusterTopology::new();
166        topo2.add_node(NodeInfo::new(
167            1,
168            "10.0.0.1:9400".parse().unwrap(),
169            NodeState::Active,
170        ));
171        topo2.add_node(NodeInfo::new(
172            2,
173            "10.0.0.2:9400".parse().unwrap(),
174            NodeState::Active,
175        ));
176        catalog.save_topology(&topo2).unwrap();
177
178        let loaded = catalog.load_topology().unwrap().unwrap();
179        assert_eq!(loaded.node_count(), 2);
180    }
181}