use std::collections::HashMap;
use std::sync::Arc;
use crate::algebra::{Algebra, Term, TriplePattern};
use oxirs_core::model::NamedNode;
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct FederatedSelectivity {
pub estimated_cardinality: f64,
pub estimated_latency_ms: f64,
pub confidence: f64,
}
impl Default for FederatedSelectivity {
fn default() -> Self {
Self {
estimated_cardinality: 1_000.0,
estimated_latency_ms: 100.0,
confidence: 0.1,
}
}
}
pub trait SourceSelectivityProvider: Send + Sync {
fn endpoint_for_iri(&self, iri: &NamedNode) -> Option<String>;
fn pattern_selectivity(&self, endpoint: &str, pattern: &TriplePattern) -> FederatedSelectivity {
let _ = (endpoint, pattern);
FederatedSelectivity::default()
}
fn silent_default(&self, endpoint: &str) -> bool {
let _ = endpoint;
false
}
}
#[derive(Debug, Default, Clone)]
pub struct StaticSourceProvider {
entries: Vec<StaticSourceEntry>,
}
#[derive(Debug, Clone)]
struct StaticSourceEntry {
prefix: String,
endpoint: String,
selectivity: FederatedSelectivity,
silent: bool,
}
impl StaticSourceProvider {
pub fn new() -> Self {
Self::default()
}
pub fn register(
&mut self,
prefix: impl Into<String>,
endpoint: impl Into<String>,
selectivity: FederatedSelectivity,
) -> &mut Self {
self.entries.push(StaticSourceEntry {
prefix: prefix.into(),
endpoint: endpoint.into(),
selectivity,
silent: false,
});
self.sort_by_specificity();
self
}
pub fn register_silent(
&mut self,
prefix: impl Into<String>,
endpoint: impl Into<String>,
selectivity: FederatedSelectivity,
) -> &mut Self {
self.entries.push(StaticSourceEntry {
prefix: prefix.into(),
endpoint: endpoint.into(),
selectivity,
silent: true,
});
self.sort_by_specificity();
self
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
fn sort_by_specificity(&mut self) {
self.entries
.sort_by_key(|entry| std::cmp::Reverse(entry.prefix.len()));
}
fn lookup(&self, iri: &str) -> Option<&StaticSourceEntry> {
self.entries.iter().find(|e| iri.starts_with(&e.prefix))
}
}
impl SourceSelectivityProvider for StaticSourceProvider {
fn endpoint_for_iri(&self, iri: &NamedNode) -> Option<String> {
self.lookup(iri.as_str()).map(|e| e.endpoint.clone())
}
fn pattern_selectivity(&self, endpoint: &str, pattern: &TriplePattern) -> FederatedSelectivity {
for entry in &self.entries {
if entry.endpoint != endpoint {
continue;
}
if pattern_uses_prefix(pattern, &entry.prefix) {
return entry.selectivity;
}
}
FederatedSelectivity::default()
}
fn silent_default(&self, endpoint: &str) -> bool {
self.entries
.iter()
.find(|e| e.endpoint == endpoint)
.map(|e| e.silent)
.unwrap_or(false)
}
}
fn pattern_uses_prefix(pattern: &TriplePattern, prefix: &str) -> bool {
pattern_term_uses_prefix(&pattern.subject, prefix)
|| pattern_term_uses_prefix(&pattern.predicate, prefix)
|| pattern_term_uses_prefix(&pattern.object, prefix)
}
fn pattern_term_uses_prefix(term: &Term, prefix: &str) -> bool {
match term {
Term::Iri(node) => node.as_str().starts_with(prefix),
_ => false,
}
}
pub struct FederatedPlanner {
provider: Arc<dyn SourceSelectivityProvider>,
latency_weight: f64,
}
impl FederatedPlanner {
pub fn new(provider: Arc<dyn SourceSelectivityProvider>) -> Self {
Self {
provider,
latency_weight: 1.0,
}
}
pub fn with_latency_weight(mut self, weight: f64) -> Self {
self.latency_weight = weight;
self
}
pub fn plan(&self, algebra: Algebra) -> FederatedPlanOutcome {
let mut report = PlanReport::default();
let rewritten = self.rewrite(algebra, &mut report);
FederatedPlanOutcome {
algebra: rewritten,
endpoints_used: report.endpoints,
patterns_federated: report.patterns_federated,
}
}
fn rewrite(&self, algebra: Algebra, report: &mut PlanReport) -> Algebra {
match algebra {
Algebra::Bgp(patterns) => self.rewrite_bgp(patterns, report),
Algebra::Join { left, right } => Algebra::Join {
left: Box::new(self.rewrite(*left, report)),
right: Box::new(self.rewrite(*right, report)),
},
Algebra::LeftJoin {
left,
right,
filter,
} => Algebra::LeftJoin {
left: Box::new(self.rewrite(*left, report)),
right: Box::new(self.rewrite(*right, report)),
filter,
},
Algebra::Union { left, right } => Algebra::Union {
left: Box::new(self.rewrite(*left, report)),
right: Box::new(self.rewrite(*right, report)),
},
Algebra::Filter { pattern, condition } => Algebra::Filter {
pattern: Box::new(self.rewrite(*pattern, report)),
condition,
},
Algebra::Extend {
pattern,
variable,
expr,
} => Algebra::Extend {
pattern: Box::new(self.rewrite(*pattern, report)),
variable,
expr,
},
Algebra::Minus { left, right } => Algebra::Minus {
left: Box::new(self.rewrite(*left, report)),
right: Box::new(self.rewrite(*right, report)),
},
Algebra::Service {
endpoint,
pattern,
silent,
} => {
if let Term::Iri(node) = &endpoint {
report.endpoints.insert(node.as_str().to_string(), 1);
}
Algebra::Service {
endpoint,
pattern,
silent,
}
}
Algebra::Graph { graph, pattern } => Algebra::Graph {
graph,
pattern: Box::new(self.rewrite(*pattern, report)),
},
Algebra::Project { pattern, variables } => Algebra::Project {
pattern: Box::new(self.rewrite(*pattern, report)),
variables,
},
Algebra::Distinct { pattern } => Algebra::Distinct {
pattern: Box::new(self.rewrite(*pattern, report)),
},
Algebra::Reduced { pattern } => Algebra::Reduced {
pattern: Box::new(self.rewrite(*pattern, report)),
},
Algebra::Slice {
pattern,
offset,
limit,
} => Algebra::Slice {
pattern: Box::new(self.rewrite(*pattern, report)),
offset,
limit,
},
Algebra::OrderBy {
pattern,
conditions,
} => Algebra::OrderBy {
pattern: Box::new(self.rewrite(*pattern, report)),
conditions,
},
Algebra::Group {
pattern,
variables,
aggregates,
} => Algebra::Group {
pattern: Box::new(self.rewrite(*pattern, report)),
variables,
aggregates,
},
Algebra::Having { pattern, condition } => Algebra::Having {
pattern: Box::new(self.rewrite(*pattern, report)),
condition,
},
other => other,
}
}
fn rewrite_bgp(&self, patterns: Vec<TriplePattern>, report: &mut PlanReport) -> Algebra {
if patterns.is_empty() {
return Algebra::Bgp(patterns);
}
let mut local: Vec<TriplePattern> = Vec::new();
let mut federated: HashMap<String, Vec<TriplePattern>> = HashMap::new();
for pattern in patterns {
match self.choose_endpoint(&pattern) {
Some(endpoint) => {
federated.entry(endpoint).or_default().push(pattern);
}
None => local.push(pattern),
}
}
if federated.is_empty() {
return Algebra::Bgp(local);
}
let mut services: Vec<(String, Vec<TriplePattern>, f64)> = federated
.into_iter()
.map(|(endpoint, patterns)| {
let cost = self.endpoint_cost(&endpoint, &patterns);
(endpoint, patterns, cost)
})
.collect();
services.sort_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal));
report.patterns_federated += services.iter().map(|(_, p, _)| p.len()).sum::<usize>();
for (endpoint, patterns, _) in &services {
*report.endpoints.entry(endpoint.clone()).or_insert(0) += patterns.len();
}
let mut iter = services.into_iter().map(|(endpoint, patterns, _)| {
let endpoint_node = NamedNode::new_unchecked(&endpoint);
let silent = self.provider.silent_default(&endpoint);
Algebra::Service {
endpoint: Term::Iri(endpoint_node),
pattern: Box::new(Algebra::Bgp(patterns)),
silent,
}
});
let mut current = if local.is_empty() {
match iter.next() {
Some(node) => node,
None => Algebra::Bgp(Vec::new()),
}
} else {
Algebra::Bgp(local)
};
for service in iter {
current = Algebra::Join {
left: Box::new(current),
right: Box::new(service),
};
}
current
}
fn choose_endpoint(&self, pattern: &TriplePattern) -> Option<String> {
for term in [&pattern.predicate, &pattern.subject, &pattern.object] {
if let Term::Iri(node) = term {
if let Some(endpoint) = self.provider.endpoint_for_iri(node) {
return Some(endpoint);
}
}
}
None
}
fn endpoint_cost(&self, endpoint: &str, patterns: &[TriplePattern]) -> f64 {
let mut total_card = 0.0;
let mut max_latency = 0.0f64;
let mut min_confidence = 1.0f64;
for pattern in patterns {
let sel = self.provider.pattern_selectivity(endpoint, pattern);
total_card += sel.estimated_cardinality;
max_latency = max_latency.max(sel.estimated_latency_ms);
min_confidence = min_confidence.min(sel.confidence);
}
let confidence_penalty = if min_confidence > 0.0 {
1.0 / min_confidence
} else {
10.0
};
(total_card + self.latency_weight * max_latency) * confidence_penalty.sqrt()
}
}
#[derive(Debug, Clone)]
pub struct FederatedPlanOutcome {
pub algebra: Algebra,
pub endpoints_used: HashMap<String, usize>,
pub patterns_federated: usize,
}
impl FederatedPlanOutcome {
pub fn touched_federation(&self) -> bool {
self.patterns_federated > 0
}
}
#[derive(Debug, Default)]
struct PlanReport {
endpoints: HashMap<String, usize>,
patterns_federated: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::algebra::{Term, TriplePattern, Variable};
fn iri(s: &str) -> Term {
Term::Iri(NamedNode::new_unchecked(s))
}
fn var(name: &str) -> Term {
Term::Variable(Variable::new(name).expect("valid variable name"))
}
fn pattern(s: Term, p: Term, o: Term) -> TriplePattern {
TriplePattern {
subject: s,
predicate: p,
object: o,
}
}
fn dbpedia_provider() -> StaticSourceProvider {
let mut provider = StaticSourceProvider::new();
provider.register(
"http://dbpedia.org/",
"https://dbpedia.org/sparql",
FederatedSelectivity {
estimated_cardinality: 100.0,
estimated_latency_ms: 80.0,
confidence: 0.9,
},
);
provider
}
#[test]
fn test_pattern_with_known_predicate_emits_service() {
let provider = Arc::new(dbpedia_provider());
let planner = FederatedPlanner::new(provider);
let alg = Algebra::Bgp(vec![pattern(
var("s"),
iri("http://dbpedia.org/property/birthDate"),
var("o"),
)]);
let outcome = planner.plan(alg);
assert!(outcome.touched_federation());
assert_eq!(outcome.patterns_federated, 1);
assert!(outcome
.endpoints_used
.contains_key("https://dbpedia.org/sparql"));
match outcome.algebra {
Algebra::Service { endpoint, .. } => match endpoint {
Term::Iri(node) => assert_eq!(node.as_str(), "https://dbpedia.org/sparql"),
other => panic!("expected IRI endpoint, got {other:?}"),
},
other => panic!("expected Service node, got {other:?}"),
}
}
#[test]
fn test_local_only_pattern_passes_through() {
let provider = Arc::new(dbpedia_provider());
let planner = FederatedPlanner::new(provider);
let alg = Algebra::Bgp(vec![pattern(
iri("http://example.org/local/alice"),
iri("http://example.org/local/knows"),
var("friend"),
)]);
let outcome = planner.plan(alg.clone());
assert!(!outcome.touched_federation());
assert_eq!(outcome.algebra, alg);
}
#[test]
fn test_mixed_local_and_federated_emits_join() {
let provider = Arc::new(dbpedia_provider());
let planner = FederatedPlanner::new(provider);
let alg = Algebra::Bgp(vec![
pattern(
var("s"),
iri("http://example.org/local/labelOf"),
var("label"),
),
pattern(
var("s"),
iri("http://dbpedia.org/property/birthDate"),
var("date"),
),
]);
let outcome = planner.plan(alg);
assert!(outcome.touched_federation());
assert_eq!(outcome.patterns_federated, 1);
match outcome.algebra {
Algebra::Join { left, right } => {
assert!(matches!(*left, Algebra::Bgp(_)));
assert!(matches!(*right, Algebra::Service { .. }));
}
other => panic!("expected Join, got {other:?}"),
}
}
#[test]
fn test_two_federations_join_in_cost_order() {
let mut provider = StaticSourceProvider::new();
provider.register(
"http://cheap.example/",
"https://cheap.example/sparql",
FederatedSelectivity {
estimated_cardinality: 10.0,
estimated_latency_ms: 20.0,
confidence: 0.9,
},
);
provider.register(
"http://pricy.example/",
"https://pricy.example/sparql",
FederatedSelectivity {
estimated_cardinality: 10_000.0,
estimated_latency_ms: 500.0,
confidence: 0.9,
},
);
let planner = FederatedPlanner::new(Arc::new(provider));
let alg = Algebra::Bgp(vec![
pattern(
var("s"),
iri("http://pricy.example/data#predicate"),
var("o1"),
),
pattern(
var("s"),
iri("http://cheap.example/data#predicate"),
var("o2"),
),
]);
let outcome = planner.plan(alg);
assert_eq!(outcome.patterns_federated, 2);
match outcome.algebra {
Algebra::Join { left, right } => {
let extract_endpoint = |alg: &Algebra| -> Option<String> {
if let Algebra::Service {
endpoint: Term::Iri(node),
..
} = alg
{
return Some(node.as_str().to_string());
}
None
};
let left_ep = extract_endpoint(&left);
let right_ep = extract_endpoint(&right);
assert_eq!(left_ep.as_deref(), Some("https://cheap.example/sparql"));
assert_eq!(right_ep.as_deref(), Some("https://pricy.example/sparql"));
}
other => panic!("expected Join, got {other:?}"),
}
}
#[test]
fn test_silent_default_propagates() {
let mut provider = StaticSourceProvider::new();
provider.register_silent(
"http://flaky.example/",
"https://flaky.example/sparql",
FederatedSelectivity::default(),
);
let planner = FederatedPlanner::new(Arc::new(provider));
let alg = Algebra::Bgp(vec![pattern(
var("s"),
iri("http://flaky.example/data#p"),
var("o"),
)]);
let outcome = planner.plan(alg);
match outcome.algebra {
Algebra::Service { silent, .. } => assert!(silent),
other => panic!("expected Service, got {other:?}"),
}
}
#[test]
fn test_pre_existing_service_passes_through() {
let provider = Arc::new(dbpedia_provider());
let planner = FederatedPlanner::new(provider);
let original = Algebra::Service {
endpoint: iri("https://other.example/sparql"),
pattern: Box::new(Algebra::Bgp(vec![pattern(
var("s"),
iri("http://example.org/local/p"),
var("o"),
)])),
silent: true,
};
let outcome = planner.plan(original.clone());
assert_eq!(outcome.algebra, original);
assert!(outcome
.endpoints_used
.contains_key("https://other.example/sparql"));
}
#[test]
fn test_recursive_into_filter() {
let provider = Arc::new(dbpedia_provider());
let planner = FederatedPlanner::new(provider);
let inner = Algebra::Bgp(vec![pattern(
var("s"),
iri("http://dbpedia.org/property/birthDate"),
var("o"),
)]);
let alg = Algebra::Filter {
pattern: Box::new(inner),
condition: crate::algebra::Expression::Variable(
Variable::new("o").expect("valid variable"),
),
};
let outcome = planner.plan(alg);
assert!(outcome.touched_federation());
assert!(matches!(
outcome.algebra,
Algebra::Filter {
pattern: _,
condition: _,
}
));
}
#[test]
fn test_static_source_provider_longest_prefix_wins() {
let mut provider = StaticSourceProvider::new();
provider.register(
"http://example.org/",
"https://wide.example/sparql",
FederatedSelectivity::default(),
);
provider.register(
"http://example.org/specific/",
"https://narrow.example/sparql",
FederatedSelectivity::default(),
);
let iri_specific = NamedNode::new_unchecked("http://example.org/specific/foo");
let iri_wide = NamedNode::new_unchecked("http://example.org/foo");
assert_eq!(
provider.endpoint_for_iri(&iri_specific).as_deref(),
Some("https://narrow.example/sparql")
);
assert_eq!(
provider.endpoint_for_iri(&iri_wide).as_deref(),
Some("https://wide.example/sparql")
);
}
#[test]
fn test_outcome_default() {
let outcome = FederatedPlanOutcome {
algebra: Algebra::Bgp(vec![]),
endpoints_used: HashMap::new(),
patterns_federated: 0,
};
assert!(!outcome.touched_federation());
}
}