Skip to main content

allsource_core/infrastructure/cluster/
request_router.rs

1use super::node_registry::{Node, NodeRegistry};
2use crate::{
3    domain::value_objects::{EntityId, PartitionKey},
4    error::{AllSourceError, Result},
5};
6/// Request Router for Distributed Partitioning
7///
8/// Routes requests to the correct node based on partition assignment.
9/// Enables horizontal scaling with SierraDB's fixed partition architecture.
10///
11/// # Design
12/// - **Partition-aware routing**: Entity ID → Partition → Node
13/// - **Failover**: Retry on node failure
14/// - **Load balancing**: Even distribution via consistent hashing
15///
16/// # Example
17/// ```ignore
18/// let registry = Arc::new(NodeRegistry::new(32));
19/// let router = RequestRouter::new(registry.clone());
20///
21/// let entity_id = EntityId::new("user-123".to_string())?;
22/// let target_node = router.route_for_entity(&entity_id)?;
23///
24/// // Send request to target_node.address
25/// ```
26use std::sync::Arc;
27
28/// Request Router for partition-aware request routing
29pub struct RequestRouter {
30    registry: Arc<NodeRegistry>,
31}
32
33impl RequestRouter {
34    /// Create new request router
35    ///
36    /// # Arguments
37    /// - `registry`: Shared node registry for cluster topology
38    pub fn new(registry: Arc<NodeRegistry>) -> Self {
39        Self { registry }
40    }
41
42    /// Route request for an entity
43    ///
44    /// Determines the partition for the entity and finds the responsible node.
45    ///
46    /// # Returns
47    /// - `Node`: Target node to send the request to
48    ///
49    /// # Errors
50    /// - Returns error if no healthy node is available for the partition
51    pub fn route_for_entity(&self, entity_id: &EntityId) -> Result<Node> {
52        // Determine partition using consistent hashing
53        let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
54        self.route_for_partition(&partition_key)
55    }
56
57    /// Route request for a specific partition
58    ///
59    /// # Returns
60    /// - `Node`: Target node responsible for this partition
61    ///
62    /// # Errors
63    /// - Returns error if no healthy node is available
64    pub fn route_for_partition(&self, partition_key: &PartitionKey) -> Result<Node> {
65        let partition_id = partition_key.partition_id();
66
67        let node_id = self
68            .registry
69            .node_for_partition(partition_id)
70            .ok_or_else(|| {
71                AllSourceError::StorageError(format!(
72                    "No healthy node available for partition {}",
73                    partition_id
74                ))
75            })?;
76
77        self.registry.get_node(node_id).ok_or_else(|| {
78            AllSourceError::InternalError(format!("Node {node_id} not found in registry"))
79        })
80    }
81
82    /// Get all nodes for load-balanced read operations
83    ///
84    /// Returns all healthy nodes that can serve read requests.
85    /// Useful for fan-out queries across multiple nodes.
86    pub fn nodes_for_read(&self) -> Vec<Node> {
87        self.registry.healthy_nodes()
88    }
89
90    /// Check if a specific node can handle the entity
91    ///
92    /// Useful for sticky sessions or connection pooling.
93    pub fn can_node_handle_entity(&self, entity_id: &EntityId, node_id: u32) -> bool {
94        let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
95        let partition_id = partition_key.partition_id();
96
97        if let Some(assigned_node_id) = self.registry.node_for_partition(partition_id) {
98            assigned_node_id == node_id
99        } else {
100            false
101        }
102    }
103
104    /// Get partition distribution for monitoring
105    ///
106    /// Returns map of node_id -> partition_ids for observability.
107    pub fn partition_distribution(&self) -> std::collections::HashMap<u32, Vec<u32>> {
108        self.registry.partition_distribution()
109    }
110
111    /// Check if routing is available
112    ///
113    /// Returns true if cluster is healthy and can handle requests.
114    pub fn is_available(&self) -> bool {
115        self.registry.is_cluster_healthy()
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use crate::infrastructure::cluster::node_registry::Node;
123
124    fn setup_cluster() -> (Arc<NodeRegistry>, RequestRouter) {
125        let registry = Arc::new(NodeRegistry::new(32));
126
127        // Register 4 nodes
128        for i in 0..4 {
129            registry.register_node(Node {
130                id: i,
131                address: format!("node-{}:8080", i),
132                healthy: true,
133                assigned_partitions: vec![],
134            });
135        }
136
137        let router = RequestRouter::new(registry.clone());
138
139        (registry, router)
140    }
141
142    #[test]
143    fn test_create_router() {
144        let registry = Arc::new(NodeRegistry::new(32));
145        let _router = RequestRouter::new(registry);
146    }
147
148    #[test]
149    fn test_route_for_entity() {
150        let (_registry, router) = setup_cluster();
151
152        let entity_id = EntityId::new("user-123".to_string()).unwrap();
153        let node = router.route_for_entity(&entity_id).unwrap();
154
155        // Should route to one of the 4 nodes
156        assert!(node.id < 4);
157        assert!(node.healthy);
158    }
159
160    #[test]
161    fn test_consistent_routing() {
162        let (_registry, router) = setup_cluster();
163
164        let entity_id = EntityId::new("user-123".to_string()).unwrap();
165
166        // Multiple calls should route to same node
167        let node1 = router.route_for_entity(&entity_id).unwrap();
168        let node2 = router.route_for_entity(&entity_id).unwrap();
169        let node3 = router.route_for_entity(&entity_id).unwrap();
170
171        assert_eq!(node1.id, node2.id);
172        assert_eq!(node2.id, node3.id);
173    }
174
175    #[test]
176    fn test_different_entities_may_route_differently() {
177        let (_registry, router) = setup_cluster();
178
179        let entity1 = EntityId::new("user-1".to_string()).unwrap();
180        let entity2 = EntityId::new("user-2".to_string()).unwrap();
181        let entity3 = EntityId::new("user-3".to_string()).unwrap();
182
183        let node1 = router.route_for_entity(&entity1).unwrap();
184        let node2 = router.route_for_entity(&entity2).unwrap();
185        let node3 = router.route_for_entity(&entity3).unwrap();
186
187        // Not all should route to same node (with 4 nodes and 3 entities)
188        let unique_nodes: std::collections::HashSet<_> =
189            vec![node1.id, node2.id, node3.id].into_iter().collect();
190
191        // Should have some distribution (not guaranteed, but likely)
192        println!("Unique nodes: {:?}", unique_nodes);
193    }
194
195    #[test]
196    fn test_route_for_partition() {
197        let (_registry, router) = setup_cluster();
198
199        let partition_key = PartitionKey::from_partition_id(15, 32).unwrap();
200        let node = router.route_for_partition(&partition_key).unwrap();
201
202        assert!(node.id < 4);
203        assert!(node.healthy);
204    }
205
206    #[test]
207    fn test_can_node_handle_entity() {
208        let (_registry, router) = setup_cluster();
209
210        let entity_id = EntityId::new("user-123".to_string()).unwrap();
211        let target_node = router.route_for_entity(&entity_id).unwrap();
212
213        // Target node should be able to handle the entity
214        assert!(router.can_node_handle_entity(&entity_id, target_node.id));
215
216        // Other nodes should not (unless by chance they have overlapping partitions)
217        // This is deterministic, so we can check
218        for i in 0..4 {
219            if i != target_node.id {
220                // Other nodes may or may not handle it (depends on partition distribution)
221                let _can_handle = router.can_node_handle_entity(&entity_id, i);
222            }
223        }
224    }
225
226    #[test]
227    fn test_nodes_for_read() {
228        let (_registry, router) = setup_cluster();
229
230        let nodes = router.nodes_for_read();
231
232        assert_eq!(nodes.len(), 4);
233        assert!(nodes.iter().all(|n| n.healthy));
234    }
235
236    #[test]
237    fn test_no_healthy_nodes() {
238        let registry = Arc::new(NodeRegistry::new(32));
239
240        // Register unhealthy node
241        registry.register_node(Node {
242            id: 0,
243            address: "node-0:8080".to_string(),
244            healthy: false,
245            assigned_partitions: vec![],
246        });
247
248        let router = RequestRouter::new(registry);
249
250        let entity_id = EntityId::new("user-123".to_string()).unwrap();
251        let result = router.route_for_entity(&entity_id);
252
253        assert!(result.is_err());
254    }
255
256    #[test]
257    fn test_partition_distribution() {
258        let (_registry, router) = setup_cluster();
259
260        let distribution = router.partition_distribution();
261
262        assert_eq!(distribution.len(), 4);
263
264        // Each node should have 8 partitions (32/4)
265        for (_node_id, partitions) in distribution {
266            assert_eq!(partitions.len(), 8);
267        }
268    }
269
270    #[test]
271    fn test_is_available() {
272        let (registry, router) = setup_cluster();
273
274        // Cluster is healthy
275        assert!(router.is_available());
276
277        // Mark all nodes unhealthy
278        for i in 0..4 {
279            registry.set_node_health(i, false);
280        }
281
282        // Cluster is now unavailable
283        assert!(!router.is_available());
284    }
285}