allsource_core/infrastructure/cluster/
request_router.rs1use super::node_registry::{Node, NodeRegistry};
2use crate::{
3 domain::value_objects::{EntityId, PartitionKey},
4 error::{AllSourceError, Result},
5};
6use std::sync::Arc;
27
28pub struct RequestRouter {
30 registry: Arc<NodeRegistry>,
31}
32
33impl RequestRouter {
34 pub fn new(registry: Arc<NodeRegistry>) -> Self {
39 Self { registry }
40 }
41
42 pub fn route_for_entity(&self, entity_id: &EntityId) -> Result<Node> {
52 let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
54 self.route_for_partition(&partition_key)
55 }
56
57 pub fn route_for_partition(&self, partition_key: &PartitionKey) -> Result<Node> {
65 let partition_id = partition_key.partition_id();
66
67 let node_id = self
68 .registry
69 .node_for_partition(partition_id)
70 .ok_or_else(|| {
71 AllSourceError::StorageError(format!(
72 "No healthy node available for partition {partition_id}"
73 ))
74 })?;
75
76 self.registry.get_node(node_id).ok_or_else(|| {
77 AllSourceError::InternalError(format!("Node {node_id} not found in registry"))
78 })
79 }
80
81 pub fn nodes_for_read(&self) -> Vec<Node> {
86 self.registry.healthy_nodes()
87 }
88
89 pub fn can_node_handle_entity(&self, entity_id: &EntityId, node_id: u32) -> bool {
93 let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
94 let partition_id = partition_key.partition_id();
95
96 if let Some(assigned_node_id) = self.registry.node_for_partition(partition_id) {
97 assigned_node_id == node_id
98 } else {
99 false
100 }
101 }
102
103 pub fn partition_distribution(&self) -> std::collections::HashMap<u32, Vec<u32>> {
107 self.registry.partition_distribution()
108 }
109
110 pub fn is_available(&self) -> bool {
114 self.registry.is_cluster_healthy()
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 use crate::infrastructure::cluster::node_registry::Node;
122
123 fn setup_cluster() -> (Arc<NodeRegistry>, RequestRouter) {
124 let registry = Arc::new(NodeRegistry::new(32));
125
126 for i in 0..4 {
128 registry.register_node(Node {
129 id: i,
130 address: format!("node-{i}:8080"),
131 healthy: true,
132 assigned_partitions: vec![],
133 });
134 }
135
136 let router = RequestRouter::new(registry.clone());
137
138 (registry, router)
139 }
140
141 #[test]
142 fn test_create_router() {
143 let registry = Arc::new(NodeRegistry::new(32));
144 let _router = RequestRouter::new(registry);
145 }
146
147 #[test]
148 fn test_route_for_entity() {
149 let (_registry, router) = setup_cluster();
150
151 let entity_id = EntityId::new("user-123".to_string()).unwrap();
152 let node = router.route_for_entity(&entity_id).unwrap();
153
154 assert!(node.id < 4);
156 assert!(node.healthy);
157 }
158
159 #[test]
160 fn test_consistent_routing() {
161 let (_registry, router) = setup_cluster();
162
163 let entity_id = EntityId::new("user-123".to_string()).unwrap();
164
165 let node1 = router.route_for_entity(&entity_id).unwrap();
167 let node2 = router.route_for_entity(&entity_id).unwrap();
168 let node3 = router.route_for_entity(&entity_id).unwrap();
169
170 assert_eq!(node1.id, node2.id);
171 assert_eq!(node2.id, node3.id);
172 }
173
174 #[test]
175 fn test_different_entities_may_route_differently() {
176 let (_registry, router) = setup_cluster();
177
178 let entity1 = EntityId::new("user-1".to_string()).unwrap();
179 let entity2 = EntityId::new("user-2".to_string()).unwrap();
180 let entity3 = EntityId::new("user-3".to_string()).unwrap();
181
182 let node1 = router.route_for_entity(&entity1).unwrap();
183 let node2 = router.route_for_entity(&entity2).unwrap();
184 let node3 = router.route_for_entity(&entity3).unwrap();
185
186 let unique_nodes: std::collections::HashSet<_> =
188 vec![node1.id, node2.id, node3.id].into_iter().collect();
189
190 println!("Unique nodes: {unique_nodes:?}");
192 }
193
194 #[test]
195 fn test_route_for_partition() {
196 let (_registry, router) = setup_cluster();
197
198 let partition_key = PartitionKey::from_partition_id(15, 32).unwrap();
199 let node = router.route_for_partition(&partition_key).unwrap();
200
201 assert!(node.id < 4);
202 assert!(node.healthy);
203 }
204
205 #[test]
206 fn test_can_node_handle_entity() {
207 let (_registry, router) = setup_cluster();
208
209 let entity_id = EntityId::new("user-123".to_string()).unwrap();
210 let target_node = router.route_for_entity(&entity_id).unwrap();
211
212 assert!(router.can_node_handle_entity(&entity_id, target_node.id));
214
215 for i in 0..4 {
218 if i != target_node.id {
219 let _can_handle = router.can_node_handle_entity(&entity_id, i);
221 }
222 }
223 }
224
225 #[test]
226 fn test_nodes_for_read() {
227 let (_registry, router) = setup_cluster();
228
229 let nodes = router.nodes_for_read();
230
231 assert_eq!(nodes.len(), 4);
232 assert!(nodes.iter().all(|n| n.healthy));
233 }
234
235 #[test]
236 fn test_no_healthy_nodes() {
237 let registry = Arc::new(NodeRegistry::new(32));
238
239 registry.register_node(Node {
241 id: 0,
242 address: "node-0:8080".to_string(),
243 healthy: false,
244 assigned_partitions: vec![],
245 });
246
247 let router = RequestRouter::new(registry);
248
249 let entity_id = EntityId::new("user-123".to_string()).unwrap();
250 let result = router.route_for_entity(&entity_id);
251
252 assert!(result.is_err());
253 }
254
255 #[test]
256 fn test_partition_distribution() {
257 let (_registry, router) = setup_cluster();
258
259 let distribution = router.partition_distribution();
260
261 assert_eq!(distribution.len(), 4);
262
263 for (_node_id, partitions) in distribution {
265 assert_eq!(partitions.len(), 8);
266 }
267 }
268
269 #[test]
270 fn test_is_available() {
271 let (registry, router) = setup_cluster();
272
273 assert!(router.is_available());
275
276 for i in 0..4 {
278 registry.set_node_health(i, false);
279 }
280
281 assert!(!router.is_available());
283 }
284}