use async_trait::async_trait;
use futures;
use regex;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use tokio::sync::RwLock;
use url::Url;
use oxirs_arq::query::{Query, QueryType};
use crate::{
error::{FusekiError as Error, FusekiResult as Result},
federation::{
FederationConfig, ServiceCapabilities as EndpointCapabilities, ServiceEndpoint,
ServiceHealth, ServiceMetadata,
},
};
#[derive(Debug, Clone)]
pub struct FederatedQueryPlan {
pub query: Query,
pub steps: Vec<ExecutionStep>,
pub estimated_cost: QueryCost,
pub strategy: ExecutionStrategy,
}
#[derive(Debug, Clone)]
pub struct ExecutionStep {
pub id: String,
pub services: Vec<ServiceSelection>,
pub sub_query: Query,
pub dependencies: Vec<String>,
pub cost: QueryCost,
}
#[derive(Debug, Clone)]
pub struct ServiceSelection {
pub service_id: String,
pub service_url: Url,
pub score: f64,
pub is_primary: bool,
}
#[derive(Debug, Clone, Default)]
pub struct QueryCost {
pub result_size: Option<usize>,
pub execution_time: Option<Duration>,
pub network_cost: Option<f64>,
pub complexity: Option<f64>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ExecutionStrategy {
Sequential,
Parallel,
Adaptive,
}
#[derive(Debug, Clone)]
pub struct QueryPlanner {
config: FederationConfig,
endpoints: Arc<RwLock<HashMap<String, ServiceEndpoint>>>,
statistics: Arc<RwLock<QueryStatistics>>,
discovery_service: Arc<dyn ServiceDiscovery>,
cost_estimator: Arc<dyn CostEstimator>,
parallel_executor: Arc<ParallelServiceExecutor>,
}
#[async_trait::async_trait]
pub trait ServiceDiscovery: Send + Sync + std::fmt::Debug {
async fn discover_endpoints(&self) -> Result<Vec<ServiceEndpoint>>;
async fn get_capabilities(&self, endpoint_url: &str) -> Result<EndpointCapabilities>;
async fn check_health(&self, endpoint_url: &str) -> Result<ServiceHealth>;
}
#[async_trait::async_trait]
pub trait CostEstimator: Send + Sync + std::fmt::Debug {
async fn estimate_cost(&self, query: &Query, endpoint: &ServiceEndpoint) -> Result<QueryCost>;
async fn estimate_result_size(
&self,
pattern: &str,
endpoint: &ServiceEndpoint,
) -> Result<usize>;
async fn get_performance_stats(&self, endpoint: &ServiceEndpoint) -> Result<ServiceStatistics>;
}
#[derive(Debug)]
pub struct ParallelServiceExecutor {
max_concurrent: usize,
timeout: Duration,
retry_policy: RetryPolicy,
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_retries: usize,
pub initial_delay: Duration,
pub max_delay: Duration,
pub backoff_multiplier: f64,
}
#[derive(Debug, Default)]
struct QueryStatistics {
pattern_stats: HashMap<String, PatternStatistics>,
service_stats: HashMap<String, ServiceStatistics>,
}
#[derive(Debug, Clone)]
struct ServicePattern {
service_url: String,
pattern: String,
variables: Vec<String>,
}
#[derive(Debug, Clone, Default)]
pub struct PatternStatistics {
pub execution_count: usize,
pub average_execution_time: Duration,
pub average_result_size: usize,
pub success_rate: f64,
}
#[derive(Debug, Clone, Default)]
pub struct ServiceStatistics {
pub total_queries: usize,
pub successful_queries: usize,
pub average_response_time: Duration,
pub average_result_size: usize,
pub availability: f64,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug)]
pub struct DefaultServiceDiscovery {
http_client: reqwest::Client,
known_endpoints: HashSet<String>,
discovery_timeout: Duration,
}
#[derive(Debug)]
pub struct DefaultCostEstimator {
statistics: Arc<RwLock<QueryStatistics>>,
default_estimates: DefaultEstimates,
}
#[derive(Debug, Clone)]
pub struct DefaultEstimates {
pub default_execution_time: Duration,
pub default_result_size: usize,
pub default_network_cost: f64,
}
impl Default for DefaultServiceDiscovery {
fn default() -> Self {
Self::new()
}
}
impl DefaultServiceDiscovery {
pub fn new() -> Self {
Self {
http_client: reqwest::Client::new(),
known_endpoints: HashSet::new(),
discovery_timeout: Duration::from_secs(30),
}
}
pub fn add_known_endpoint(&mut self, endpoint: String) {
self.known_endpoints.insert(endpoint);
}
}
#[async_trait]
impl ServiceDiscovery for DefaultServiceDiscovery {
async fn discover_endpoints(&self) -> Result<Vec<ServiceEndpoint>> {
let mut discovered = Vec::new();
for endpoint_url in &self.known_endpoints {
match self.get_service_description(endpoint_url).await {
Ok(endpoint) => discovered.push(endpoint),
Err(e) => {
tracing::warn!("Failed to discover endpoint {}: {}", endpoint_url, e);
}
}
}
Ok(discovered)
}
async fn get_capabilities(&self, _endpoint_url: &str) -> Result<EndpointCapabilities> {
let _query = r#"
SELECT ?feature ?function WHERE {
?service a <http://www.w3.org/ns/sparql-service-description#Service> .
OPTIONAL { ?service <http://www.w3.org/ns/sparql-service-description#feature> ?feature }
OPTIONAL { ?service <http://www.w3.org/ns/sparql-service-description#extensionFunction> ?function }
}
"#;
Ok(EndpointCapabilities::default())
}
async fn check_health(&self, endpoint_url: &str) -> Result<ServiceHealth> {
let _start = std::time::Instant::now();
let health_query = "ASK { ?s ?p ?o }";
match self
.http_client
.get(endpoint_url)
.query(&[("query", health_query)])
.timeout(self.discovery_timeout)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
Ok(ServiceHealth::Healthy)
} else {
Ok(ServiceHealth::Degraded)
}
}
Err(_e) => Ok(ServiceHealth::Unhealthy),
}
}
}
impl DefaultServiceDiscovery {
async fn get_service_description(&self, endpoint_url: &str) -> Result<ServiceEndpoint> {
let url = Url::parse(endpoint_url).map_err(|e| Error::InvalidUrl(e.to_string()))?;
Ok(ServiceEndpoint {
url,
metadata: ServiceMetadata {
name: format!("Service at {endpoint_url}"),
description: None,
tags: vec![],
location: None,
version: None,
contact: None,
},
health: self.check_health(endpoint_url).await.unwrap_or_default(),
capabilities: EndpointCapabilities::default(),
})
}
}
impl Default for DefaultCostEstimator {
fn default() -> Self {
Self::new()
}
}
impl DefaultCostEstimator {
pub fn new() -> Self {
Self {
statistics: Arc::new(RwLock::new(QueryStatistics::default())),
default_estimates: DefaultEstimates {
default_execution_time: Duration::from_millis(1000),
default_result_size: 100,
default_network_cost: 1.0,
},
}
}
}
#[async_trait]
impl CostEstimator for DefaultCostEstimator {
async fn estimate_cost(&self, query: &Query, endpoint: &ServiceEndpoint) -> Result<QueryCost> {
let stats = self.statistics.read().await;
let pattern_key = format!("{:?}", query.query_type);
let cost = if let Some(pattern_stats) = stats.pattern_stats.get(&pattern_key) {
QueryCost {
result_size: Some(pattern_stats.average_result_size),
execution_time: Some(pattern_stats.average_execution_time),
network_cost: Some(
endpoint
.capabilities
.avg_response_time
.map(|d| d.as_millis() as f64)
.unwrap_or(1.0),
),
complexity: Some(self.calculate_complexity(query)),
}
} else {
QueryCost {
result_size: Some(self.default_estimates.default_result_size),
execution_time: Some(self.default_estimates.default_execution_time),
network_cost: Some(self.default_estimates.default_network_cost),
complexity: Some(self.calculate_complexity(query)),
}
};
Ok(cost)
}
async fn estimate_result_size(
&self,
pattern: &str,
_endpoint: &ServiceEndpoint,
) -> Result<usize> {
let stats = self.statistics.read().await;
Ok(stats
.pattern_stats
.get(pattern)
.map(|s| s.average_result_size)
.unwrap_or(self.default_estimates.default_result_size))
}
async fn get_performance_stats(&self, endpoint: &ServiceEndpoint) -> Result<ServiceStatistics> {
let stats = self.statistics.read().await;
Ok(stats
.service_stats
.get(&endpoint.url.to_string())
.cloned()
.unwrap_or_default())
}
}
impl DefaultCostEstimator {
fn calculate_complexity(&self, query: &Query) -> f64 {
match query.query_type {
QueryType::Select => 1.0,
QueryType::Construct => 2.0,
QueryType::Ask => 0.5,
QueryType::Describe => 1.5,
}
}
}
impl ParallelServiceExecutor {
pub fn new(max_concurrent: usize) -> Self {
Self {
max_concurrent,
timeout: Duration::from_secs(30),
retry_policy: RetryPolicy {
max_retries: 3,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
backoff_multiplier: 2.0,
},
}
}
pub async fn execute_parallel<T, F>(&self, mut requests: Vec<F>) -> Vec<Result<T>>
where
F: std::future::Future<Output = Result<T>> + Send,
T: Send,
{
use futures::stream::{FuturesUnordered, StreamExt};
let mut results = Vec::new();
while !requests.is_empty() {
let batch_size = self.max_concurrent.min(requests.len());
let batch: Vec<_> = requests.drain(..batch_size).collect();
let mut futures = FuturesUnordered::new();
for request in batch {
let timeout_future = tokio::time::timeout(self.timeout, request);
futures.push(async move {
match timeout_future.await {
Ok(result) => result,
Err(_) => Err(Error::TimeoutWithMessage(
"Service request timed out".to_string(),
)),
}
});
}
while let Some(result) = futures.next().await {
results.push(result);
}
}
results
}
pub async fn execute_with_retry<T, F>(&self, mut request_fn: F) -> Result<T>
where
F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>>,
T: Send,
{
let mut delay = self.retry_policy.initial_delay;
for attempt in 0..=self.retry_policy.max_retries {
match request_fn().await {
Ok(result) => return Ok(result),
Err(e) => {
if attempt == self.retry_policy.max_retries {
return Err(e);
}
tokio::time::sleep(delay).await;
delay = std::cmp::min(
Duration::from_millis(
(delay.as_millis() as f64 * self.retry_policy.backoff_multiplier)
as u64,
),
self.retry_policy.max_delay,
);
}
}
}
unreachable!()
}
}
impl QueryPlanner {
pub fn new(
config: FederationConfig,
discovery_service: Arc<dyn ServiceDiscovery>,
cost_estimator: Arc<dyn CostEstimator>,
) -> Self {
Self {
config,
endpoints: Arc::new(RwLock::new(HashMap::new())),
statistics: Arc::new(RwLock::new(QueryStatistics::default())),
discovery_service,
cost_estimator,
parallel_executor: Arc::new(ParallelServiceExecutor::new(4)), }
}
pub async fn plan_federated_query(&self, query: &Query) -> Result<FederatedQueryPlan> {
let endpoints = self.discovery_service.discover_endpoints().await?;
{
let mut endpoint_map = self.endpoints.write().await;
for endpoint in &endpoints {
endpoint_map.insert(endpoint.url.to_string(), endpoint.clone());
}
}
let mut endpoint_costs = Vec::new();
for endpoint in &endpoints {
let cost = self.cost_estimator.estimate_cost(query, endpoint).await?;
endpoint_costs.push((endpoint.clone(), cost));
}
endpoint_costs.sort_by(|a, b| {
let cost_a = a.1.execution_time.unwrap_or_default().as_millis() as f64
+ a.1.network_cost.unwrap_or(0.0);
let cost_b = b.1.execution_time.unwrap_or_default().as_millis() as f64
+ b.1.network_cost.unwrap_or(0.0);
cost_a
.partial_cmp(&cost_b)
.unwrap_or(std::cmp::Ordering::Equal)
});
let steps = self.create_execution_steps(query, &endpoint_costs).await?;
let total_cost = self.calculate_total_cost(&steps);
let strategy = self.determine_execution_strategy(&steps);
Ok(FederatedQueryPlan {
query: query.clone(),
steps,
estimated_cost: total_cost,
strategy,
})
}
pub async fn execute_plan(&self, plan: &FederatedQueryPlan) -> Result<Vec<QueryResult>> {
match plan.strategy {
ExecutionStrategy::Parallel => self.execute_parallel_plan(plan).await,
ExecutionStrategy::Sequential => self.execute_sequential_plan(plan).await,
ExecutionStrategy::Adaptive => self.execute_adaptive_plan(plan).await,
}
}
pub async fn refresh_endpoints(&self) -> Result<()> {
let discovered = self.discovery_service.discover_endpoints().await?;
let mut endpoint_map = self.endpoints.write().await;
let health_futures: Vec<_> = discovered
.iter()
.map(|endpoint| async {
let health = self
.discovery_service
.check_health(endpoint.url.as_str())
.await
.unwrap_or_default();
Ok((endpoint.url.to_string(), health))
})
.collect();
let health_results = self
.parallel_executor
.execute_parallel(health_futures)
.await;
for (mut endpoint, health_result) in discovered.into_iter().zip(health_results) {
if let Ok((_, health)) = health_result {
endpoint.health = health;
endpoint_map.insert(endpoint.url.to_string(), endpoint);
}
}
Ok(())
}
async fn create_execution_steps(
&self,
query: &Query,
endpoint_costs: &[(ServiceEndpoint, QueryCost)],
) -> Result<Vec<ExecutionStep>> {
if endpoint_costs.is_empty() {
return Err(Error::ServiceUnavailable {
message: "No available endpoints for query execution".to_string(),
});
}
let service_patterns = self.extract_service_patterns(query)?;
if !service_patterns.is_empty() {
self.create_service_delegation_steps(query, &service_patterns, endpoint_costs)
.await
} else {
self.create_partitioned_execution_steps(query, endpoint_costs)
.await
}
}
fn extract_service_patterns(&self, query: &Query) -> Result<Vec<ServicePattern>> {
let query_string = format!("{query:?}");
let mut patterns = Vec::new();
let service_regex =
regex::Regex::new(r"SERVICE\s*<([^>]+)>\s*\{([^}]+)\}").map_err(|e| Error::Parse {
message: format!("Regex error: {e}"),
})?;
for cap in service_regex.captures_iter(&query_string) {
if let (Some(service_url), Some(pattern)) = (cap.get(1), cap.get(2)) {
patterns.push(ServicePattern {
service_url: service_url.as_str().to_string(),
pattern: pattern.as_str().trim().to_string(),
variables: self.extract_variables_from_pattern(pattern.as_str()),
});
}
}
Ok(patterns)
}
fn extract_variables_from_pattern(&self, pattern: &str) -> Vec<String> {
let var_regex = regex::Regex::new(r"\?(\w+)").expect("regex pattern should be valid");
var_regex
.captures_iter(pattern)
.filter_map(|cap| cap.get(1).map(|m| m.as_str().to_string()))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect()
}
async fn create_service_delegation_steps(
&self,
query: &Query,
service_patterns: &[ServicePattern],
endpoint_costs: &[(ServiceEndpoint, QueryCost)],
) -> Result<Vec<ExecutionStep>> {
let mut steps = Vec::new();
for (idx, pattern) in service_patterns.iter().enumerate() {
let matching_endpoint = endpoint_costs
.iter()
.find(|(endpoint, _)| endpoint.url.as_str() == pattern.service_url)
.or_else(|| endpoint_costs.first()) .ok_or_else(|| Error::ServiceUnavailable {
message: format!("No endpoint available for SERVICE {}", pattern.service_url),
})?;
let sub_query = self.create_sub_query(&pattern.pattern)?;
let service_selection = ServiceSelection {
service_id: matching_endpoint.0.metadata.name.clone(),
service_url: Url::parse(&pattern.service_url).map_err(|e| Error::Parse {
message: format!("Invalid service URL: {e}"),
})?,
score: 1.0
/ (matching_endpoint
.1
.execution_time
.unwrap_or_default()
.as_millis() as f64
+ 1.0),
is_primary: true,
};
steps.push(ExecutionStep {
id: format!("service_step_{idx}"),
services: vec![service_selection],
sub_query,
dependencies: if idx > 0 {
vec![format!("service_step_{}", idx - 1)]
} else {
vec![]
},
cost: matching_endpoint.1.clone(),
});
}
if steps.len() > 1 {
let (best_endpoint, cost) = endpoint_costs
.first()
.expect("collection validated to be non-empty");
let final_step = ExecutionStep {
id: "final_combination".to_string(),
services: vec![ServiceSelection {
service_id: best_endpoint.metadata.name.clone(),
service_url: best_endpoint.url.clone(),
score: 1.0,
is_primary: true,
}],
sub_query: query.clone(),
dependencies: steps.iter().map(|s| s.id.clone()).collect(),
cost: cost.clone(),
};
steps.push(final_step);
}
Ok(steps)
}
async fn create_partitioned_execution_steps(
&self,
query: &Query,
endpoint_costs: &[(ServiceEndpoint, QueryCost)],
) -> Result<Vec<ExecutionStep>> {
if self.is_complex_query(query) && endpoint_costs.len() > 1 {
self.create_parallel_decomposition_steps(query, endpoint_costs)
.await
} else {
self.create_single_step(query, endpoint_costs).await
}
}
fn is_complex_query(&self, query: &Query) -> bool {
let query_string = format!("{query:?}");
query_string.matches("WHERE").count() > 1
|| query_string.contains("UNION")
|| query_string.contains("OPTIONAL")
|| query_string.contains("GROUP BY")
|| query_string.contains("ORDER BY")
|| query_string.matches("FILTER").count() > 1
}
async fn create_parallel_decomposition_steps(
&self,
query: &Query,
endpoint_costs: &[(ServiceEndpoint, QueryCost)],
) -> Result<Vec<ExecutionStep>> {
let mut steps = Vec::new();
let query_string = format!("{query:?}");
let graph_patterns = self.identify_graph_patterns(&query_string)?;
if graph_patterns.len() > 1 {
for (idx, pattern) in graph_patterns.iter().enumerate() {
if let Some((endpoint, cost)) = endpoint_costs.get(idx % endpoint_costs.len()) {
let sub_query = self.create_sub_query(pattern)?;
let service_selection = ServiceSelection {
service_id: endpoint.metadata.name.clone(),
service_url: endpoint.url.clone(),
score: 1.0
/ (cost.execution_time.unwrap_or_default().as_millis() as f64 + 1.0),
is_primary: true,
};
steps.push(ExecutionStep {
id: format!("parallel_step_{idx}"),
services: vec![service_selection],
sub_query,
dependencies: vec![], cost: cost.clone(),
});
}
}
if steps.len() > 1 {
let (best_endpoint, cost) = endpoint_costs
.first()
.expect("collection validated to be non-empty");
steps.push(ExecutionStep {
id: "merge_step".to_string(),
services: vec![ServiceSelection {
service_id: best_endpoint.metadata.name.clone(),
service_url: best_endpoint.url.clone(),
score: 1.0,
is_primary: true,
}],
sub_query: query.clone(),
dependencies: steps.iter().map(|s| s.id.clone()).collect(),
cost: cost.clone(),
});
}
} else {
steps = self.create_single_step(query, endpoint_costs).await?;
}
Ok(steps)
}
fn identify_graph_patterns(&self, query_string: &str) -> Result<Vec<String>> {
let mut patterns = Vec::new();
let lines: Vec<&str> = query_string.lines().collect();
let mut current_pattern = String::new();
let mut in_where = false;
for line in lines {
let trimmed = line.trim();
if trimmed.contains("WHERE") {
in_where = true;
continue;
}
if in_where {
if trimmed.starts_with('}') {
if !current_pattern.trim().is_empty() {
patterns.push(current_pattern.trim().to_string());
current_pattern.clear();
}
break;
} else if trimmed.contains('?') && trimmed.ends_with('.') {
current_pattern.push_str(trimmed);
current_pattern.push('\n');
if trimmed.matches('?').count() >= 2 {
patterns.push(current_pattern.trim().to_string());
current_pattern.clear();
}
}
}
}
if !current_pattern.trim().is_empty() {
patterns.push(current_pattern.trim().to_string());
}
if patterns.is_empty() {
patterns.push("?s ?p ?o".to_string()); }
Ok(patterns)
}
async fn create_single_step(
&self,
query: &Query,
endpoint_costs: &[(ServiceEndpoint, QueryCost)],
) -> Result<Vec<ExecutionStep>> {
let (best_endpoint, cost) =
endpoint_costs
.first()
.ok_or_else(|| Error::ServiceUnavailable {
message: "No endpoints available".to_string(),
})?;
let service_selection = ServiceSelection {
service_id: best_endpoint.metadata.name.clone(),
service_url: best_endpoint.url.clone(),
score: 1.0 / (cost.execution_time.unwrap_or_default().as_millis() as f64 + 1.0),
is_primary: true,
};
Ok(vec![ExecutionStep {
id: "single_step".to_string(),
services: vec![service_selection],
sub_query: query.clone(),
dependencies: vec![],
cost: cost.clone(),
}])
}
fn create_sub_query(&self, pattern: &str) -> Result<Query> {
let _sub_query_string = format!("SELECT * WHERE {{ {pattern} }}");
use oxirs_arq::query::QueryType;
use oxirs_arq::Algebra;
Ok(Query {
query_type: QueryType::Select,
select_variables: vec![],
where_clause: Algebra::Zero,
order_by: vec![],
group_by: vec![],
having: None,
limit: None,
offset: None,
distinct: false,
reduced: false,
construct_template: vec![],
prefixes: std::collections::HashMap::new(),
base_iri: None,
dataset: oxirs_arq::query::DatasetClause::default(),
})
}
fn calculate_total_cost(&self, steps: &[ExecutionStep]) -> QueryCost {
let mut total_cost = QueryCost::default();
for step in steps {
if let Some(exec_time) = step.cost.execution_time {
let current = total_cost.execution_time.unwrap_or_default();
total_cost.execution_time = Some(current + exec_time);
}
if let Some(result_size) = step.cost.result_size {
let current = total_cost.result_size.unwrap_or(0);
total_cost.result_size = Some(current + result_size);
}
if let Some(network_cost) = step.cost.network_cost {
let current = total_cost.network_cost.unwrap_or(0.0);
total_cost.network_cost = Some(current + network_cost);
}
}
total_cost
}
fn determine_execution_strategy(&self, steps: &[ExecutionStep]) -> ExecutionStrategy {
if steps.len() > 1 && steps.iter().all(|s| s.dependencies.is_empty()) {
ExecutionStrategy::Parallel
} else {
ExecutionStrategy::Sequential
}
}
async fn execute_parallel_plan(&self, plan: &FederatedQueryPlan) -> Result<Vec<QueryResult>> {
let execution_futures: Vec<_> = plan
.steps
.iter()
.map(|step| self.execute_step(step))
.collect();
let results = self
.parallel_executor
.execute_parallel(execution_futures)
.await;
let mut query_results = Vec::new();
for result in results {
query_results.push(result?);
}
Ok(query_results)
}
async fn execute_sequential_plan(&self, plan: &FederatedQueryPlan) -> Result<Vec<QueryResult>> {
let mut results = Vec::new();
for step in &plan.steps {
let result = self.execute_step(step).await?;
results.push(result);
}
Ok(results)
}
async fn execute_adaptive_plan(&self, plan: &FederatedQueryPlan) -> Result<Vec<QueryResult>> {
self.execute_parallel_plan(plan).await
}
async fn execute_step(&self, step: &ExecutionStep) -> Result<QueryResult> {
if let Some(primary_service) = step.services.iter().find(|s| s.is_primary) {
self.execute_service_query(primary_service, step).await
} else if !step.services.is_empty() {
self.execute_with_fallback(step).await
} else {
Err(Error::ServiceUnavailable {
message: "No services available for step execution".to_string(),
})
}
}
async fn execute_service_query(
&self,
service: &ServiceSelection,
step: &ExecutionStep,
) -> Result<QueryResult> {
let start_time = std::time::Instant::now();
let http_client = reqwest::Client::new();
let query_string = format!("{:?}", step.sub_query);
let json_result = self
.parallel_executor
.execute_with_retry(|| {
let client = http_client.clone();
let url = service.service_url.clone();
let query = query_string.clone();
Box::pin(async move {
let response = client
.post(url.as_str())
.header("Accept", "application/sparql-results+json")
.header("Content-Type", "application/x-www-form-urlencoded")
.form(&[("query", query.as_str())])
.timeout(Duration::from_secs(30))
.send()
.await
.map_err(|e| Error::NetworkError {
message: format!("HTTP request failed: {e}"),
})?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(Error::ServiceError {
message: format!("Service {url} returned HTTP {status}: {error_text}"),
});
}
response.json().await.map_err(|e| Error::Parse {
message: format!("Failed to parse JSON response: {e}"),
})
})
})
.await?;
let result = self.parse_sparql_results(json_result)?;
let execution_time = start_time.elapsed();
self.update_service_statistics(&service.service_id, execution_time, result.bindings.len())
.await;
Ok(QueryResult {
bindings: result.bindings,
variables: result.variables,
execution_time,
})
}
async fn execute_with_fallback(&self, step: &ExecutionStep) -> Result<QueryResult> {
let mut last_error = None;
let mut sorted_services = step.services.clone();
sorted_services.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
for service in &sorted_services {
match self.execute_service_query(service, step).await {
Ok(result) => {
tracing::info!(
"Successfully executed step {} on fallback service {}",
step.id,
service.service_id
);
return Ok(result);
}
Err(e) => {
tracing::warn!(
"Service {} failed for step {}: {}",
service.service_id,
step.id,
e
);
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or_else(|| Error::ServiceUnavailable {
message: format!("All fallback services failed for step {}", step.id),
}))
}
fn parse_sparql_results(&self, json: serde_json::Value) -> Result<QueryResult> {
let head = json.get("head").ok_or_else(|| Error::Parse {
message: "Missing 'head' in SPARQL results".to_string(),
})?;
let vars = head
.get("vars")
.and_then(|v| v.as_array())
.ok_or_else(|| Error::Parse {
message: "Missing 'vars' in SPARQL results".to_string(),
})?;
let variables: Vec<String> = vars
.iter()
.filter_map(|v| v.as_str())
.map(|s| s.to_string())
.collect();
let results = json.get("results").ok_or_else(|| Error::Parse {
message: "Missing 'results' in SPARQL results".to_string(),
})?;
let bindings_array = results
.get("bindings")
.and_then(|b| b.as_array())
.ok_or_else(|| Error::Parse {
message: "Missing 'bindings' in SPARQL results".to_string(),
})?;
let mut bindings = Vec::new();
for binding_obj in bindings_array {
if let Some(binding_map) = binding_obj.as_object() {
for (var, value_obj) in binding_map {
if let Some(value_map) = value_obj.as_object() {
let value = value_map
.get("value")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
bindings.push(QueryBinding {
variable: var.clone(),
value,
});
}
}
}
}
Ok(QueryResult {
bindings,
variables,
execution_time: Duration::from_millis(0), })
}
async fn update_service_statistics(
&self,
service_id: &str,
execution_time: Duration,
result_count: usize,
) {
let mut stats = self.statistics.write().await;
let service_stats = stats
.service_stats
.entry(service_id.to_string())
.or_insert_with(ServiceStatistics::default);
service_stats.total_queries += 1;
service_stats.successful_queries += 1;
let current_avg = service_stats.average_response_time.as_millis() as f64;
let new_time = execution_time.as_millis() as f64;
let total_queries = service_stats.total_queries as f64;
let new_avg = (current_avg * (total_queries - 1.0) + new_time) / total_queries;
service_stats.average_response_time = Duration::from_millis(new_avg as u64);
let current_avg_size = service_stats.average_result_size as f64;
let new_avg_size =
(current_avg_size * (total_queries - 1.0) + result_count as f64) / total_queries;
service_stats.average_result_size = new_avg_size as usize;
service_stats.availability =
service_stats.successful_queries as f64 / service_stats.total_queries as f64;
service_stats.last_updated = chrono::Utc::now();
tracing::debug!(
"Updated statistics for service {}: avg_time={}ms, avg_size={}, availability={:.2}%",
service_id,
service_stats.average_response_time.as_millis(),
service_stats.average_result_size,
service_stats.availability * 100.0
);
}
pub async fn update_statistics(
&self,
service_id: &str,
query_pattern: String,
result_count: usize,
execution_time: Duration,
success: bool,
) {
self.update_service_statistics(service_id, execution_time, result_count)
.await;
let mut stats = self.statistics.write().await;
let pattern_stats = stats
.pattern_stats
.entry(query_pattern)
.or_insert_with(PatternStatistics::default);
pattern_stats.execution_count += 1;
if success {
let count = pattern_stats.execution_count as f64;
let current_avg_time = pattern_stats.average_execution_time.as_millis() as f64;
let new_time = execution_time.as_millis() as f64;
let new_avg_time = (current_avg_time * (count - 1.0) + new_time) / count;
pattern_stats.average_execution_time = Duration::from_millis(new_avg_time as u64);
let current_avg_size = pattern_stats.average_result_size as f64;
let new_avg_size = (current_avg_size * (count - 1.0) + result_count as f64) / count;
pattern_stats.average_result_size = new_avg_size as usize;
pattern_stats.success_rate = (pattern_stats.success_rate * (count - 1.0) + 1.0) / count;
} else {
let count = pattern_stats.execution_count as f64;
pattern_stats.success_rate = (pattern_stats.success_rate * (count - 1.0)) / count;
}
}
pub async fn create_execution_plan(
&self,
query: &str,
service_patterns: &[crate::federated_query_optimizer::ServicePattern],
) -> Result<crate::federated_query_optimizer::ExecutionPlan> {
use crate::federated_query_optimizer::QueryPlanner as FederatedQueryPlanner;
let planner = FederatedQueryPlanner::new();
planner
.create_execution_plan(query, service_patterns)
.await
.map_err(|e| Error::QueryExecution {
message: format!("Failed to create execution plan: {}", e),
})
}
}
#[derive(Debug, Clone)]
pub struct QueryResult {
pub bindings: Vec<QueryBinding>,
pub variables: Vec<String>,
pub execution_time: Duration,
}
#[derive(Debug, Clone)]
pub struct QueryBinding {
pub variable: String,
pub value: String,
}