allsource_core/infrastructure/cluster/
node_registry.rs1use parking_lot::RwLock;
35use std::collections::HashMap;
36use std::sync::Arc;
37
38#[derive(Debug, Clone)]
40pub struct Node {
41 pub id: u32,
43
44 pub address: String,
46
47 pub healthy: bool,
49
50 pub assigned_partitions: Vec<u32>,
52}
53
54pub struct NodeRegistry {
56 partition_count: u32,
58
59 nodes: Arc<RwLock<HashMap<u32, Node>>>,
61}
62
63impl NodeRegistry {
64 pub fn new(partition_count: u32) -> Self {
69 Self {
70 partition_count,
71 nodes: Arc::new(RwLock::new(HashMap::new())),
72 }
73 }
74
75 pub fn register_node(&self, mut node: Node) {
79 let mut nodes = self.nodes.write();
80
81 node.assigned_partitions.clear();
83
84 nodes.insert(node.id, node);
85
86 self.rebalance_partitions_locked(&mut nodes);
88 }
89
90 pub fn unregister_node(&self, node_id: u32) {
94 let mut nodes = self.nodes.write();
95 nodes.remove(&node_id);
96 self.rebalance_partitions_locked(&mut nodes);
97 }
98
99 pub fn set_node_health(&self, node_id: u32, healthy: bool) {
103 let mut nodes = self.nodes.write();
104
105 if let Some(node) = nodes.get_mut(&node_id) {
106 node.healthy = healthy;
107 self.rebalance_partitions_locked(&mut nodes);
108 }
109 }
110
111 fn rebalance_partitions_locked(&self, nodes: &mut HashMap<u32, Node>) {
115 for node in nodes.values_mut() {
117 node.assigned_partitions.clear();
118 }
119
120 let mut healthy_nodes: Vec<u32> = nodes.iter()
122 .filter(|(_, n)| n.healthy)
123 .map(|(id, _)| *id)
124 .collect();
125
126 healthy_nodes.sort();
127
128 if healthy_nodes.is_empty() {
129 return; }
131
132 for partition_id in 0..self.partition_count {
134 let node_idx = (partition_id as usize) % healthy_nodes.len();
135 let node_id = healthy_nodes[node_idx];
136
137 if let Some(node) = nodes.get_mut(&node_id) {
138 node.assigned_partitions.push(partition_id);
139 }
140 }
141 }
142
143 pub fn node_for_partition(&self, partition_id: u32) -> Option<u32> {
147 let nodes = self.nodes.read();
148
149 nodes.values()
150 .find(|n| n.healthy && n.assigned_partitions.contains(&partition_id))
151 .map(|n| n.id)
152 }
153
154 pub fn get_node(&self, node_id: u32) -> Option<Node> {
156 self.nodes.read().get(&node_id).cloned()
157 }
158
159 pub fn all_nodes(&self) -> Vec<Node> {
161 self.nodes.read().values().cloned().collect()
162 }
163
164 pub fn healthy_nodes(&self) -> Vec<Node> {
166 self.nodes.read()
167 .values()
168 .filter(|n| n.healthy)
169 .cloned()
170 .collect()
171 }
172
173 pub fn partition_distribution(&self) -> HashMap<u32, Vec<u32>> {
175 let nodes = self.nodes.read();
176
177 nodes.iter()
178 .filter(|(_, n)| n.healthy)
179 .map(|(id, n)| (*id, n.assigned_partitions.clone()))
180 .collect()
181 }
182
183 pub fn is_cluster_healthy(&self) -> bool {
187 let nodes = self.nodes.read();
188
189 for partition_id in 0..self.partition_count {
190 let has_node = nodes.values()
191 .any(|n| n.healthy && n.assigned_partitions.contains(&partition_id));
192
193 if !has_node {
194 return false;
195 }
196 }
197
198 true
199 }
200
201 pub fn node_count(&self) -> usize {
203 self.nodes.read().len()
204 }
205
206 pub fn healthy_node_count(&self) -> usize {
208 self.nodes.read().values().filter(|n| n.healthy).count()
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_create_registry() {
218 let registry = NodeRegistry::new(32);
219 assert_eq!(registry.node_count(), 0);
220 assert_eq!(registry.healthy_node_count(), 0);
221 }
222
223 #[test]
224 fn test_register_node() {
225 let registry = NodeRegistry::new(32);
226
227 let node = Node {
228 id: 0,
229 address: "node-0:8080".to_string(),
230 healthy: true,
231 assigned_partitions: vec![],
232 };
233
234 registry.register_node(node);
235
236 assert_eq!(registry.node_count(), 1);
237 assert_eq!(registry.healthy_node_count(), 1);
238
239 let node = registry.get_node(0).unwrap();
241 assert_eq!(node.assigned_partitions.len(), 32);
242 }
243
244 #[test]
245 fn test_two_node_distribution() {
246 let registry = NodeRegistry::new(32);
247
248 registry.register_node(Node {
249 id: 0,
250 address: "node-0:8080".to_string(),
251 healthy: true,
252 assigned_partitions: vec![],
253 });
254
255 registry.register_node(Node {
256 id: 1,
257 address: "node-1:8080".to_string(),
258 healthy: true,
259 assigned_partitions: vec![],
260 });
261
262 let node0 = registry.get_node(0).unwrap();
264 let node1 = registry.get_node(1).unwrap();
265
266 assert_eq!(node0.assigned_partitions.len(), 16);
267 assert_eq!(node1.assigned_partitions.len(), 16);
268
269 for partition_id in &node0.assigned_partitions {
271 assert!(!node1.assigned_partitions.contains(partition_id));
272 }
273 }
274
275 #[test]
276 fn test_node_for_partition() {
277 let registry = NodeRegistry::new(32);
278
279 registry.register_node(Node {
280 id: 0,
281 address: "node-0:8080".to_string(),
282 healthy: true,
283 assigned_partitions: vec![],
284 });
285
286 registry.register_node(Node {
287 id: 1,
288 address: "node-1:8080".to_string(),
289 healthy: true,
290 assigned_partitions: vec![],
291 });
292
293 for partition_id in 0..32 {
295 let node_id = registry.node_for_partition(partition_id);
296 assert!(node_id.is_some());
297 }
298 }
299
300 #[test]
301 fn test_unhealthy_node_excluded() {
302 let registry = NodeRegistry::new(32);
303
304 registry.register_node(Node {
305 id: 0,
306 address: "node-0:8080".to_string(),
307 healthy: true,
308 assigned_partitions: vec![],
309 });
310
311 registry.register_node(Node {
312 id: 1,
313 address: "node-1:8080".to_string(),
314 healthy: false, assigned_partitions: vec![],
316 });
317
318 let node0 = registry.get_node(0).unwrap();
320 let node1 = registry.get_node(1).unwrap();
321
322 assert_eq!(node0.assigned_partitions.len(), 32);
323 assert_eq!(node1.assigned_partitions.len(), 0);
324 }
325
326 #[test]
327 fn test_rebalance_on_health_change() {
328 let registry = NodeRegistry::new(32);
329
330 registry.register_node(Node {
331 id: 0,
332 address: "node-0:8080".to_string(),
333 healthy: true,
334 assigned_partitions: vec![],
335 });
336
337 registry.register_node(Node {
338 id: 1,
339 address: "node-1:8080".to_string(),
340 healthy: true,
341 assigned_partitions: vec![],
342 });
343
344 let node0_before = registry.get_node(0).unwrap();
346 assert_eq!(node0_before.assigned_partitions.len(), 16);
347
348 registry.set_node_health(1, false);
350
351 let node0_after = registry.get_node(0).unwrap();
353 assert_eq!(node0_after.assigned_partitions.len(), 32);
354 }
355
356 #[test]
357 fn test_cluster_health() {
358 let registry = NodeRegistry::new(32);
359
360 assert!(!registry.is_cluster_healthy());
362
363 registry.register_node(Node {
365 id: 0,
366 address: "node-0:8080".to_string(),
367 healthy: true,
368 assigned_partitions: vec![],
369 });
370
371 assert!(registry.is_cluster_healthy());
372
373 registry.set_node_health(0, false);
375 assert!(!registry.is_cluster_healthy());
376 }
377
378 #[test]
379 fn test_partition_distribution() {
380 let registry = NodeRegistry::new(32);
381
382 registry.register_node(Node {
383 id: 0,
384 address: "node-0:8080".to_string(),
385 healthy: true,
386 assigned_partitions: vec![],
387 });
388
389 registry.register_node(Node {
390 id: 1,
391 address: "node-1:8080".to_string(),
392 healthy: true,
393 assigned_partitions: vec![],
394 });
395
396 let distribution = registry.partition_distribution();
397
398 assert_eq!(distribution.len(), 2);
399 assert_eq!(distribution.get(&0).unwrap().len(), 16);
400 assert_eq!(distribution.get(&1).unwrap().len(), 16);
401 }
402
403 #[test]
404 fn test_deterministic_assignment() {
405 let registry1 = NodeRegistry::new(32);
406 let registry2 = NodeRegistry::new(32);
407
408 for i in 0..4 {
410 let node = Node {
411 id: i,
412 address: format!("node-{}:8080", i),
413 healthy: true,
414 assigned_partitions: vec![],
415 };
416
417 registry1.register_node(node.clone());
418 registry2.register_node(node);
419 }
420
421 for partition_id in 0..32 {
423 let node1 = registry1.node_for_partition(partition_id);
424 let node2 = registry2.node_for_partition(partition_id);
425
426 assert_eq!(node1, node2);
427 }
428 }
429}