allsource_core/infrastructure/cluster/
request_router.rs1use super::node_registry::{Node, NodeRegistry};
2use crate::domain::value_objects::{EntityId, PartitionKey};
3use crate::error::{AllSourceError, Result};
4use std::sync::Arc;
25
26pub struct RequestRouter {
28 registry: Arc<NodeRegistry>,
29}
30
31impl RequestRouter {
32 pub fn new(registry: Arc<NodeRegistry>) -> Self {
37 Self { registry }
38 }
39
40 pub fn route_for_entity(&self, entity_id: &EntityId) -> Result<Node> {
50 let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
52 self.route_for_partition(&partition_key)
53 }
54
55 pub fn route_for_partition(&self, partition_key: &PartitionKey) -> Result<Node> {
63 let partition_id = partition_key.partition_id();
64
65 let node_id = self
66 .registry
67 .node_for_partition(partition_id)
68 .ok_or_else(|| {
69 AllSourceError::StorageError(format!(
70 "No healthy node available for partition {}",
71 partition_id
72 ))
73 })?;
74
75 self.registry.get_node(node_id).ok_or_else(|| {
76 AllSourceError::InternalError(format!("Node {} not found in registry", node_id))
77 })
78 }
79
80 pub fn nodes_for_read(&self) -> Vec<Node> {
85 self.registry.healthy_nodes()
86 }
87
88 pub fn can_node_handle_entity(&self, entity_id: &EntityId, node_id: u32) -> bool {
92 let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
93 let partition_id = partition_key.partition_id();
94
95 if let Some(assigned_node_id) = self.registry.node_for_partition(partition_id) {
96 assigned_node_id == node_id
97 } else {
98 false
99 }
100 }
101
102 pub fn partition_distribution(&self) -> std::collections::HashMap<u32, Vec<u32>> {
106 self.registry.partition_distribution()
107 }
108
109 pub fn is_available(&self) -> bool {
113 self.registry.is_cluster_healthy()
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use crate::infrastructure::cluster::node_registry::Node;
121
122 fn setup_cluster() -> (Arc<NodeRegistry>, RequestRouter) {
123 let registry = Arc::new(NodeRegistry::new(32));
124
125 for i in 0..4 {
127 registry.register_node(Node {
128 id: i,
129 address: format!("node-{}:8080", i),
130 healthy: true,
131 assigned_partitions: vec![],
132 });
133 }
134
135 let router = RequestRouter::new(registry.clone());
136
137 (registry, router)
138 }
139
140 #[test]
141 fn test_create_router() {
142 let registry = Arc::new(NodeRegistry::new(32));
143 let _router = RequestRouter::new(registry);
144 }
145
146 #[test]
147 fn test_route_for_entity() {
148 let (_registry, router) = setup_cluster();
149
150 let entity_id = EntityId::new("user-123".to_string()).unwrap();
151 let node = router.route_for_entity(&entity_id).unwrap();
152
153 assert!(node.id < 4);
155 assert!(node.healthy);
156 }
157
158 #[test]
159 fn test_consistent_routing() {
160 let (_registry, router) = setup_cluster();
161
162 let entity_id = EntityId::new("user-123".to_string()).unwrap();
163
164 let node1 = router.route_for_entity(&entity_id).unwrap();
166 let node2 = router.route_for_entity(&entity_id).unwrap();
167 let node3 = router.route_for_entity(&entity_id).unwrap();
168
169 assert_eq!(node1.id, node2.id);
170 assert_eq!(node2.id, node3.id);
171 }
172
173 #[test]
174 fn test_different_entities_may_route_differently() {
175 let (_registry, router) = setup_cluster();
176
177 let entity1 = EntityId::new("user-1".to_string()).unwrap();
178 let entity2 = EntityId::new("user-2".to_string()).unwrap();
179 let entity3 = EntityId::new("user-3".to_string()).unwrap();
180
181 let node1 = router.route_for_entity(&entity1).unwrap();
182 let node2 = router.route_for_entity(&entity2).unwrap();
183 let node3 = router.route_for_entity(&entity3).unwrap();
184
185 let unique_nodes: std::collections::HashSet<_> =
187 vec![node1.id, node2.id, node3.id].into_iter().collect();
188
189 println!("Unique nodes: {:?}", unique_nodes);
191 }
192
193 #[test]
194 fn test_route_for_partition() {
195 let (_registry, router) = setup_cluster();
196
197 let partition_key = PartitionKey::from_partition_id(15, 32).unwrap();
198 let node = router.route_for_partition(&partition_key).unwrap();
199
200 assert!(node.id < 4);
201 assert!(node.healthy);
202 }
203
204 #[test]
205 fn test_can_node_handle_entity() {
206 let (_registry, router) = setup_cluster();
207
208 let entity_id = EntityId::new("user-123".to_string()).unwrap();
209 let target_node = router.route_for_entity(&entity_id).unwrap();
210
211 assert!(router.can_node_handle_entity(&entity_id, target_node.id));
213
214 for i in 0..4 {
217 if i != target_node.id {
218 let _can_handle = router.can_node_handle_entity(&entity_id, i);
220 }
221 }
222 }
223
224 #[test]
225 fn test_nodes_for_read() {
226 let (_registry, router) = setup_cluster();
227
228 let nodes = router.nodes_for_read();
229
230 assert_eq!(nodes.len(), 4);
231 assert!(nodes.iter().all(|n| n.healthy));
232 }
233
234 #[test]
235 fn test_no_healthy_nodes() {
236 let registry = Arc::new(NodeRegistry::new(32));
237
238 registry.register_node(Node {
240 id: 0,
241 address: "node-0:8080".to_string(),
242 healthy: false,
243 assigned_partitions: vec![],
244 });
245
246 let router = RequestRouter::new(registry);
247
248 let entity_id = EntityId::new("user-123".to_string()).unwrap();
249 let result = router.route_for_entity(&entity_id);
250
251 assert!(result.is_err());
252 }
253
254 #[test]
255 fn test_partition_distribution() {
256 let (_registry, router) = setup_cluster();
257
258 let distribution = router.partition_distribution();
259
260 assert_eq!(distribution.len(), 4);
261
262 for (_node_id, partitions) in distribution {
264 assert_eq!(partitions.len(), 8);
265 }
266 }
267
268 #[test]
269 fn test_is_available() {
270 let (registry, router) = setup_cluster();
271
272 assert!(router.is_available());
274
275 for i in 0..4 {
277 registry.set_node_health(i, false);
278 }
279
280 assert!(!router.is_available());
282 }
283}