Skip to main content

allsource_core/infrastructure/cluster/
node_registry.rs

1/// Node Registry for Distributed Partitioning
2///
3/// Manages cluster nodes and partition assignments for horizontal scaling.
4/// Based on SierraDB's fixed partition architecture.
5///
6/// # Design
7/// - **Fixed partitions**: 32 partitions (single-node) or 1024+ (cluster)
8/// - **Consistent assignment**: Partitions assigned to nodes deterministically
9/// - **Health monitoring**: Track node health status
10/// - **Automatic rebalancing**: Reassign partitions on node failures
11///
12/// # Cluster Topology
13/// - Single-node: All 32 partitions on one node
14/// - 2-node: 16 partitions per node
15/// - 4-node: 8 partitions per node
16/// - 8-node: 4 partitions per node
17///
18/// # Example
19/// ```ignore
20/// let registry = NodeRegistry::new(32);
21///
22/// // Register nodes
23/// registry.register_node(Node {
24///     id: 0,
25///     address: "node-0:8080".to_string(),
26///     healthy: true,
27///     assigned_partitions: vec![],
28/// });
29///
30/// // Find node for partition
31/// let node_id = registry.node_for_partition(15);
32/// ```
33use dashmap::DashMap;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37/// Node in the cluster
38#[derive(Debug, Clone)]
39pub struct Node {
40    /// Unique node ID
41    pub id: u32,
42
43    /// Network address (host:port)
44    pub address: String,
45
46    /// Health status
47    pub healthy: bool,
48
49    /// Partitions assigned to this node
50    pub assigned_partitions: Vec<u32>,
51}
52
53/// Node Registry manages cluster topology
54pub struct NodeRegistry {
55    /// Total number of partitions (fixed)
56    partition_count: u32,
57
58    /// Registered nodes - using DashMap for lock-free concurrent access
59    nodes: Arc<DashMap<u32, Node>>,
60}
61
62impl NodeRegistry {
63    /// Create new node registry
64    ///
65    /// # Arguments
66    /// - `partition_count`: Total fixed partitions (32 for single-node, 1024+ for cluster)
67    pub fn new(partition_count: u32) -> Self {
68        Self {
69            partition_count,
70            nodes: Arc::new(DashMap::new()),
71        }
72    }
73
74    /// Register a node in the cluster
75    ///
76    /// Automatically rebalances partitions across healthy nodes.
77    pub fn register_node(&self, mut node: Node) {
78        // Clear assigned partitions (will be recalculated)
79        node.assigned_partitions.clear();
80
81        self.nodes.insert(node.id, node);
82
83        // Rebalance partitions
84        self.rebalance_partitions();
85    }
86
87    /// Unregister a node from the cluster
88    ///
89    /// Triggers automatic rebalancing to remaining nodes.
90    pub fn unregister_node(&self, node_id: u32) {
91        self.nodes.remove(&node_id);
92        self.rebalance_partitions();
93    }
94
95    /// Mark node as healthy or unhealthy
96    ///
97    /// Unhealthy nodes are excluded from partition assignment.
98    pub fn set_node_health(&self, node_id: u32, healthy: bool) {
99        if let Some(mut node) = self.nodes.get_mut(&node_id) {
100            node.healthy = healthy;
101        }
102        self.rebalance_partitions();
103    }
104
105    /// Rebalance partitions across healthy nodes
106    ///
107    /// Uses round-robin distribution for even load balancing.
108    fn rebalance_partitions(&self) {
109        // Clear existing assignments
110        for mut entry in self.nodes.iter_mut() {
111            entry.value_mut().assigned_partitions.clear();
112        }
113
114        // Get healthy nodes sorted by ID for deterministic assignment
115        let mut healthy_nodes: Vec<u32> = self.nodes
116            .iter()
117            .filter(|entry| entry.value().healthy)
118            .map(|entry| *entry.key())
119            .collect();
120
121        healthy_nodes.sort();
122
123        if healthy_nodes.is_empty() {
124            return; // No healthy nodes available
125        }
126
127        // Distribute partitions evenly using round-robin
128        for partition_id in 0..self.partition_count {
129            let node_idx = (partition_id as usize) % healthy_nodes.len();
130            let node_id = healthy_nodes[node_idx];
131
132            if let Some(mut node) = self.nodes.get_mut(&node_id) {
133                node.assigned_partitions.push(partition_id);
134            }
135        }
136    }
137
138    /// Find node responsible for a partition
139    ///
140    /// Returns None if no healthy node is assigned to the partition.
141    pub fn node_for_partition(&self, partition_id: u32) -> Option<u32> {
142        self.nodes
143            .iter()
144            .find(|entry| entry.value().healthy && entry.value().assigned_partitions.contains(&partition_id))
145            .map(|entry| entry.value().id)
146    }
147
148    /// Get node by ID
149    pub fn get_node(&self, node_id: u32) -> Option<Node> {
150        self.nodes.get(&node_id).map(|entry| entry.value().clone())
151    }
152
153    /// Get all nodes
154    pub fn all_nodes(&self) -> Vec<Node> {
155        self.nodes.iter().map(|entry| entry.value().clone()).collect()
156    }
157
158    /// Get healthy nodes
159    pub fn healthy_nodes(&self) -> Vec<Node> {
160        self.nodes
161            .iter()
162            .filter(|entry| entry.value().healthy)
163            .map(|entry| entry.value().clone())
164            .collect()
165    }
166
167    /// Get partition distribution statistics
168    pub fn partition_distribution(&self) -> HashMap<u32, Vec<u32>> {
169        self.nodes
170            .iter()
171            .filter(|entry| entry.value().healthy)
172            .map(|entry| (*entry.key(), entry.value().assigned_partitions.clone()))
173            .collect()
174    }
175
176    /// Check if cluster is healthy
177    ///
178    /// Returns true if all partitions have at least one healthy node assigned.
179    pub fn is_cluster_healthy(&self) -> bool {
180        for partition_id in 0..self.partition_count {
181            let has_node = self.nodes
182                .iter()
183                .any(|entry| entry.value().healthy && entry.value().assigned_partitions.contains(&partition_id));
184
185            if !has_node {
186                return false;
187            }
188        }
189
190        true
191    }
192
193    /// Get node count
194    pub fn node_count(&self) -> usize {
195        self.nodes.len()
196    }
197
198    /// Get healthy node count
199    pub fn healthy_node_count(&self) -> usize {
200        self.nodes.iter().filter(|entry| entry.value().healthy).count()
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn test_create_registry() {
210        let registry = NodeRegistry::new(32);
211        assert_eq!(registry.node_count(), 0);
212        assert_eq!(registry.healthy_node_count(), 0);
213    }
214
215    #[test]
216    fn test_register_node() {
217        let registry = NodeRegistry::new(32);
218
219        let node = Node {
220            id: 0,
221            address: "node-0:8080".to_string(),
222            healthy: true,
223            assigned_partitions: vec![],
224        };
225
226        registry.register_node(node);
227
228        assert_eq!(registry.node_count(), 1);
229        assert_eq!(registry.healthy_node_count(), 1);
230
231        // All partitions should be assigned to the single node
232        let node = registry.get_node(0).unwrap();
233        assert_eq!(node.assigned_partitions.len(), 32);
234    }
235
236    #[test]
237    fn test_two_node_distribution() {
238        let registry = NodeRegistry::new(32);
239
240        registry.register_node(Node {
241            id: 0,
242            address: "node-0:8080".to_string(),
243            healthy: true,
244            assigned_partitions: vec![],
245        });
246
247        registry.register_node(Node {
248            id: 1,
249            address: "node-1:8080".to_string(),
250            healthy: true,
251            assigned_partitions: vec![],
252        });
253
254        // Each node should have ~16 partitions
255        let node0 = registry.get_node(0).unwrap();
256        let node1 = registry.get_node(1).unwrap();
257
258        assert_eq!(node0.assigned_partitions.len(), 16);
259        assert_eq!(node1.assigned_partitions.len(), 16);
260
261        // Partitions should not overlap
262        for partition_id in &node0.assigned_partitions {
263            assert!(!node1.assigned_partitions.contains(partition_id));
264        }
265    }
266
267    #[test]
268    fn test_node_for_partition() {
269        let registry = NodeRegistry::new(32);
270
271        registry.register_node(Node {
272            id: 0,
273            address: "node-0:8080".to_string(),
274            healthy: true,
275            assigned_partitions: vec![],
276        });
277
278        registry.register_node(Node {
279            id: 1,
280            address: "node-1:8080".to_string(),
281            healthy: true,
282            assigned_partitions: vec![],
283        });
284
285        // Each partition should map to exactly one node
286        for partition_id in 0..32 {
287            let node_id = registry.node_for_partition(partition_id);
288            assert!(node_id.is_some());
289        }
290    }
291
292    #[test]
293    fn test_unhealthy_node_excluded() {
294        let registry = NodeRegistry::new(32);
295
296        registry.register_node(Node {
297            id: 0,
298            address: "node-0:8080".to_string(),
299            healthy: true,
300            assigned_partitions: vec![],
301        });
302
303        registry.register_node(Node {
304            id: 1,
305            address: "node-1:8080".to_string(),
306            healthy: false, // Unhealthy
307            assigned_partitions: vec![],
308        });
309
310        // All partitions should go to node 0
311        let node0 = registry.get_node(0).unwrap();
312        let node1 = registry.get_node(1).unwrap();
313
314        assert_eq!(node0.assigned_partitions.len(), 32);
315        assert_eq!(node1.assigned_partitions.len(), 0);
316    }
317
318    #[test]
319    fn test_rebalance_on_health_change() {
320        let registry = NodeRegistry::new(32);
321
322        registry.register_node(Node {
323            id: 0,
324            address: "node-0:8080".to_string(),
325            healthy: true,
326            assigned_partitions: vec![],
327        });
328
329        registry.register_node(Node {
330            id: 1,
331            address: "node-1:8080".to_string(),
332            healthy: true,
333            assigned_partitions: vec![],
334        });
335
336        // Initially 16/16 split
337        let node0_before = registry.get_node(0).unwrap();
338        assert_eq!(node0_before.assigned_partitions.len(), 16);
339
340        // Mark node 1 as unhealthy
341        registry.set_node_health(1, false);
342
343        // Node 0 should now have all 32 partitions
344        let node0_after = registry.get_node(0).unwrap();
345        assert_eq!(node0_after.assigned_partitions.len(), 32);
346    }
347
348    #[test]
349    fn test_cluster_health() {
350        let registry = NodeRegistry::new(32);
351
352        // No nodes - unhealthy
353        assert!(!registry.is_cluster_healthy());
354
355        // Add one healthy node
356        registry.register_node(Node {
357            id: 0,
358            address: "node-0:8080".to_string(),
359            healthy: true,
360            assigned_partitions: vec![],
361        });
362
363        assert!(registry.is_cluster_healthy());
364
365        // Mark unhealthy
366        registry.set_node_health(0, false);
367        assert!(!registry.is_cluster_healthy());
368    }
369
370    #[test]
371    fn test_partition_distribution() {
372        let registry = NodeRegistry::new(32);
373
374        registry.register_node(Node {
375            id: 0,
376            address: "node-0:8080".to_string(),
377            healthy: true,
378            assigned_partitions: vec![],
379        });
380
381        registry.register_node(Node {
382            id: 1,
383            address: "node-1:8080".to_string(),
384            healthy: true,
385            assigned_partitions: vec![],
386        });
387
388        let distribution = registry.partition_distribution();
389
390        assert_eq!(distribution.len(), 2);
391        assert_eq!(distribution.get(&0).unwrap().len(), 16);
392        assert_eq!(distribution.get(&1).unwrap().len(), 16);
393    }
394
395    #[test]
396    fn test_deterministic_assignment() {
397        let registry1 = NodeRegistry::new(32);
398        let registry2 = NodeRegistry::new(32);
399
400        // Register same nodes in same order
401        for i in 0..4 {
402            let node = Node {
403                id: i,
404                address: format!("node-{}:8080", i),
405                healthy: true,
406                assigned_partitions: vec![],
407            };
408
409            registry1.register_node(node.clone());
410            registry2.register_node(node);
411        }
412
413        // Partition assignments should be identical
414        for partition_id in 0..32 {
415            let node1 = registry1.node_for_partition(partition_id);
416            let node2 = registry2.node_for_partition(partition_id);
417
418            assert_eq!(node1, node2);
419        }
420    }
421}