allsource_core/infrastructure/cluster/
node_registry.rs

1/// Node Registry for Distributed Partitioning
2///
3/// Manages cluster nodes and partition assignments for horizontal scaling.
4/// Based on SierraDB's fixed partition architecture.
5///
6/// # Design
7/// - **Fixed partitions**: 32 partitions (single-node) or 1024+ (cluster)
8/// - **Consistent assignment**: Partitions assigned to nodes deterministically
9/// - **Health monitoring**: Track node health status
10/// - **Automatic rebalancing**: Reassign partitions on node failures
11///
12/// # Cluster Topology
13/// - Single-node: All 32 partitions on one node
14/// - 2-node: 16 partitions per node
15/// - 4-node: 8 partitions per node
16/// - 8-node: 4 partitions per node
17///
18/// # Example
19/// ```ignore
20/// let registry = NodeRegistry::new(32);
21///
22/// // Register nodes
23/// registry.register_node(Node {
24///     id: 0,
25///     address: "node-0:8080".to_string(),
26///     healthy: true,
27///     assigned_partitions: vec![],
28/// });
29///
30/// // Find node for partition
31/// let node_id = registry.node_for_partition(15);
32/// ```
33
34use parking_lot::RwLock;
35use std::collections::HashMap;
36use std::sync::Arc;
37
38/// Node in the cluster
39#[derive(Debug, Clone)]
40pub struct Node {
41    /// Unique node ID
42    pub id: u32,
43
44    /// Network address (host:port)
45    pub address: String,
46
47    /// Health status
48    pub healthy: bool,
49
50    /// Partitions assigned to this node
51    pub assigned_partitions: Vec<u32>,
52}
53
54/// Node Registry manages cluster topology
55pub struct NodeRegistry {
56    /// Total number of partitions (fixed)
57    partition_count: u32,
58
59    /// Registered nodes
60    nodes: Arc<RwLock<HashMap<u32, Node>>>,
61}
62
63impl NodeRegistry {
64    /// Create new node registry
65    ///
66    /// # Arguments
67    /// - `partition_count`: Total fixed partitions (32 for single-node, 1024+ for cluster)
68    pub fn new(partition_count: u32) -> Self {
69        Self {
70            partition_count,
71            nodes: Arc::new(RwLock::new(HashMap::new())),
72        }
73    }
74
75    /// Register a node in the cluster
76    ///
77    /// Automatically rebalances partitions across healthy nodes.
78    pub fn register_node(&self, mut node: Node) {
79        let mut nodes = self.nodes.write();
80
81        // Clear assigned partitions (will be recalculated)
82        node.assigned_partitions.clear();
83
84        nodes.insert(node.id, node);
85
86        // Rebalance partitions
87        self.rebalance_partitions_locked(&mut nodes);
88    }
89
90    /// Unregister a node from the cluster
91    ///
92    /// Triggers automatic rebalancing to remaining nodes.
93    pub fn unregister_node(&self, node_id: u32) {
94        let mut nodes = self.nodes.write();
95        nodes.remove(&node_id);
96        self.rebalance_partitions_locked(&mut nodes);
97    }
98
99    /// Mark node as healthy or unhealthy
100    ///
101    /// Unhealthy nodes are excluded from partition assignment.
102    pub fn set_node_health(&self, node_id: u32, healthy: bool) {
103        let mut nodes = self.nodes.write();
104
105        if let Some(node) = nodes.get_mut(&node_id) {
106            node.healthy = healthy;
107            self.rebalance_partitions_locked(&mut nodes);
108        }
109    }
110
111    /// Rebalance partitions across healthy nodes
112    ///
113    /// Uses round-robin distribution for even load balancing.
114    fn rebalance_partitions_locked(&self, nodes: &mut HashMap<u32, Node>) {
115        // Clear existing assignments
116        for node in nodes.values_mut() {
117            node.assigned_partitions.clear();
118        }
119
120        // Get healthy nodes sorted by ID for deterministic assignment
121        let mut healthy_nodes: Vec<u32> = nodes.iter()
122            .filter(|(_, n)| n.healthy)
123            .map(|(id, _)| *id)
124            .collect();
125
126        healthy_nodes.sort();
127
128        if healthy_nodes.is_empty() {
129            return; // No healthy nodes available
130        }
131
132        // Distribute partitions evenly using round-robin
133        for partition_id in 0..self.partition_count {
134            let node_idx = (partition_id as usize) % healthy_nodes.len();
135            let node_id = healthy_nodes[node_idx];
136
137            if let Some(node) = nodes.get_mut(&node_id) {
138                node.assigned_partitions.push(partition_id);
139            }
140        }
141    }
142
143    /// Find node responsible for a partition
144    ///
145    /// Returns None if no healthy node is assigned to the partition.
146    pub fn node_for_partition(&self, partition_id: u32) -> Option<u32> {
147        let nodes = self.nodes.read();
148
149        nodes.values()
150            .find(|n| n.healthy && n.assigned_partitions.contains(&partition_id))
151            .map(|n| n.id)
152    }
153
154    /// Get node by ID
155    pub fn get_node(&self, node_id: u32) -> Option<Node> {
156        self.nodes.read().get(&node_id).cloned()
157    }
158
159    /// Get all nodes
160    pub fn all_nodes(&self) -> Vec<Node> {
161        self.nodes.read().values().cloned().collect()
162    }
163
164    /// Get healthy nodes
165    pub fn healthy_nodes(&self) -> Vec<Node> {
166        self.nodes.read()
167            .values()
168            .filter(|n| n.healthy)
169            .cloned()
170            .collect()
171    }
172
173    /// Get partition distribution statistics
174    pub fn partition_distribution(&self) -> HashMap<u32, Vec<u32>> {
175        let nodes = self.nodes.read();
176
177        nodes.iter()
178            .filter(|(_, n)| n.healthy)
179            .map(|(id, n)| (*id, n.assigned_partitions.clone()))
180            .collect()
181    }
182
183    /// Check if cluster is healthy
184    ///
185    /// Returns true if all partitions have at least one healthy node assigned.
186    pub fn is_cluster_healthy(&self) -> bool {
187        let nodes = self.nodes.read();
188
189        for partition_id in 0..self.partition_count {
190            let has_node = nodes.values()
191                .any(|n| n.healthy && n.assigned_partitions.contains(&partition_id));
192
193            if !has_node {
194                return false;
195            }
196        }
197
198        true
199    }
200
201    /// Get node count
202    pub fn node_count(&self) -> usize {
203        self.nodes.read().len()
204    }
205
206    /// Get healthy node count
207    pub fn healthy_node_count(&self) -> usize {
208        self.nodes.read().values().filter(|n| n.healthy).count()
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_create_registry() {
218        let registry = NodeRegistry::new(32);
219        assert_eq!(registry.node_count(), 0);
220        assert_eq!(registry.healthy_node_count(), 0);
221    }
222
223    #[test]
224    fn test_register_node() {
225        let registry = NodeRegistry::new(32);
226
227        let node = Node {
228            id: 0,
229            address: "node-0:8080".to_string(),
230            healthy: true,
231            assigned_partitions: vec![],
232        };
233
234        registry.register_node(node);
235
236        assert_eq!(registry.node_count(), 1);
237        assert_eq!(registry.healthy_node_count(), 1);
238
239        // All partitions should be assigned to the single node
240        let node = registry.get_node(0).unwrap();
241        assert_eq!(node.assigned_partitions.len(), 32);
242    }
243
244    #[test]
245    fn test_two_node_distribution() {
246        let registry = NodeRegistry::new(32);
247
248        registry.register_node(Node {
249            id: 0,
250            address: "node-0:8080".to_string(),
251            healthy: true,
252            assigned_partitions: vec![],
253        });
254
255        registry.register_node(Node {
256            id: 1,
257            address: "node-1:8080".to_string(),
258            healthy: true,
259            assigned_partitions: vec![],
260        });
261
262        // Each node should have ~16 partitions
263        let node0 = registry.get_node(0).unwrap();
264        let node1 = registry.get_node(1).unwrap();
265
266        assert_eq!(node0.assigned_partitions.len(), 16);
267        assert_eq!(node1.assigned_partitions.len(), 16);
268
269        // Partitions should not overlap
270        for partition_id in &node0.assigned_partitions {
271            assert!(!node1.assigned_partitions.contains(partition_id));
272        }
273    }
274
275    #[test]
276    fn test_node_for_partition() {
277        let registry = NodeRegistry::new(32);
278
279        registry.register_node(Node {
280            id: 0,
281            address: "node-0:8080".to_string(),
282            healthy: true,
283            assigned_partitions: vec![],
284        });
285
286        registry.register_node(Node {
287            id: 1,
288            address: "node-1:8080".to_string(),
289            healthy: true,
290            assigned_partitions: vec![],
291        });
292
293        // Each partition should map to exactly one node
294        for partition_id in 0..32 {
295            let node_id = registry.node_for_partition(partition_id);
296            assert!(node_id.is_some());
297        }
298    }
299
300    #[test]
301    fn test_unhealthy_node_excluded() {
302        let registry = NodeRegistry::new(32);
303
304        registry.register_node(Node {
305            id: 0,
306            address: "node-0:8080".to_string(),
307            healthy: true,
308            assigned_partitions: vec![],
309        });
310
311        registry.register_node(Node {
312            id: 1,
313            address: "node-1:8080".to_string(),
314            healthy: false, // Unhealthy
315            assigned_partitions: vec![],
316        });
317
318        // All partitions should go to node 0
319        let node0 = registry.get_node(0).unwrap();
320        let node1 = registry.get_node(1).unwrap();
321
322        assert_eq!(node0.assigned_partitions.len(), 32);
323        assert_eq!(node1.assigned_partitions.len(), 0);
324    }
325
326    #[test]
327    fn test_rebalance_on_health_change() {
328        let registry = NodeRegistry::new(32);
329
330        registry.register_node(Node {
331            id: 0,
332            address: "node-0:8080".to_string(),
333            healthy: true,
334            assigned_partitions: vec![],
335        });
336
337        registry.register_node(Node {
338            id: 1,
339            address: "node-1:8080".to_string(),
340            healthy: true,
341            assigned_partitions: vec![],
342        });
343
344        // Initially 16/16 split
345        let node0_before = registry.get_node(0).unwrap();
346        assert_eq!(node0_before.assigned_partitions.len(), 16);
347
348        // Mark node 1 as unhealthy
349        registry.set_node_health(1, false);
350
351        // Node 0 should now have all 32 partitions
352        let node0_after = registry.get_node(0).unwrap();
353        assert_eq!(node0_after.assigned_partitions.len(), 32);
354    }
355
356    #[test]
357    fn test_cluster_health() {
358        let registry = NodeRegistry::new(32);
359
360        // No nodes - unhealthy
361        assert!(!registry.is_cluster_healthy());
362
363        // Add one healthy node
364        registry.register_node(Node {
365            id: 0,
366            address: "node-0:8080".to_string(),
367            healthy: true,
368            assigned_partitions: vec![],
369        });
370
371        assert!(registry.is_cluster_healthy());
372
373        // Mark unhealthy
374        registry.set_node_health(0, false);
375        assert!(!registry.is_cluster_healthy());
376    }
377
378    #[test]
379    fn test_partition_distribution() {
380        let registry = NodeRegistry::new(32);
381
382        registry.register_node(Node {
383            id: 0,
384            address: "node-0:8080".to_string(),
385            healthy: true,
386            assigned_partitions: vec![],
387        });
388
389        registry.register_node(Node {
390            id: 1,
391            address: "node-1:8080".to_string(),
392            healthy: true,
393            assigned_partitions: vec![],
394        });
395
396        let distribution = registry.partition_distribution();
397
398        assert_eq!(distribution.len(), 2);
399        assert_eq!(distribution.get(&0).unwrap().len(), 16);
400        assert_eq!(distribution.get(&1).unwrap().len(), 16);
401    }
402
403    #[test]
404    fn test_deterministic_assignment() {
405        let registry1 = NodeRegistry::new(32);
406        let registry2 = NodeRegistry::new(32);
407
408        // Register same nodes in same order
409        for i in 0..4 {
410            let node = Node {
411                id: i,
412                address: format!("node-{}:8080", i),
413                healthy: true,
414                assigned_partitions: vec![],
415            };
416
417            registry1.register_node(node.clone());
418            registry2.register_node(node);
419        }
420
421        // Partition assignments should be identical
422        for partition_id in 0..32 {
423            let node1 = registry1.node_for_partition(partition_id);
424            let node2 = registry2.node_for_partition(partition_id);
425
426            assert_eq!(node1, node2);
427        }
428    }
429}