use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FederationConfig {
pub regions: Vec<RegionConfig>,
#[serde(default = "default_sync_interval")]
pub sync_interval_secs: u64,
#[serde(default = "default_nats_prefix")]
pub nats_prefix: String,
}
fn default_sync_interval() -> u64 {
30
}
fn default_nats_prefix() -> String {
"varpulis.federation".to_string()
}
impl Default for FederationConfig {
fn default() -> Self {
Self {
regions: Vec::new(),
sync_interval_secs: default_sync_interval(),
nats_prefix: default_nats_prefix(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionConfig {
pub name: String,
pub coordinator_url: String,
pub nats_url: Option<String>,
#[serde(default = "default_priority")]
pub priority: u32,
}
fn default_priority() -> u32 {
100
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RegionStatus {
Online,
Degraded,
Offline,
}
impl std::fmt::Display for RegionStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RegionStatus::Online => write!(f, "online"),
RegionStatus::Degraded => write!(f, "degraded"),
RegionStatus::Offline => write!(f, "offline"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CatalogEntry {
pub pipeline_name: String,
pub group_name: String,
pub event_types: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionState {
pub name: String,
pub status: RegionStatus,
pub pipeline_catalog: Vec<CatalogEntry>,
pub last_heartbeat: chrono::DateTime<chrono::Utc>,
pub worker_count: usize,
pub coordinator_url: String,
}
impl RegionState {
fn new(name: &str, coordinator_url: &str) -> Self {
Self {
name: name.to_string(),
status: RegionStatus::Online,
pipeline_catalog: Vec::new(),
last_heartbeat: chrono::Utc::now(),
worker_count: 0,
coordinator_url: coordinator_url.to_string(),
}
}
}
#[derive(Debug)]
pub struct FederationCoordinator {
config: FederationConfig,
regions: HashMap<String, RegionState>,
local_region: Option<String>,
}
impl FederationCoordinator {
pub fn new(config: FederationConfig) -> Self {
Self {
config,
regions: HashMap::new(),
local_region: None,
}
}
pub fn set_local_region(&mut self, name: &str) {
self.local_region = Some(name.to_string());
}
pub fn register_region(&mut self, config: &RegionConfig) {
let state = RegionState::new(&config.name, &config.coordinator_url);
info!("Federation: registered region '{}'", config.name);
self.regions.insert(config.name.clone(), state);
}
pub fn deregister_region(&mut self, name: &str) -> bool {
if self.regions.remove(name).is_some() {
info!("Federation: deregistered region '{}'", name);
true
} else {
warn!("Federation: region '{}' not found for deregistration", name);
false
}
}
pub fn heartbeat(&mut self, region_name: &str, worker_count: usize) {
if let Some(state) = self.regions.get_mut(region_name) {
state.last_heartbeat = chrono::Utc::now();
state.worker_count = worker_count;
state.status = RegionStatus::Online;
}
}
pub fn sync_catalog(&mut self, region_name: &str, catalog: Vec<CatalogEntry>) {
if let Some(state) = self.regions.get_mut(region_name) {
state.pipeline_catalog = catalog;
state.last_heartbeat = chrono::Utc::now();
}
}
pub fn check_health(&mut self) {
let now = chrono::Utc::now();
let timeout_secs = (self.config.sync_interval_secs * 3) as i64;
let offline_secs = (self.config.sync_interval_secs * 10) as i64;
for state in self.regions.values_mut() {
let elapsed = (now - state.last_heartbeat).num_seconds();
if elapsed > offline_secs {
if state.status != RegionStatus::Offline {
warn!(
"Federation: region '{}' marked offline ({}s since heartbeat)",
state.name, elapsed
);
state.status = RegionStatus::Offline;
}
} else if elapsed > timeout_secs && state.status == RegionStatus::Online {
warn!(
"Federation: region '{}' marked degraded ({}s since heartbeat)",
state.name, elapsed
);
state.status = RegionStatus::Degraded;
}
}
}
pub fn get_global_catalog(&self) -> Vec<(String, CatalogEntry)> {
let mut catalog = Vec::new();
for (region_name, state) in &self.regions {
if state.status != RegionStatus::Offline {
for entry in &state.pipeline_catalog {
catalog.push((region_name.clone(), entry.clone()));
}
}
}
catalog
}
pub fn get_regions(&self) -> &HashMap<String, RegionState> {
&self.regions
}
pub fn get_region(&self, name: &str) -> Option<&RegionState> {
self.regions.get(name)
}
pub fn config(&self) -> &FederationConfig {
&self.config
}
pub fn online_region_count(&self) -> usize {
self.regions
.values()
.filter(|s| s.status == RegionStatus::Online)
.count()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FederationStatus {
pub total_regions: usize,
pub online_regions: usize,
pub degraded_regions: usize,
pub offline_regions: usize,
pub total_pipelines: usize,
pub regions: Vec<RegionSummary>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionSummary {
pub name: String,
pub status: RegionStatus,
pub worker_count: usize,
pub pipeline_count: usize,
pub last_heartbeat: String,
pub coordinator_url: String,
}
impl FederationCoordinator {
pub fn status(&self) -> FederationStatus {
let regions: Vec<RegionSummary> = self
.regions
.values()
.map(|s| RegionSummary {
name: s.name.clone(),
status: s.status,
worker_count: s.worker_count,
pipeline_count: s.pipeline_catalog.len(),
last_heartbeat: s.last_heartbeat.to_rfc3339(),
coordinator_url: s.coordinator_url.clone(),
})
.collect();
FederationStatus {
total_regions: regions.len(),
online_regions: regions
.iter()
.filter(|r| r.status == RegionStatus::Online)
.count(),
degraded_regions: regions
.iter()
.filter(|r| r.status == RegionStatus::Degraded)
.count(),
offline_regions: regions
.iter()
.filter(|r| r.status == RegionStatus::Offline)
.count(),
total_pipelines: regions.iter().map(|r| r.pipeline_count).sum(),
regions,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> FederationConfig {
FederationConfig {
regions: vec![
RegionConfig {
name: "us-east-1".to_string(),
coordinator_url: "http://east:9100".to_string(),
nats_url: Some("nats://east:4222".to_string()),
priority: 10,
},
RegionConfig {
name: "eu-west-1".to_string(),
coordinator_url: "http://west:9100".to_string(),
nats_url: Some("nats://west:4222".to_string()),
priority: 20,
},
],
sync_interval_secs: 30,
nats_prefix: "varpulis.federation".to_string(),
}
}
#[test]
fn test_register_and_deregister() {
let config = test_config();
let mut fed = FederationCoordinator::new(config.clone());
fed.register_region(&config.regions[0]);
fed.register_region(&config.regions[1]);
assert_eq!(fed.get_regions().len(), 2);
assert!(fed.get_region("us-east-1").is_some());
assert!(fed.get_region("eu-west-1").is_some());
assert!(fed.deregister_region("us-east-1"));
assert_eq!(fed.get_regions().len(), 1);
assert!(!fed.deregister_region("nonexistent"));
}
#[test]
fn test_heartbeat_and_health() {
let config = FederationConfig {
sync_interval_secs: 1,
..test_config()
};
let mut fed = FederationCoordinator::new(config.clone());
fed.register_region(&config.regions[0]);
fed.heartbeat("us-east-1", 5);
let state = fed.get_region("us-east-1").unwrap();
assert_eq!(state.status, RegionStatus::Online);
assert_eq!(state.worker_count, 5);
fed.check_health();
assert_eq!(
fed.get_region("us-east-1").unwrap().status,
RegionStatus::Online
);
}
#[test]
fn test_catalog_sync() {
let config = test_config();
let mut fed = FederationCoordinator::new(config.clone());
fed.register_region(&config.regions[0]);
fed.sync_catalog(
"us-east-1",
vec![CatalogEntry {
pipeline_name: "fraud-detect".to_string(),
group_name: "fraud".to_string(),
event_types: vec!["Transaction".to_string()],
}],
);
let catalog = fed.get_global_catalog();
assert_eq!(catalog.len(), 1);
assert_eq!(catalog[0].0, "us-east-1");
assert_eq!(catalog[0].1.pipeline_name, "fraud-detect");
}
#[test]
fn test_status_summary() {
let config = test_config();
let mut fed = FederationCoordinator::new(config.clone());
fed.register_region(&config.regions[0]);
fed.register_region(&config.regions[1]);
fed.heartbeat("us-east-1", 3);
let status = fed.status();
assert_eq!(status.total_regions, 2);
assert_eq!(status.online_regions, 2);
}
#[test]
fn test_online_region_count() {
let config = test_config();
let mut fed = FederationCoordinator::new(config.clone());
fed.register_region(&config.regions[0]);
fed.register_region(&config.regions[1]);
assert_eq!(fed.online_region_count(), 2);
}
}