forge_orchestration/federation/
mod.rs1pub mod routing;
11pub mod replication;
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use tracing::{info, warn};
19
20pub use routing::{GeoRouter, RoutingPolicy, RegionRoute};
21pub use replication::{ReplicationPolicy, ReplicationController};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct RegionConfig {
26 pub name: String,
28 pub display_name: String,
30 pub endpoint: String,
32 pub location: GeoLocation,
34 pub capacity: RegionCapacity,
36 pub labels: HashMap<String, String>,
38 pub is_local: bool,
40}
41
42impl RegionConfig {
43 pub fn new(name: impl Into<String>, endpoint: impl Into<String>) -> Self {
45 Self {
46 name: name.into(),
47 display_name: String::new(),
48 endpoint: endpoint.into(),
49 location: GeoLocation::default(),
50 capacity: RegionCapacity::default(),
51 labels: HashMap::new(),
52 is_local: false,
53 }
54 }
55
56 pub fn with_display_name(mut self, name: impl Into<String>) -> Self {
58 self.display_name = name.into();
59 self
60 }
61
62 pub fn with_location(mut self, lat: f64, lon: f64) -> Self {
64 self.location = GeoLocation { latitude: lat, longitude: lon };
65 self
66 }
67
68 pub fn as_local(mut self) -> Self {
70 self.is_local = true;
71 self
72 }
73
74 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
76 self.labels.insert(key.into(), value.into());
77 self
78 }
79}
80
81#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
83pub struct GeoLocation {
84 pub latitude: f64,
86 pub longitude: f64,
88}
89
90impl GeoLocation {
91 pub fn distance_to(&self, other: &GeoLocation) -> f64 {
93 let r = 6371.0; let lat1 = self.latitude.to_radians();
97 let lat2 = other.latitude.to_radians();
98 let dlat = (other.latitude - self.latitude).to_radians();
99 let dlon = (other.longitude - self.longitude).to_radians();
100
101 let a = (dlat / 2.0).sin().powi(2)
102 + lat1.cos() * lat2.cos() * (dlon / 2.0).sin().powi(2);
103 let c = 2.0 * a.sqrt().asin();
104
105 r * c
106 }
107}
108
109#[derive(Debug, Clone, Default, Serialize, Deserialize)]
111pub struct RegionCapacity {
112 pub cpu_total: u64,
114 pub cpu_available: u64,
116 pub memory_total: u64,
118 pub memory_available: u64,
120 pub gpu_total: u32,
122 pub gpu_available: u32,
124 pub node_count: u32,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct RegionHealth {
131 pub region: String,
133 pub healthy: bool,
135 pub last_check: chrono::DateTime<chrono::Utc>,
137 pub latency_ms: u32,
139 pub error: Option<String>,
141 pub consecutive_failures: u32,
143}
144
145impl RegionHealth {
146 pub fn healthy(region: impl Into<String>, latency_ms: u32) -> Self {
148 Self {
149 region: region.into(),
150 healthy: true,
151 last_check: chrono::Utc::now(),
152 latency_ms,
153 error: None,
154 consecutive_failures: 0,
155 }
156 }
157
158 pub fn unhealthy(region: impl Into<String>, error: impl Into<String>) -> Self {
160 Self {
161 region: region.into(),
162 healthy: false,
163 last_check: chrono::Utc::now(),
164 latency_ms: 0,
165 error: Some(error.into()),
166 consecutive_failures: 1,
167 }
168 }
169}
170
171pub struct FederationManager {
173 local_region: String,
175 regions: Arc<RwLock<HashMap<String, RegionConfig>>>,
177 health: Arc<RwLock<HashMap<String, RegionHealth>>>,
179 router: Arc<GeoRouter>,
181 health_check_interval: Duration,
183}
184
185impl FederationManager {
186 pub fn new(local_region: impl Into<String>) -> Self {
188 Self {
189 local_region: local_region.into(),
190 regions: Arc::new(RwLock::new(HashMap::new())),
191 health: Arc::new(RwLock::new(HashMap::new())),
192 router: Arc::new(GeoRouter::new()),
193 health_check_interval: Duration::from_secs(30),
194 }
195 }
196
197 pub fn register_region(&self, config: RegionConfig) {
199 info!(region = %config.name, endpoint = %config.endpoint, "Registering region");
200
201 let name = config.name.clone();
202 self.regions.write().insert(name.clone(), config);
203
204 self.health.write().insert(name.clone(), RegionHealth {
206 region: name,
207 healthy: false,
208 last_check: chrono::Utc::now(),
209 latency_ms: 0,
210 error: Some("Not yet checked".to_string()),
211 consecutive_failures: 0,
212 });
213 }
214
215 pub fn unregister_region(&self, name: &str) {
217 info!(region = name, "Unregistering region");
218 self.regions.write().remove(name);
219 self.health.write().remove(name);
220 }
221
222 pub fn regions(&self) -> Vec<RegionConfig> {
224 self.regions.read().values().cloned().collect()
225 }
226
227 pub fn healthy_regions(&self) -> Vec<RegionConfig> {
229 let health = self.health.read();
230 self.regions.read()
231 .values()
232 .filter(|r| health.get(&r.name).map(|h| h.healthy).unwrap_or(false))
233 .cloned()
234 .collect()
235 }
236
237 pub fn get_region(&self, name: &str) -> Option<RegionConfig> {
239 self.regions.read().get(name).cloned()
240 }
241
242 pub fn get_health(&self, name: &str) -> Option<RegionHealth> {
244 self.health.read().get(name).cloned()
245 }
246
247 pub fn update_health(&self, health: RegionHealth) {
249 let region = health.region.clone();
250 let was_healthy = self.health.read()
251 .get(®ion)
252 .map(|h| h.healthy)
253 .unwrap_or(false);
254
255 if was_healthy && !health.healthy {
256 warn!(region = %region, error = ?health.error, "Region became unhealthy");
257 } else if !was_healthy && health.healthy {
258 info!(region = %region, latency_ms = health.latency_ms, "Region became healthy");
259 }
260
261 self.health.write().insert(region, health);
262 }
263
264 pub fn find_best_region(&self, requirements: &PlacementRequirements) -> Option<String> {
266 let regions = self.healthy_regions();
267
268 if regions.is_empty() {
269 return None;
270 }
271
272 let candidates: Vec<_> = regions.iter()
274 .filter(|r| {
275 r.capacity.cpu_available >= requirements.cpu_millis
276 && r.capacity.memory_available >= requirements.memory_mb
277 && r.capacity.gpu_available >= requirements.gpu_count
278 })
279 .collect();
280
281 if candidates.is_empty() {
282 return None;
283 }
284
285 let health = self.health.read();
287 let mut best: Option<(&RegionConfig, f64)> = None;
288
289 for region in candidates {
290 let mut score = 0.0;
291
292 if region.is_local {
294 score += 100.0;
295 }
296
297 if let Some(h) = health.get(®ion.name) {
299 score += 50.0 / (1.0 + h.latency_ms as f64 / 100.0);
300 }
301
302 let cpu_ratio = region.capacity.cpu_available as f64 / region.capacity.cpu_total.max(1) as f64;
304 score += cpu_ratio * 30.0;
305
306 if let Some(preferred) = &requirements.preferred_region {
308 if ®ion.name == preferred {
309 score += 200.0;
310 }
311 }
312
313 if let Some(avoid) = &requirements.avoid_region {
315 if ®ion.name == avoid {
316 score -= 500.0;
317 }
318 }
319
320 if best.is_none() || score > best.unwrap().1 {
321 best = Some((region, score));
322 }
323 }
324
325 best.map(|(r, _)| r.name.clone())
326 }
327
328 pub fn router(&self) -> Arc<GeoRouter> {
330 self.router.clone()
331 }
332
333 pub async fn start_health_checks(self: Arc<Self>) {
335 let manager = self.clone();
336
337 tokio::spawn(async move {
338 loop {
339 manager.check_all_regions().await;
340 tokio::time::sleep(manager.health_check_interval).await;
341 }
342 });
343 }
344
345 async fn check_all_regions(&self) {
347 let regions: Vec<_> = self.regions.read().values().cloned().collect();
348
349 for region in regions {
350 if region.is_local {
351 self.update_health(RegionHealth::healthy(®ion.name, 0));
353 continue;
354 }
355
356 let health = self.check_region_health(®ion).await;
357 self.update_health(health);
358 }
359 }
360
361 async fn check_region_health(&self, region: &RegionConfig) -> RegionHealth {
363 let start = Instant::now();
364
365 let client = reqwest::Client::builder()
367 .timeout(Duration::from_secs(5))
368 .build()
369 .unwrap();
370
371 let health_url = format!("{}/healthz", region.endpoint);
372
373 match client.get(&health_url).send().await {
374 Ok(response) => {
375 if response.status().is_success() {
376 RegionHealth::healthy(®ion.name, start.elapsed().as_millis() as u32)
377 } else {
378 RegionHealth::unhealthy(®ion.name, format!("HTTP {}", response.status()))
379 }
380 }
381 Err(e) => {
382 RegionHealth::unhealthy(®ion.name, e.to_string())
383 }
384 }
385 }
386}
387
388#[derive(Debug, Clone, Default)]
390pub struct PlacementRequirements {
391 pub cpu_millis: u64,
393 pub memory_mb: u64,
395 pub gpu_count: u32,
397 pub preferred_region: Option<String>,
399 pub avoid_region: Option<String>,
401 pub required_labels: HashMap<String, String>,
403 pub data_locality: Option<String>,
405}
406
407impl PlacementRequirements {
408 pub fn new() -> Self {
410 Self::default()
411 }
412
413 pub fn cpu(mut self, millis: u64) -> Self {
415 self.cpu_millis = millis;
416 self
417 }
418
419 pub fn memory(mut self, mb: u64) -> Self {
421 self.memory_mb = mb;
422 self
423 }
424
425 pub fn gpu(mut self, count: u32) -> Self {
427 self.gpu_count = count;
428 self
429 }
430
431 pub fn prefer_region(mut self, region: impl Into<String>) -> Self {
433 self.preferred_region = Some(region.into());
434 self
435 }
436
437 pub fn avoid_region(mut self, region: impl Into<String>) -> Self {
439 self.avoid_region = Some(region.into());
440 self
441 }
442
443 pub fn with_data_locality(mut self, region: impl Into<String>) -> Self {
445 self.data_locality = Some(region.into());
446 self
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn test_geo_distance() {
456 let ny = GeoLocation { latitude: 40.7128, longitude: -74.0060 };
458 let london = GeoLocation { latitude: 51.5074, longitude: -0.1278 };
459
460 let distance = ny.distance_to(&london);
461 assert!(distance > 5500.0 && distance < 5700.0);
463 }
464
465 #[test]
466 fn test_federation_manager() {
467 let manager = FederationManager::new("us-east-1");
468
469 manager.register_region(
470 RegionConfig::new("us-east-1", "http://localhost:6443")
471 .with_location(39.0, -77.0)
472 .as_local()
473 );
474
475 manager.register_region(
476 RegionConfig::new("eu-west-1", "http://eu.example.com:6443")
477 .with_location(53.0, -8.0)
478 );
479
480 assert_eq!(manager.regions().len(), 2);
481 }
482
483 #[test]
484 fn test_find_best_region() {
485 let manager = FederationManager::new("us-east-1");
486
487 let mut us_config = RegionConfig::new("us-east-1", "http://localhost:6443").as_local();
488 us_config.capacity = RegionCapacity {
489 cpu_total: 10000,
490 cpu_available: 5000,
491 memory_total: 32000,
492 memory_available: 16000,
493 gpu_total: 4,
494 gpu_available: 2,
495 node_count: 5,
496 };
497 manager.register_region(us_config);
498
499 manager.update_health(RegionHealth::healthy("us-east-1", 5));
501
502 let requirements = PlacementRequirements::new()
503 .cpu(1000)
504 .memory(2048);
505
506 let best = manager.find_best_region(&requirements);
507 assert_eq!(best, Some("us-east-1".to_string()));
508 }
509}