forge_orchestration/federation/
routing.rs1use std::collections::HashMap;
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12
13use super::GeoLocation;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17pub enum RoutingPolicy {
18 Nearest,
20 LowestLatency,
22 MostCapacity,
24 Pinned,
26 RoundRobin,
28 WeightedRandom,
30}
31
32impl Default for RoutingPolicy {
33 fn default() -> Self {
34 Self::LowestLatency
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct RegionRoute {
41 pub region: String,
43 pub weight: u32,
45 pub is_fallback: bool,
47 pub priority: u32,
49}
50
51impl RegionRoute {
52 pub fn new(region: impl Into<String>) -> Self {
54 Self {
55 region: region.into(),
56 weight: 100,
57 is_fallback: false,
58 priority: 0,
59 }
60 }
61
62 pub fn as_fallback(mut self) -> Self {
64 self.is_fallback = true;
65 self.priority = 100;
66 self
67 }
68
69 pub fn with_weight(mut self, weight: u32) -> Self {
71 self.weight = weight;
72 self
73 }
74
75 pub fn with_priority(mut self, priority: u32) -> Self {
77 self.priority = priority;
78 self
79 }
80}
81
82pub struct GeoRouter {
84 locations: RwLock<HashMap<String, GeoLocation>>,
86 latencies: RwLock<HashMap<String, u32>>,
88 health: RwLock<HashMap<String, bool>>,
90 default_policy: RwLock<RoutingPolicy>,
92 rr_counter: RwLock<usize>,
94 static_routes: RwLock<HashMap<String, Vec<RegionRoute>>>,
96}
97
98impl GeoRouter {
99 pub fn new() -> Self {
101 Self {
102 locations: RwLock::new(HashMap::new()),
103 latencies: RwLock::new(HashMap::new()),
104 health: RwLock::new(HashMap::new()),
105 default_policy: RwLock::new(RoutingPolicy::LowestLatency),
106 rr_counter: RwLock::new(0),
107 static_routes: RwLock::new(HashMap::new()),
108 }
109 }
110
111 pub fn set_default_policy(&self, policy: RoutingPolicy) {
113 *self.default_policy.write() = policy;
114 }
115
116 pub fn register_location(&self, region: impl Into<String>, location: GeoLocation) {
118 self.locations.write().insert(region.into(), location);
119 }
120
121 pub fn update_latency(&self, region: impl Into<String>, latency_ms: u32) {
123 self.latencies.write().insert(region.into(), latency_ms);
124 }
125
126 pub fn update_health(&self, region: impl Into<String>, healthy: bool) {
128 self.health.write().insert(region.into(), healthy);
129 }
130
131 pub fn add_static_route(&self, service: impl Into<String>, route: RegionRoute) {
133 let mut routes = self.static_routes.write();
134 routes.entry(service.into())
135 .or_insert_with(Vec::new)
136 .push(route);
137 }
138
139 pub fn route(&self, client_location: Option<&GeoLocation>, policy: Option<RoutingPolicy>) -> Option<String> {
141 let policy = policy.unwrap_or_else(|| *self.default_policy.read());
142 let health = self.health.read();
143
144 let healthy_regions: Vec<_> = health.iter()
146 .filter(|(_, &healthy)| healthy)
147 .map(|(r, _)| r.clone())
148 .collect();
149
150 if healthy_regions.is_empty() {
151 return None;
152 }
153
154 match policy {
155 RoutingPolicy::Nearest => {
156 self.route_nearest(client_location, &healthy_regions)
157 }
158 RoutingPolicy::LowestLatency => {
159 self.route_lowest_latency(&healthy_regions)
160 }
161 RoutingPolicy::MostCapacity => {
162 healthy_regions.first().cloned()
165 }
166 RoutingPolicy::RoundRobin => {
167 self.route_round_robin(&healthy_regions)
168 }
169 RoutingPolicy::WeightedRandom => {
170 self.route_weighted_random(&healthy_regions)
171 }
172 RoutingPolicy::Pinned => {
173 healthy_regions.first().cloned()
175 }
176 }
177 }
178
179 pub fn route_service(&self, service: &str, client_location: Option<&GeoLocation>) -> Option<String> {
181 let routes = self.static_routes.read();
182
183 if let Some(service_routes) = routes.get(service) {
184 let health = self.health.read();
185
186 let mut sorted_routes = service_routes.clone();
188 sorted_routes.sort_by_key(|r| r.priority);
189
190 for route in sorted_routes {
191 if health.get(&route.region).copied().unwrap_or(false) {
192 return Some(route.region);
193 }
194 }
195 }
196
197 self.route(client_location, None)
199 }
200
201 fn route_nearest(&self, client_location: Option<&GeoLocation>, healthy_regions: &[String]) -> Option<String> {
203 let client_loc = client_location?;
204 let locations = self.locations.read();
205
206 healthy_regions.iter()
207 .filter_map(|r| {
208 locations.get(r).map(|loc| (r, client_loc.distance_to(loc)))
209 })
210 .min_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
211 .map(|(r, _)| r.clone())
212 }
213
214 fn route_lowest_latency(&self, healthy_regions: &[String]) -> Option<String> {
216 let latencies = self.latencies.read();
217
218 healthy_regions.iter()
219 .filter_map(|r| latencies.get(r).map(|&lat| (r, lat)))
220 .min_by_key(|(_, lat)| *lat)
221 .map(|(r, _)| r.clone())
222 .or_else(|| healthy_regions.first().cloned())
223 }
224
225 fn route_round_robin(&self, healthy_regions: &[String]) -> Option<String> {
227 if healthy_regions.is_empty() {
228 return None;
229 }
230
231 let mut counter = self.rr_counter.write();
232 let index = *counter % healthy_regions.len();
233 *counter = counter.wrapping_add(1);
234
235 healthy_regions.get(index).cloned()
236 }
237
238 fn route_weighted_random(&self, healthy_regions: &[String]) -> Option<String> {
240 if healthy_regions.is_empty() {
241 return None;
242 }
243
244 let latencies = self.latencies.read();
245
246 let weights: Vec<_> = healthy_regions.iter()
248 .map(|r| {
249 let latency = latencies.get(r).copied().unwrap_or(100);
250 let weight = 10000 / (latency.max(1) as u64);
251 (r, weight)
252 })
253 .collect();
254
255 let total_weight: u64 = weights.iter().map(|(_, w)| w).sum();
256
257 if total_weight == 0 {
258 return healthy_regions.first().cloned();
259 }
260
261 let rand = std::time::SystemTime::now()
263 .duration_since(std::time::UNIX_EPOCH)
264 .map(|d| d.subsec_nanos() as u64)
265 .unwrap_or(0);
266
267 let target = rand % total_weight;
268 let mut cumulative = 0u64;
269
270 for (region, weight) in weights {
271 cumulative += weight;
272 if cumulative > target {
273 return Some(region.clone());
274 }
275 }
276
277 healthy_regions.first().cloned()
278 }
279
280 pub fn get_preferred_regions(&self, client_location: Option<&GeoLocation>) -> Vec<String> {
282 let health = self.health.read();
283 let latencies = self.latencies.read();
284 let locations = self.locations.read();
285
286 let mut regions: Vec<_> = health.iter()
287 .filter(|(_, &healthy)| healthy)
288 .map(|(r, _)| {
289 let latency = latencies.get(r).copied().unwrap_or(u32::MAX);
290 let distance = client_location
291 .and_then(|cl| locations.get(r).map(|rl| cl.distance_to(rl)))
292 .unwrap_or(f64::MAX);
293 (r.clone(), latency, distance)
294 })
295 .collect();
296
297 regions.sort_by(|a, b| {
299 a.1.cmp(&b.1)
300 .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
301 });
302
303 regions.into_iter().map(|(r, _, _)| r).collect()
304 }
305}
306
307impl Default for GeoRouter {
308 fn default() -> Self {
309 Self::new()
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn test_routing_policy() {
319 let router = GeoRouter::new();
320
321 router.update_health("us-east-1", true);
322 router.update_health("eu-west-1", true);
323 router.update_latency("us-east-1", 10);
324 router.update_latency("eu-west-1", 100);
325
326 let result = router.route(None, Some(RoutingPolicy::LowestLatency));
328 assert_eq!(result, Some("us-east-1".to_string()));
329 }
330
331 #[test]
332 fn test_nearest_routing() {
333 let router = GeoRouter::new();
334
335 let ny = GeoLocation { latitude: 40.7128, longitude: -74.0060 };
337 let virginia = GeoLocation { latitude: 39.0, longitude: -77.0 };
339 let ireland = GeoLocation { latitude: 53.0, longitude: -8.0 };
341
342 router.register_location("us-east-1", virginia);
343 router.register_location("eu-west-1", ireland);
344 router.update_health("us-east-1", true);
345 router.update_health("eu-west-1", true);
346
347 let result = router.route(Some(&ny), Some(RoutingPolicy::Nearest));
348 assert_eq!(result, Some("us-east-1".to_string()));
349 }
350
351 #[test]
352 fn test_round_robin() {
353 let router = GeoRouter::new();
354
355 router.update_health("region-1", true);
356 router.update_health("region-2", true);
357 router.update_health("region-3", true);
358
359 let r1 = router.route(None, Some(RoutingPolicy::RoundRobin));
360 let r2 = router.route(None, Some(RoutingPolicy::RoundRobin));
361 let r3 = router.route(None, Some(RoutingPolicy::RoundRobin));
362 let r4 = router.route(None, Some(RoutingPolicy::RoundRobin));
363
364 assert_ne!(r1, r2);
366 assert_ne!(r2, r3);
367 assert_eq!(r1, r4); }
369
370 #[test]
371 fn test_static_routes() {
372 let router = GeoRouter::new();
373
374 router.update_health("us-east-1", true);
375 router.update_health("eu-west-1", true);
376
377 router.add_static_route("my-service", RegionRoute::new("eu-west-1").with_priority(0));
378 router.add_static_route("my-service", RegionRoute::new("us-east-1").as_fallback());
379
380 let result = router.route_service("my-service", None);
381 assert_eq!(result, Some("eu-west-1".to_string()));
382 }
383}