allsource_core/infrastructure/cluster/
node_registry.rs1use dashmap::DashMap;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37#[derive(Debug, Clone)]
39pub struct Node {
40 pub id: u32,
42
43 pub address: String,
45
46 pub healthy: bool,
48
49 pub assigned_partitions: Vec<u32>,
51}
52
53pub struct NodeRegistry {
55 partition_count: u32,
57
58 nodes: Arc<DashMap<u32, Node>>,
60}
61
62impl NodeRegistry {
63 pub fn new(partition_count: u32) -> Self {
68 Self {
69 partition_count,
70 nodes: Arc::new(DashMap::new()),
71 }
72 }
73
74 pub fn register_node(&self, mut node: Node) {
78 node.assigned_partitions.clear();
80
81 self.nodes.insert(node.id, node);
82
83 self.rebalance_partitions();
85 }
86
87 pub fn unregister_node(&self, node_id: u32) {
91 self.nodes.remove(&node_id);
92 self.rebalance_partitions();
93 }
94
95 pub fn set_node_health(&self, node_id: u32, healthy: bool) {
99 if let Some(mut node) = self.nodes.get_mut(&node_id) {
100 node.healthy = healthy;
101 }
102 self.rebalance_partitions();
103 }
104
105 fn rebalance_partitions(&self) {
109 for mut entry in self.nodes.iter_mut() {
111 entry.value_mut().assigned_partitions.clear();
112 }
113
114 let mut healthy_nodes: Vec<u32> = self.nodes
116 .iter()
117 .filter(|entry| entry.value().healthy)
118 .map(|entry| *entry.key())
119 .collect();
120
121 healthy_nodes.sort();
122
123 if healthy_nodes.is_empty() {
124 return; }
126
127 for partition_id in 0..self.partition_count {
129 let node_idx = (partition_id as usize) % healthy_nodes.len();
130 let node_id = healthy_nodes[node_idx];
131
132 if let Some(mut node) = self.nodes.get_mut(&node_id) {
133 node.assigned_partitions.push(partition_id);
134 }
135 }
136 }
137
138 pub fn node_for_partition(&self, partition_id: u32) -> Option<u32> {
142 self.nodes
143 .iter()
144 .find(|entry| entry.value().healthy && entry.value().assigned_partitions.contains(&partition_id))
145 .map(|entry| entry.value().id)
146 }
147
148 pub fn get_node(&self, node_id: u32) -> Option<Node> {
150 self.nodes.get(&node_id).map(|entry| entry.value().clone())
151 }
152
153 pub fn all_nodes(&self) -> Vec<Node> {
155 self.nodes.iter().map(|entry| entry.value().clone()).collect()
156 }
157
158 pub fn healthy_nodes(&self) -> Vec<Node> {
160 self.nodes
161 .iter()
162 .filter(|entry| entry.value().healthy)
163 .map(|entry| entry.value().clone())
164 .collect()
165 }
166
167 pub fn partition_distribution(&self) -> HashMap<u32, Vec<u32>> {
169 self.nodes
170 .iter()
171 .filter(|entry| entry.value().healthy)
172 .map(|entry| (*entry.key(), entry.value().assigned_partitions.clone()))
173 .collect()
174 }
175
176 pub fn is_cluster_healthy(&self) -> bool {
180 for partition_id in 0..self.partition_count {
181 let has_node = self.nodes
182 .iter()
183 .any(|entry| entry.value().healthy && entry.value().assigned_partitions.contains(&partition_id));
184
185 if !has_node {
186 return false;
187 }
188 }
189
190 true
191 }
192
193 pub fn node_count(&self) -> usize {
195 self.nodes.len()
196 }
197
198 pub fn healthy_node_count(&self) -> usize {
200 self.nodes.iter().filter(|entry| entry.value().healthy).count()
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn test_create_registry() {
210 let registry = NodeRegistry::new(32);
211 assert_eq!(registry.node_count(), 0);
212 assert_eq!(registry.healthy_node_count(), 0);
213 }
214
215 #[test]
216 fn test_register_node() {
217 let registry = NodeRegistry::new(32);
218
219 let node = Node {
220 id: 0,
221 address: "node-0:8080".to_string(),
222 healthy: true,
223 assigned_partitions: vec![],
224 };
225
226 registry.register_node(node);
227
228 assert_eq!(registry.node_count(), 1);
229 assert_eq!(registry.healthy_node_count(), 1);
230
231 let node = registry.get_node(0).unwrap();
233 assert_eq!(node.assigned_partitions.len(), 32);
234 }
235
236 #[test]
237 fn test_two_node_distribution() {
238 let registry = NodeRegistry::new(32);
239
240 registry.register_node(Node {
241 id: 0,
242 address: "node-0:8080".to_string(),
243 healthy: true,
244 assigned_partitions: vec![],
245 });
246
247 registry.register_node(Node {
248 id: 1,
249 address: "node-1:8080".to_string(),
250 healthy: true,
251 assigned_partitions: vec![],
252 });
253
254 let node0 = registry.get_node(0).unwrap();
256 let node1 = registry.get_node(1).unwrap();
257
258 assert_eq!(node0.assigned_partitions.len(), 16);
259 assert_eq!(node1.assigned_partitions.len(), 16);
260
261 for partition_id in &node0.assigned_partitions {
263 assert!(!node1.assigned_partitions.contains(partition_id));
264 }
265 }
266
267 #[test]
268 fn test_node_for_partition() {
269 let registry = NodeRegistry::new(32);
270
271 registry.register_node(Node {
272 id: 0,
273 address: "node-0:8080".to_string(),
274 healthy: true,
275 assigned_partitions: vec![],
276 });
277
278 registry.register_node(Node {
279 id: 1,
280 address: "node-1:8080".to_string(),
281 healthy: true,
282 assigned_partitions: vec![],
283 });
284
285 for partition_id in 0..32 {
287 let node_id = registry.node_for_partition(partition_id);
288 assert!(node_id.is_some());
289 }
290 }
291
292 #[test]
293 fn test_unhealthy_node_excluded() {
294 let registry = NodeRegistry::new(32);
295
296 registry.register_node(Node {
297 id: 0,
298 address: "node-0:8080".to_string(),
299 healthy: true,
300 assigned_partitions: vec![],
301 });
302
303 registry.register_node(Node {
304 id: 1,
305 address: "node-1:8080".to_string(),
306 healthy: false, assigned_partitions: vec![],
308 });
309
310 let node0 = registry.get_node(0).unwrap();
312 let node1 = registry.get_node(1).unwrap();
313
314 assert_eq!(node0.assigned_partitions.len(), 32);
315 assert_eq!(node1.assigned_partitions.len(), 0);
316 }
317
318 #[test]
319 fn test_rebalance_on_health_change() {
320 let registry = NodeRegistry::new(32);
321
322 registry.register_node(Node {
323 id: 0,
324 address: "node-0:8080".to_string(),
325 healthy: true,
326 assigned_partitions: vec![],
327 });
328
329 registry.register_node(Node {
330 id: 1,
331 address: "node-1:8080".to_string(),
332 healthy: true,
333 assigned_partitions: vec![],
334 });
335
336 let node0_before = registry.get_node(0).unwrap();
338 assert_eq!(node0_before.assigned_partitions.len(), 16);
339
340 registry.set_node_health(1, false);
342
343 let node0_after = registry.get_node(0).unwrap();
345 assert_eq!(node0_after.assigned_partitions.len(), 32);
346 }
347
348 #[test]
349 fn test_cluster_health() {
350 let registry = NodeRegistry::new(32);
351
352 assert!(!registry.is_cluster_healthy());
354
355 registry.register_node(Node {
357 id: 0,
358 address: "node-0:8080".to_string(),
359 healthy: true,
360 assigned_partitions: vec![],
361 });
362
363 assert!(registry.is_cluster_healthy());
364
365 registry.set_node_health(0, false);
367 assert!(!registry.is_cluster_healthy());
368 }
369
370 #[test]
371 fn test_partition_distribution() {
372 let registry = NodeRegistry::new(32);
373
374 registry.register_node(Node {
375 id: 0,
376 address: "node-0:8080".to_string(),
377 healthy: true,
378 assigned_partitions: vec![],
379 });
380
381 registry.register_node(Node {
382 id: 1,
383 address: "node-1:8080".to_string(),
384 healthy: true,
385 assigned_partitions: vec![],
386 });
387
388 let distribution = registry.partition_distribution();
389
390 assert_eq!(distribution.len(), 2);
391 assert_eq!(distribution.get(&0).unwrap().len(), 16);
392 assert_eq!(distribution.get(&1).unwrap().len(), 16);
393 }
394
395 #[test]
396 fn test_deterministic_assignment() {
397 let registry1 = NodeRegistry::new(32);
398 let registry2 = NodeRegistry::new(32);
399
400 for i in 0..4 {
402 let node = Node {
403 id: i,
404 address: format!("node-{}:8080", i),
405 healthy: true,
406 assigned_partitions: vec![],
407 };
408
409 registry1.register_node(node.clone());
410 registry2.register_node(node);
411 }
412
413 for partition_id in 0..32 {
415 let node1 = registry1.node_for_partition(partition_id);
416 let node2 = registry2.node_for_partition(partition_id);
417
418 assert_eq!(node1, node2);
419 }
420 }
421}