use crate::distributed::coordinator::{QueryPlan, QueryResult};
use crate::distributed::shard::ShardId;
use crate::{GraphError, Result};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use uuid::Uuid;
pub type ClusterId = String;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteCluster {
pub cluster_id: ClusterId,
pub name: String,
pub endpoint: String,
pub status: ClusterStatus,
pub auth_token: Option<String>,
pub last_health_check: DateTime<Utc>,
pub metadata: HashMap<String, String>,
pub shard_count: u32,
pub region: Option<String>,
}
impl RemoteCluster {
pub fn new(cluster_id: ClusterId, name: String, endpoint: String) -> Self {
Self {
cluster_id,
name,
endpoint,
status: ClusterStatus::Unknown,
auth_token: None,
last_health_check: Utc::now(),
metadata: HashMap::new(),
shard_count: 0,
region: None,
}
}
pub fn is_healthy(&self) -> bool {
matches!(self.status, ClusterStatus::Healthy)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ClusterStatus {
Healthy,
Degraded,
Unreachable,
Unknown,
}
pub struct ClusterRegistry {
clusters: Arc<DashMap<ClusterId, RemoteCluster>>,
discovery_config: DiscoveryConfig,
}
impl ClusterRegistry {
pub fn new(discovery_config: DiscoveryConfig) -> Self {
Self {
clusters: Arc::new(DashMap::new()),
discovery_config,
}
}
pub fn register_cluster(&self, cluster: RemoteCluster) -> Result<()> {
info!(
"Registering cluster: {} ({})",
cluster.name, cluster.cluster_id
);
self.clusters.insert(cluster.cluster_id.clone(), cluster);
Ok(())
}
pub fn unregister_cluster(&self, cluster_id: &ClusterId) -> Result<()> {
info!("Unregistering cluster: {}", cluster_id);
self.clusters.remove(cluster_id).ok_or_else(|| {
GraphError::FederationError(format!("Cluster not found: {}", cluster_id))
})?;
Ok(())
}
pub fn get_cluster(&self, cluster_id: &ClusterId) -> Option<RemoteCluster> {
self.clusters.get(cluster_id).map(|c| c.value().clone())
}
pub fn list_clusters(&self) -> Vec<RemoteCluster> {
self.clusters.iter().map(|e| e.value().clone()).collect()
}
pub fn healthy_clusters(&self) -> Vec<RemoteCluster> {
self.clusters
.iter()
.filter(|e| e.value().is_healthy())
.map(|e| e.value().clone())
.collect()
}
pub async fn health_check(&self, cluster_id: &ClusterId) -> Result<ClusterStatus> {
let cluster = self.get_cluster(cluster_id).ok_or_else(|| {
GraphError::FederationError(format!("Cluster not found: {}", cluster_id))
})?;
let status = ClusterStatus::Healthy;
if let Some(mut entry) = self.clusters.get_mut(cluster_id) {
entry.status = status;
entry.last_health_check = Utc::now();
}
debug!("Health check for cluster {}: {:?}", cluster_id, status);
Ok(status)
}
pub async fn health_check_all(&self) -> HashMap<ClusterId, ClusterStatus> {
let mut results = HashMap::new();
for cluster in self.list_clusters() {
match self.health_check(&cluster.cluster_id).await {
Ok(status) => {
results.insert(cluster.cluster_id, status);
}
Err(e) => {
warn!(
"Health check failed for cluster {}: {}",
cluster.cluster_id, e
);
results.insert(cluster.cluster_id, ClusterStatus::Unreachable);
}
}
}
results
}
pub async fn discover_clusters(&self) -> Result<Vec<RemoteCluster>> {
if !self.discovery_config.auto_discovery {
return Ok(Vec::new());
}
info!("Discovering clusters...");
Ok(Vec::new())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoveryConfig {
pub auto_discovery: bool,
pub discovery_method: DiscoveryMethod,
pub discovery_interval_seconds: u64,
pub health_check_interval_seconds: u64,
}
impl Default for DiscoveryConfig {
fn default() -> Self {
Self {
auto_discovery: false,
discovery_method: DiscoveryMethod::Static,
discovery_interval_seconds: 60,
health_check_interval_seconds: 30,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum DiscoveryMethod {
Static,
Dns,
Consul,
Etcd,
Kubernetes,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FederatedQuery {
pub query_id: String,
pub query: String,
pub target_clusters: Vec<ClusterId>,
pub strategy: FederationStrategy,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum FederationStrategy {
Parallel,
Sequential,
PrimaryWithFallback,
Nearest,
}
pub struct Federation {
registry: Arc<ClusterRegistry>,
config: FederationConfig,
active_queries: Arc<DashMap<String, FederatedQuery>>,
}
impl Federation {
pub fn new(config: FederationConfig) -> Self {
let discovery_config = DiscoveryConfig::default();
Self {
registry: Arc::new(ClusterRegistry::new(discovery_config)),
config,
active_queries: Arc::new(DashMap::new()),
}
}
pub fn registry(&self) -> Arc<ClusterRegistry> {
Arc::clone(&self.registry)
}
pub async fn execute_federated(
&self,
query: &str,
target_clusters: Option<Vec<ClusterId>>,
) -> Result<FederatedQueryResult> {
let query_id = Uuid::new_v4().to_string();
let start = std::time::Instant::now();
let clusters = if let Some(targets) = target_clusters {
targets
.into_iter()
.filter_map(|id| self.registry.get_cluster(&id))
.collect()
} else {
self.registry.healthy_clusters()
};
if clusters.is_empty() {
return Err(GraphError::FederationError(
"No healthy clusters available".to_string(),
));
}
info!(
"Executing federated query {} across {} clusters",
query_id,
clusters.len()
);
let federated_query = FederatedQuery {
query_id: query_id.clone(),
query: query.to_string(),
target_clusters: clusters.iter().map(|c| c.cluster_id.clone()).collect(),
strategy: self.config.default_strategy,
created_at: Utc::now(),
};
self.active_queries
.insert(query_id.clone(), federated_query.clone());
let mut cluster_results = HashMap::new();
match self.config.default_strategy {
FederationStrategy::Parallel => {
let mut handles = Vec::new();
for cluster in &clusters {
let cluster_id = cluster.cluster_id.clone();
let query_str = query.to_string();
let cluster_clone = cluster.clone();
let handle = tokio::spawn(async move {
Self::execute_on_cluster(&cluster_clone, &query_str).await
});
handles.push((cluster_id, handle));
}
for (cluster_id, handle) in handles {
match handle.await {
Ok(Ok(result)) => {
cluster_results.insert(cluster_id, result);
}
Ok(Err(e)) => {
warn!("Query failed on cluster {}: {}", cluster_id, e);
}
Err(e) => {
warn!("Task failed for cluster {}: {}", cluster_id, e);
}
}
}
}
FederationStrategy::Sequential => {
for cluster in &clusters {
match Self::execute_on_cluster(cluster, query).await {
Ok(result) => {
cluster_results.insert(cluster.cluster_id.clone(), result);
}
Err(e) => {
warn!("Query failed on cluster {}: {}", cluster.cluster_id, e);
}
}
}
}
FederationStrategy::Nearest | FederationStrategy::PrimaryWithFallback => {
if let Some(cluster) = clusters.first() {
match Self::execute_on_cluster(cluster, query).await {
Ok(result) => {
cluster_results.insert(cluster.cluster_id.clone(), result);
}
Err(e) => {
warn!("Query failed on cluster {}: {}", cluster.cluster_id, e);
}
}
}
}
}
let merged_result = self.merge_results(cluster_results)?;
let execution_time_ms = start.elapsed().as_millis() as u64;
self.active_queries.remove(&query_id);
Ok(FederatedQueryResult {
query_id,
merged_result,
clusters_queried: clusters.len(),
execution_time_ms,
})
}
async fn execute_on_cluster(cluster: &RemoteCluster, query: &str) -> Result<QueryResult> {
debug!("Executing query on cluster: {}", cluster.cluster_id);
Ok(QueryResult {
query_id: Uuid::new_v4().to_string(),
nodes: Vec::new(),
edges: Vec::new(),
aggregates: HashMap::new(),
stats: crate::distributed::coordinator::QueryStats {
execution_time_ms: 0,
shards_queried: 0,
nodes_scanned: 0,
edges_scanned: 0,
cached: false,
},
})
}
fn merge_results(&self, results: HashMap<ClusterId, QueryResult>) -> Result<QueryResult> {
if results.is_empty() {
return Err(GraphError::FederationError(
"No results to merge".to_string(),
));
}
let mut merged = QueryResult {
query_id: Uuid::new_v4().to_string(),
nodes: Vec::new(),
edges: Vec::new(),
aggregates: HashMap::new(),
stats: crate::distributed::coordinator::QueryStats {
execution_time_ms: 0,
shards_queried: 0,
nodes_scanned: 0,
edges_scanned: 0,
cached: false,
},
};
for (cluster_id, result) in results {
debug!("Merging results from cluster: {}", cluster_id);
for node in result.nodes {
if !merged.nodes.iter().any(|n| n.id == node.id) {
merged.nodes.push(node);
}
}
for edge in result.edges {
if !merged.edges.iter().any(|e| e.id == edge.id) {
merged.edges.push(edge);
}
}
for (key, value) in result.aggregates {
merged
.aggregates
.insert(format!("{}_{}", cluster_id, key), value);
}
merged.stats.execution_time_ms = merged
.stats
.execution_time_ms
.max(result.stats.execution_time_ms);
merged.stats.shards_queried += result.stats.shards_queried;
merged.stats.nodes_scanned += result.stats.nodes_scanned;
merged.stats.edges_scanned += result.stats.edges_scanned;
}
Ok(merged)
}
pub fn config(&self) -> &FederationConfig {
&self.config
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FederationConfig {
pub default_strategy: FederationStrategy,
pub max_clusters: usize,
pub query_timeout_seconds: u64,
pub enable_caching: bool,
}
impl Default for FederationConfig {
fn default() -> Self {
Self {
default_strategy: FederationStrategy::Parallel,
max_clusters: 10,
query_timeout_seconds: 30,
enable_caching: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FederatedQueryResult {
pub query_id: String,
pub merged_result: QueryResult,
pub clusters_queried: usize,
pub execution_time_ms: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cluster_registry() {
let config = DiscoveryConfig::default();
let registry = ClusterRegistry::new(config);
let cluster = RemoteCluster::new(
"cluster-1".to_string(),
"Test Cluster".to_string(),
"http://localhost:8080".to_string(),
);
registry.register_cluster(cluster.clone()).unwrap();
assert_eq!(registry.list_clusters().len(), 1);
assert!(registry.get_cluster(&"cluster-1".to_string()).is_some());
}
#[tokio::test]
async fn test_federation() {
let config = FederationConfig::default();
let federation = Federation::new(config);
let cluster = RemoteCluster::new(
"cluster-1".to_string(),
"Test Cluster".to_string(),
"http://localhost:8080".to_string(),
);
federation.registry().register_cluster(cluster).unwrap();
}
#[test]
fn test_remote_cluster() {
let cluster = RemoteCluster::new(
"test".to_string(),
"Test".to_string(),
"http://localhost".to_string(),
);
assert!(!cluster.is_healthy());
}
}