allsource_core/infrastructure/cluster/
node_registry.rs

1/// Node Registry for Distributed Partitioning
2///
3/// Manages cluster nodes and partition assignments for horizontal scaling.
4/// Based on SierraDB's fixed partition architecture.
5///
6/// # Design
7/// - **Fixed partitions**: 32 partitions (single-node) or 1024+ (cluster)
8/// - **Consistent assignment**: Partitions assigned to nodes deterministically
9/// - **Health monitoring**: Track node health status
10/// - **Automatic rebalancing**: Reassign partitions on node failures
11///
12/// # Cluster Topology
13/// - Single-node: All 32 partitions on one node
14/// - 2-node: 16 partitions per node
15/// - 4-node: 8 partitions per node
16/// - 8-node: 4 partitions per node
17///
18/// # Example
19/// ```ignore
20/// let registry = NodeRegistry::new(32);
21///
22/// // Register nodes
23/// registry.register_node(Node {
24///     id: 0,
25///     address: "node-0:8080".to_string(),
26///     healthy: true,
27///     assigned_partitions: vec![],
28/// });
29///
30/// // Find node for partition
31/// let node_id = registry.node_for_partition(15);
32/// ```
33use parking_lot::RwLock;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37/// Node in the cluster
38#[derive(Debug, Clone)]
39pub struct Node {
40    /// Unique node ID
41    pub id: u32,
42
43    /// Network address (host:port)
44    pub address: String,
45
46    /// Health status
47    pub healthy: bool,
48
49    /// Partitions assigned to this node
50    pub assigned_partitions: Vec<u32>,
51}
52
53/// Node Registry manages cluster topology
54pub struct NodeRegistry {
55    /// Total number of partitions (fixed)
56    partition_count: u32,
57
58    /// Registered nodes
59    nodes: Arc<RwLock<HashMap<u32, Node>>>,
60}
61
62impl NodeRegistry {
63    /// Create new node registry
64    ///
65    /// # Arguments
66    /// - `partition_count`: Total fixed partitions (32 for single-node, 1024+ for cluster)
67    pub fn new(partition_count: u32) -> Self {
68        Self {
69            partition_count,
70            nodes: Arc::new(RwLock::new(HashMap::new())),
71        }
72    }
73
74    /// Register a node in the cluster
75    ///
76    /// Automatically rebalances partitions across healthy nodes.
77    pub fn register_node(&self, mut node: Node) {
78        let mut nodes = self.nodes.write();
79
80        // Clear assigned partitions (will be recalculated)
81        node.assigned_partitions.clear();
82
83        nodes.insert(node.id, node);
84
85        // Rebalance partitions
86        self.rebalance_partitions_locked(&mut nodes);
87    }
88
89    /// Unregister a node from the cluster
90    ///
91    /// Triggers automatic rebalancing to remaining nodes.
92    pub fn unregister_node(&self, node_id: u32) {
93        let mut nodes = self.nodes.write();
94        nodes.remove(&node_id);
95        self.rebalance_partitions_locked(&mut nodes);
96    }
97
98    /// Mark node as healthy or unhealthy
99    ///
100    /// Unhealthy nodes are excluded from partition assignment.
101    pub fn set_node_health(&self, node_id: u32, healthy: bool) {
102        let mut nodes = self.nodes.write();
103
104        if let Some(node) = nodes.get_mut(&node_id) {
105            node.healthy = healthy;
106            self.rebalance_partitions_locked(&mut nodes);
107        }
108    }
109
110    /// Rebalance partitions across healthy nodes
111    ///
112    /// Uses round-robin distribution for even load balancing.
113    fn rebalance_partitions_locked(&self, nodes: &mut HashMap<u32, Node>) {
114        // Clear existing assignments
115        for node in nodes.values_mut() {
116            node.assigned_partitions.clear();
117        }
118
119        // Get healthy nodes sorted by ID for deterministic assignment
120        let mut healthy_nodes: Vec<u32> = nodes
121            .iter()
122            .filter(|(_, n)| n.healthy)
123            .map(|(id, _)| *id)
124            .collect();
125
126        healthy_nodes.sort();
127
128        if healthy_nodes.is_empty() {
129            return; // No healthy nodes available
130        }
131
132        // Distribute partitions evenly using round-robin
133        for partition_id in 0..self.partition_count {
134            let node_idx = (partition_id as usize) % healthy_nodes.len();
135            let node_id = healthy_nodes[node_idx];
136
137            if let Some(node) = nodes.get_mut(&node_id) {
138                node.assigned_partitions.push(partition_id);
139            }
140        }
141    }
142
143    /// Find node responsible for a partition
144    ///
145    /// Returns None if no healthy node is assigned to the partition.
146    pub fn node_for_partition(&self, partition_id: u32) -> Option<u32> {
147        let nodes = self.nodes.read();
148
149        nodes
150            .values()
151            .find(|n| n.healthy && n.assigned_partitions.contains(&partition_id))
152            .map(|n| n.id)
153    }
154
155    /// Get node by ID
156    pub fn get_node(&self, node_id: u32) -> Option<Node> {
157        self.nodes.read().get(&node_id).cloned()
158    }
159
160    /// Get all nodes
161    pub fn all_nodes(&self) -> Vec<Node> {
162        self.nodes.read().values().cloned().collect()
163    }
164
165    /// Get healthy nodes
166    pub fn healthy_nodes(&self) -> Vec<Node> {
167        self.nodes
168            .read()
169            .values()
170            .filter(|n| n.healthy)
171            .cloned()
172            .collect()
173    }
174
175    /// Get partition distribution statistics
176    pub fn partition_distribution(&self) -> HashMap<u32, Vec<u32>> {
177        let nodes = self.nodes.read();
178
179        nodes
180            .iter()
181            .filter(|(_, n)| n.healthy)
182            .map(|(id, n)| (*id, n.assigned_partitions.clone()))
183            .collect()
184    }
185
186    /// Check if cluster is healthy
187    ///
188    /// Returns true if all partitions have at least one healthy node assigned.
189    pub fn is_cluster_healthy(&self) -> bool {
190        let nodes = self.nodes.read();
191
192        for partition_id in 0..self.partition_count {
193            let has_node = nodes
194                .values()
195                .any(|n| n.healthy && n.assigned_partitions.contains(&partition_id));
196
197            if !has_node {
198                return false;
199            }
200        }
201
202        true
203    }
204
205    /// Get node count
206    pub fn node_count(&self) -> usize {
207        self.nodes.read().len()
208    }
209
210    /// Get healthy node count
211    pub fn healthy_node_count(&self) -> usize {
212        self.nodes.read().values().filter(|n| n.healthy).count()
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn test_create_registry() {
222        let registry = NodeRegistry::new(32);
223        assert_eq!(registry.node_count(), 0);
224        assert_eq!(registry.healthy_node_count(), 0);
225    }
226
227    #[test]
228    fn test_register_node() {
229        let registry = NodeRegistry::new(32);
230
231        let node = Node {
232            id: 0,
233            address: "node-0:8080".to_string(),
234            healthy: true,
235            assigned_partitions: vec![],
236        };
237
238        registry.register_node(node);
239
240        assert_eq!(registry.node_count(), 1);
241        assert_eq!(registry.healthy_node_count(), 1);
242
243        // All partitions should be assigned to the single node
244        let node = registry.get_node(0).unwrap();
245        assert_eq!(node.assigned_partitions.len(), 32);
246    }
247
248    #[test]
249    fn test_two_node_distribution() {
250        let registry = NodeRegistry::new(32);
251
252        registry.register_node(Node {
253            id: 0,
254            address: "node-0:8080".to_string(),
255            healthy: true,
256            assigned_partitions: vec![],
257        });
258
259        registry.register_node(Node {
260            id: 1,
261            address: "node-1:8080".to_string(),
262            healthy: true,
263            assigned_partitions: vec![],
264        });
265
266        // Each node should have ~16 partitions
267        let node0 = registry.get_node(0).unwrap();
268        let node1 = registry.get_node(1).unwrap();
269
270        assert_eq!(node0.assigned_partitions.len(), 16);
271        assert_eq!(node1.assigned_partitions.len(), 16);
272
273        // Partitions should not overlap
274        for partition_id in &node0.assigned_partitions {
275            assert!(!node1.assigned_partitions.contains(partition_id));
276        }
277    }
278
279    #[test]
280    fn test_node_for_partition() {
281        let registry = NodeRegistry::new(32);
282
283        registry.register_node(Node {
284            id: 0,
285            address: "node-0:8080".to_string(),
286            healthy: true,
287            assigned_partitions: vec![],
288        });
289
290        registry.register_node(Node {
291            id: 1,
292            address: "node-1:8080".to_string(),
293            healthy: true,
294            assigned_partitions: vec![],
295        });
296
297        // Each partition should map to exactly one node
298        for partition_id in 0..32 {
299            let node_id = registry.node_for_partition(partition_id);
300            assert!(node_id.is_some());
301        }
302    }
303
304    #[test]
305    fn test_unhealthy_node_excluded() {
306        let registry = NodeRegistry::new(32);
307
308        registry.register_node(Node {
309            id: 0,
310            address: "node-0:8080".to_string(),
311            healthy: true,
312            assigned_partitions: vec![],
313        });
314
315        registry.register_node(Node {
316            id: 1,
317            address: "node-1:8080".to_string(),
318            healthy: false, // Unhealthy
319            assigned_partitions: vec![],
320        });
321
322        // All partitions should go to node 0
323        let node0 = registry.get_node(0).unwrap();
324        let node1 = registry.get_node(1).unwrap();
325
326        assert_eq!(node0.assigned_partitions.len(), 32);
327        assert_eq!(node1.assigned_partitions.len(), 0);
328    }
329
330    #[test]
331    fn test_rebalance_on_health_change() {
332        let registry = NodeRegistry::new(32);
333
334        registry.register_node(Node {
335            id: 0,
336            address: "node-0:8080".to_string(),
337            healthy: true,
338            assigned_partitions: vec![],
339        });
340
341        registry.register_node(Node {
342            id: 1,
343            address: "node-1:8080".to_string(),
344            healthy: true,
345            assigned_partitions: vec![],
346        });
347
348        // Initially 16/16 split
349        let node0_before = registry.get_node(0).unwrap();
350        assert_eq!(node0_before.assigned_partitions.len(), 16);
351
352        // Mark node 1 as unhealthy
353        registry.set_node_health(1, false);
354
355        // Node 0 should now have all 32 partitions
356        let node0_after = registry.get_node(0).unwrap();
357        assert_eq!(node0_after.assigned_partitions.len(), 32);
358    }
359
360    #[test]
361    fn test_cluster_health() {
362        let registry = NodeRegistry::new(32);
363
364        // No nodes - unhealthy
365        assert!(!registry.is_cluster_healthy());
366
367        // Add one healthy node
368        registry.register_node(Node {
369            id: 0,
370            address: "node-0:8080".to_string(),
371            healthy: true,
372            assigned_partitions: vec![],
373        });
374
375        assert!(registry.is_cluster_healthy());
376
377        // Mark unhealthy
378        registry.set_node_health(0, false);
379        assert!(!registry.is_cluster_healthy());
380    }
381
382    #[test]
383    fn test_partition_distribution() {
384        let registry = NodeRegistry::new(32);
385
386        registry.register_node(Node {
387            id: 0,
388            address: "node-0:8080".to_string(),
389            healthy: true,
390            assigned_partitions: vec![],
391        });
392
393        registry.register_node(Node {
394            id: 1,
395            address: "node-1:8080".to_string(),
396            healthy: true,
397            assigned_partitions: vec![],
398        });
399
400        let distribution = registry.partition_distribution();
401
402        assert_eq!(distribution.len(), 2);
403        assert_eq!(distribution.get(&0).unwrap().len(), 16);
404        assert_eq!(distribution.get(&1).unwrap().len(), 16);
405    }
406
407    #[test]
408    fn test_deterministic_assignment() {
409        let registry1 = NodeRegistry::new(32);
410        let registry2 = NodeRegistry::new(32);
411
412        // Register same nodes in same order
413        for i in 0..4 {
414            let node = Node {
415                id: i,
416                address: format!("node-{}:8080", i),
417                healthy: true,
418                assigned_partitions: vec![],
419            };
420
421            registry1.register_node(node.clone());
422            registry2.register_node(node);
423        }
424
425        // Partition assignments should be identical
426        for partition_id in 0..32 {
427            let node1 = registry1.node_for_partition(partition_id);
428            let node2 = registry2.node_for_partition(partition_id);
429
430            assert_eq!(node1, node2);
431        }
432    }
433}