allsource_core/infrastructure/cluster/
request_router.rs

1/// Request Router for Distributed Partitioning
2///
3/// Routes requests to the correct node based on partition assignment.
4/// Enables horizontal scaling with SierraDB's fixed partition architecture.
5///
6/// # Design
7/// - **Partition-aware routing**: Entity ID → Partition → Node
8/// - **Failover**: Retry on node failure
9/// - **Load balancing**: Even distribution via consistent hashing
10///
11/// # Example
12/// ```ignore
13/// let registry = Arc::new(NodeRegistry::new(32));
14/// let router = RequestRouter::new(registry.clone());
15///
16/// let entity_id = EntityId::new("user-123".to_string())?;
17/// let target_node = router.route_for_entity(&entity_id)?;
18///
19/// // Send request to target_node.address
20/// ```
21
22use std::sync::Arc;
23use crate::domain::value_objects::{EntityId, PartitionKey};
24use crate::error::{AllSourceError, Result};
25use super::node_registry::{Node, NodeRegistry};
26
27/// Request Router for partition-aware request routing
28pub struct RequestRouter {
29    registry: Arc<NodeRegistry>,
30}
31
32impl RequestRouter {
33    /// Create new request router
34    ///
35    /// # Arguments
36    /// - `registry`: Shared node registry for cluster topology
37    pub fn new(registry: Arc<NodeRegistry>) -> Self {
38        Self { registry }
39    }
40
41    /// Route request for an entity
42    ///
43    /// Determines the partition for the entity and finds the responsible node.
44    ///
45    /// # Returns
46    /// - `Node`: Target node to send the request to
47    ///
48    /// # Errors
49    /// - Returns error if no healthy node is available for the partition
50    pub fn route_for_entity(&self, entity_id: &EntityId) -> Result<Node> {
51        // Determine partition using consistent hashing
52        let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
53        self.route_for_partition(&partition_key)
54    }
55
56    /// Route request for a specific partition
57    ///
58    /// # Returns
59    /// - `Node`: Target node responsible for this partition
60    ///
61    /// # Errors
62    /// - Returns error if no healthy node is available
63    pub fn route_for_partition(&self, partition_key: &PartitionKey) -> Result<Node> {
64        let partition_id = partition_key.partition_id();
65
66        let node_id = self.registry.node_for_partition(partition_id)
67            .ok_or_else(|| AllSourceError::StorageError(format!(
68                "No healthy node available for partition {}",
69                partition_id
70            )))?;
71
72        self.registry.get_node(node_id)
73            .ok_or_else(|| AllSourceError::InternalError(format!(
74                "Node {} not found in registry",
75                node_id
76            )))
77    }
78
79    /// Get all nodes for load-balanced read operations
80    ///
81    /// Returns all healthy nodes that can serve read requests.
82    /// Useful for fan-out queries across multiple nodes.
83    pub fn nodes_for_read(&self) -> Vec<Node> {
84        self.registry.healthy_nodes()
85    }
86
87    /// Check if a specific node can handle the entity
88    ///
89    /// Useful for sticky sessions or connection pooling.
90    pub fn can_node_handle_entity(&self, entity_id: &EntityId, node_id: u32) -> bool {
91        let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
92        let partition_id = partition_key.partition_id();
93
94        if let Some(assigned_node_id) = self.registry.node_for_partition(partition_id) {
95            assigned_node_id == node_id
96        } else {
97            false
98        }
99    }
100
101    /// Get partition distribution for monitoring
102    ///
103    /// Returns map of node_id -> partition_ids for observability.
104    pub fn partition_distribution(&self) -> std::collections::HashMap<u32, Vec<u32>> {
105        self.registry.partition_distribution()
106    }
107
108    /// Check if routing is available
109    ///
110    /// Returns true if cluster is healthy and can handle requests.
111    pub fn is_available(&self) -> bool {
112        self.registry.is_cluster_healthy()
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use crate::infrastructure::cluster::node_registry::Node;
120
121    fn setup_cluster() -> (Arc<NodeRegistry>, RequestRouter) {
122        let registry = Arc::new(NodeRegistry::new(32));
123
124        // Register 4 nodes
125        for i in 0..4 {
126            registry.register_node(Node {
127                id: i,
128                address: format!("node-{}:8080", i),
129                healthy: true,
130                assigned_partitions: vec![],
131            });
132        }
133
134        let router = RequestRouter::new(registry.clone());
135
136        (registry, router)
137    }
138
139    #[test]
140    fn test_create_router() {
141        let registry = Arc::new(NodeRegistry::new(32));
142        let _router = RequestRouter::new(registry);
143    }
144
145    #[test]
146    fn test_route_for_entity() {
147        let (_registry, router) = setup_cluster();
148
149        let entity_id = EntityId::new("user-123".to_string()).unwrap();
150        let node = router.route_for_entity(&entity_id).unwrap();
151
152        // Should route to one of the 4 nodes
153        assert!(node.id < 4);
154        assert!(node.healthy);
155    }
156
157    #[test]
158    fn test_consistent_routing() {
159        let (_registry, router) = setup_cluster();
160
161        let entity_id = EntityId::new("user-123".to_string()).unwrap();
162
163        // Multiple calls should route to same node
164        let node1 = router.route_for_entity(&entity_id).unwrap();
165        let node2 = router.route_for_entity(&entity_id).unwrap();
166        let node3 = router.route_for_entity(&entity_id).unwrap();
167
168        assert_eq!(node1.id, node2.id);
169        assert_eq!(node2.id, node3.id);
170    }
171
172    #[test]
173    fn test_different_entities_may_route_differently() {
174        let (_registry, router) = setup_cluster();
175
176        let entity1 = EntityId::new("user-1".to_string()).unwrap();
177        let entity2 = EntityId::new("user-2".to_string()).unwrap();
178        let entity3 = EntityId::new("user-3".to_string()).unwrap();
179
180        let node1 = router.route_for_entity(&entity1).unwrap();
181        let node2 = router.route_for_entity(&entity2).unwrap();
182        let node3 = router.route_for_entity(&entity3).unwrap();
183
184        // Not all should route to same node (with 4 nodes and 3 entities)
185        let unique_nodes: std::collections::HashSet<_> =
186            vec![node1.id, node2.id, node3.id].into_iter().collect();
187
188        // Should have some distribution (not guaranteed, but likely)
189        println!("Unique nodes: {:?}", unique_nodes);
190    }
191
192    #[test]
193    fn test_route_for_partition() {
194        let (_registry, router) = setup_cluster();
195
196        let partition_key = PartitionKey::from_partition_id(15, 32).unwrap();
197        let node = router.route_for_partition(&partition_key).unwrap();
198
199        assert!(node.id < 4);
200        assert!(node.healthy);
201    }
202
203    #[test]
204    fn test_can_node_handle_entity() {
205        let (_registry, router) = setup_cluster();
206
207        let entity_id = EntityId::new("user-123".to_string()).unwrap();
208        let target_node = router.route_for_entity(&entity_id).unwrap();
209
210        // Target node should be able to handle the entity
211        assert!(router.can_node_handle_entity(&entity_id, target_node.id));
212
213        // Other nodes should not (unless by chance they have overlapping partitions)
214        // This is deterministic, so we can check
215        for i in 0..4 {
216            if i != target_node.id {
217                // Other nodes may or may not handle it (depends on partition distribution)
218                let _can_handle = router.can_node_handle_entity(&entity_id, i);
219            }
220        }
221    }
222
223    #[test]
224    fn test_nodes_for_read() {
225        let (_registry, router) = setup_cluster();
226
227        let nodes = router.nodes_for_read();
228
229        assert_eq!(nodes.len(), 4);
230        assert!(nodes.iter().all(|n| n.healthy));
231    }
232
233    #[test]
234    fn test_no_healthy_nodes() {
235        let registry = Arc::new(NodeRegistry::new(32));
236
237        // Register unhealthy node
238        registry.register_node(Node {
239            id: 0,
240            address: "node-0:8080".to_string(),
241            healthy: false,
242            assigned_partitions: vec![],
243        });
244
245        let router = RequestRouter::new(registry);
246
247        let entity_id = EntityId::new("user-123".to_string()).unwrap();
248        let result = router.route_for_entity(&entity_id);
249
250        assert!(result.is_err());
251    }
252
253    #[test]
254    fn test_partition_distribution() {
255        let (_registry, router) = setup_cluster();
256
257        let distribution = router.partition_distribution();
258
259        assert_eq!(distribution.len(), 4);
260
261        // Each node should have 8 partitions (32/4)
262        for (_node_id, partitions) in distribution {
263            assert_eq!(partitions.len(), 8);
264        }
265    }
266
267    #[test]
268    fn test_is_available() {
269        let (registry, router) = setup_cluster();
270
271        // Cluster is healthy
272        assert!(router.is_available());
273
274        // Mark all nodes unhealthy
275        for i in 0..4 {
276            registry.set_node_health(i, false);
277        }
278
279        // Cluster is now unavailable
280        assert!(!router.is_available());
281    }
282}