forge_orchestration/federation/
routing.rs

1//! Geo-aware routing for multi-region federation
2//!
3//! Implements intelligent routing based on:
4//! - Geographic proximity
5//! - Latency measurements
6//! - Region health
7//! - Data locality
8
9use std::collections::HashMap;
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12
13use super::GeoLocation;
14
15/// Routing policy for cross-region requests
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17pub enum RoutingPolicy {
18    /// Route to nearest healthy region
19    Nearest,
20    /// Route to lowest latency region
21    LowestLatency,
22    /// Route to region with most capacity
23    MostCapacity,
24    /// Route to specific region (with fallback)
25    Pinned,
26    /// Round-robin across healthy regions
27    RoundRobin,
28    /// Weighted random based on capacity
29    WeightedRandom,
30}
31
32impl Default for RoutingPolicy {
33    fn default() -> Self {
34        Self::LowestLatency
35    }
36}
37
38/// Route to a specific region
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct RegionRoute {
41    /// Target region
42    pub region: String,
43    /// Route weight (for weighted routing)
44    pub weight: u32,
45    /// Is this a fallback route
46    pub is_fallback: bool,
47    /// Route priority (lower = higher priority)
48    pub priority: u32,
49}
50
51impl RegionRoute {
52    /// Create new route
53    pub fn new(region: impl Into<String>) -> Self {
54        Self {
55            region: region.into(),
56            weight: 100,
57            is_fallback: false,
58            priority: 0,
59        }
60    }
61
62    /// Set as fallback
63    pub fn as_fallback(mut self) -> Self {
64        self.is_fallback = true;
65        self.priority = 100;
66        self
67    }
68
69    /// Set weight
70    pub fn with_weight(mut self, weight: u32) -> Self {
71        self.weight = weight;
72        self
73    }
74
75    /// Set priority
76    pub fn with_priority(mut self, priority: u32) -> Self {
77        self.priority = priority;
78        self
79    }
80}
81
82/// Geo-aware router for multi-region routing
83pub struct GeoRouter {
84    /// Region locations
85    locations: RwLock<HashMap<String, GeoLocation>>,
86    /// Region latencies (measured RTT in ms)
87    latencies: RwLock<HashMap<String, u32>>,
88    /// Region health
89    health: RwLock<HashMap<String, bool>>,
90    /// Default routing policy
91    default_policy: RwLock<RoutingPolicy>,
92    /// Round-robin counter
93    rr_counter: RwLock<usize>,
94    /// Static routes
95    static_routes: RwLock<HashMap<String, Vec<RegionRoute>>>,
96}
97
98impl GeoRouter {
99    /// Create new geo router
100    pub fn new() -> Self {
101        Self {
102            locations: RwLock::new(HashMap::new()),
103            latencies: RwLock::new(HashMap::new()),
104            health: RwLock::new(HashMap::new()),
105            default_policy: RwLock::new(RoutingPolicy::LowestLatency),
106            rr_counter: RwLock::new(0),
107            static_routes: RwLock::new(HashMap::new()),
108        }
109    }
110
111    /// Set default routing policy
112    pub fn set_default_policy(&self, policy: RoutingPolicy) {
113        *self.default_policy.write() = policy;
114    }
115
116    /// Register region location
117    pub fn register_location(&self, region: impl Into<String>, location: GeoLocation) {
118        self.locations.write().insert(region.into(), location);
119    }
120
121    /// Update region latency
122    pub fn update_latency(&self, region: impl Into<String>, latency_ms: u32) {
123        self.latencies.write().insert(region.into(), latency_ms);
124    }
125
126    /// Update region health
127    pub fn update_health(&self, region: impl Into<String>, healthy: bool) {
128        self.health.write().insert(region.into(), healthy);
129    }
130
131    /// Add static route for a service
132    pub fn add_static_route(&self, service: impl Into<String>, route: RegionRoute) {
133        let mut routes = self.static_routes.write();
134        routes.entry(service.into())
135            .or_insert_with(Vec::new)
136            .push(route);
137    }
138
139    /// Route request to best region
140    pub fn route(&self, client_location: Option<&GeoLocation>, policy: Option<RoutingPolicy>) -> Option<String> {
141        let policy = policy.unwrap_or_else(|| *self.default_policy.read());
142        let health = self.health.read();
143        
144        // Get healthy regions
145        let healthy_regions: Vec<_> = health.iter()
146            .filter(|(_, &healthy)| healthy)
147            .map(|(r, _)| r.clone())
148            .collect();
149
150        if healthy_regions.is_empty() {
151            return None;
152        }
153
154        match policy {
155            RoutingPolicy::Nearest => {
156                self.route_nearest(client_location, &healthy_regions)
157            }
158            RoutingPolicy::LowestLatency => {
159                self.route_lowest_latency(&healthy_regions)
160            }
161            RoutingPolicy::MostCapacity => {
162                // For now, just return first healthy region
163                // Real implementation would check capacity
164                healthy_regions.first().cloned()
165            }
166            RoutingPolicy::RoundRobin => {
167                self.route_round_robin(&healthy_regions)
168            }
169            RoutingPolicy::WeightedRandom => {
170                self.route_weighted_random(&healthy_regions)
171            }
172            RoutingPolicy::Pinned => {
173                // Return first healthy region as pinned
174                healthy_regions.first().cloned()
175            }
176        }
177    }
178
179    /// Route for a specific service
180    pub fn route_service(&self, service: &str, client_location: Option<&GeoLocation>) -> Option<String> {
181        let routes = self.static_routes.read();
182        
183        if let Some(service_routes) = routes.get(service) {
184            let health = self.health.read();
185            
186            // Sort by priority and find first healthy route
187            let mut sorted_routes = service_routes.clone();
188            sorted_routes.sort_by_key(|r| r.priority);
189
190            for route in sorted_routes {
191                if health.get(&route.region).copied().unwrap_or(false) {
192                    return Some(route.region);
193                }
194            }
195        }
196
197        // Fall back to default routing
198        self.route(client_location, None)
199    }
200
201    /// Route to nearest region by geographic distance
202    fn route_nearest(&self, client_location: Option<&GeoLocation>, healthy_regions: &[String]) -> Option<String> {
203        let client_loc = client_location?;
204        let locations = self.locations.read();
205
206        healthy_regions.iter()
207            .filter_map(|r| {
208                locations.get(r).map(|loc| (r, client_loc.distance_to(loc)))
209            })
210            .min_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
211            .map(|(r, _)| r.clone())
212    }
213
214    /// Route to lowest latency region
215    fn route_lowest_latency(&self, healthy_regions: &[String]) -> Option<String> {
216        let latencies = self.latencies.read();
217
218        healthy_regions.iter()
219            .filter_map(|r| latencies.get(r).map(|&lat| (r, lat)))
220            .min_by_key(|(_, lat)| *lat)
221            .map(|(r, _)| r.clone())
222            .or_else(|| healthy_regions.first().cloned())
223    }
224
225    /// Round-robin routing
226    fn route_round_robin(&self, healthy_regions: &[String]) -> Option<String> {
227        if healthy_regions.is_empty() {
228            return None;
229        }
230
231        let mut counter = self.rr_counter.write();
232        let index = *counter % healthy_regions.len();
233        *counter = counter.wrapping_add(1);
234
235        healthy_regions.get(index).cloned()
236    }
237
238    /// Weighted random routing based on inverse latency
239    fn route_weighted_random(&self, healthy_regions: &[String]) -> Option<String> {
240        if healthy_regions.is_empty() {
241            return None;
242        }
243
244        let latencies = self.latencies.read();
245        
246        // Calculate weights (inverse of latency)
247        let weights: Vec<_> = healthy_regions.iter()
248            .map(|r| {
249                let latency = latencies.get(r).copied().unwrap_or(100);
250                let weight = 10000 / (latency.max(1) as u64);
251                (r, weight)
252            })
253            .collect();
254
255        let total_weight: u64 = weights.iter().map(|(_, w)| w).sum();
256        
257        if total_weight == 0 {
258            return healthy_regions.first().cloned();
259        }
260
261        // Simple pseudo-random selection
262        let rand = std::time::SystemTime::now()
263            .duration_since(std::time::UNIX_EPOCH)
264            .map(|d| d.subsec_nanos() as u64)
265            .unwrap_or(0);
266        
267        let target = rand % total_weight;
268        let mut cumulative = 0u64;
269
270        for (region, weight) in weights {
271            cumulative += weight;
272            if cumulative > target {
273                return Some(region.clone());
274            }
275        }
276
277        healthy_regions.first().cloned()
278    }
279
280    /// Get all healthy regions sorted by preference
281    pub fn get_preferred_regions(&self, client_location: Option<&GeoLocation>) -> Vec<String> {
282        let health = self.health.read();
283        let latencies = self.latencies.read();
284        let locations = self.locations.read();
285
286        let mut regions: Vec<_> = health.iter()
287            .filter(|(_, &healthy)| healthy)
288            .map(|(r, _)| {
289                let latency = latencies.get(r).copied().unwrap_or(u32::MAX);
290                let distance = client_location
291                    .and_then(|cl| locations.get(r).map(|rl| cl.distance_to(rl)))
292                    .unwrap_or(f64::MAX);
293                (r.clone(), latency, distance)
294            })
295            .collect();
296
297        // Sort by latency, then distance
298        regions.sort_by(|a, b| {
299            a.1.cmp(&b.1)
300                .then_with(|| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
301        });
302
303        regions.into_iter().map(|(r, _, _)| r).collect()
304    }
305}
306
307impl Default for GeoRouter {
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[test]
318    fn test_routing_policy() {
319        let router = GeoRouter::new();
320        
321        router.update_health("us-east-1", true);
322        router.update_health("eu-west-1", true);
323        router.update_latency("us-east-1", 10);
324        router.update_latency("eu-west-1", 100);
325
326        // Should route to lowest latency
327        let result = router.route(None, Some(RoutingPolicy::LowestLatency));
328        assert_eq!(result, Some("us-east-1".to_string()));
329    }
330
331    #[test]
332    fn test_nearest_routing() {
333        let router = GeoRouter::new();
334        
335        // New York
336        let ny = GeoLocation { latitude: 40.7128, longitude: -74.0060 };
337        // Virginia (us-east-1)
338        let virginia = GeoLocation { latitude: 39.0, longitude: -77.0 };
339        // Ireland (eu-west-1)
340        let ireland = GeoLocation { latitude: 53.0, longitude: -8.0 };
341
342        router.register_location("us-east-1", virginia);
343        router.register_location("eu-west-1", ireland);
344        router.update_health("us-east-1", true);
345        router.update_health("eu-west-1", true);
346
347        let result = router.route(Some(&ny), Some(RoutingPolicy::Nearest));
348        assert_eq!(result, Some("us-east-1".to_string()));
349    }
350
351    #[test]
352    fn test_round_robin() {
353        let router = GeoRouter::new();
354        
355        router.update_health("region-1", true);
356        router.update_health("region-2", true);
357        router.update_health("region-3", true);
358
359        let r1 = router.route(None, Some(RoutingPolicy::RoundRobin));
360        let r2 = router.route(None, Some(RoutingPolicy::RoundRobin));
361        let r3 = router.route(None, Some(RoutingPolicy::RoundRobin));
362        let r4 = router.route(None, Some(RoutingPolicy::RoundRobin));
363
364        // Should cycle through regions
365        assert_ne!(r1, r2);
366        assert_ne!(r2, r3);
367        assert_eq!(r1, r4); // Back to first
368    }
369
370    #[test]
371    fn test_static_routes() {
372        let router = GeoRouter::new();
373        
374        router.update_health("us-east-1", true);
375        router.update_health("eu-west-1", true);
376        
377        router.add_static_route("my-service", RegionRoute::new("eu-west-1").with_priority(0));
378        router.add_static_route("my-service", RegionRoute::new("us-east-1").as_fallback());
379
380        let result = router.route_service("my-service", None);
381        assert_eq!(result, Some("eu-west-1".to_string()));
382    }
383}