use anyhow::{Context, Result};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use super::config::{FederationConfig, RemoteEndpoint};
use super::dataset_federation::{DatasetFederation, SparqlEndpoint};
use super::query_planner::QueryPlanner;
use super::schema_stitcher::SchemaStitcher;
use crate::types::Schema;
pub struct FederationManager {
config: FederationConfig,
schema_stitcher: Arc<SchemaStitcher>,
query_planner: Arc<QueryPlanner>,
dataset_federation: Arc<RwLock<DatasetFederation>>,
merged_schema: Arc<RwLock<Option<Schema>>>,
}
impl FederationManager {
pub fn new(local_schema: Arc<Schema>, config: FederationConfig) -> Self {
let schema_stitcher = Arc::new(SchemaStitcher::new(local_schema));
let query_planner = Arc::new(QueryPlanner::new(schema_stitcher.clone(), config.clone()));
let dataset_federation = Arc::new(RwLock::new(DatasetFederation::new()));
Self {
config,
schema_stitcher,
query_planner,
dataset_federation,
merged_schema: Arc::new(RwLock::new(None)),
}
}
pub async fn initialize(&self) -> Result<()> {
tracing::info!(
"Initializing federation manager with {} endpoints",
self.config.endpoints.len()
);
let merged_schema = self
.schema_stitcher
.merge_schemas(&self.config.endpoints)
.await
.context("Failed to merge remote schemas")?;
{
let mut schema_guard = self.merged_schema.write().await;
*schema_guard = Some(merged_schema);
}
{
let mut dataset_federation = self.dataset_federation.write().await;
for endpoint in &self.config.endpoints {
let sparql_endpoint = SparqlEndpoint {
id: endpoint.id.clone(),
url: endpoint.url.clone(),
auth_header: endpoint.auth_header.clone(),
timeout_secs: endpoint.timeout_secs,
max_concurrent_queries: 10, supported_features: std::collections::HashSet::new(), statistics: None,
};
dataset_federation.add_endpoint(sparql_endpoint);
}
}
for endpoint in &self.config.endpoints {
if let Err(e) = self.update_endpoint_statistics(&endpoint.id).await {
tracing::warn!(
"Failed to update statistics for endpoint {}: {}",
endpoint.id,
e
);
}
}
tracing::info!("Federation manager initialized successfully");
Ok(())
}
pub async fn get_merged_schema(&self) -> Result<Schema> {
let schema_guard = self.merged_schema.read().await;
schema_guard
.as_ref()
.cloned()
.ok_or_else(|| anyhow::anyhow!("Federation not initialized"))
}
pub async fn execute_graphql_query(
&self,
query: &crate::ast::Document,
) -> Result<serde_json::Value> {
let merged_schema = self.get_merged_schema().await?;
let query_plan = self
.query_planner
.plan_query(query, &merged_schema)
.await
.context("Failed to plan federated query")?;
tracing::debug!(
"Executing federated query with {} steps",
query_plan.steps.len()
);
self.query_planner
.execute_plan(&query_plan)
.await
.context("Failed to execute federated query plan")
}
pub async fn execute_sparql_query(&self, query: &str) -> Result<serde_json::Value> {
let dataset_federation = self.dataset_federation.read().await;
dataset_federation
.federate_sparql_query(query)
.await
.context("Failed to execute federated SPARQL query")
}
pub async fn add_endpoint(&mut self, endpoint: RemoteEndpoint) -> Result<()> {
tracing::info!("Adding new federation endpoint: {}", endpoint.id);
let remote_schema = self
.schema_stitcher
.introspect_remote(&endpoint)
.await
.context("Failed to introspect new endpoint")?;
let mut merged_schema = self.get_merged_schema().await?;
self.schema_stitcher
.merge_schema_into(&mut merged_schema, &remote_schema, &endpoint)
.context("Failed to merge new endpoint schema")?;
{
let mut schema_guard = self.merged_schema.write().await;
*schema_guard = Some(merged_schema);
}
{
let mut dataset_federation = self.dataset_federation.write().await;
let sparql_endpoint = SparqlEndpoint {
id: endpoint.id.clone(),
url: endpoint.url.clone(),
auth_header: endpoint.auth_header.clone(),
timeout_secs: endpoint.timeout_secs,
max_concurrent_queries: 10,
supported_features: std::collections::HashSet::new(),
statistics: None,
};
dataset_federation.add_endpoint(sparql_endpoint);
}
self.config.endpoints.push(endpoint);
tracing::info!("Successfully added new federation endpoint");
Ok(())
}
pub async fn remove_endpoint(&mut self, endpoint_id: &str) -> Result<()> {
tracing::info!("Removing federation endpoint: {}", endpoint_id);
self.config.endpoints.retain(|ep| ep.id != endpoint_id);
let merged_schema = self
.schema_stitcher
.merge_schemas(&self.config.endpoints)
.await
.context("Failed to rebuild merged schema")?;
{
let mut schema_guard = self.merged_schema.write().await;
*schema_guard = Some(merged_schema);
}
tracing::info!("Successfully removed federation endpoint: {}", endpoint_id);
Ok(())
}
pub async fn update_endpoint_statistics(&self, endpoint_id: &str) -> Result<()> {
let mut dataset_federation = self.dataset_federation.write().await;
dataset_federation
.update_endpoint_statistics(endpoint_id)
.await
.context("Failed to update endpoint statistics")
}
pub async fn get_health_status(&self) -> Result<FederationHealthStatus> {
let mut status = FederationHealthStatus {
total_endpoints: self.config.endpoints.len(),
healthy_endpoints: 0,
endpoint_statuses: HashMap::new(),
schema_cache_size: 0,
};
for endpoint in &self.config.endpoints {
let endpoint_healthy = self.check_endpoint_health(endpoint).await;
if endpoint_healthy {
status.healthy_endpoints += 1;
}
status.endpoint_statuses.insert(
endpoint.id.clone(),
EndpointHealthStatus {
healthy: endpoint_healthy,
last_check: chrono::Utc::now(),
response_time_ms: None, },
);
}
Ok(status)
}
async fn check_endpoint_health(&self, endpoint: &RemoteEndpoint) -> bool {
if let Some(health_url) = &endpoint.health_check_url {
let client = reqwest::Client::new();
match client
.get(health_url)
.timeout(std::time::Duration::from_secs(5))
.send()
.await
{
Ok(response) => response.status().is_success(),
Err(_) => false,
}
} else {
self.schema_stitcher
.introspect_remote(endpoint)
.await
.is_ok()
}
}
pub fn get_config(&self) -> &FederationConfig {
&self.config
}
pub async fn update_config(&mut self, new_config: FederationConfig) -> Result<()> {
tracing::info!("Updating federation configuration");
self.config = new_config;
self.initialize()
.await
.context("Failed to reinitialize with new configuration")
}
pub async fn clear_caches(&self) -> Result<()> {
tracing::info!("Clearing federation caches");
{
let mut schema_guard = self.merged_schema.write().await;
*schema_guard = None;
}
self.initialize()
.await
.context("Failed to reinitialize after clearing caches")
}
}
#[derive(Debug, Clone)]
pub struct FederationHealthStatus {
pub total_endpoints: usize,
pub healthy_endpoints: usize,
pub endpoint_statuses: HashMap<String, EndpointHealthStatus>,
pub schema_cache_size: usize,
}
#[derive(Debug, Clone)]
pub struct EndpointHealthStatus {
pub healthy: bool,
pub last_check: chrono::DateTime<chrono::Utc>,
pub response_time_ms: Option<u64>,
}