use futures::future::join_all;
use reqwest::Client;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::Semaphore, time::timeout};
use oxirs_arq::query::{Query, QueryType};
use oxirs_core::query::QueryResults;
use crate::{
error::{FusekiError, FusekiResult},
federation::{
health::HealthMonitor,
planner::{
ExecutionStep, ExecutionStrategy, FederatedQueryPlan, QueryPlanner, ServiceSelection,
},
FederationConfig,
},
};
#[derive(Debug)]
pub struct QueryResult {
pub results: QueryResults,
pub metadata: QueryMetadata,
}
#[derive(Debug, Clone, Default)]
pub struct QueryMetadata {
pub execution_time: Option<Duration>,
pub service_id: Option<String>,
pub result_count: usize,
}
impl QueryResult {
pub fn new_empty() -> Self {
Self {
results: QueryResults::Boolean(false),
metadata: QueryMetadata::default(),
}
}
pub fn size_hint(&self) -> usize {
self.metadata.result_count
}
}
pub struct FederatedExecutor {
config: FederationConfig,
http_client: Client,
semaphore: Arc<Semaphore>,
planner: Arc<QueryPlanner>,
health_monitor: Arc<HealthMonitor>,
}
#[derive(Debug)]
struct ExecutionContext {
plan: FederatedQueryPlan,
results: HashMap<String, QueryResult>,
completed_steps: HashSet<String>,
metrics: ExecutionMetrics,
}
#[derive(Debug, Default)]
struct ExecutionMetrics {
total_time: Option<Duration>,
step_times: HashMap<String, Duration>,
service_calls: u32,
failed_calls: u32,
bytes_transferred: u64,
}
impl FederatedExecutor {
pub fn new(
config: FederationConfig,
planner: Arc<QueryPlanner>,
health_monitor: Arc<HealthMonitor>,
) -> Self {
let max_concurrent = config.max_concurrent_requests;
Self {
http_client: Client::builder()
.timeout(config.request_timeout)
.pool_max_idle_per_host(max_concurrent)
.build()
.expect("HTTP client builder should succeed"),
semaphore: Arc::new(Semaphore::new(max_concurrent)),
config,
planner,
health_monitor,
}
}
pub async fn execute(&self, plan: FederatedQueryPlan) -> FusekiResult<QueryResult> {
let start = Instant::now();
let mut context = ExecutionContext {
plan: plan.clone(),
results: HashMap::new(),
completed_steps: HashSet::new(),
metrics: ExecutionMetrics::default(),
};
let result = match plan.strategy {
ExecutionStrategy::Sequential => self.execute_sequential(&mut context).await?,
ExecutionStrategy::Parallel => self.execute_parallel(&mut context).await?,
ExecutionStrategy::Adaptive => self.execute_adaptive(&mut context).await?,
};
context.metrics.total_time = Some(start.elapsed());
self.report_metrics(&context.metrics);
Ok(result)
}
async fn execute_sequential(
&self,
context: &mut ExecutionContext,
) -> FusekiResult<QueryResult> {
let mut final_result = QueryResult::new_empty();
let steps = context.plan.steps.clone();
for step in &steps {
let result = self.execute_step(step, context).await?;
context.completed_steps.insert(step.id.clone());
final_result = result;
}
Ok(final_result)
}
async fn execute_parallel(&self, context: &mut ExecutionContext) -> FusekiResult<QueryResult> {
let step_groups = self.group_steps_by_dependencies(&context.plan.steps);
let mut final_result = QueryResult::new_empty();
for group in step_groups {
let mut group_results = Vec::new();
let futures = group.into_iter().map(|step| {
let step = step.clone();
async move {
let _start = std::time::Instant::now();
let primary_service =
step.services.iter().find(|s| s.is_primary).ok_or_else(|| {
FusekiError::Configuration {
message: "No primary service for step".to_string(),
}
})?;
self.execute_on_service_standalone(primary_service, &step.sub_query)
.await
.map(|result| (step.id.clone(), result))
}
});
let results = join_all(futures).await;
for result in results {
match result {
Ok((step_id, res)) => {
context.completed_steps.insert(step_id);
group_results.push(res);
}
Err(e) => return Err(e),
}
}
if let Some(last_result) = group_results.pop() {
final_result = last_result;
}
}
Ok(final_result)
}
async fn execute_adaptive(&self, context: &mut ExecutionContext) -> FusekiResult<QueryResult> {
match self.execute_parallel(context).await {
Ok(result) => Ok(result),
Err(_) => {
tracing::warn!("Parallel execution failed, falling back to sequential");
context.results.clear();
context.metrics = ExecutionMetrics::default();
self.execute_sequential(context).await
}
}
}
async fn execute_step(
&self,
step: &ExecutionStep,
context: &mut ExecutionContext,
) -> FusekiResult<QueryResult> {
let start = Instant::now();
let primary_service = step.services.iter().find(|s| s.is_primary).ok_or_else(|| {
FusekiError::Configuration {
message: "No primary service for step".to_string(),
}
})?;
if !self
.health_monitor
.should_use_service(&primary_service.service_id)
.await
{
for service in &step.services {
if !service.is_primary
&& self
.health_monitor
.should_use_service(&service.service_id)
.await
{
return self
.execute_on_service(service, &step.sub_query, context)
.await;
}
}
return Err(FusekiError::ServiceUnavailable {
message: format!("All services unavailable for step {}", step.id),
});
}
let result = self
.execute_on_service(primary_service, &step.sub_query, context)
.await;
context
.metrics
.step_times
.insert(step.id.clone(), start.elapsed());
if let Ok(ref res) = result {
self.planner
.update_statistics(
&primary_service.service_id,
format!("step_{}", step.id),
res.size_hint(),
start.elapsed(),
true,
)
.await;
}
result
}
async fn execute_on_service_standalone(
&self,
service: &ServiceSelection,
query: &Query,
) -> FusekiResult<QueryResult> {
let _permit = self
.semaphore
.acquire()
.await
.map_err(|_| FusekiError::QueryExecution {
message: "Failed to acquire semaphore".to_string(),
})?;
let query_string = self.serialize_query(query)?;
let response = match timeout(
self.config.request_timeout,
self.http_client
.post(service.service_url.as_str())
.header("Content-Type", "application/sparql-query")
.header("Accept", self.get_accept_header(&query.query_type))
.body(query_string)
.send(),
)
.await
{
Ok(Ok(resp)) => resp,
Ok(Err(e)) => {
return Err(FusekiError::QueryExecution {
message: format!("Service request failed: {e}"),
});
}
Err(_) => {
return Err(FusekiError::QueryExecution {
message: "Service request timed out".to_string(),
});
}
};
if !response.status().is_success() {
return Err(FusekiError::QueryExecution {
message: format!("Service returned error: {}", response.status()),
});
}
self.parse_response(response, &query.query_type).await
}
async fn execute_on_service(
&self,
service: &ServiceSelection,
query: &Query,
context: &mut ExecutionContext,
) -> FusekiResult<QueryResult> {
let _permit = self
.semaphore
.acquire()
.await
.map_err(|_| FusekiError::QueryExecution {
message: "Failed to acquire semaphore".to_string(),
})?;
context.metrics.service_calls += 1;
let query_string = self.serialize_query(query)?;
let response = match timeout(
self.config.request_timeout,
self.http_client
.post(service.service_url.as_str())
.header("Content-Type", "application/sparql-query")
.header("Accept", self.get_accept_header(&query.query_type))
.body(query_string)
.send(),
)
.await
{
Ok(Ok(resp)) => resp,
Ok(Err(e)) => {
context.metrics.failed_calls += 1;
return Err(FusekiError::QueryExecution {
message: format!("Service request failed: {e}"),
});
}
Err(_) => {
context.metrics.failed_calls += 1;
return Err(FusekiError::QueryExecution {
message: "Service request timed out".to_string(),
});
}
};
if !response.status().is_success() {
context.metrics.failed_calls += 1;
return Err(FusekiError::QueryExecution {
message: format!("Service returned error: {}", response.status()),
});
}
if let Some(len) = response.content_length() {
context.metrics.bytes_transferred += len;
}
self.parse_response(response, &query.query_type).await
}
async fn parse_response(
&self,
response: reqwest::Response,
query_type: &QueryType,
) -> FusekiResult<QueryResult> {
let response_text = response
.text()
.await
.map_err(|e| FusekiError::QueryExecution {
message: format!("Failed to read response: {e}"),
})?;
let results = match query_type {
QueryType::Select => {
let json: serde_json::Value =
serde_json::from_str(&response_text).map_err(|e| {
FusekiError::QueryExecution {
message: format!("Invalid JSON response: {e}"),
}
})?;
let variable_bindings = self.parse_sparql_json_bindings(&json)?;
let solutions: Vec<oxirs_core::query::Solution> = variable_bindings
.into_iter()
.map(|vb| {
let mut solution = oxirs_core::query::Solution::new();
for var_name in vb.variables() {
if let Some(term) = vb.get(var_name) {
if let Ok(var) = oxirs_core::model::Variable::new(var_name) {
solution.bind(var, term.clone());
}
}
}
solution
})
.collect();
QueryResults::Solutions(solutions)
}
QueryType::Ask => {
let json: serde_json::Value =
serde_json::from_str(&response_text).map_err(|e| {
FusekiError::QueryExecution {
message: format!("Invalid JSON response: {e}"),
}
})?;
let boolean_result = json
.get("boolean")
.and_then(|b| b.as_bool())
.unwrap_or(false);
QueryResults::Boolean(boolean_result)
}
QueryType::Construct | QueryType::Describe => {
let quads = self.parse_graph_response(&response_text)?;
let triples: Vec<oxirs_core::model::Triple> = quads
.into_iter()
.map(|quad| {
oxirs_core::model::Triple::new(
quad.subject().clone(),
quad.predicate().clone(),
quad.object().clone(),
)
})
.collect();
QueryResults::Graph(triples)
}
};
let result_count = match &results {
QueryResults::Solutions(solutions) => solutions.len(),
QueryResults::Boolean(_) => 1,
QueryResults::Graph(graph) => graph.len(),
};
Ok(QueryResult {
results,
metadata: QueryMetadata {
execution_time: None, service_id: None,
result_count,
},
})
}
fn serialize_query(&self, query: &Query) -> FusekiResult<String> {
match query.to_string() {
query_str if !query_str.is_empty() => Ok(query_str),
_ => Err(FusekiError::QueryExecution {
message: "Failed to serialize query".to_string(),
}),
}
}
fn get_accept_header(&self, query_type: &QueryType) -> &'static str {
match query_type {
QueryType::Select | QueryType::Ask => "application/sparql-results+json",
QueryType::Construct | QueryType::Describe => "application/n-triples",
}
}
fn group_steps_by_dependencies(&self, steps: &[ExecutionStep]) -> Vec<Vec<ExecutionStep>> {
let mut groups = Vec::new();
let mut remaining_steps: HashMap<String, ExecutionStep> = steps
.iter()
.map(|step| (step.id.clone(), step.clone()))
.collect();
let mut processed = std::collections::HashSet::new();
while !remaining_steps.is_empty() {
let mut current_group = Vec::new();
let ready_steps: Vec<String> = remaining_steps
.keys()
.filter(|step_id| {
remaining_steps
.get(*step_id)
.map(|step| step.dependencies.iter().all(|dep| processed.contains(dep)))
.unwrap_or(false)
})
.cloned()
.collect();
if ready_steps.is_empty() {
if let Some((first_id, _)) = remaining_steps.iter().next() {
let first_id = first_id.clone();
if let Some(step) = remaining_steps.remove(&first_id) {
current_group.push(step);
processed.insert(first_id);
}
}
} else {
for step_id in ready_steps {
if let Some(step) = remaining_steps.remove(&step_id) {
current_group.push(step);
processed.insert(step_id);
}
}
}
if !current_group.is_empty() {
groups.push(current_group);
} else {
break;
}
}
groups
}
fn parse_sparql_json_bindings(
&self,
json: &serde_json::Value,
) -> FusekiResult<Vec<oxirs_core::rdf_store::VariableBinding>> {
let bindings_array = json
.get("results")
.and_then(|r| r.get("bindings"))
.and_then(|b| b.as_array())
.ok_or_else(|| FusekiError::QueryExecution {
message: "Invalid SPARQL JSON format: missing results.bindings".to_string(),
})?;
let mut solutions = Vec::new();
for binding_obj in bindings_array {
let mut variable_binding = oxirs_core::rdf_store::VariableBinding::new();
if let Some(binding_map) = binding_obj.as_object() {
for (var_name, term_obj) in binding_map {
if let Some(term) = self.parse_sparql_json_term(term_obj)? {
variable_binding.bind(var_name.clone(), term);
}
}
}
solutions.push(variable_binding);
}
Ok(solutions)
}
fn parse_sparql_json_term(
&self,
term_obj: &serde_json::Value,
) -> FusekiResult<Option<oxirs_core::model::Term>> {
let term_type = term_obj
.get("type")
.and_then(|t| t.as_str())
.ok_or_else(|| FusekiError::QueryExecution {
message: "Invalid SPARQL JSON term: missing type".to_string(),
})?;
let value = term_obj
.get("value")
.and_then(|v| v.as_str())
.ok_or_else(|| FusekiError::QueryExecution {
message: "Invalid SPARQL JSON term: missing value".to_string(),
})?;
let term = match term_type {
"uri" => oxirs_core::model::Term::NamedNode(
oxirs_core::model::NamedNode::new(value)
.expect("IRI from remote endpoint should be valid"),
),
"bnode" => oxirs_core::model::Term::BlankNode(
oxirs_core::model::BlankNode::new(value)
.expect("blank node from remote endpoint should be valid"),
),
"literal" => {
let language = term_obj
.get("xml:lang")
.and_then(|l| l.as_str())
.map(|s| s.to_string());
let datatype = term_obj.get("datatype").and_then(|d| d.as_str()).map(|s| {
oxirs_core::model::NamedNode::new(s).expect("graph IRI should be valid")
});
oxirs_core::model::Term::Literal(if let Some(lang) = language {
oxirs_core::model::Literal::new_language_tagged_literal(value, lang)
.expect("language-tagged literal should be valid")
} else if let Some(dt) = datatype {
oxirs_core::model::Literal::new_typed_literal(value, dt)
} else {
oxirs_core::model::Literal::new_simple_literal(value)
})
}
_ => {
return Err(FusekiError::QueryExecution {
message: format!("Unknown SPARQL JSON term type: {}", term_type),
});
}
};
Ok(Some(term))
}
fn parse_graph_response(
&self,
response_text: &str,
) -> FusekiResult<Vec<oxirs_core::model::Quad>> {
let mut quads = Vec::new();
for line in response_text.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if line.ends_with('.') {
if let Some(quad) = self.parse_ntriples_line(line)? {
quads.push(quad);
}
}
}
Ok(quads)
}
fn parse_ntriples_line(&self, line: &str) -> FusekiResult<Option<oxirs_core::model::Quad>> {
let line = line.trim_end_matches('.');
let parts = self.split_ntriples_terms(line)?;
if parts.len() < 3 {
return Ok(None);
}
let subject = self.parse_ntriples_term(&parts[0])?;
let predicate = self.parse_ntriples_term(&parts[1])?;
let object = self.parse_ntriples_term(&parts[2])?;
let graph = oxirs_core::model::GraphName::NamedNode(
oxirs_core::model::NamedNode::new("http://default-graph")
.expect("hardcoded default graph IRI should be valid"),
);
let subject_pos: oxirs_core::model::Subject =
subject
.try_into()
.map_err(|_| FusekiError::QueryExecution {
message: "Invalid subject term".to_string(),
})?;
let predicate_pos: oxirs_core::model::Predicate =
predicate
.try_into()
.map_err(|_| FusekiError::QueryExecution {
message: "Invalid predicate term".to_string(),
})?;
let object_pos: oxirs_core::model::Object = object.into();
Ok(Some(oxirs_core::model::Quad::new(
subject_pos,
predicate_pos,
object_pos,
graph,
)))
}
fn split_ntriples_terms(&self, line: &str) -> FusekiResult<Vec<String>> {
let mut terms = Vec::new();
let mut current_term = String::new();
let mut in_quotes = false;
let chars = line.chars();
for ch in chars {
match ch {
'"' if !in_quotes => {
in_quotes = true;
current_term.push(ch);
}
'"' if in_quotes => {
in_quotes = false;
current_term.push(ch);
}
' ' | '\t' if !in_quotes => {
if !current_term.is_empty() {
terms.push(current_term.trim().to_string());
current_term.clear();
}
}
_ => {
current_term.push(ch);
}
}
}
if !current_term.is_empty() {
terms.push(current_term.trim().to_string());
}
Ok(terms)
}
fn parse_ntriples_term(&self, term_str: &str) -> FusekiResult<oxirs_core::model::Term> {
let term_str = term_str.trim();
if term_str.starts_with('<') && term_str.ends_with('>') {
let iri = &term_str[1..term_str.len() - 1];
Ok(oxirs_core::model::Term::NamedNode(
oxirs_core::model::NamedNode::new(iri).expect("IRI from N-Triples should be valid"),
))
} else if let Some(bnode) = term_str.strip_prefix("_:") {
Ok(oxirs_core::model::Term::BlankNode(
oxirs_core::model::BlankNode::new(bnode)
.expect("blank node from N-Triples should be valid"),
))
} else if term_str.starts_with('"') {
self.parse_ntriples_literal(term_str)
} else {
Err(FusekiError::QueryExecution {
message: format!("Invalid N-Triples term: {}", term_str),
})
}
}
fn parse_ntriples_literal(&self, literal_str: &str) -> FusekiResult<oxirs_core::model::Term> {
if let Some(end_quote) = literal_str[1..].find('"') {
let value = &literal_str[1..end_quote + 1];
let rest = &literal_str[end_quote + 2..];
if let Some(lang) = rest.strip_prefix('@') {
Ok(oxirs_core::model::Term::Literal(
oxirs_core::model::Literal::new_language_tagged_literal(value, lang)
.expect("language-tagged literal should be valid"),
))
} else if let Some(datatype) = rest.strip_prefix("^^") {
let datatype = if datatype.starts_with('<') && datatype.ends_with('>') {
&datatype[1..datatype.len() - 1]
} else {
datatype
};
Ok(oxirs_core::model::Term::Literal(
oxirs_core::model::Literal::new_typed_literal(
value,
oxirs_core::model::NamedNode::new(datatype)
.expect("datatype IRI from remote endpoint should be valid"),
),
))
} else {
Ok(oxirs_core::model::Term::Literal(
oxirs_core::model::Literal::new_simple_literal(value),
))
}
} else {
Err(FusekiError::QueryExecution {
message: format!("Invalid N-Triples literal: {}", literal_str),
})
}
}
fn report_metrics(&self, metrics: &ExecutionMetrics) {
tracing::info!(
"Federated query executed in {:?} with {} service calls ({} failed)",
metrics.total_time.unwrap_or_default(),
metrics.service_calls,
metrics.failed_calls
);
if !metrics.step_times.is_empty() {
tracing::debug!("Step execution times: {:?}", metrics.step_times);
}
if metrics.bytes_transferred > 0 {
tracing::debug!("Bytes transferred: {}", metrics.bytes_transferred);
}
}
}
pub struct FederatedResultMerger {
merge_strategy: FederatedMergeStrategy,
}
#[derive(Debug, Clone)]
pub enum FederatedMergeStrategy {
Union,
Intersection,
Join(Vec<String>),
Custom,
}
impl FederatedResultMerger {
pub fn new(strategy: FederatedMergeStrategy) -> Self {
Self {
merge_strategy: strategy,
}
}
pub async fn merge(&self, results: Vec<QueryResult>) -> FusekiResult<QueryResult> {
if results.is_empty() {
return Ok(QueryResult::new_empty());
}
if results.len() == 1 {
return Ok(results
.into_iter()
.next()
.expect("results should not be empty after non_empty check"));
}
match &self.merge_strategy {
FederatedMergeStrategy::Union => self.merge_union(results).await,
FederatedMergeStrategy::Intersection => self.merge_intersection(results).await,
FederatedMergeStrategy::Join(vars) => self.merge_join(results, vars).await,
FederatedMergeStrategy::Custom => Err(FusekiError::QueryExecution {
message: "Custom merge not implemented".to_string(),
}),
}
}
async fn merge_union(&self, mut results: Vec<QueryResult>) -> FusekiResult<QueryResult> {
use oxirs_core::query::Solution;
use std::collections::HashSet;
if results.is_empty() {
return Ok(QueryResult::new_empty());
}
if results.len() == 1 {
return Ok(results
.pop()
.expect("results should not be empty after non_empty check"));
}
let mut base = results.remove(0);
for result in results {
match (&mut base.results, &result.results) {
(QueryResults::Boolean(ref mut b1), QueryResults::Boolean(b2)) => {
*b1 = *b1 || *b2;
}
(QueryResults::Solutions(ref mut sols1), QueryResults::Solutions(sols2)) => {
let mut seen = HashSet::new();
for sol in sols1.iter() {
seen.insert(format!("{:?}", sol)); }
for sol in sols2 {
let key = format!("{:?}", sol);
if !seen.contains(&key) {
sols1.push(sol.clone());
seen.insert(key);
}
}
base.metadata.result_count = sols1.len();
}
(QueryResults::Graph(ref mut triples1), QueryResults::Graph(triples2)) => {
let mut seen = HashSet::new();
for triple in triples1.iter() {
seen.insert(format!("{:?}", triple));
}
for triple in triples2 {
let key = format!("{:?}", triple);
if !seen.contains(&key) {
triples1.push(triple.clone());
seen.insert(key);
}
}
base.metadata.result_count = triples1.len();
}
_ => {
return Err(FusekiError::QueryExecution {
message: "Cannot union incompatible result types".to_string(),
});
}
}
if let (Some(t1), Some(t2)) =
(base.metadata.execution_time, result.metadata.execution_time)
{
base.metadata.execution_time = Some(t1 + t2);
}
}
Ok(base)
}
async fn merge_intersection(&self, mut results: Vec<QueryResult>) -> FusekiResult<QueryResult> {
use std::collections::HashSet;
if results.is_empty() {
return Ok(QueryResult::new_empty());
}
if results.len() == 1 {
return Ok(results
.pop()
.expect("results should not be empty after non_empty check"));
}
let mut base = results.remove(0);
for result in results {
match (&mut base.results, &result.results) {
(QueryResults::Boolean(ref mut b1), QueryResults::Boolean(b2)) => {
*b1 = *b1 && *b2;
}
(QueryResults::Solutions(ref mut sols1), QueryResults::Solutions(sols2)) => {
let set2: HashSet<String> =
sols2.iter().map(|sol| format!("{:?}", sol)).collect();
sols1.retain(|sol| {
let key = format!("{:?}", sol);
set2.contains(&key)
});
base.metadata.result_count = sols1.len();
}
(QueryResults::Graph(ref mut triples1), QueryResults::Graph(triples2)) => {
let set2: HashSet<String> = triples2
.iter()
.map(|triple| format!("{:?}", triple))
.collect();
triples1.retain(|triple| {
let key = format!("{:?}", triple);
set2.contains(&key)
});
base.metadata.result_count = triples1.len();
}
_ => {
return Err(FusekiError::QueryExecution {
message: "Cannot intersect incompatible result types".to_string(),
});
}
}
if let (Some(t1), Some(t2)) =
(base.metadata.execution_time, result.metadata.execution_time)
{
base.metadata.execution_time = Some(t1 + t2);
}
}
Ok(base)
}
async fn merge_join(
&self,
mut results: Vec<QueryResult>,
join_vars: &[String],
) -> FusekiResult<QueryResult> {
use oxirs_core::model::Variable;
use std::collections::HashMap;
if results.is_empty() {
return Ok(QueryResult::new_empty());
}
if results.len() == 1 {
return Ok(results
.pop()
.expect("results should not be empty after non_empty check"));
}
let mut base = results.remove(0);
if let QueryResults::Solutions(ref mut sols1) = base.results {
for result in results {
if let QueryResults::Solutions(sols2) = result.results {
let mut joined_solutions = Vec::new();
let join_variables: Vec<Variable> = join_vars
.iter()
.map(|v| {
Variable::new(v).unwrap_or_else(|_| {
Variable::new("_").expect("underscore variable should be valid")
})
})
.collect();
let actual_join_vars = if join_variables.is_empty() {
Vec::new() } else {
join_variables
};
for sol1 in sols1.iter() {
for sol2 in &sols2 {
let mut matches = true;
if !actual_join_vars.is_empty() {
for join_var in &actual_join_vars {
let val1_opt = sol1.get(join_var);
let val2_opt = sol2.get(join_var);
match (val1_opt, val2_opt) {
(Some(v1), Some(v2)) if v1 == v2 => {
}
(None, None) => {
}
_ => {
matches = false;
break;
}
}
}
}
if matches {
if let Some(merged) = sol1.merge(sol2) {
joined_solutions.push(merged);
}
}
}
}
*sols1 = joined_solutions;
base.metadata.result_count = sols1.len();
if let (Some(t1), Some(t2)) =
(base.metadata.execution_time, result.metadata.execution_time)
{
base.metadata.execution_time = Some(t1 + t2);
}
} else {
return Err(FusekiError::QueryExecution {
message: "Can only join Solutions result types".to_string(),
});
}
}
Ok(base)
} else {
Err(FusekiError::QueryExecution {
message: "Join operation requires Solutions result type".to_string(),
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_result_merger() {
let _merger = FederatedResultMerger::new(FederatedMergeStrategy::Union);
}
}