Skip to main content

allsource_core/infrastructure/cluster/
node_registry.rs

1/// Node Registry for Distributed Partitioning
2///
3/// Manages cluster nodes and partition assignments for horizontal scaling.
4/// Based on SierraDB's fixed partition architecture.
5///
6/// # Design
7/// - **Fixed partitions**: 32 partitions (single-node) or 1024+ (cluster)
8/// - **Consistent assignment**: Partitions assigned to nodes deterministically
9/// - **Health monitoring**: Track node health status
10/// - **Automatic rebalancing**: Reassign partitions on node failures
11///
12/// # Cluster Topology
13/// - Single-node: All 32 partitions on one node
14/// - 2-node: 16 partitions per node
15/// - 4-node: 8 partitions per node
16/// - 8-node: 4 partitions per node
17///
18/// # Example
19/// ```ignore
20/// let registry = NodeRegistry::new(32);
21///
22/// // Register nodes
23/// registry.register_node(Node {
24///     id: 0,
25///     address: "node-0:8080".to_string(),
26///     healthy: true,
27///     assigned_partitions: vec![],
28/// });
29///
30/// // Find node for partition
31/// let node_id = registry.node_for_partition(15);
32/// ```
33use dashmap::DashMap;
34use std::{collections::HashMap, sync::Arc};
35
36/// Node in the cluster
37#[derive(Debug, Clone)]
38pub struct Node {
39    /// Unique node ID
40    pub id: u32,
41
42    /// Network address (host:port)
43    pub address: String,
44
45    /// Health status
46    pub healthy: bool,
47
48    /// Partitions assigned to this node
49    pub assigned_partitions: Vec<u32>,
50}
51
52/// Node Registry manages cluster topology
53pub struct NodeRegistry {
54    /// Total number of partitions (fixed)
55    partition_count: u32,
56
57    /// Registered nodes - using DashMap for lock-free concurrent access
58    nodes: Arc<DashMap<u32, Node>>,
59}
60
61impl NodeRegistry {
62    /// Create new node registry
63    ///
64    /// # Arguments
65    /// - `partition_count`: Total fixed partitions (32 for single-node, 1024+ for cluster)
66    pub fn new(partition_count: u32) -> Self {
67        Self {
68            partition_count,
69            nodes: Arc::new(DashMap::new()),
70        }
71    }
72
73    /// Register a node in the cluster
74    ///
75    /// Automatically rebalances partitions across healthy nodes.
76    pub fn register_node(&self, mut node: Node) {
77        // Clear assigned partitions (will be recalculated)
78        node.assigned_partitions.clear();
79
80        self.nodes.insert(node.id, node);
81
82        // Rebalance partitions
83        self.rebalance_partitions();
84    }
85
86    /// Unregister a node from the cluster
87    ///
88    /// Triggers automatic rebalancing to remaining nodes.
89    pub fn unregister_node(&self, node_id: u32) {
90        self.nodes.remove(&node_id);
91        self.rebalance_partitions();
92    }
93
94    /// Mark node as healthy or unhealthy
95    ///
96    /// Unhealthy nodes are excluded from partition assignment.
97    pub fn set_node_health(&self, node_id: u32, healthy: bool) {
98        if let Some(mut node) = self.nodes.get_mut(&node_id) {
99            node.healthy = healthy;
100        }
101        self.rebalance_partitions();
102    }
103
104    /// Rebalance partitions across healthy nodes
105    ///
106    /// Uses round-robin distribution for even load balancing.
107    fn rebalance_partitions(&self) {
108        // Clear existing assignments
109        for mut entry in self.nodes.iter_mut() {
110            entry.value_mut().assigned_partitions.clear();
111        }
112
113        // Get healthy nodes sorted by ID for deterministic assignment
114        let mut healthy_nodes: Vec<u32> = self
115            .nodes
116            .iter()
117            .filter(|entry| entry.value().healthy)
118            .map(|entry| *entry.key())
119            .collect();
120
121        healthy_nodes.sort_unstable();
122
123        if healthy_nodes.is_empty() {
124            return; // No healthy nodes available
125        }
126
127        // Distribute partitions evenly using round-robin
128        for partition_id in 0..self.partition_count {
129            let node_idx = (partition_id as usize) % healthy_nodes.len();
130            let node_id = healthy_nodes[node_idx];
131
132            if let Some(mut node) = self.nodes.get_mut(&node_id) {
133                node.assigned_partitions.push(partition_id);
134            }
135        }
136    }
137
138    /// Find node responsible for a partition
139    ///
140    /// Returns None if no healthy node is assigned to the partition.
141    pub fn node_for_partition(&self, partition_id: u32) -> Option<u32> {
142        self.nodes
143            .iter()
144            .find(|entry| {
145                entry.value().healthy && entry.value().assigned_partitions.contains(&partition_id)
146            })
147            .map(|entry| entry.value().id)
148    }
149
150    /// Get node by ID
151    pub fn get_node(&self, node_id: u32) -> Option<Node> {
152        self.nodes.get(&node_id).map(|entry| entry.value().clone())
153    }
154
155    /// Get all nodes
156    pub fn all_nodes(&self) -> Vec<Node> {
157        self.nodes
158            .iter()
159            .map(|entry| entry.value().clone())
160            .collect()
161    }
162
163    /// Get healthy nodes
164    pub fn healthy_nodes(&self) -> Vec<Node> {
165        self.nodes
166            .iter()
167            .filter(|entry| entry.value().healthy)
168            .map(|entry| entry.value().clone())
169            .collect()
170    }
171
172    /// Get partition distribution statistics
173    pub fn partition_distribution(&self) -> HashMap<u32, Vec<u32>> {
174        self.nodes
175            .iter()
176            .filter(|entry| entry.value().healthy)
177            .map(|entry| (*entry.key(), entry.value().assigned_partitions.clone()))
178            .collect()
179    }
180
181    /// Check if cluster is healthy
182    ///
183    /// Returns true if all partitions have at least one healthy node assigned.
184    pub fn is_cluster_healthy(&self) -> bool {
185        for partition_id in 0..self.partition_count {
186            let has_node = self.nodes.iter().any(|entry| {
187                entry.value().healthy && entry.value().assigned_partitions.contains(&partition_id)
188            });
189
190            if !has_node {
191                return false;
192            }
193        }
194
195        true
196    }
197
198    /// Get node count
199    pub fn node_count(&self) -> usize {
200        self.nodes.len()
201    }
202
203    /// Get healthy node count
204    pub fn healthy_node_count(&self) -> usize {
205        self.nodes
206            .iter()
207            .filter(|entry| entry.value().healthy)
208            .count()
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_create_registry() {
218        let registry = NodeRegistry::new(32);
219        assert_eq!(registry.node_count(), 0);
220        assert_eq!(registry.healthy_node_count(), 0);
221    }
222
223    #[test]
224    fn test_register_node() {
225        let registry = NodeRegistry::new(32);
226
227        let node = Node {
228            id: 0,
229            address: "node-0:8080".to_string(),
230            healthy: true,
231            assigned_partitions: vec![],
232        };
233
234        registry.register_node(node);
235
236        assert_eq!(registry.node_count(), 1);
237        assert_eq!(registry.healthy_node_count(), 1);
238
239        // All partitions should be assigned to the single node
240        let node = registry.get_node(0).unwrap();
241        assert_eq!(node.assigned_partitions.len(), 32);
242    }
243
244    #[test]
245    fn test_two_node_distribution() {
246        let registry = NodeRegistry::new(32);
247
248        registry.register_node(Node {
249            id: 0,
250            address: "node-0:8080".to_string(),
251            healthy: true,
252            assigned_partitions: vec![],
253        });
254
255        registry.register_node(Node {
256            id: 1,
257            address: "node-1:8080".to_string(),
258            healthy: true,
259            assigned_partitions: vec![],
260        });
261
262        // Each node should have ~16 partitions
263        let node0 = registry.get_node(0).unwrap();
264        let node1 = registry.get_node(1).unwrap();
265
266        assert_eq!(node0.assigned_partitions.len(), 16);
267        assert_eq!(node1.assigned_partitions.len(), 16);
268
269        // Partitions should not overlap
270        for partition_id in &node0.assigned_partitions {
271            assert!(!node1.assigned_partitions.contains(partition_id));
272        }
273    }
274
275    #[test]
276    fn test_node_for_partition() {
277        let registry = NodeRegistry::new(32);
278
279        registry.register_node(Node {
280            id: 0,
281            address: "node-0:8080".to_string(),
282            healthy: true,
283            assigned_partitions: vec![],
284        });
285
286        registry.register_node(Node {
287            id: 1,
288            address: "node-1:8080".to_string(),
289            healthy: true,
290            assigned_partitions: vec![],
291        });
292
293        // Each partition should map to exactly one node
294        for partition_id in 0..32 {
295            let node_id = registry.node_for_partition(partition_id);
296            assert!(node_id.is_some());
297        }
298    }
299
300    #[test]
301    fn test_unhealthy_node_excluded() {
302        let registry = NodeRegistry::new(32);
303
304        registry.register_node(Node {
305            id: 0,
306            address: "node-0:8080".to_string(),
307            healthy: true,
308            assigned_partitions: vec![],
309        });
310
311        registry.register_node(Node {
312            id: 1,
313            address: "node-1:8080".to_string(),
314            healthy: false, // Unhealthy
315            assigned_partitions: vec![],
316        });
317
318        // All partitions should go to node 0
319        let node0 = registry.get_node(0).unwrap();
320        let node1 = registry.get_node(1).unwrap();
321
322        assert_eq!(node0.assigned_partitions.len(), 32);
323        assert_eq!(node1.assigned_partitions.len(), 0);
324    }
325
326    #[test]
327    fn test_rebalance_on_health_change() {
328        let registry = NodeRegistry::new(32);
329
330        registry.register_node(Node {
331            id: 0,
332            address: "node-0:8080".to_string(),
333            healthy: true,
334            assigned_partitions: vec![],
335        });
336
337        registry.register_node(Node {
338            id: 1,
339            address: "node-1:8080".to_string(),
340            healthy: true,
341            assigned_partitions: vec![],
342        });
343
344        // Initially 16/16 split
345        let node0_before = registry.get_node(0).unwrap();
346        assert_eq!(node0_before.assigned_partitions.len(), 16);
347
348        // Mark node 1 as unhealthy
349        registry.set_node_health(1, false);
350
351        // Node 0 should now have all 32 partitions
352        let node0_after = registry.get_node(0).unwrap();
353        assert_eq!(node0_after.assigned_partitions.len(), 32);
354    }
355
356    #[test]
357    fn test_cluster_health() {
358        let registry = NodeRegistry::new(32);
359
360        // No nodes - unhealthy
361        assert!(!registry.is_cluster_healthy());
362
363        // Add one healthy node
364        registry.register_node(Node {
365            id: 0,
366            address: "node-0:8080".to_string(),
367            healthy: true,
368            assigned_partitions: vec![],
369        });
370
371        assert!(registry.is_cluster_healthy());
372
373        // Mark unhealthy
374        registry.set_node_health(0, false);
375        assert!(!registry.is_cluster_healthy());
376    }
377
378    #[test]
379    fn test_partition_distribution() {
380        let registry = NodeRegistry::new(32);
381
382        registry.register_node(Node {
383            id: 0,
384            address: "node-0:8080".to_string(),
385            healthy: true,
386            assigned_partitions: vec![],
387        });
388
389        registry.register_node(Node {
390            id: 1,
391            address: "node-1:8080".to_string(),
392            healthy: true,
393            assigned_partitions: vec![],
394        });
395
396        let distribution = registry.partition_distribution();
397
398        assert_eq!(distribution.len(), 2);
399        assert_eq!(distribution.get(&0).unwrap().len(), 16);
400        assert_eq!(distribution.get(&1).unwrap().len(), 16);
401    }
402
403    #[test]
404    fn test_deterministic_assignment() {
405        let registry1 = NodeRegistry::new(32);
406        let registry2 = NodeRegistry::new(32);
407
408        // Register same nodes in same order
409        for i in 0..4 {
410            let node = Node {
411                id: i,
412                address: format!("node-{i}:8080"),
413                healthy: true,
414                assigned_partitions: vec![],
415            };
416
417            registry1.register_node(node.clone());
418            registry2.register_node(node);
419        }
420
421        // Partition assignments should be identical
422        for partition_id in 0..32 {
423            let node1 = registry1.node_for_partition(partition_id);
424            let node2 = registry2.node_for_partition(partition_id);
425
426            assert_eq!(node1, node2);
427        }
428    }
429}