nodedb_cluster/catalog/
ops.rs1use crate::error::{ClusterError, Result};
6use crate::routing::RoutingTable;
7use crate::topology::ClusterTopology;
8
9use super::core::ClusterCatalog;
10use super::schema::{KEY_ROUTING, KEY_TOPOLOGY, ROUTING_TABLE, TOPOLOGY_TABLE, catalog_err};
11
12impl ClusterCatalog {
13 pub fn save_topology(&self, topology: &ClusterTopology) -> Result<()> {
15 let bytes = zerompk::to_msgpack_vec(topology).map_err(|e| ClusterError::Codec {
16 detail: format!("serialize topology: {e}"),
17 })?;
18
19 let txn = self.db.begin_write().map_err(catalog_err)?;
20 {
21 let mut table = txn.open_table(TOPOLOGY_TABLE).map_err(catalog_err)?;
22 table
23 .insert(KEY_TOPOLOGY, bytes.as_slice())
24 .map_err(catalog_err)?;
25 }
26 txn.commit().map_err(catalog_err)?;
27 Ok(())
28 }
29
30 pub fn load_topology(&self) -> Result<Option<ClusterTopology>> {
32 let txn = self.db.begin_read().map_err(catalog_err)?;
33 let table = txn.open_table(TOPOLOGY_TABLE).map_err(catalog_err)?;
34
35 match table.get(KEY_TOPOLOGY).map_err(catalog_err)? {
36 Some(guard) => {
37 let bytes = guard.value();
38 let topo: ClusterTopology =
39 zerompk::from_msgpack(bytes).map_err(|e| ClusterError::Codec {
40 detail: format!("deserialize topology: {e}"),
41 })?;
42 Ok(Some(topo))
43 }
44 None => Ok(None),
45 }
46 }
47
48 pub fn save_routing(&self, routing: &RoutingTable) -> Result<()> {
50 let bytes = zerompk::to_msgpack_vec(routing).map_err(|e| ClusterError::Codec {
51 detail: format!("serialize routing: {e}"),
52 })?;
53
54 let txn = self.db.begin_write().map_err(catalog_err)?;
55 {
56 let mut table = txn.open_table(ROUTING_TABLE).map_err(catalog_err)?;
57 table
58 .insert(KEY_ROUTING, bytes.as_slice())
59 .map_err(catalog_err)?;
60 }
61 txn.commit().map_err(catalog_err)?;
62 Ok(())
63 }
64
65 pub fn load_routing(&self) -> Result<Option<RoutingTable>> {
67 let txn = self.db.begin_read().map_err(catalog_err)?;
68 let table = txn.open_table(ROUTING_TABLE).map_err(catalog_err)?;
69
70 match table.get(KEY_ROUTING).map_err(catalog_err)? {
71 Some(guard) => {
72 let bytes = guard.value();
73 let rt: RoutingTable =
74 zerompk::from_msgpack(bytes).map_err(|e| ClusterError::Codec {
75 detail: format!("deserialize routing: {e}"),
76 })?;
77 Ok(Some(rt))
78 }
79 None => Ok(None),
80 }
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87 use crate::topology::{NodeInfo, NodeState};
88
89 fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
90 let dir = tempfile::tempdir().unwrap();
91 let path = dir.path().join("cluster.redb");
92 let catalog = ClusterCatalog::open(&path).unwrap();
93 (dir, catalog)
94 }
95
96 #[test]
97 fn topology_save_load_roundtrip() {
98 let (_dir, catalog) = temp_catalog();
99
100 let mut topo = ClusterTopology::new();
101 topo.add_node(NodeInfo::new(
102 1,
103 "10.0.0.1:9400".parse().unwrap(),
104 NodeState::Active,
105 ));
106 topo.add_node(NodeInfo::new(
107 2,
108 "10.0.0.2:9400".parse().unwrap(),
109 NodeState::Active,
110 ));
111 topo.add_node(NodeInfo::new(
112 3,
113 "10.0.0.3:9400".parse().unwrap(),
114 NodeState::Joining,
115 ));
116
117 catalog.save_topology(&topo).unwrap();
118 let loaded = catalog.load_topology().unwrap().unwrap();
119
120 assert_eq!(loaded.node_count(), 3);
121 assert_eq!(loaded.version(), 3);
122 assert_eq!(loaded.active_nodes().len(), 2);
123 assert_eq!(loaded.get_node(1).unwrap().addr, "10.0.0.1:9400");
124 }
125
126 #[test]
127 fn routing_save_load_roundtrip() {
128 let (_dir, catalog) = temp_catalog();
129
130 let rt = RoutingTable::uniform(4, &[1, 2, 3], 3);
132 catalog.save_routing(&rt).unwrap();
133 let loaded = catalog.load_routing().unwrap().unwrap();
134
135 assert_eq!(loaded.num_groups(), 5);
136 assert_eq!(loaded.vshard_to_group().len(), 1024);
137 for i in 0..1024u32 {
138 assert_eq!(
139 rt.group_for_vshard(i).unwrap(),
140 loaded.group_for_vshard(i).unwrap()
141 );
142 }
143 }
144
145 #[test]
146 fn empty_catalog_returns_none() {
147 let (_dir, catalog) = temp_catalog();
148
149 assert!(catalog.load_topology().unwrap().is_none());
150 assert!(catalog.load_routing().unwrap().is_none());
151 }
152
153 #[test]
154 fn overwrite_topology() {
155 let (_dir, catalog) = temp_catalog();
156
157 let mut topo1 = ClusterTopology::new();
158 topo1.add_node(NodeInfo::new(
159 1,
160 "10.0.0.1:9400".parse().unwrap(),
161 NodeState::Active,
162 ));
163 catalog.save_topology(&topo1).unwrap();
164
165 let mut topo2 = ClusterTopology::new();
166 topo2.add_node(NodeInfo::new(
167 1,
168 "10.0.0.1:9400".parse().unwrap(),
169 NodeState::Active,
170 ));
171 topo2.add_node(NodeInfo::new(
172 2,
173 "10.0.0.2:9400".parse().unwrap(),
174 NodeState::Active,
175 ));
176 catalog.save_topology(&topo2).unwrap();
177
178 let loaded = catalog.load_topology().unwrap().unwrap();
179 assert_eq!(loaded.node_count(), 2);
180 }
181}