allsource_core/infrastructure/cluster/
request_router.rs1use std::sync::Arc;
23use crate::domain::value_objects::{EntityId, PartitionKey};
24use crate::error::{AllSourceError, Result};
25use super::node_registry::{Node, NodeRegistry};
26
27pub struct RequestRouter {
29 registry: Arc<NodeRegistry>,
30}
31
32impl RequestRouter {
33 pub fn new(registry: Arc<NodeRegistry>) -> Self {
38 Self { registry }
39 }
40
41 pub fn route_for_entity(&self, entity_id: &EntityId) -> Result<Node> {
51 let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
53 self.route_for_partition(&partition_key)
54 }
55
56 pub fn route_for_partition(&self, partition_key: &PartitionKey) -> Result<Node> {
64 let partition_id = partition_key.partition_id();
65
66 let node_id = self.registry.node_for_partition(partition_id)
67 .ok_or_else(|| AllSourceError::StorageError(format!(
68 "No healthy node available for partition {}",
69 partition_id
70 )))?;
71
72 self.registry.get_node(node_id)
73 .ok_or_else(|| AllSourceError::InternalError(format!(
74 "Node {} not found in registry",
75 node_id
76 )))
77 }
78
79 pub fn nodes_for_read(&self) -> Vec<Node> {
84 self.registry.healthy_nodes()
85 }
86
87 pub fn can_node_handle_entity(&self, entity_id: &EntityId, node_id: u32) -> bool {
91 let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
92 let partition_id = partition_key.partition_id();
93
94 if let Some(assigned_node_id) = self.registry.node_for_partition(partition_id) {
95 assigned_node_id == node_id
96 } else {
97 false
98 }
99 }
100
101 pub fn partition_distribution(&self) -> std::collections::HashMap<u32, Vec<u32>> {
105 self.registry.partition_distribution()
106 }
107
108 pub fn is_available(&self) -> bool {
112 self.registry.is_cluster_healthy()
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use crate::infrastructure::cluster::node_registry::Node;
120
121 fn setup_cluster() -> (Arc<NodeRegistry>, RequestRouter) {
122 let registry = Arc::new(NodeRegistry::new(32));
123
124 for i in 0..4 {
126 registry.register_node(Node {
127 id: i,
128 address: format!("node-{}:8080", i),
129 healthy: true,
130 assigned_partitions: vec![],
131 });
132 }
133
134 let router = RequestRouter::new(registry.clone());
135
136 (registry, router)
137 }
138
139 #[test]
140 fn test_create_router() {
141 let registry = Arc::new(NodeRegistry::new(32));
142 let _router = RequestRouter::new(registry);
143 }
144
145 #[test]
146 fn test_route_for_entity() {
147 let (_registry, router) = setup_cluster();
148
149 let entity_id = EntityId::new("user-123".to_string()).unwrap();
150 let node = router.route_for_entity(&entity_id).unwrap();
151
152 assert!(node.id < 4);
154 assert!(node.healthy);
155 }
156
157 #[test]
158 fn test_consistent_routing() {
159 let (_registry, router) = setup_cluster();
160
161 let entity_id = EntityId::new("user-123".to_string()).unwrap();
162
163 let node1 = router.route_for_entity(&entity_id).unwrap();
165 let node2 = router.route_for_entity(&entity_id).unwrap();
166 let node3 = router.route_for_entity(&entity_id).unwrap();
167
168 assert_eq!(node1.id, node2.id);
169 assert_eq!(node2.id, node3.id);
170 }
171
172 #[test]
173 fn test_different_entities_may_route_differently() {
174 let (_registry, router) = setup_cluster();
175
176 let entity1 = EntityId::new("user-1".to_string()).unwrap();
177 let entity2 = EntityId::new("user-2".to_string()).unwrap();
178 let entity3 = EntityId::new("user-3".to_string()).unwrap();
179
180 let node1 = router.route_for_entity(&entity1).unwrap();
181 let node2 = router.route_for_entity(&entity2).unwrap();
182 let node3 = router.route_for_entity(&entity3).unwrap();
183
184 let unique_nodes: std::collections::HashSet<_> =
186 vec![node1.id, node2.id, node3.id].into_iter().collect();
187
188 println!("Unique nodes: {:?}", unique_nodes);
190 }
191
192 #[test]
193 fn test_route_for_partition() {
194 let (_registry, router) = setup_cluster();
195
196 let partition_key = PartitionKey::from_partition_id(15, 32).unwrap();
197 let node = router.route_for_partition(&partition_key).unwrap();
198
199 assert!(node.id < 4);
200 assert!(node.healthy);
201 }
202
203 #[test]
204 fn test_can_node_handle_entity() {
205 let (_registry, router) = setup_cluster();
206
207 let entity_id = EntityId::new("user-123".to_string()).unwrap();
208 let target_node = router.route_for_entity(&entity_id).unwrap();
209
210 assert!(router.can_node_handle_entity(&entity_id, target_node.id));
212
213 for i in 0..4 {
216 if i != target_node.id {
217 let _can_handle = router.can_node_handle_entity(&entity_id, i);
219 }
220 }
221 }
222
223 #[test]
224 fn test_nodes_for_read() {
225 let (_registry, router) = setup_cluster();
226
227 let nodes = router.nodes_for_read();
228
229 assert_eq!(nodes.len(), 4);
230 assert!(nodes.iter().all(|n| n.healthy));
231 }
232
233 #[test]
234 fn test_no_healthy_nodes() {
235 let registry = Arc::new(NodeRegistry::new(32));
236
237 registry.register_node(Node {
239 id: 0,
240 address: "node-0:8080".to_string(),
241 healthy: false,
242 assigned_partitions: vec![],
243 });
244
245 let router = RequestRouter::new(registry);
246
247 let entity_id = EntityId::new("user-123".to_string()).unwrap();
248 let result = router.route_for_entity(&entity_id);
249
250 assert!(result.is_err());
251 }
252
253 #[test]
254 fn test_partition_distribution() {
255 let (_registry, router) = setup_cluster();
256
257 let distribution = router.partition_distribution();
258
259 assert_eq!(distribution.len(), 4);
260
261 for (_node_id, partitions) in distribution {
263 assert_eq!(partitions.len(), 8);
264 }
265 }
266
267 #[test]
268 fn test_is_available() {
269 let (registry, router) = setup_cluster();
270
271 assert!(router.is_available());
273
274 for i in 0..4 {
276 registry.set_node_health(i, false);
277 }
278
279 assert!(!router.is_available());
281 }
282}