forge_orchestration/federation/
mod.rs

1//! Multi-Region Federation for Forge Orchestration
2//!
3//! Implements cross-region orchestration with:
4//! - Region discovery and health monitoring
5//! - Cross-region workload placement
6//! - Geo-aware routing and failover
7//! - Data locality optimization
8//! - Latency-based routing
9
10pub mod routing;
11pub mod replication;
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use tracing::{info, warn};
19
20pub use routing::{GeoRouter, RoutingPolicy, RegionRoute};
21pub use replication::{ReplicationPolicy, ReplicationController};
22
23/// Region configuration
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct RegionConfig {
26    /// Region name (e.g., "us-east-1", "eu-west-1")
27    pub name: String,
28    /// Region display name
29    pub display_name: String,
30    /// API endpoint for this region
31    pub endpoint: String,
32    /// Geographic location
33    pub location: GeoLocation,
34    /// Region capacity
35    pub capacity: RegionCapacity,
36    /// Region labels
37    pub labels: HashMap<String, String>,
38    /// Is this the local region
39    pub is_local: bool,
40}
41
42impl RegionConfig {
43    /// Create new region config
44    pub fn new(name: impl Into<String>, endpoint: impl Into<String>) -> Self {
45        Self {
46            name: name.into(),
47            display_name: String::new(),
48            endpoint: endpoint.into(),
49            location: GeoLocation::default(),
50            capacity: RegionCapacity::default(),
51            labels: HashMap::new(),
52            is_local: false,
53        }
54    }
55
56    /// Set display name
57    pub fn with_display_name(mut self, name: impl Into<String>) -> Self {
58        self.display_name = name.into();
59        self
60    }
61
62    /// Set location
63    pub fn with_location(mut self, lat: f64, lon: f64) -> Self {
64        self.location = GeoLocation { latitude: lat, longitude: lon };
65        self
66    }
67
68    /// Mark as local region
69    pub fn as_local(mut self) -> Self {
70        self.is_local = true;
71        self
72    }
73
74    /// Add label
75    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
76        self.labels.insert(key.into(), value.into());
77        self
78    }
79}
80
81/// Geographic location
82#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
83pub struct GeoLocation {
84    /// Latitude
85    pub latitude: f64,
86    /// Longitude
87    pub longitude: f64,
88}
89
90impl GeoLocation {
91    /// Calculate distance to another location in kilometers
92    pub fn distance_to(&self, other: &GeoLocation) -> f64 {
93        // Haversine formula
94        let r = 6371.0; // Earth's radius in km
95        
96        let lat1 = self.latitude.to_radians();
97        let lat2 = other.latitude.to_radians();
98        let dlat = (other.latitude - self.latitude).to_radians();
99        let dlon = (other.longitude - self.longitude).to_radians();
100
101        let a = (dlat / 2.0).sin().powi(2) 
102            + lat1.cos() * lat2.cos() * (dlon / 2.0).sin().powi(2);
103        let c = 2.0 * a.sqrt().asin();
104
105        r * c
106    }
107}
108
109/// Region capacity
110#[derive(Debug, Clone, Default, Serialize, Deserialize)]
111pub struct RegionCapacity {
112    /// Total CPU capacity (millicores)
113    pub cpu_total: u64,
114    /// Available CPU
115    pub cpu_available: u64,
116    /// Total memory (MB)
117    pub memory_total: u64,
118    /// Available memory
119    pub memory_available: u64,
120    /// Total GPU count
121    pub gpu_total: u32,
122    /// Available GPUs
123    pub gpu_available: u32,
124    /// Node count
125    pub node_count: u32,
126}
127
128/// Region health status
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct RegionHealth {
131    /// Region name
132    pub region: String,
133    /// Is region healthy
134    pub healthy: bool,
135    /// Last health check time
136    pub last_check: chrono::DateTime<chrono::Utc>,
137    /// Round-trip latency in ms
138    pub latency_ms: u32,
139    /// Error message if unhealthy
140    pub error: Option<String>,
141    /// Consecutive failures
142    pub consecutive_failures: u32,
143}
144
145impl RegionHealth {
146    /// Create healthy status
147    pub fn healthy(region: impl Into<String>, latency_ms: u32) -> Self {
148        Self {
149            region: region.into(),
150            healthy: true,
151            last_check: chrono::Utc::now(),
152            latency_ms,
153            error: None,
154            consecutive_failures: 0,
155        }
156    }
157
158    /// Create unhealthy status
159    pub fn unhealthy(region: impl Into<String>, error: impl Into<String>) -> Self {
160        Self {
161            region: region.into(),
162            healthy: false,
163            last_check: chrono::Utc::now(),
164            latency_ms: 0,
165            error: Some(error.into()),
166            consecutive_failures: 1,
167        }
168    }
169}
170
171/// Federation manager for multi-region coordination
172pub struct FederationManager {
173    /// Local region name
174    local_region: String,
175    /// Known regions
176    regions: Arc<RwLock<HashMap<String, RegionConfig>>>,
177    /// Region health status
178    health: Arc<RwLock<HashMap<String, RegionHealth>>>,
179    /// Geo router
180    router: Arc<GeoRouter>,
181    /// Health check interval
182    health_check_interval: Duration,
183}
184
185impl FederationManager {
186    /// Create new federation manager
187    pub fn new(local_region: impl Into<String>) -> Self {
188        Self {
189            local_region: local_region.into(),
190            regions: Arc::new(RwLock::new(HashMap::new())),
191            health: Arc::new(RwLock::new(HashMap::new())),
192            router: Arc::new(GeoRouter::new()),
193            health_check_interval: Duration::from_secs(30),
194        }
195    }
196
197    /// Register a region
198    pub fn register_region(&self, config: RegionConfig) {
199        info!(region = %config.name, endpoint = %config.endpoint, "Registering region");
200        
201        let name = config.name.clone();
202        self.regions.write().insert(name.clone(), config);
203        
204        // Initialize health as unknown
205        self.health.write().insert(name.clone(), RegionHealth {
206            region: name,
207            healthy: false,
208            last_check: chrono::Utc::now(),
209            latency_ms: 0,
210            error: Some("Not yet checked".to_string()),
211            consecutive_failures: 0,
212        });
213    }
214
215    /// Unregister a region
216    pub fn unregister_region(&self, name: &str) {
217        info!(region = name, "Unregistering region");
218        self.regions.write().remove(name);
219        self.health.write().remove(name);
220    }
221
222    /// Get all regions
223    pub fn regions(&self) -> Vec<RegionConfig> {
224        self.regions.read().values().cloned().collect()
225    }
226
227    /// Get healthy regions
228    pub fn healthy_regions(&self) -> Vec<RegionConfig> {
229        let health = self.health.read();
230        self.regions.read()
231            .values()
232            .filter(|r| health.get(&r.name).map(|h| h.healthy).unwrap_or(false))
233            .cloned()
234            .collect()
235    }
236
237    /// Get region by name
238    pub fn get_region(&self, name: &str) -> Option<RegionConfig> {
239        self.regions.read().get(name).cloned()
240    }
241
242    /// Get region health
243    pub fn get_health(&self, name: &str) -> Option<RegionHealth> {
244        self.health.read().get(name).cloned()
245    }
246
247    /// Update region health
248    pub fn update_health(&self, health: RegionHealth) {
249        let region = health.region.clone();
250        let was_healthy = self.health.read()
251            .get(&region)
252            .map(|h| h.healthy)
253            .unwrap_or(false);
254
255        if was_healthy && !health.healthy {
256            warn!(region = %region, error = ?health.error, "Region became unhealthy");
257        } else if !was_healthy && health.healthy {
258            info!(region = %region, latency_ms = health.latency_ms, "Region became healthy");
259        }
260
261        self.health.write().insert(region, health);
262    }
263
264    /// Find best region for a workload
265    pub fn find_best_region(&self, requirements: &PlacementRequirements) -> Option<String> {
266        let regions = self.healthy_regions();
267        
268        if regions.is_empty() {
269            return None;
270        }
271
272        // Filter by capacity
273        let candidates: Vec<_> = regions.iter()
274            .filter(|r| {
275                r.capacity.cpu_available >= requirements.cpu_millis
276                    && r.capacity.memory_available >= requirements.memory_mb
277                    && r.capacity.gpu_available >= requirements.gpu_count
278            })
279            .collect();
280
281        if candidates.is_empty() {
282            return None;
283        }
284
285        // Score candidates
286        let health = self.health.read();
287        let mut best: Option<(&RegionConfig, f64)> = None;
288
289        for region in candidates {
290            let mut score = 0.0;
291
292            // Prefer local region
293            if region.is_local {
294                score += 100.0;
295            }
296
297            // Prefer low latency
298            if let Some(h) = health.get(&region.name) {
299                score += 50.0 / (1.0 + h.latency_ms as f64 / 100.0);
300            }
301
302            // Prefer regions with more capacity
303            let cpu_ratio = region.capacity.cpu_available as f64 / region.capacity.cpu_total.max(1) as f64;
304            score += cpu_ratio * 30.0;
305
306            // Check affinity
307            if let Some(preferred) = &requirements.preferred_region {
308                if &region.name == preferred {
309                    score += 200.0;
310                }
311            }
312
313            // Check anti-affinity
314            if let Some(avoid) = &requirements.avoid_region {
315                if &region.name == avoid {
316                    score -= 500.0;
317                }
318            }
319
320            if best.is_none() || score > best.unwrap().1 {
321                best = Some((region, score));
322            }
323        }
324
325        best.map(|(r, _)| r.name.clone())
326    }
327
328    /// Get the geo router
329    pub fn router(&self) -> Arc<GeoRouter> {
330        self.router.clone()
331    }
332
333    /// Start health check loop
334    pub async fn start_health_checks(self: Arc<Self>) {
335        let manager = self.clone();
336        
337        tokio::spawn(async move {
338            loop {
339                manager.check_all_regions().await;
340                tokio::time::sleep(manager.health_check_interval).await;
341            }
342        });
343    }
344
345    /// Check health of all regions
346    async fn check_all_regions(&self) {
347        let regions: Vec<_> = self.regions.read().values().cloned().collect();
348        
349        for region in regions {
350            if region.is_local {
351                // Local region is always healthy
352                self.update_health(RegionHealth::healthy(&region.name, 0));
353                continue;
354            }
355
356            let health = self.check_region_health(&region).await;
357            self.update_health(health);
358        }
359    }
360
361    /// Check health of a single region
362    async fn check_region_health(&self, region: &RegionConfig) -> RegionHealth {
363        let start = Instant::now();
364        
365        // Try to connect to region endpoint
366        let client = reqwest::Client::builder()
367            .timeout(Duration::from_secs(5))
368            .build()
369            .unwrap();
370
371        let health_url = format!("{}/healthz", region.endpoint);
372        
373        match client.get(&health_url).send().await {
374            Ok(response) => {
375                if response.status().is_success() {
376                    RegionHealth::healthy(&region.name, start.elapsed().as_millis() as u32)
377                } else {
378                    RegionHealth::unhealthy(&region.name, format!("HTTP {}", response.status()))
379                }
380            }
381            Err(e) => {
382                RegionHealth::unhealthy(&region.name, e.to_string())
383            }
384        }
385    }
386}
387
388/// Placement requirements for cross-region scheduling
389#[derive(Debug, Clone, Default)]
390pub struct PlacementRequirements {
391    /// CPU requirement (millicores)
392    pub cpu_millis: u64,
393    /// Memory requirement (MB)
394    pub memory_mb: u64,
395    /// GPU count requirement
396    pub gpu_count: u32,
397    /// Preferred region
398    pub preferred_region: Option<String>,
399    /// Region to avoid
400    pub avoid_region: Option<String>,
401    /// Required labels
402    pub required_labels: HashMap<String, String>,
403    /// Data locality (region where data resides)
404    pub data_locality: Option<String>,
405}
406
407impl PlacementRequirements {
408    /// Create new requirements
409    pub fn new() -> Self {
410        Self::default()
411    }
412
413    /// Set CPU requirement
414    pub fn cpu(mut self, millis: u64) -> Self {
415        self.cpu_millis = millis;
416        self
417    }
418
419    /// Set memory requirement
420    pub fn memory(mut self, mb: u64) -> Self {
421        self.memory_mb = mb;
422        self
423    }
424
425    /// Set GPU requirement
426    pub fn gpu(mut self, count: u32) -> Self {
427        self.gpu_count = count;
428        self
429    }
430
431    /// Set preferred region
432    pub fn prefer_region(mut self, region: impl Into<String>) -> Self {
433        self.preferred_region = Some(region.into());
434        self
435    }
436
437    /// Set region to avoid
438    pub fn avoid_region(mut self, region: impl Into<String>) -> Self {
439        self.avoid_region = Some(region.into());
440        self
441    }
442
443    /// Set data locality
444    pub fn with_data_locality(mut self, region: impl Into<String>) -> Self {
445        self.data_locality = Some(region.into());
446        self
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn test_geo_distance() {
456        // New York to London
457        let ny = GeoLocation { latitude: 40.7128, longitude: -74.0060 };
458        let london = GeoLocation { latitude: 51.5074, longitude: -0.1278 };
459        
460        let distance = ny.distance_to(&london);
461        // Should be approximately 5570 km
462        assert!(distance > 5500.0 && distance < 5700.0);
463    }
464
465    #[test]
466    fn test_federation_manager() {
467        let manager = FederationManager::new("us-east-1");
468        
469        manager.register_region(
470            RegionConfig::new("us-east-1", "http://localhost:6443")
471                .with_location(39.0, -77.0)
472                .as_local()
473        );
474        
475        manager.register_region(
476            RegionConfig::new("eu-west-1", "http://eu.example.com:6443")
477                .with_location(53.0, -8.0)
478        );
479
480        assert_eq!(manager.regions().len(), 2);
481    }
482
483    #[test]
484    fn test_find_best_region() {
485        let manager = FederationManager::new("us-east-1");
486        
487        let mut us_config = RegionConfig::new("us-east-1", "http://localhost:6443").as_local();
488        us_config.capacity = RegionCapacity {
489            cpu_total: 10000,
490            cpu_available: 5000,
491            memory_total: 32000,
492            memory_available: 16000,
493            gpu_total: 4,
494            gpu_available: 2,
495            node_count: 5,
496        };
497        manager.register_region(us_config);
498        
499        // Mark as healthy
500        manager.update_health(RegionHealth::healthy("us-east-1", 5));
501
502        let requirements = PlacementRequirements::new()
503            .cpu(1000)
504            .memory(2048);
505
506        let best = manager.find_best_region(&requirements);
507        assert_eq!(best, Some("us-east-1".to_string()));
508    }
509}