pub mod discovery;
pub mod executor;
pub mod health;
pub mod planner;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use url::Url;
use crate::error::FusekiResult;
#[derive(Debug, Clone)]
pub struct FederationConfig {
pub enable_discovery: bool,
pub discovery_interval: Duration,
pub max_concurrent_requests: usize,
pub request_timeout: Duration,
pub enable_cost_estimation: bool,
pub circuit_breaker: CircuitBreakerConfig,
}
impl Default for FederationConfig {
fn default() -> Self {
Self {
enable_discovery: true,
discovery_interval: Duration::from_secs(300), max_concurrent_requests: 10,
request_timeout: Duration::from_secs(30),
enable_cost_estimation: true,
circuit_breaker: CircuitBreakerConfig::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u32,
pub success_threshold: u32,
pub timeout: Duration,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
success_threshold: 3,
timeout: Duration::from_secs(60),
}
}
}
#[derive(Debug, Clone)]
pub struct ServiceEndpoint {
pub url: Url,
pub metadata: ServiceMetadata,
pub health: ServiceHealth,
pub capabilities: ServiceCapabilities,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct ServiceMetadata {
pub name: String,
pub description: Option<String>,
pub tags: Vec<String>,
pub location: Option<String>,
pub version: Option<String>,
pub contact: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub enum ServiceHealth {
Healthy,
Degraded,
Unhealthy,
#[default]
Unknown,
}
#[derive(Debug, Clone, Default)]
pub struct ServiceCapabilities {
pub sparql_features: Vec<String>,
pub dataset_size: Option<u64>,
pub avg_response_time: Option<Duration>,
pub max_result_size: Option<usize>,
pub result_formats: Vec<String>,
}
pub type EndpointCapabilities = ServiceCapabilities;
pub struct FederationManager {
config: FederationConfig,
endpoints: Arc<RwLock<HashMap<String, ServiceEndpoint>>>,
discovery: Arc<dyn planner::ServiceDiscovery>,
planner: Arc<planner::QueryPlanner>,
health_monitor: Arc<health::HealthMonitor>,
}
impl FederationManager {
pub fn new(config: FederationConfig) -> Self {
let endpoints = Arc::new(RwLock::new(HashMap::new()));
let discovery = Arc::new(planner::DefaultServiceDiscovery::new());
let cost_estimator = Arc::new(planner::DefaultCostEstimator::new());
Self {
discovery: discovery.clone(),
planner: Arc::new(planner::QueryPlanner::new(
config.clone(),
discovery.clone(),
cost_estimator,
)),
health_monitor: Arc::new(health::HealthMonitor::new(
config.clone(),
endpoints.clone(),
)),
config,
endpoints,
}
}
pub async fn start(&self) -> FusekiResult<()> {
Ok(())
}
pub async fn stop(&self) -> FusekiResult<()> {
Ok(())
}
pub async fn register_endpoint(
&self,
id: String,
endpoint: ServiceEndpoint,
) -> FusekiResult<()> {
let mut endpoints = self.endpoints.write().await;
endpoints.insert(id, endpoint);
Ok(())
}
pub async fn get_healthy_endpoints(&self) -> Vec<(String, ServiceEndpoint)> {
let endpoints = self.endpoints.read().await;
endpoints
.iter()
.filter(|(_, ep)| matches!(ep.health, ServiceHealth::Healthy))
.map(|(id, ep)| (id.clone(), ep.clone()))
.collect()
}
pub async fn create_execution_plan(
&self,
query: &str,
) -> crate::error::FusekiResult<crate::federated_query_optimizer::ExecutionPlan> {
let service_patterns = self.extract_service_patterns(query)?;
self.planner
.create_execution_plan(query, &service_patterns)
.await
}
fn extract_service_patterns(
&self,
query: &str,
) -> crate::error::FusekiResult<Vec<crate::federated_query_optimizer::ServicePattern>> {
use regex::Regex;
let mut patterns = Vec::new();
let service_regex =
Regex::new(r"SERVICE\s+<([^>]+)>\s*\{").expect("regex pattern should be valid");
for captures in service_regex.captures_iter(query) {
if let Some(endpoint) = captures.get(1) {
patterns.push(crate::federated_query_optimizer::ServicePattern {
service_url: endpoint.as_str().to_string(),
pattern: query.to_string(), is_silent: false,
is_optional: false,
});
}
}
if patterns.is_empty() {
patterns.push(crate::federated_query_optimizer::ServicePattern {
service_url: "local".to_string(),
pattern: query.to_string(),
is_silent: false,
is_optional: false,
});
}
Ok(patterns)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = FederationConfig::default();
assert!(config.enable_discovery);
assert_eq!(config.max_concurrent_requests, 10);
}
}