allsource_core/infrastructure/cluster/
request_router.rs1use super::node_registry::{Node, NodeRegistry};
2use crate::{
3 domain::value_objects::{EntityId, PartitionKey},
4 error::{AllSourceError, Result},
5};
6use std::sync::Arc;
27
28pub struct RequestRouter {
30 registry: Arc<NodeRegistry>,
31}
32
33impl RequestRouter {
34 pub fn new(registry: Arc<NodeRegistry>) -> Self {
39 Self { registry }
40 }
41
42 pub fn route_for_entity(&self, entity_id: &EntityId) -> Result<Node> {
52 let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
54 self.route_for_partition(&partition_key)
55 }
56
57 pub fn route_for_partition(&self, partition_key: &PartitionKey) -> Result<Node> {
65 let partition_id = partition_key.partition_id();
66
67 let node_id = self
68 .registry
69 .node_for_partition(partition_id)
70 .ok_or_else(|| {
71 AllSourceError::StorageError(format!(
72 "No healthy node available for partition {}",
73 partition_id
74 ))
75 })?;
76
77 self.registry.get_node(node_id).ok_or_else(|| {
78 AllSourceError::InternalError(format!("Node {node_id} not found in registry"))
79 })
80 }
81
82 pub fn nodes_for_read(&self) -> Vec<Node> {
87 self.registry.healthy_nodes()
88 }
89
90 pub fn can_node_handle_entity(&self, entity_id: &EntityId, node_id: u32) -> bool {
94 let partition_key = PartitionKey::from_entity_id(entity_id.as_str());
95 let partition_id = partition_key.partition_id();
96
97 if let Some(assigned_node_id) = self.registry.node_for_partition(partition_id) {
98 assigned_node_id == node_id
99 } else {
100 false
101 }
102 }
103
104 pub fn partition_distribution(&self) -> std::collections::HashMap<u32, Vec<u32>> {
108 self.registry.partition_distribution()
109 }
110
111 pub fn is_available(&self) -> bool {
115 self.registry.is_cluster_healthy()
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use crate::infrastructure::cluster::node_registry::Node;
123
124 fn setup_cluster() -> (Arc<NodeRegistry>, RequestRouter) {
125 let registry = Arc::new(NodeRegistry::new(32));
126
127 for i in 0..4 {
129 registry.register_node(Node {
130 id: i,
131 address: format!("node-{}:8080", i),
132 healthy: true,
133 assigned_partitions: vec![],
134 });
135 }
136
137 let router = RequestRouter::new(registry.clone());
138
139 (registry, router)
140 }
141
142 #[test]
143 fn test_create_router() {
144 let registry = Arc::new(NodeRegistry::new(32));
145 let _router = RequestRouter::new(registry);
146 }
147
148 #[test]
149 fn test_route_for_entity() {
150 let (_registry, router) = setup_cluster();
151
152 let entity_id = EntityId::new("user-123".to_string()).unwrap();
153 let node = router.route_for_entity(&entity_id).unwrap();
154
155 assert!(node.id < 4);
157 assert!(node.healthy);
158 }
159
160 #[test]
161 fn test_consistent_routing() {
162 let (_registry, router) = setup_cluster();
163
164 let entity_id = EntityId::new("user-123".to_string()).unwrap();
165
166 let node1 = router.route_for_entity(&entity_id).unwrap();
168 let node2 = router.route_for_entity(&entity_id).unwrap();
169 let node3 = router.route_for_entity(&entity_id).unwrap();
170
171 assert_eq!(node1.id, node2.id);
172 assert_eq!(node2.id, node3.id);
173 }
174
175 #[test]
176 fn test_different_entities_may_route_differently() {
177 let (_registry, router) = setup_cluster();
178
179 let entity1 = EntityId::new("user-1".to_string()).unwrap();
180 let entity2 = EntityId::new("user-2".to_string()).unwrap();
181 let entity3 = EntityId::new("user-3".to_string()).unwrap();
182
183 let node1 = router.route_for_entity(&entity1).unwrap();
184 let node2 = router.route_for_entity(&entity2).unwrap();
185 let node3 = router.route_for_entity(&entity3).unwrap();
186
187 let unique_nodes: std::collections::HashSet<_> =
189 vec![node1.id, node2.id, node3.id].into_iter().collect();
190
191 println!("Unique nodes: {:?}", unique_nodes);
193 }
194
195 #[test]
196 fn test_route_for_partition() {
197 let (_registry, router) = setup_cluster();
198
199 let partition_key = PartitionKey::from_partition_id(15, 32).unwrap();
200 let node = router.route_for_partition(&partition_key).unwrap();
201
202 assert!(node.id < 4);
203 assert!(node.healthy);
204 }
205
206 #[test]
207 fn test_can_node_handle_entity() {
208 let (_registry, router) = setup_cluster();
209
210 let entity_id = EntityId::new("user-123".to_string()).unwrap();
211 let target_node = router.route_for_entity(&entity_id).unwrap();
212
213 assert!(router.can_node_handle_entity(&entity_id, target_node.id));
215
216 for i in 0..4 {
219 if i != target_node.id {
220 let _can_handle = router.can_node_handle_entity(&entity_id, i);
222 }
223 }
224 }
225
226 #[test]
227 fn test_nodes_for_read() {
228 let (_registry, router) = setup_cluster();
229
230 let nodes = router.nodes_for_read();
231
232 assert_eq!(nodes.len(), 4);
233 assert!(nodes.iter().all(|n| n.healthy));
234 }
235
236 #[test]
237 fn test_no_healthy_nodes() {
238 let registry = Arc::new(NodeRegistry::new(32));
239
240 registry.register_node(Node {
242 id: 0,
243 address: "node-0:8080".to_string(),
244 healthy: false,
245 assigned_partitions: vec![],
246 });
247
248 let router = RequestRouter::new(registry);
249
250 let entity_id = EntityId::new("user-123".to_string()).unwrap();
251 let result = router.route_for_entity(&entity_id);
252
253 assert!(result.is_err());
254 }
255
256 #[test]
257 fn test_partition_distribution() {
258 let (_registry, router) = setup_cluster();
259
260 let distribution = router.partition_distribution();
261
262 assert_eq!(distribution.len(), 4);
263
264 for (_node_id, partitions) in distribution {
266 assert_eq!(partitions.len(), 8);
267 }
268 }
269
270 #[test]
271 fn test_is_available() {
272 let (registry, router) = setup_cluster();
273
274 assert!(router.is_available());
276
277 for i in 0..4 {
279 registry.set_node_health(i, false);
280 }
281
282 assert!(!router.is_available());
284 }
285}