allsource_core/infrastructure/cluster/
request_router.rs

1use super::node_registry::{Node, NodeRegistry};
2use crate::domain::value_objects::{EntityId, PartitionKey};
3use crate::error::{AllSourceError, Result};
4/// Request Router for Distributed Partitioning
5///
6/// Routes requests to the correct node based on partition assignment.
7/// Enables horizontal scaling with SierraDB's fixed partition architecture.
8///
9/// # Design
10/// - **Partition-aware routing**: Entity ID → Partition → Node
11/// - **Failover**: Retry on node failure
12/// - **Load balancing**: Even distribution via consistent hashing
13///
14/// # Example
15/// ```ignore
16/// let registry = Arc::new(NodeRegistry::new(32));
17/// let router = RequestRouter::new(registry.clone());
18///
19/// let entity_id = EntityId::new("user-123".to_string())?;
20/// let target_node = router.route_for_entity(&entity_id)?;
21///
22/// // Send request to target_node.address
23/// ```
24use std::sync::Arc;
25
26/// Request Router for partition-aware request routing
27pub struct RequestRouter {
28    registry: Arc<NodeRegistry>,
29}
30
31impl RequestRouter {
32    /// Create new request router
33    ///
34    /// # Arguments
35    /// - `registry`: Shared node registry for cluster topology
36    pub fn new(registry: Arc<NodeRegistry>) -> Self {
37        Self { registry }
38    }
39
40    /// Route request for an entity
41    ///
42    /// Determines the partition for the entity and finds the responsible node.
43    ///
44    /// # Returns
45    /// - `Node`: Target node to send the request to
46    ///
47    /// # Errors
48    /// - Returns error if no healthy node is available for the partition
49    pub fn route_for_entity(&self, entity_id: &EntityId) -> Result<Node> {
50        // Determine partition using consistent hashing
51        let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
52        self.route_for_partition(&partition_key)
53    }
54
55    /// Route request for a specific partition
56    ///
57    /// # Returns
58    /// - `Node`: Target node responsible for this partition
59    ///
60    /// # Errors
61    /// - Returns error if no healthy node is available
62    pub fn route_for_partition(&self, partition_key: &PartitionKey) -> Result<Node> {
63        let partition_id = partition_key.partition_id();
64
65        let node_id = self
66            .registry
67            .node_for_partition(partition_id)
68            .ok_or_else(|| {
69                AllSourceError::StorageError(format!(
70                    "No healthy node available for partition {}",
71                    partition_id
72                ))
73            })?;
74
75        self.registry.get_node(node_id).ok_or_else(|| {
76            AllSourceError::InternalError(format!("Node {} not found in registry", node_id))
77        })
78    }
79
80    /// Get all nodes for load-balanced read operations
81    ///
82    /// Returns all healthy nodes that can serve read requests.
83    /// Useful for fan-out queries across multiple nodes.
84    pub fn nodes_for_read(&self) -> Vec<Node> {
85        self.registry.healthy_nodes()
86    }
87
88    /// Check if a specific node can handle the entity
89    ///
90    /// Useful for sticky sessions or connection pooling.
91    pub fn can_node_handle_entity(&self, entity_id: &EntityId, node_id: u32) -> bool {
92        let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
93        let partition_id = partition_key.partition_id();
94
95        if let Some(assigned_node_id) = self.registry.node_for_partition(partition_id) {
96            assigned_node_id == node_id
97        } else {
98            false
99        }
100    }
101
102    /// Get partition distribution for monitoring
103    ///
104    /// Returns map of node_id -> partition_ids for observability.
105    pub fn partition_distribution(&self) -> std::collections::HashMap<u32, Vec<u32>> {
106        self.registry.partition_distribution()
107    }
108
109    /// Check if routing is available
110    ///
111    /// Returns true if cluster is healthy and can handle requests.
112    pub fn is_available(&self) -> bool {
113        self.registry.is_cluster_healthy()
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::infrastructure::cluster::node_registry::Node;
121
122    fn setup_cluster() -> (Arc<NodeRegistry>, RequestRouter) {
123        let registry = Arc::new(NodeRegistry::new(32));
124
125        // Register 4 nodes
126        for i in 0..4 {
127            registry.register_node(Node {
128                id: i,
129                address: format!("node-{}:8080", i),
130                healthy: true,
131                assigned_partitions: vec![],
132            });
133        }
134
135        let router = RequestRouter::new(registry.clone());
136
137        (registry, router)
138    }
139
140    #[test]
141    fn test_create_router() {
142        let registry = Arc::new(NodeRegistry::new(32));
143        let _router = RequestRouter::new(registry);
144    }
145
146    #[test]
147    fn test_route_for_entity() {
148        let (_registry, router) = setup_cluster();
149
150        let entity_id = EntityId::new("user-123".to_string()).unwrap();
151        let node = router.route_for_entity(&entity_id).unwrap();
152
153        // Should route to one of the 4 nodes
154        assert!(node.id < 4);
155        assert!(node.healthy);
156    }
157
158    #[test]
159    fn test_consistent_routing() {
160        let (_registry, router) = setup_cluster();
161
162        let entity_id = EntityId::new("user-123".to_string()).unwrap();
163
164        // Multiple calls should route to same node
165        let node1 = router.route_for_entity(&entity_id).unwrap();
166        let node2 = router.route_for_entity(&entity_id).unwrap();
167        let node3 = router.route_for_entity(&entity_id).unwrap();
168
169        assert_eq!(node1.id, node2.id);
170        assert_eq!(node2.id, node3.id);
171    }
172
173    #[test]
174    fn test_different_entities_may_route_differently() {
175        let (_registry, router) = setup_cluster();
176
177        let entity1 = EntityId::new("user-1".to_string()).unwrap();
178        let entity2 = EntityId::new("user-2".to_string()).unwrap();
179        let entity3 = EntityId::new("user-3".to_string()).unwrap();
180
181        let node1 = router.route_for_entity(&entity1).unwrap();
182        let node2 = router.route_for_entity(&entity2).unwrap();
183        let node3 = router.route_for_entity(&entity3).unwrap();
184
185        // Not all should route to same node (with 4 nodes and 3 entities)
186        let unique_nodes: std::collections::HashSet<_> =
187            vec![node1.id, node2.id, node3.id].into_iter().collect();
188
189        // Should have some distribution (not guaranteed, but likely)
190        println!("Unique nodes: {:?}", unique_nodes);
191    }
192
193    #[test]
194    fn test_route_for_partition() {
195        let (_registry, router) = setup_cluster();
196
197        let partition_key = PartitionKey::from_partition_id(15, 32).unwrap();
198        let node = router.route_for_partition(&partition_key).unwrap();
199
200        assert!(node.id < 4);
201        assert!(node.healthy);
202    }
203
204    #[test]
205    fn test_can_node_handle_entity() {
206        let (_registry, router) = setup_cluster();
207
208        let entity_id = EntityId::new("user-123".to_string()).unwrap();
209        let target_node = router.route_for_entity(&entity_id).unwrap();
210
211        // Target node should be able to handle the entity
212        assert!(router.can_node_handle_entity(&entity_id, target_node.id));
213
214        // Other nodes should not (unless by chance they have overlapping partitions)
215        // This is deterministic, so we can check
216        for i in 0..4 {
217            if i != target_node.id {
218                // Other nodes may or may not handle it (depends on partition distribution)
219                let _can_handle = router.can_node_handle_entity(&entity_id, i);
220            }
221        }
222    }
223
224    #[test]
225    fn test_nodes_for_read() {
226        let (_registry, router) = setup_cluster();
227
228        let nodes = router.nodes_for_read();
229
230        assert_eq!(nodes.len(), 4);
231        assert!(nodes.iter().all(|n| n.healthy));
232    }
233
234    #[test]
235    fn test_no_healthy_nodes() {
236        let registry = Arc::new(NodeRegistry::new(32));
237
238        // Register unhealthy node
239        registry.register_node(Node {
240            id: 0,
241            address: "node-0:8080".to_string(),
242            healthy: false,
243            assigned_partitions: vec![],
244        });
245
246        let router = RequestRouter::new(registry);
247
248        let entity_id = EntityId::new("user-123".to_string()).unwrap();
249        let result = router.route_for_entity(&entity_id);
250
251        assert!(result.is_err());
252    }
253
254    #[test]
255    fn test_partition_distribution() {
256        let (_registry, router) = setup_cluster();
257
258        let distribution = router.partition_distribution();
259
260        assert_eq!(distribution.len(), 4);
261
262        // Each node should have 8 partitions (32/4)
263        for (_node_id, partitions) in distribution {
264            assert_eq!(partitions.len(), 8);
265        }
266    }
267
268    #[test]
269    fn test_is_available() {
270        let (registry, router) = setup_cluster();
271
272        // Cluster is healthy
273        assert!(router.is_available());
274
275        // Mark all nodes unhealthy
276        for i in 0..4 {
277            registry.set_node_health(i, false);
278        }
279
280        // Cluster is now unavailable
281        assert!(!router.is_available());
282    }
283}