#[cfg(any(feature = "http", feature = "sparql", feature = "void"))]
use crate::context::ContextProvider;
#[cfg(any(feature = "http", feature = "sparql"))]
use crate::core::error::Result;
#[cfg(any(feature = "http", feature = "sparql"))]
use crate::core::query::Query;
#[cfg(any(feature = "http", feature = "sparql"))]
use crate::core::source::SourceRanking;
#[cfg(any(feature = "http", feature = "sparql", feature = "void"))]
use super::Router;
#[cfg(feature = "http")]
impl<C: ContextProvider> Router<C> {
#[cfg_attr(
feature = "observability",
tracing::instrument(
skip(self, query),
fields(
strategy = tracing::field::debug(strategy),
sources_count = tracing::field::Empty
)
)
)]
pub fn federated_query(
&self,
query: &Query,
strategy: crate::federation::AggregationStrategy,
) -> crate::core::error::Result<crate::federation::AggregatedResult> {
use crate::core::error::OxiRouterError;
use crate::federation::{Aggregator, Executor};
let sources_vec: Vec<crate::core::source::DataSource> =
self.sources.values().cloned().collect();
let plan = self.planner.plan(query, &sources_vec)?;
#[cfg(feature = "observability")]
{
tracing::Span::current().record("sources_count", plan.sub_plans.len());
}
let executor = Executor::new();
let mut results = Vec::with_capacity(plan.sub_plans.len());
for sub_plan in &plan.sub_plans {
let source = self
.sources
.get(&sub_plan.source_id)
.ok_or_else(|| OxiRouterError::SourceNotFound(sub_plan.source_id.clone()))?;
let result = executor.execute_single(&sub_plan.sub_query, source);
results.push(result);
}
let aggregation_result = Aggregator::new()
.with_strategy(strategy)
.aggregate(&results)
.inspect_err(|_e| {
#[cfg(feature = "observability")]
{
metrics::counter!("oxirouter.federation.execute.errors").increment(1);
}
})?;
Ok(aggregation_result)
}
pub fn route_and_execute(
&self,
query: &Query,
) -> crate::core::error::Result<Vec<crate::federation::QueryResult>> {
use crate::federation::Executor;
let ranking = self.route(query)?;
let sources: Vec<&crate::core::source::DataSource> = ranking
.sources
.iter()
.filter_map(|sel| self.sources.get(&sel.source_id))
.collect();
Executor::new().execute(query, &sources, &ranking)
}
}
#[cfg(feature = "sparql")]
impl<C: ContextProvider> Router<C> {
pub fn route_sparql(&self, sparql: &str) -> Result<SourceRanking> {
let query = Query::from_sparql(sparql)?;
self.route(&query)
}
pub fn route_sparql_and_log(&mut self, sparql: &str) -> Result<SourceRanking> {
let query = Query::from_sparql(sparql)?;
self.route_and_log(&query)
}
}
#[cfg(feature = "void")]
impl<C: ContextProvider> Router<C> {
pub fn register_from_void_ttl(&mut self, ttl: &str) -> crate::core::error::Result<()> {
let sources = crate::core::void::parse_oxirouter_ttl(ttl)?;
for source in sources {
self.add_source(source);
}
Ok(())
}
}