use crate::{
error::{FusekiError, FusekiResult},
metrics::MetricsService,
};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use metrics::{counter, histogram};
use reqwest::{Client, ClientBuilder, StatusCode};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
sync::{RwLock, Semaphore},
time::timeout,
};
type QueryDecomposeFn = Box<dyn Fn(&str) -> Vec<QueryFragment> + Send + Sync>;
pub struct FederatedQueryOptimizer {
pub endpoints: Arc<RwLock<EndpointRegistry>>,
pub planner: Arc<QueryPlanner>,
pub cost_estimator: Arc<CostEstimator>,
pub executor: Arc<FederatedExecutor>,
pub merger: Arc<ResultMerger>,
pub metrics: Arc<MetricsService>,
}
pub struct EndpointRegistry {
endpoints: HashMap<String, EndpointInfo>,
health_cache: DashMap<String, HealthStatus>,
discovery: Arc<EndpointDiscovery>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EndpointInfo {
pub url: String,
pub name: String,
pub description: Option<String>,
pub capabilities: EndpointCapabilities,
pub authentication: Option<EndpointAuth>,
pub timeout_ms: u64,
pub max_retries: u32,
pub priority: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EndpointCapabilities {
pub sparql_version: String,
pub supports_update: bool,
pub supports_graph_store: bool,
pub supports_service_description: bool,
pub max_query_size: Option<usize>,
pub rate_limit: Option<RateLimit>,
pub features: HashSet<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RateLimit {
pub requests_per_second: u32,
pub burst_size: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EndpointAuth {
Basic {
username: String,
password: String,
},
Bearer {
token: String,
},
ApiKey {
key: String,
header_name: String,
},
OAuth2 {
client_id: String,
client_secret: String,
token_url: String,
},
}
#[derive(Debug, Clone)]
pub struct HealthStatus {
pub is_healthy: bool,
pub last_check: Instant,
pub response_time_ms: u64,
pub error_count: u32,
pub success_count: u32,
}
pub struct EndpointDiscovery {
client: Client,
catalogs: Vec<String>,
}
pub struct QueryPlanner {
decomposition_rules: Vec<DecompositionRule>,
join_optimizer: Arc<JoinOrderOptimizer>,
statistics: Arc<RwLock<FederationStatistics>>,
}
pub struct DecompositionRule {
pub name: String,
pub pattern: String,
pub applicability_check: Box<dyn Fn(&str) -> bool + Send + Sync>,
pub decompose: QueryDecomposeFn,
}
#[derive(Debug, Clone)]
pub struct QueryFragment {
pub fragment_id: String,
pub sparql: String,
pub target_endpoints: Vec<String>,
pub dependencies: Vec<String>,
pub estimated_cost: f64,
pub is_optional: bool,
}
pub struct JoinOrderOptimizer {
cost_model: Arc<JoinCostModel>,
dp_cache: DashMap<String, JoinPlan>,
}
pub struct JoinCostModel {
latency_map: DashMap<(String, String), Duration>,
bandwidth_map: DashMap<String, f64>,
}
#[derive(Debug, Clone)]
pub struct JoinPlan {
pub steps: Vec<JoinStep>,
pub estimated_cost: f64,
pub estimated_time_ms: u64,
}
#[derive(Debug, Clone)]
pub struct JoinStep {
pub operation: JoinOperation,
pub left_source: String,
pub right_source: String,
pub output_destination: String,
}
#[derive(Debug, Clone)]
pub enum JoinOperation {
HashJoin,
SortMergeJoin,
NestedLoopJoin,
BroadcastJoin,
IndexJoin,
}
pub struct CostEstimator {
history: Arc<RwLock<QueryHistory>>,
ml_model: Option<Arc<CostPredictionModel>>,
cardinality: Arc<CardinalityEstimator>,
}
pub struct QueryHistory {
executions: Vec<QueryExecution>,
patterns: HashMap<String, PatternStats>,
}
#[derive(Debug, Clone)]
pub struct QueryExecution {
pub query_hash: String,
pub fragments: Vec<String>,
pub endpoints: Vec<String>,
pub execution_time_ms: u64,
pub result_count: usize,
pub timestamp: Instant,
}
#[derive(Debug, Clone)]
pub struct PatternStats {
pub pattern: String,
pub avg_execution_time: f64,
pub avg_result_count: f64,
pub execution_count: u32,
}
pub struct CostPredictionModel {
_model_data: Vec<u8>,
}
pub struct CardinalityEstimator {
endpoint_stats: DashMap<String, EndpointStatistics>,
histograms: DashMap<String, Histogram>,
}
#[derive(Debug, Clone)]
pub struct EndpointStatistics {
pub triple_count: u64,
pub distinct_subjects: u64,
pub distinct_predicates: u64,
pub distinct_objects: u64,
pub last_updated: Instant,
}
impl Default for EndpointStatistics {
fn default() -> Self {
Self {
triple_count: 0,
distinct_subjects: 0,
distinct_predicates: 0,
distinct_objects: 0,
last_updated: Instant::now(),
}
}
}
#[derive(Debug, Clone)]
pub struct Histogram {
pub buckets: Vec<HistogramBucket>,
pub total_count: u64,
}
#[derive(Debug, Clone)]
pub struct HistogramBucket {
pub min_value: String,
pub max_value: String,
pub count: u64,
}
pub struct FederationStatistics {
query_stats: HashMap<String, QueryStats>,
endpoint_stats: HashMap<String, EndpointPerformance>,
}
#[derive(Debug, Clone)]
pub struct QueryStats {
pub total_executions: u64,
pub avg_execution_time: f64,
pub success_rate: f64,
}
#[derive(Debug, Clone)]
pub struct EndpointPerformance {
pub avg_response_time: f64,
pub availability: f64,
pub throughput: f64,
}
pub struct FederatedExecutor {
client_pool: Arc<ClientPool>,
strategies: Vec<Arc<dyn ExecutionStrategy>>,
semaphore: Arc<Semaphore>,
retry_policy: Arc<RetryPolicy>,
}
pub struct ClientPool {
clients: DashMap<String, Client>,
max_connections_per_endpoint: usize,
}
#[async_trait]
pub trait ExecutionStrategy: Send + Sync {
fn name(&self) -> &str;
fn applicable(&self, plan: &ExecutionPlan) -> bool;
async fn execute(
&self,
plan: &ExecutionPlan,
executor: &FederatedExecutor,
) -> FusekiResult<QueryResults>;
}
#[derive(Debug, Clone)]
pub struct ExecutionPlan {
pub query_id: String,
pub fragments: Vec<QueryFragment>,
pub join_plan: JoinPlan,
pub timeout_ms: u64,
pub optimization_hints: HashMap<String, String>,
pub execution_steps: Vec<String>,
pub estimated_cost: f64,
pub resource_requirements: ResourceRequirements,
}
#[derive(Debug, Clone)]
pub struct ResourceRequirements {
pub required_endpoints: Vec<String>,
pub estimated_memory_mb: f64,
pub estimated_cpu_cores: f64,
}
#[derive(Debug, Clone)]
pub struct QueryResults {
pub bindings: Vec<HashMap<String, serde_json::Value>>,
pub metadata: ResultMetadata,
}
#[derive(Debug, Clone)]
pub struct ResultMetadata {
pub total_execution_time_ms: u64,
pub endpoint_times: HashMap<String, u64>,
pub result_count: usize,
pub partial_results: bool,
}
pub struct RetryPolicy {
pub max_retries: u32,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
pub exponential_base: f64,
}
#[derive(Debug, Clone)]
pub struct ServicePattern {
pub service_url: String,
pub pattern: String,
pub is_silent: bool,
pub is_optional: bool,
}
#[async_trait]
pub trait MergeStrategy: Send + Sync {
fn name(&self) -> &str;
async fn merge(&self, results: Vec<QueryResults>) -> FusekiResult<QueryResults>;
}
pub struct ResultMerger {
pub strategies: HashMap<String, Arc<dyn MergeStrategy>>,
dedup_cache: Arc<RwLock<HashSet<u64>>>,
}
impl FederatedQueryOptimizer {
pub fn new(metrics: Arc<MetricsService>) -> Self {
Self {
endpoints: Arc::new(RwLock::new(EndpointRegistry::new())),
planner: Arc::new(QueryPlanner::new()),
cost_estimator: Arc::new(CostEstimator::new()),
executor: Arc::new(FederatedExecutor::new()),
merger: Arc::new(ResultMerger::new()),
metrics,
}
}
pub async fn process_federated_query(
&self,
query: &str,
timeout_ms: u64,
) -> FusekiResult<QueryResults> {
let start = Instant::now();
let service_patterns = self.extract_service_patterns(query)?;
if service_patterns.is_empty() {
return Err(FusekiError::bad_request("No SERVICE patterns found"));
}
self.check_endpoint_health(&service_patterns).await?;
let plan = self
.planner
.create_execution_plan(query, &service_patterns)
.await?;
let cost_estimate = self.cost_estimator.estimate_cost(&plan).await?;
histogram!("federated_query.estimated_cost").record(cost_estimate);
let results = timeout(
Duration::from_millis(timeout_ms),
self.executor.execute_plan(&plan),
)
.await
.map_err(|_| FusekiError::TimeoutWithMessage("Federated query timeout".into()))??;
let duration = start.elapsed();
histogram!("federated_query.execution_time").record(duration.as_millis() as f64);
counter!("federated_query.total").increment(1);
Ok(results)
}
pub fn extract_service_patterns(&self, query: &str) -> FusekiResult<Vec<ServicePattern>> {
let mut patterns = Vec::new();
let mut in_service = false;
let mut current_service = String::new();
let mut brace_count = 0;
let mut service_url = String::new();
for line in query.lines() {
let trimmed = line.trim();
if trimmed.starts_with("SERVICE") {
in_service = true;
if let Some(url_start) = trimmed.find('<') {
if let Some(url_end) = trimmed.find('>') {
service_url = trimmed[url_start + 1..url_end].to_string();
}
}
}
if in_service {
current_service.push_str(line);
current_service.push('\n');
for ch in trimmed.chars() {
match ch {
'{' => brace_count += 1,
'}' => {
brace_count -= 1;
if brace_count == 0 {
patterns.push(ServicePattern {
service_url: service_url.clone(),
pattern: current_service.clone(),
is_silent: current_service.contains("SILENT"),
is_optional: false,
});
in_service = false;
current_service.clear();
service_url.clear();
}
}
_ => {}
}
}
}
}
Ok(patterns)
}
async fn check_endpoint_health(&self, patterns: &[ServicePattern]) -> FusekiResult<()> {
let endpoints = self.endpoints.read().await;
let mut futures = FuturesUnordered::new();
for pattern in patterns {
let endpoint_url = pattern.service_url.clone();
if let Some(endpoint) = endpoints.endpoints.get(&endpoint_url) {
let health_check = endpoints.check_endpoint_health(endpoint.clone());
futures.push(health_check);
} else if !pattern.is_silent {
return Err(FusekiError::bad_request(format!(
"Unknown endpoint: {endpoint_url}"
)));
}
}
while let Some(result) = futures.next().await {
result?;
}
Ok(())
}
}
impl Default for EndpointRegistry {
fn default() -> Self {
Self::new()
}
}
impl EndpointRegistry {
pub fn new() -> Self {
Self {
endpoints: HashMap::new(),
health_cache: DashMap::new(),
discovery: Arc::new(EndpointDiscovery::new()),
}
}
pub fn register_endpoint(&mut self, endpoint: EndpointInfo) {
self.endpoints.insert(endpoint.url.clone(), endpoint);
}
pub async fn check_endpoint_health(&self, endpoint: EndpointInfo) -> FusekiResult<()> {
let client = ClientBuilder::new()
.timeout(Duration::from_millis(5000))
.build()
.map_err(|e| FusekiError::internal(format!("Client error: {e}")))?;
let start = Instant::now();
let response = client
.get(&endpoint.url)
.header("Accept", "application/sparql-results+json")
.query(&[("query", "ASK { ?s ?p ?o } LIMIT 1")])
.send()
.await;
let response_time = start.elapsed().as_millis() as u64;
match response {
Ok(resp) if resp.status() == StatusCode::OK => {
self.health_cache.insert(
endpoint.url.clone(),
HealthStatus {
is_healthy: true,
last_check: Instant::now(),
response_time_ms: response_time,
error_count: 0,
success_count: 1,
},
);
Ok(())
}
Ok(resp) => Err(FusekiError::bad_request(format!(
"Endpoint returned status: {}",
resp.status()
))),
Err(e) => {
self.health_cache.insert(
endpoint.url.clone(),
HealthStatus {
is_healthy: false,
last_check: Instant::now(),
response_time_ms: response_time,
error_count: 1,
success_count: 0,
},
);
Err(FusekiError::internal(format!("Health check failed: {e}")))
}
}
}
pub async fn discover_endpoints(&self) -> FusekiResult<Vec<EndpointInfo>> {
self.discovery.discover_from_catalogs().await
}
}
impl Default for EndpointDiscovery {
fn default() -> Self {
Self::new()
}
}
impl EndpointDiscovery {
pub fn new() -> Self {
Self {
client: Client::new(),
catalogs: vec![
"https://www.w3.org/wiki/SparqlEndpoints".to_string(),
"https://lod-cloud.net/endpoints".to_string(),
],
}
}
pub async fn discover_from_catalogs(&self) -> FusekiResult<Vec<EndpointInfo>> {
Ok(vec![])
}
}
impl Default for QueryPlanner {
fn default() -> Self {
Self::new()
}
}
impl QueryPlanner {
pub fn new() -> Self {
Self {
decomposition_rules: Self::create_decomposition_rules(),
join_optimizer: Arc::new(JoinOrderOptimizer::new()),
statistics: Arc::new(RwLock::new(FederationStatistics::new())),
}
}
fn create_decomposition_rules() -> Vec<DecompositionRule> {
vec![
DecompositionRule {
name: "TriplePatternDecomposition".to_string(),
pattern: "triple_pattern".to_string(),
applicability_check: Box::new(|query| {
query.contains("?s") && query.contains("?p") && query.contains("?o")
}),
decompose: Box::new(|_query| {
vec![]
}),
},
DecompositionRule {
name: "UnionDecomposition".to_string(),
pattern: "union".to_string(),
applicability_check: Box::new(|query| query.to_uppercase().contains("UNION")),
decompose: Box::new(|_query| {
vec![]
}),
},
DecompositionRule {
name: "OptionalDecomposition".to_string(),
pattern: "optional".to_string(),
applicability_check: Box::new(|query| query.to_uppercase().contains("OPTIONAL")),
decompose: Box::new(|_query| {
vec![]
}),
},
]
}
pub async fn create_execution_plan(
&self,
query: &str,
service_patterns: &[ServicePattern],
) -> FusekiResult<ExecutionPlan> {
let fragments = self.decompose_query(query, service_patterns)?;
let join_plan = self.join_optimizer.optimize_joins(&fragments).await?;
let required_endpoints: Vec<String> = service_patterns
.iter()
.map(|p| p.service_url.clone())
.collect();
let execution_steps: Vec<String> = fragments
.iter()
.enumerate()
.map(|(i, f)| {
format!(
"Execute fragment {} ({}) at {:?}",
i, f.sparql, f.target_endpoints
)
})
.collect();
let estimated_cost = fragments.iter().map(|f| f.estimated_cost).sum::<f64>();
Ok(ExecutionPlan {
query_id: uuid::Uuid::new_v4().to_string(),
fragments: fragments.clone(),
join_plan,
timeout_ms: 30000,
optimization_hints: HashMap::new(),
execution_steps,
estimated_cost,
resource_requirements: ResourceRequirements {
required_endpoints,
estimated_memory_mb: fragments.len() as f64 * 5.0,
estimated_cpu_cores: (fragments.len() as f64 / 2.0).max(1.0),
},
})
}
fn decompose_query(
&self,
query: &str,
service_patterns: &[ServicePattern],
) -> FusekiResult<Vec<QueryFragment>> {
let mut fragments = Vec::new();
for (idx, pattern) in service_patterns.iter().enumerate() {
fragments.push(QueryFragment {
fragment_id: format!("service_{idx}"),
sparql: pattern.pattern.clone(),
target_endpoints: vec![pattern.service_url.clone()],
dependencies: vec![],
estimated_cost: 1.0,
is_optional: pattern.is_optional,
});
}
for rule in &self.decomposition_rules {
if (rule.applicability_check)(query) {
let decomposed = (rule.decompose)(query);
fragments.extend(decomposed);
}
}
Ok(fragments)
}
}
impl Default for JoinOrderOptimizer {
fn default() -> Self {
Self::new()
}
}
impl JoinOrderOptimizer {
pub fn new() -> Self {
Self {
cost_model: Arc::new(JoinCostModel::new()),
dp_cache: DashMap::new(),
}
}
pub async fn optimize_joins(&self, fragments: &[QueryFragment]) -> FusekiResult<JoinPlan> {
let cache_key = self.compute_cache_key(fragments);
if let Some(cached_plan) = self.dp_cache.get(&cache_key) {
return Ok(cached_plan.clone());
}
let plan = self.compute_optimal_plan(fragments).await?;
self.dp_cache.insert(cache_key, plan.clone());
Ok(plan)
}
fn compute_cache_key(&self, fragments: &[QueryFragment]) -> String {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
for fragment in fragments {
std::hash::Hash::hash(&fragment.fragment_id, &mut hasher);
}
format!("{:x}", std::hash::Hasher::finish(&hasher))
}
async fn compute_optimal_plan(&self, fragments: &[QueryFragment]) -> FusekiResult<JoinPlan> {
let mut steps = Vec::new();
if fragments.len() > 1 {
for i in 0..fragments.len() - 1 {
steps.push(JoinStep {
operation: JoinOperation::HashJoin,
left_source: fragments[i].fragment_id.clone(),
right_source: fragments[i + 1].fragment_id.clone(),
output_destination: format!("join_{i}"),
});
}
}
Ok(JoinPlan {
steps,
estimated_cost: fragments.len() as f64,
estimated_time_ms: fragments.len() as u64 * 100,
})
}
}
impl Default for JoinCostModel {
fn default() -> Self {
Self::new()
}
}
impl JoinCostModel {
pub fn new() -> Self {
Self {
latency_map: DashMap::new(),
bandwidth_map: DashMap::new(),
}
}
}
impl Default for CostEstimator {
fn default() -> Self {
Self::new()
}
}
impl CostEstimator {
pub fn new() -> Self {
Self {
history: Arc::new(RwLock::new(QueryHistory::new())),
ml_model: None,
cardinality: Arc::new(CardinalityEstimator::new()),
}
}
pub async fn estimate_cost(&self, plan: &ExecutionPlan) -> FusekiResult<f64> {
let mut total_cost = 0.0;
for fragment in &plan.fragments {
let fragment_cost = self.estimate_fragment_cost(fragment).await?;
total_cost += fragment_cost;
}
for step in &plan.join_plan.steps {
let join_cost = self.estimate_join_cost(step).await?;
total_cost += join_cost;
}
Ok(total_cost)
}
async fn estimate_fragment_cost(&self, fragment: &QueryFragment) -> FusekiResult<f64> {
let history = self.history.read().await;
if let Some(stats) = history.patterns.get(&fragment.fragment_id) {
return Ok(stats.avg_execution_time);
}
let cardinality = self
.cardinality
.estimate_cardinality(&fragment.sparql)
.await?;
Ok(cardinality as f64 * 0.001) }
async fn estimate_join_cost(&self, step: &JoinStep) -> FusekiResult<f64> {
match step.operation {
JoinOperation::HashJoin => Ok(10.0),
JoinOperation::SortMergeJoin => Ok(20.0),
JoinOperation::NestedLoopJoin => Ok(100.0),
JoinOperation::BroadcastJoin => Ok(5.0),
JoinOperation::IndexJoin => Ok(2.0),
}
}
}
impl Default for QueryHistory {
fn default() -> Self {
Self::new()
}
}
impl QueryHistory {
pub fn new() -> Self {
Self {
executions: Vec::new(),
patterns: HashMap::new(),
}
}
}
impl Default for CardinalityEstimator {
fn default() -> Self {
Self::new()
}
}
impl CardinalityEstimator {
pub fn new() -> Self {
Self {
endpoint_stats: DashMap::new(),
histograms: DashMap::new(),
}
}
pub async fn estimate_cardinality(&self, query: &str) -> FusekiResult<u64> {
if query.contains("LIMIT") {
if let Some(limit_pos) = query.find("LIMIT") {
let limit_str = &query[limit_pos + 5..].trim();
let limit_val = if let Some(space_pos) = limit_str.find(' ') {
&limit_str[..space_pos]
} else {
limit_str
};
if let Ok(limit) = limit_val.parse::<u64>() {
return Ok(limit);
}
}
}
Ok(1000)
}
}
impl Default for FederationStatistics {
fn default() -> Self {
Self::new()
}
}
impl FederationStatistics {
pub fn new() -> Self {
Self {
query_stats: HashMap::new(),
endpoint_stats: HashMap::new(),
}
}
}
impl Default for FederatedExecutor {
fn default() -> Self {
Self::new()
}
}
impl FederatedExecutor {
pub fn new() -> Self {
Self {
client_pool: Arc::new(ClientPool::new()),
strategies: Self::create_execution_strategies(),
semaphore: Arc::new(Semaphore::new(10)),
retry_policy: Arc::new(RetryPolicy::default()),
}
}
fn create_execution_strategies() -> Vec<Arc<dyn ExecutionStrategy>> {
vec![
Arc::new(ParallelExecutionStrategy),
Arc::new(SequentialExecutionStrategy),
Arc::new(AdaptiveExecutionStrategy),
]
}
pub async fn execute_plan(&self, plan: &ExecutionPlan) -> FusekiResult<QueryResults> {
let strategy = self.select_strategy(plan);
strategy.execute(plan, self).await
}
fn select_strategy(&self, plan: &ExecutionPlan) -> Arc<dyn ExecutionStrategy> {
for strategy in &self.strategies {
if strategy.applicable(plan) {
return strategy.clone();
}
}
self.strategies[1].clone()
}
pub async fn execute_fragment(
&self,
fragment: &QueryFragment,
endpoint_url: &str,
) -> FusekiResult<QueryResults> {
let _permit = self
.semaphore
.acquire()
.await
.map_err(|_| FusekiError::internal("Semaphore error"))?;
let client = self.client_pool.get_client(endpoint_url).await?;
let mut retries = 0;
loop {
match self
.send_query(&client, endpoint_url, &fragment.sparql)
.await
{
Ok(results) => return Ok(results),
Err(_e) if retries < self.retry_policy.max_retries => {
retries += 1;
let backoff = self.calculate_backoff(retries);
tokio::time::sleep(backoff).await;
}
Err(e) => return Err(e),
}
}
}
async fn send_query(
&self,
client: &Client,
endpoint_url: &str,
query: &str,
) -> FusekiResult<QueryResults> {
let response = client
.post(endpoint_url)
.header("Content-Type", "application/sparql-query")
.header("Accept", "application/sparql-results+json")
.body(query.to_string())
.send()
.await
.map_err(|e| FusekiError::internal(format!("Request failed: {e}")))?;
if !response.status().is_success() {
return Err(FusekiError::bad_request(format!(
"Endpoint returned status: {}",
response.status()
)));
}
let json: serde_json::Value = response
.json()
.await
.map_err(|e| FusekiError::internal(format!("JSON parse error: {e}")))?;
let bindings = self.parse_sparql_results(json)?;
Ok(QueryResults {
bindings: bindings.clone(),
metadata: ResultMetadata {
total_execution_time_ms: 0,
endpoint_times: HashMap::new(),
result_count: bindings.len(),
partial_results: false,
},
})
}
fn parse_sparql_results(
&self,
json: serde_json::Value,
) -> FusekiResult<Vec<HashMap<String, serde_json::Value>>> {
let results = json
.get("results")
.and_then(|r| r.get("bindings"))
.and_then(|b| b.as_array())
.ok_or_else(|| FusekiError::internal("Invalid SPARQL results format"))?;
let mut bindings = Vec::new();
for result in results {
if let Some(obj) = result.as_object() {
let mut binding = HashMap::new();
for (var, value) in obj {
binding.insert(var.clone(), value.clone());
}
bindings.push(binding);
}
}
Ok(bindings)
}
pub fn calculate_backoff(&self, attempt: u32) -> Duration {
let backoff_ms = (self.retry_policy.initial_backoff_ms as f64
* self.retry_policy.exponential_base.powi(attempt as i32))
as u64;
Duration::from_millis(backoff_ms.min(self.retry_policy.max_backoff_ms))
}
}
impl Default for ClientPool {
fn default() -> Self {
Self::new()
}
}
impl ClientPool {
pub fn new() -> Self {
Self {
clients: DashMap::new(),
max_connections_per_endpoint: 10,
}
}
pub async fn get_client(&self, endpoint_url: &str) -> FusekiResult<Client> {
if let Some(client) = self.clients.get(endpoint_url) {
return Ok(client.clone());
}
let client = ClientBuilder::new()
.pool_max_idle_per_host(self.max_connections_per_endpoint)
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| FusekiError::internal(format!("Client creation failed: {e}")))?;
self.clients
.insert(endpoint_url.to_string(), client.clone());
Ok(client)
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 3,
initial_backoff_ms: 100,
max_backoff_ms: 5000,
exponential_base: 2.0,
}
}
}
struct ParallelExecutionStrategy;
#[async_trait]
impl ExecutionStrategy for ParallelExecutionStrategy {
fn name(&self) -> &str {
"ParallelExecution"
}
fn applicable(&self, plan: &ExecutionPlan) -> bool {
plan.fragments.len() > 1 && plan.fragments.iter().all(|f| f.dependencies.is_empty())
}
async fn execute(
&self,
plan: &ExecutionPlan,
executor: &FederatedExecutor,
) -> FusekiResult<QueryResults> {
let mut futures = FuturesUnordered::new();
for fragment in &plan.fragments {
for endpoint in &fragment.target_endpoints {
let fragment_clone = fragment.clone();
let endpoint_clone = endpoint.clone();
let executor_clone = executor;
futures.push(async move {
executor_clone
.execute_fragment(&fragment_clone, &endpoint_clone)
.await
});
}
}
let mut all_results = Vec::new();
while let Some(result) = futures.next().await {
all_results.push(result?);
}
ResultMerger::new().merge_results(all_results).await
}
}
struct SequentialExecutionStrategy;
#[async_trait]
impl ExecutionStrategy for SequentialExecutionStrategy {
fn name(&self) -> &str {
"SequentialExecution"
}
fn applicable(&self, _plan: &ExecutionPlan) -> bool {
true }
async fn execute(
&self,
plan: &ExecutionPlan,
executor: &FederatedExecutor,
) -> FusekiResult<QueryResults> {
let mut all_results = Vec::new();
for fragment in &plan.fragments {
for endpoint in &fragment.target_endpoints {
let result = executor.execute_fragment(fragment, endpoint).await?;
all_results.push(result);
}
}
ResultMerger::new().merge_results(all_results).await
}
}
struct AdaptiveExecutionStrategy;
#[async_trait]
impl ExecutionStrategy for AdaptiveExecutionStrategy {
fn name(&self) -> &str {
"AdaptiveExecution"
}
fn applicable(&self, plan: &ExecutionPlan) -> bool {
plan.fragments.len() > 2
}
async fn execute(
&self,
plan: &ExecutionPlan,
executor: &FederatedExecutor,
) -> FusekiResult<QueryResults> {
let independent: Vec<_> = plan
.fragments
.iter()
.filter(|f| f.dependencies.is_empty())
.collect();
let dependent: Vec<_> = plan
.fragments
.iter()
.filter(|f| !f.dependencies.is_empty())
.collect();
let mut all_results = Vec::new();
if !independent.is_empty() {
let mut futures = FuturesUnordered::new();
for fragment in independent {
for endpoint in &fragment.target_endpoints {
let fragment_clone = fragment.clone();
let endpoint_clone = endpoint.clone();
let executor_clone = executor;
futures.push(async move {
executor_clone
.execute_fragment(&fragment_clone, &endpoint_clone)
.await
});
}
}
while let Some(result) = futures.next().await {
all_results.push(result?);
}
}
for fragment in dependent {
for endpoint in &fragment.target_endpoints {
let result = executor.execute_fragment(fragment, endpoint).await?;
all_results.push(result);
}
}
ResultMerger::new().merge_results(all_results).await
}
}
impl Default for ResultMerger {
fn default() -> Self {
Self::new()
}
}
impl ResultMerger {
pub fn new() -> Self {
Self {
strategies: Self::create_merge_strategies(),
dedup_cache: Arc::new(RwLock::new(HashSet::new())),
}
}
fn create_merge_strategies() -> HashMap<String, Arc<dyn MergeStrategy>> {
let mut strategies = HashMap::new();
strategies.insert(
"union".to_string(),
Arc::new(UnionMergeStrategy) as Arc<dyn MergeStrategy>,
);
strategies.insert(
"join".to_string(),
Arc::new(JoinMergeStrategy) as Arc<dyn MergeStrategy>,
);
strategies.insert(
"distinct".to_string(),
Arc::new(DistinctMergeStrategy) as Arc<dyn MergeStrategy>,
);
strategies
}
pub async fn merge_results(&self, results: Vec<QueryResults>) -> FusekiResult<QueryResults> {
if results.is_empty() {
return Ok(QueryResults {
bindings: vec![],
metadata: ResultMetadata {
total_execution_time_ms: 0,
endpoint_times: HashMap::new(),
result_count: 0,
partial_results: false,
},
});
}
if results.len() == 1 {
return Ok(results
.into_iter()
.next()
.expect("results should not be empty after check"));
}
let strategy = self
.strategies
.get("union")
.expect("union strategy should be registered");
strategy.merge(results).await
}
}
struct UnionMergeStrategy;
#[async_trait]
impl MergeStrategy for UnionMergeStrategy {
fn name(&self) -> &str {
"UnionMerge"
}
async fn merge(&self, results: Vec<QueryResults>) -> FusekiResult<QueryResults> {
let mut merged_bindings = Vec::new();
let mut total_time = 0;
let mut endpoint_times = HashMap::new();
for result in results {
merged_bindings.extend(result.bindings);
total_time += result.metadata.total_execution_time_ms;
endpoint_times.extend(result.metadata.endpoint_times);
}
let result_count = merged_bindings.len();
Ok(QueryResults {
bindings: merged_bindings,
metadata: ResultMetadata {
total_execution_time_ms: total_time,
endpoint_times,
result_count,
partial_results: false,
},
})
}
}
struct JoinMergeStrategy;
#[async_trait]
impl MergeStrategy for JoinMergeStrategy {
fn name(&self) -> &str {
"JoinMerge"
}
async fn merge(&self, results: Vec<QueryResults>) -> FusekiResult<QueryResults> {
if results.len() != 2 {
return Err(FusekiError::internal("Join requires exactly 2 result sets"));
}
let left = &results[0];
let right = &results[1];
let mut joined_bindings = Vec::new();
for left_binding in &left.bindings {
for right_binding in &right.bindings {
let common_vars: Vec<_> = left_binding
.keys()
.filter(|k| right_binding.contains_key(*k))
.collect();
let mut match_found = true;
for var in &common_vars {
if left_binding.get(*var) != right_binding.get(*var) {
match_found = false;
break;
}
}
if match_found {
let mut merged = left_binding.clone();
for (k, v) in right_binding {
merged.entry(k.clone()).or_insert(v.clone());
}
joined_bindings.push(merged);
}
}
}
let result_count = joined_bindings.len();
Ok(QueryResults {
bindings: joined_bindings,
metadata: ResultMetadata {
total_execution_time_ms: left.metadata.total_execution_time_ms
+ right.metadata.total_execution_time_ms,
endpoint_times: {
let mut times = left.metadata.endpoint_times.clone();
times.extend(right.metadata.endpoint_times.clone());
times
},
result_count,
partial_results: false,
},
})
}
}
struct DistinctMergeStrategy;
#[async_trait]
impl MergeStrategy for DistinctMergeStrategy {
fn name(&self) -> &str {
"DistinctMerge"
}
async fn merge(&self, results: Vec<QueryResults>) -> FusekiResult<QueryResults> {
let mut seen = HashSet::new();
let mut distinct_bindings = Vec::new();
let mut total_time = 0;
let mut endpoint_times = HashMap::new();
for result in results {
for binding in result.bindings {
let hash = Self::hash_binding(&binding);
if seen.insert(hash) {
distinct_bindings.push(binding);
}
}
total_time += result.metadata.total_execution_time_ms;
endpoint_times.extend(result.metadata.endpoint_times);
}
let result_count = distinct_bindings.len();
Ok(QueryResults {
bindings: distinct_bindings,
metadata: ResultMetadata {
total_execution_time_ms: total_time,
endpoint_times,
result_count,
partial_results: false,
},
})
}
}
impl DistinctMergeStrategy {
fn hash_binding(binding: &HashMap<String, serde_json::Value>) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut items: Vec<_> = binding.iter().collect();
items.sort_by_key(|(k, _)| k.as_str());
for (k, v) in items {
k.hash(&mut hasher);
v.to_string().hash(&mut hasher);
}
hasher.finish()
}
}
mod uuid {
pub struct Uuid;
impl Uuid {
pub fn new_v4() -> Self {
Uuid
}
}
impl std::fmt::Display for Uuid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use scirs2_core::random::{Random, Rng};
let mut rng = Random::seed(42);
write!(
f,
"{:x}-{:x}-{:x}-{:x}",
rng.random_range(0..u32::MAX),
rng.random_range(0..u16::MAX),
rng.random_range(0..u16::MAX),
rng.random_range(0..u32::MAX)
)
}
}
}
mod rand {
pub fn random<T>() -> T
where
T: From<u32>,
{
T::from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_nanos() as u32,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_extract_service_patterns() {
let config = crate::config::MonitoringConfig {
metrics: crate::config::MetricsConfig {
enabled: false,
endpoint: "/metrics".to_string(),
port: Some(9000),
namespace: "oxirs_fuseki".to_string(),
collect_system_metrics: true,
histogram_buckets: vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
],
},
health_checks: crate::config::HealthCheckConfig {
enabled: false,
interval_secs: 30,
timeout_secs: 5,
checks: vec!["store".to_string(), "memory".to_string()],
},
tracing: crate::config::TracingConfig {
enabled: false,
endpoint: None,
service_name: "oxirs-fuseki".to_string(),
sample_rate: 0.1,
output: crate::config::TracingOutput::Stdout,
},
prometheus: Some(crate::config::PrometheusConfig {
enabled: false,
endpoint: "/metrics".to_string(),
port: Some(9090),
namespace: "oxirs_fuseki".to_string(),
job_name: "oxirs-fuseki".to_string(),
instance: "localhost:3030".to_string(),
scrape_interval_secs: 15,
timeout_secs: 10,
}),
};
let optimizer =
FederatedQueryOptimizer::new(Arc::new(MetricsService::new(config).unwrap()));
let query = r#"
PREFIX foaf: <http://xmlns.com/foaf/0.1/>
SELECT ?person ?name ?friend
WHERE {
?person foaf:name ?name .
SERVICE <http://example.org/sparql> {
?person foaf:knows ?friend .
}
}
"#;
let patterns = optimizer.extract_service_patterns(query).unwrap();
assert_eq!(patterns.len(), 1);
assert_eq!(patterns[0].service_url, "http://example.org/sparql");
assert!(!patterns[0].is_silent);
}
#[tokio::test]
async fn test_multiple_service_patterns() {
let config = crate::config::MonitoringConfig {
metrics: crate::config::MetricsConfig {
enabled: false,
endpoint: "/metrics".to_string(),
port: Some(9000),
namespace: "oxirs_fuseki".to_string(),
collect_system_metrics: true,
histogram_buckets: vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
],
},
health_checks: crate::config::HealthCheckConfig {
enabled: false,
interval_secs: 30,
timeout_secs: 5,
checks: vec!["store".to_string(), "memory".to_string()],
},
tracing: crate::config::TracingConfig {
enabled: false,
endpoint: None,
service_name: "oxirs-fuseki".to_string(),
sample_rate: 0.1,
output: crate::config::TracingOutput::Stdout,
},
prometheus: Some(crate::config::PrometheusConfig {
enabled: false,
endpoint: "/metrics".to_string(),
port: Some(9090),
namespace: "oxirs_fuseki".to_string(),
job_name: "oxirs-fuseki".to_string(),
instance: "localhost:3030".to_string(),
scrape_interval_secs: 15,
timeout_secs: 10,
}),
};
let optimizer =
FederatedQueryOptimizer::new(Arc::new(MetricsService::new(config).unwrap()));
let query = r#"
SELECT ?s ?p ?o
WHERE {
SERVICE <http://endpoint1.org/sparql> {
?s ?p ?o
}
SERVICE SILENT <http://endpoint2.org/sparql> {
?s ?p2 ?o2
}
}
"#;
let patterns = optimizer.extract_service_patterns(query).unwrap();
assert_eq!(patterns.len(), 2);
assert!(!patterns[0].is_silent);
assert!(patterns[1].is_silent);
}
#[tokio::test]
async fn test_query_planner() {
let planner = QueryPlanner::new();
let service_patterns = vec![ServicePattern {
service_url: "http://test.org/sparql".to_string(),
pattern: "?s ?p ?o".to_string(),
is_silent: false,
is_optional: false,
}];
let plan = planner
.create_execution_plan("SELECT * WHERE { ?s ?p ?o }", &service_patterns)
.await
.unwrap();
assert!(!plan.fragments.is_empty());
}
#[tokio::test]
async fn test_result_merger_union() {
let merger = ResultMerger::new();
let results = vec![
QueryResults {
bindings: vec![HashMap::from([(
"x".to_string(),
serde_json::json!("value1"),
)])],
metadata: ResultMetadata {
total_execution_time_ms: 100,
endpoint_times: HashMap::new(),
result_count: 1,
partial_results: false,
},
},
QueryResults {
bindings: vec![HashMap::from([(
"x".to_string(),
serde_json::json!("value2"),
)])],
metadata: ResultMetadata {
total_execution_time_ms: 150,
endpoint_times: HashMap::new(),
result_count: 1,
partial_results: false,
},
},
];
let merged = merger.merge_results(results).await.unwrap();
assert_eq!(merged.bindings.len(), 2);
assert_eq!(merged.metadata.total_execution_time_ms, 250);
}
}