allsource_core/infrastructure/cluster/
node_registry.rs1use parking_lot::RwLock;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37#[derive(Debug, Clone)]
39pub struct Node {
40 pub id: u32,
42
43 pub address: String,
45
46 pub healthy: bool,
48
49 pub assigned_partitions: Vec<u32>,
51}
52
53pub struct NodeRegistry {
55 partition_count: u32,
57
58 nodes: Arc<RwLock<HashMap<u32, Node>>>,
60}
61
62impl NodeRegistry {
63 pub fn new(partition_count: u32) -> Self {
68 Self {
69 partition_count,
70 nodes: Arc::new(RwLock::new(HashMap::new())),
71 }
72 }
73
74 pub fn register_node(&self, mut node: Node) {
78 let mut nodes = self.nodes.write();
79
80 node.assigned_partitions.clear();
82
83 nodes.insert(node.id, node);
84
85 self.rebalance_partitions_locked(&mut nodes);
87 }
88
89 pub fn unregister_node(&self, node_id: u32) {
93 let mut nodes = self.nodes.write();
94 nodes.remove(&node_id);
95 self.rebalance_partitions_locked(&mut nodes);
96 }
97
98 pub fn set_node_health(&self, node_id: u32, healthy: bool) {
102 let mut nodes = self.nodes.write();
103
104 if let Some(node) = nodes.get_mut(&node_id) {
105 node.healthy = healthy;
106 self.rebalance_partitions_locked(&mut nodes);
107 }
108 }
109
110 fn rebalance_partitions_locked(&self, nodes: &mut HashMap<u32, Node>) {
114 for node in nodes.values_mut() {
116 node.assigned_partitions.clear();
117 }
118
119 let mut healthy_nodes: Vec<u32> = nodes
121 .iter()
122 .filter(|(_, n)| n.healthy)
123 .map(|(id, _)| *id)
124 .collect();
125
126 healthy_nodes.sort();
127
128 if healthy_nodes.is_empty() {
129 return; }
131
132 for partition_id in 0..self.partition_count {
134 let node_idx = (partition_id as usize) % healthy_nodes.len();
135 let node_id = healthy_nodes[node_idx];
136
137 if let Some(node) = nodes.get_mut(&node_id) {
138 node.assigned_partitions.push(partition_id);
139 }
140 }
141 }
142
143 pub fn node_for_partition(&self, partition_id: u32) -> Option<u32> {
147 let nodes = self.nodes.read();
148
149 nodes
150 .values()
151 .find(|n| n.healthy && n.assigned_partitions.contains(&partition_id))
152 .map(|n| n.id)
153 }
154
155 pub fn get_node(&self, node_id: u32) -> Option<Node> {
157 self.nodes.read().get(&node_id).cloned()
158 }
159
160 pub fn all_nodes(&self) -> Vec<Node> {
162 self.nodes.read().values().cloned().collect()
163 }
164
165 pub fn healthy_nodes(&self) -> Vec<Node> {
167 self.nodes
168 .read()
169 .values()
170 .filter(|n| n.healthy)
171 .cloned()
172 .collect()
173 }
174
175 pub fn partition_distribution(&self) -> HashMap<u32, Vec<u32>> {
177 let nodes = self.nodes.read();
178
179 nodes
180 .iter()
181 .filter(|(_, n)| n.healthy)
182 .map(|(id, n)| (*id, n.assigned_partitions.clone()))
183 .collect()
184 }
185
186 pub fn is_cluster_healthy(&self) -> bool {
190 let nodes = self.nodes.read();
191
192 for partition_id in 0..self.partition_count {
193 let has_node = nodes
194 .values()
195 .any(|n| n.healthy && n.assigned_partitions.contains(&partition_id));
196
197 if !has_node {
198 return false;
199 }
200 }
201
202 true
203 }
204
205 pub fn node_count(&self) -> usize {
207 self.nodes.read().len()
208 }
209
210 pub fn healthy_node_count(&self) -> usize {
212 self.nodes.read().values().filter(|n| n.healthy).count()
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn test_create_registry() {
222 let registry = NodeRegistry::new(32);
223 assert_eq!(registry.node_count(), 0);
224 assert_eq!(registry.healthy_node_count(), 0);
225 }
226
227 #[test]
228 fn test_register_node() {
229 let registry = NodeRegistry::new(32);
230
231 let node = Node {
232 id: 0,
233 address: "node-0:8080".to_string(),
234 healthy: true,
235 assigned_partitions: vec![],
236 };
237
238 registry.register_node(node);
239
240 assert_eq!(registry.node_count(), 1);
241 assert_eq!(registry.healthy_node_count(), 1);
242
243 let node = registry.get_node(0).unwrap();
245 assert_eq!(node.assigned_partitions.len(), 32);
246 }
247
248 #[test]
249 fn test_two_node_distribution() {
250 let registry = NodeRegistry::new(32);
251
252 registry.register_node(Node {
253 id: 0,
254 address: "node-0:8080".to_string(),
255 healthy: true,
256 assigned_partitions: vec![],
257 });
258
259 registry.register_node(Node {
260 id: 1,
261 address: "node-1:8080".to_string(),
262 healthy: true,
263 assigned_partitions: vec![],
264 });
265
266 let node0 = registry.get_node(0).unwrap();
268 let node1 = registry.get_node(1).unwrap();
269
270 assert_eq!(node0.assigned_partitions.len(), 16);
271 assert_eq!(node1.assigned_partitions.len(), 16);
272
273 for partition_id in &node0.assigned_partitions {
275 assert!(!node1.assigned_partitions.contains(partition_id));
276 }
277 }
278
279 #[test]
280 fn test_node_for_partition() {
281 let registry = NodeRegistry::new(32);
282
283 registry.register_node(Node {
284 id: 0,
285 address: "node-0:8080".to_string(),
286 healthy: true,
287 assigned_partitions: vec![],
288 });
289
290 registry.register_node(Node {
291 id: 1,
292 address: "node-1:8080".to_string(),
293 healthy: true,
294 assigned_partitions: vec![],
295 });
296
297 for partition_id in 0..32 {
299 let node_id = registry.node_for_partition(partition_id);
300 assert!(node_id.is_some());
301 }
302 }
303
304 #[test]
305 fn test_unhealthy_node_excluded() {
306 let registry = NodeRegistry::new(32);
307
308 registry.register_node(Node {
309 id: 0,
310 address: "node-0:8080".to_string(),
311 healthy: true,
312 assigned_partitions: vec![],
313 });
314
315 registry.register_node(Node {
316 id: 1,
317 address: "node-1:8080".to_string(),
318 healthy: false, assigned_partitions: vec![],
320 });
321
322 let node0 = registry.get_node(0).unwrap();
324 let node1 = registry.get_node(1).unwrap();
325
326 assert_eq!(node0.assigned_partitions.len(), 32);
327 assert_eq!(node1.assigned_partitions.len(), 0);
328 }
329
330 #[test]
331 fn test_rebalance_on_health_change() {
332 let registry = NodeRegistry::new(32);
333
334 registry.register_node(Node {
335 id: 0,
336 address: "node-0:8080".to_string(),
337 healthy: true,
338 assigned_partitions: vec![],
339 });
340
341 registry.register_node(Node {
342 id: 1,
343 address: "node-1:8080".to_string(),
344 healthy: true,
345 assigned_partitions: vec![],
346 });
347
348 let node0_before = registry.get_node(0).unwrap();
350 assert_eq!(node0_before.assigned_partitions.len(), 16);
351
352 registry.set_node_health(1, false);
354
355 let node0_after = registry.get_node(0).unwrap();
357 assert_eq!(node0_after.assigned_partitions.len(), 32);
358 }
359
360 #[test]
361 fn test_cluster_health() {
362 let registry = NodeRegistry::new(32);
363
364 assert!(!registry.is_cluster_healthy());
366
367 registry.register_node(Node {
369 id: 0,
370 address: "node-0:8080".to_string(),
371 healthy: true,
372 assigned_partitions: vec![],
373 });
374
375 assert!(registry.is_cluster_healthy());
376
377 registry.set_node_health(0, false);
379 assert!(!registry.is_cluster_healthy());
380 }
381
382 #[test]
383 fn test_partition_distribution() {
384 let registry = NodeRegistry::new(32);
385
386 registry.register_node(Node {
387 id: 0,
388 address: "node-0:8080".to_string(),
389 healthy: true,
390 assigned_partitions: vec![],
391 });
392
393 registry.register_node(Node {
394 id: 1,
395 address: "node-1:8080".to_string(),
396 healthy: true,
397 assigned_partitions: vec![],
398 });
399
400 let distribution = registry.partition_distribution();
401
402 assert_eq!(distribution.len(), 2);
403 assert_eq!(distribution.get(&0).unwrap().len(), 16);
404 assert_eq!(distribution.get(&1).unwrap().len(), 16);
405 }
406
407 #[test]
408 fn test_deterministic_assignment() {
409 let registry1 = NodeRegistry::new(32);
410 let registry2 = NodeRegistry::new(32);
411
412 for i in 0..4 {
414 let node = Node {
415 id: i,
416 address: format!("node-{}:8080", i),
417 healthy: true,
418 assigned_partitions: vec![],
419 };
420
421 registry1.register_node(node.clone());
422 registry2.register_node(node);
423 }
424
425 for partition_id in 0..32 {
427 let node1 = registry1.node_for_partition(partition_id);
428 let node2 = registry2.node_for_partition(partition_id);
429
430 assert_eq!(node1, node2);
431 }
432 }
433}