allsource_core/infrastructure/cluster/
node_registry.rs1use dashmap::DashMap;
34use std::{collections::HashMap, sync::Arc};
35
36#[derive(Debug, Clone)]
38pub struct Node {
39 pub id: u32,
41
42 pub address: String,
44
45 pub healthy: bool,
47
48 pub assigned_partitions: Vec<u32>,
50}
51
52pub struct NodeRegistry {
54 partition_count: u32,
56
57 nodes: Arc<DashMap<u32, Node>>,
59}
60
61impl NodeRegistry {
62 pub fn new(partition_count: u32) -> Self {
67 Self {
68 partition_count,
69 nodes: Arc::new(DashMap::new()),
70 }
71 }
72
73 pub fn register_node(&self, mut node: Node) {
77 node.assigned_partitions.clear();
79
80 self.nodes.insert(node.id, node);
81
82 self.rebalance_partitions();
84 }
85
86 pub fn unregister_node(&self, node_id: u32) {
90 self.nodes.remove(&node_id);
91 self.rebalance_partitions();
92 }
93
94 pub fn set_node_health(&self, node_id: u32, healthy: bool) {
98 if let Some(mut node) = self.nodes.get_mut(&node_id) {
99 node.healthy = healthy;
100 }
101 self.rebalance_partitions();
102 }
103
104 fn rebalance_partitions(&self) {
108 for mut entry in self.nodes.iter_mut() {
110 entry.value_mut().assigned_partitions.clear();
111 }
112
113 let mut healthy_nodes: Vec<u32> = self
115 .nodes
116 .iter()
117 .filter(|entry| entry.value().healthy)
118 .map(|entry| *entry.key())
119 .collect();
120
121 healthy_nodes.sort();
122
123 if healthy_nodes.is_empty() {
124 return; }
126
127 for partition_id in 0..self.partition_count {
129 let node_idx = (partition_id as usize) % healthy_nodes.len();
130 let node_id = healthy_nodes[node_idx];
131
132 if let Some(mut node) = self.nodes.get_mut(&node_id) {
133 node.assigned_partitions.push(partition_id);
134 }
135 }
136 }
137
138 pub fn node_for_partition(&self, partition_id: u32) -> Option<u32> {
142 self.nodes
143 .iter()
144 .find(|entry| {
145 entry.value().healthy && entry.value().assigned_partitions.contains(&partition_id)
146 })
147 .map(|entry| entry.value().id)
148 }
149
150 pub fn get_node(&self, node_id: u32) -> Option<Node> {
152 self.nodes.get(&node_id).map(|entry| entry.value().clone())
153 }
154
155 pub fn all_nodes(&self) -> Vec<Node> {
157 self.nodes
158 .iter()
159 .map(|entry| entry.value().clone())
160 .collect()
161 }
162
163 pub fn healthy_nodes(&self) -> Vec<Node> {
165 self.nodes
166 .iter()
167 .filter(|entry| entry.value().healthy)
168 .map(|entry| entry.value().clone())
169 .collect()
170 }
171
172 pub fn partition_distribution(&self) -> HashMap<u32, Vec<u32>> {
174 self.nodes
175 .iter()
176 .filter(|entry| entry.value().healthy)
177 .map(|entry| (*entry.key(), entry.value().assigned_partitions.clone()))
178 .collect()
179 }
180
181 pub fn is_cluster_healthy(&self) -> bool {
185 for partition_id in 0..self.partition_count {
186 let has_node = self.nodes.iter().any(|entry| {
187 entry.value().healthy && entry.value().assigned_partitions.contains(&partition_id)
188 });
189
190 if !has_node {
191 return false;
192 }
193 }
194
195 true
196 }
197
198 pub fn node_count(&self) -> usize {
200 self.nodes.len()
201 }
202
203 pub fn healthy_node_count(&self) -> usize {
205 self.nodes
206 .iter()
207 .filter(|entry| entry.value().healthy)
208 .count()
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_create_registry() {
218 let registry = NodeRegistry::new(32);
219 assert_eq!(registry.node_count(), 0);
220 assert_eq!(registry.healthy_node_count(), 0);
221 }
222
223 #[test]
224 fn test_register_node() {
225 let registry = NodeRegistry::new(32);
226
227 let node = Node {
228 id: 0,
229 address: "node-0:8080".to_string(),
230 healthy: true,
231 assigned_partitions: vec![],
232 };
233
234 registry.register_node(node);
235
236 assert_eq!(registry.node_count(), 1);
237 assert_eq!(registry.healthy_node_count(), 1);
238
239 let node = registry.get_node(0).unwrap();
241 assert_eq!(node.assigned_partitions.len(), 32);
242 }
243
244 #[test]
245 fn test_two_node_distribution() {
246 let registry = NodeRegistry::new(32);
247
248 registry.register_node(Node {
249 id: 0,
250 address: "node-0:8080".to_string(),
251 healthy: true,
252 assigned_partitions: vec![],
253 });
254
255 registry.register_node(Node {
256 id: 1,
257 address: "node-1:8080".to_string(),
258 healthy: true,
259 assigned_partitions: vec![],
260 });
261
262 let node0 = registry.get_node(0).unwrap();
264 let node1 = registry.get_node(1).unwrap();
265
266 assert_eq!(node0.assigned_partitions.len(), 16);
267 assert_eq!(node1.assigned_partitions.len(), 16);
268
269 for partition_id in &node0.assigned_partitions {
271 assert!(!node1.assigned_partitions.contains(partition_id));
272 }
273 }
274
275 #[test]
276 fn test_node_for_partition() {
277 let registry = NodeRegistry::new(32);
278
279 registry.register_node(Node {
280 id: 0,
281 address: "node-0:8080".to_string(),
282 healthy: true,
283 assigned_partitions: vec![],
284 });
285
286 registry.register_node(Node {
287 id: 1,
288 address: "node-1:8080".to_string(),
289 healthy: true,
290 assigned_partitions: vec![],
291 });
292
293 for partition_id in 0..32 {
295 let node_id = registry.node_for_partition(partition_id);
296 assert!(node_id.is_some());
297 }
298 }
299
300 #[test]
301 fn test_unhealthy_node_excluded() {
302 let registry = NodeRegistry::new(32);
303
304 registry.register_node(Node {
305 id: 0,
306 address: "node-0:8080".to_string(),
307 healthy: true,
308 assigned_partitions: vec![],
309 });
310
311 registry.register_node(Node {
312 id: 1,
313 address: "node-1:8080".to_string(),
314 healthy: false, assigned_partitions: vec![],
316 });
317
318 let node0 = registry.get_node(0).unwrap();
320 let node1 = registry.get_node(1).unwrap();
321
322 assert_eq!(node0.assigned_partitions.len(), 32);
323 assert_eq!(node1.assigned_partitions.len(), 0);
324 }
325
326 #[test]
327 fn test_rebalance_on_health_change() {
328 let registry = NodeRegistry::new(32);
329
330 registry.register_node(Node {
331 id: 0,
332 address: "node-0:8080".to_string(),
333 healthy: true,
334 assigned_partitions: vec![],
335 });
336
337 registry.register_node(Node {
338 id: 1,
339 address: "node-1:8080".to_string(),
340 healthy: true,
341 assigned_partitions: vec![],
342 });
343
344 let node0_before = registry.get_node(0).unwrap();
346 assert_eq!(node0_before.assigned_partitions.len(), 16);
347
348 registry.set_node_health(1, false);
350
351 let node0_after = registry.get_node(0).unwrap();
353 assert_eq!(node0_after.assigned_partitions.len(), 32);
354 }
355
356 #[test]
357 fn test_cluster_health() {
358 let registry = NodeRegistry::new(32);
359
360 assert!(!registry.is_cluster_healthy());
362
363 registry.register_node(Node {
365 id: 0,
366 address: "node-0:8080".to_string(),
367 healthy: true,
368 assigned_partitions: vec![],
369 });
370
371 assert!(registry.is_cluster_healthy());
372
373 registry.set_node_health(0, false);
375 assert!(!registry.is_cluster_healthy());
376 }
377
378 #[test]
379 fn test_partition_distribution() {
380 let registry = NodeRegistry::new(32);
381
382 registry.register_node(Node {
383 id: 0,
384 address: "node-0:8080".to_string(),
385 healthy: true,
386 assigned_partitions: vec![],
387 });
388
389 registry.register_node(Node {
390 id: 1,
391 address: "node-1:8080".to_string(),
392 healthy: true,
393 assigned_partitions: vec![],
394 });
395
396 let distribution = registry.partition_distribution();
397
398 assert_eq!(distribution.len(), 2);
399 assert_eq!(distribution.get(&0).unwrap().len(), 16);
400 assert_eq!(distribution.get(&1).unwrap().len(), 16);
401 }
402
403 #[test]
404 fn test_deterministic_assignment() {
405 let registry1 = NodeRegistry::new(32);
406 let registry2 = NodeRegistry::new(32);
407
408 for i in 0..4 {
410 let node = Node {
411 id: i,
412 address: format!("node-{}:8080", i),
413 healthy: true,
414 assigned_partitions: vec![],
415 };
416
417 registry1.register_node(node.clone());
418 registry2.register_node(node);
419 }
420
421 for partition_id in 0..32 {
423 let node1 = registry1.node_for_partition(partition_id);
424 let node2 = registry2.node_for_partition(partition_id);
425
426 assert_eq!(node1, node2);
427 }
428 }
429}