1use crate::algebra::{Algebra, Binding, Expression, Solution, Term, Variable};
11use anyhow::{anyhow, Result};
12use dashmap::DashMap;
13use reqwest;
14use scirs2_core::metrics::{Counter, Timer};
15use scirs2_core::random::{rng, Rng}; use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::Semaphore;
20
21#[derive(Debug, Clone)]
23pub struct FederationConfig {
24 pub max_concurrent_requests: usize,
26 pub request_timeout: Duration,
28 pub max_retries: usize,
30 pub retry_base_delay: Duration,
32 pub retry_max_delay: Duration,
34 pub enable_caching: bool,
36 pub cache_ttl: Duration,
38 pub connection_pool_size: usize,
40 pub enable_health_monitoring: bool,
42 pub health_check_interval: Duration,
44 pub load_balancing: LoadBalancingStrategy,
46 pub enable_query_decomposition: bool,
48}
49
50impl Default for FederationConfig {
51 fn default() -> Self {
52 Self {
53 max_concurrent_requests: 32,
54 request_timeout: Duration::from_secs(60),
55 max_retries: 3,
56 retry_base_delay: Duration::from_millis(100),
57 retry_max_delay: Duration::from_secs(10),
58 enable_caching: true,
59 cache_ttl: Duration::from_secs(300),
60 connection_pool_size: 8,
61 enable_health_monitoring: true,
62 health_check_interval: Duration::from_secs(30),
63 load_balancing: LoadBalancingStrategy::LeastLoaded,
64 enable_query_decomposition: true,
65 }
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum LoadBalancingStrategy {
72 RoundRobin,
74 Random,
76 LeastLoaded,
78 FastestResponse,
80 Adaptive,
82}
83
84pub struct FederationExecutor {
86 config: FederationConfig,
87 connection_pools: Arc<DashMap<String, Arc<ConnectionPool>>>,
89 result_cache: Arc<DashMap<QueryCacheKey, CachedResult>>,
91 endpoint_health: Arc<DashMap<String, EndpointHealth>>,
93 metrics: Arc<FederationMetrics>,
95 request_semaphore: Arc<Semaphore>,
97}
98
99impl FederationExecutor {
100 pub fn new(config: FederationConfig) -> Self {
102 let max_concurrent = config.max_concurrent_requests;
103 Self {
104 config,
105 connection_pools: Arc::new(DashMap::new()),
106 result_cache: Arc::new(DashMap::new()),
107 endpoint_health: Arc::new(DashMap::new()),
108 metrics: Arc::new(FederationMetrics::new()),
109 request_semaphore: Arc::new(Semaphore::new(max_concurrent)),
110 }
111 }
112
113 pub async fn execute_service(
115 &self,
116 endpoint: &Term,
117 pattern: &Algebra,
118 silent: bool,
119 ) -> Result<Solution> {
120 let endpoint_url = self.extract_endpoint_url(endpoint)?;
121
122 if self.config.enable_health_monitoring {
124 let health = self.get_endpoint_health(&endpoint_url);
125 if !health.is_healthy() && !silent {
126 return Err(anyhow!(
127 "Endpoint {} is unhealthy: {}",
128 endpoint_url,
129 health.status_message
130 ));
131 }
132 }
133
134 if self.config.enable_caching {
136 let cache_key = QueryCacheKey {
137 endpoint: endpoint_url.clone(),
138 query: format!("{:?}", pattern),
139 };
140 if let Some(cached) = self.result_cache.get(&cache_key) {
141 if !cached.is_expired() {
142 self.metrics.cache_hits.inc();
143 return Ok(cached.results.clone());
144 } else {
145 self.result_cache.remove(&cache_key);
146 }
147 }
148 self.metrics.cache_misses.inc();
149 }
150
151 let start = Instant::now();
153 let result = self
154 .execute_with_retry(&endpoint_url, pattern, silent)
155 .await;
156 let elapsed = start.elapsed();
157
158 let _timer_guard = self.metrics.request_duration.start();
160 match &result {
161 Ok(results) => {
162 self.metrics.successful_requests.inc();
163 self.metrics.results_received.add(results.len() as u64);
164
165 if self.config.enable_caching {
167 let cache_key = QueryCacheKey {
168 endpoint: endpoint_url.clone(),
169 query: format!("{:?}", pattern),
170 };
171 self.result_cache.insert(
172 cache_key,
173 CachedResult {
174 results: results.clone(),
175 timestamp: Instant::now(),
176 ttl: self.config.cache_ttl,
177 },
178 );
179 }
180
181 if let Some(mut health) = self.endpoint_health.get_mut(&endpoint_url) {
183 health.record_success(elapsed);
184 }
185 }
186 Err(_) => {
187 self.metrics.failed_requests.inc();
188
189 if let Some(mut health) = self.endpoint_health.get_mut(&endpoint_url) {
191 health.record_failure();
192 }
193 }
194 }
195
196 result
197 }
198
199 async fn execute_with_retry(
201 &self,
202 endpoint: &str,
203 pattern: &Algebra,
204 silent: bool,
205 ) -> Result<Solution> {
206 let mut last_error = None;
207 let mut delay = self.config.retry_base_delay;
208
209 for attempt in 0..=self.config.max_retries {
210 let _permit = self
212 .request_semaphore
213 .acquire()
214 .await
215 .map_err(|e| anyhow!("Failed to acquire request semaphore: {}", e))?;
216
217 let pool = self.get_or_create_pool(endpoint);
219
220 match self.execute_query(endpoint, pattern, &pool).await {
221 Ok(results) => {
222 if attempt > 0 {
223 self.metrics.retried_requests.inc();
224 }
225 return Ok(results);
226 }
227 Err(e) => {
228 last_error = Some(e);
229
230 if attempt < self.config.max_retries {
231 let jitter_ms = rng().random_range(0..100);
233 let jitter = Duration::from_millis(jitter_ms);
234 tokio::time::sleep(delay + jitter).await;
235 delay = (delay * 2).min(self.config.retry_max_delay);
236 }
237 }
238 }
239 }
240
241 if silent {
242 Ok(Vec::new())
243 } else {
244 Err(last_error.unwrap_or_else(|| anyhow!("All retry attempts failed")))
245 }
246 }
247
248 fn algebra_to_sparql(&self, algebra: &Algebra) -> Result<String> {
250 let mut query = String::from("SELECT * WHERE {\n");
251 self.algebra_to_sparql_recursive(algebra, &mut query, 1)?;
252 query.push_str("}\n");
253 Ok(query)
254 }
255
256 fn algebra_to_sparql_recursive(
258 &self,
259 algebra: &Algebra,
260 query: &mut String,
261 indent: usize,
262 ) -> Result<()> {
263 let indent_str = " ".repeat(indent);
264
265 match algebra {
266 Algebra::Bgp(patterns) => {
267 for pattern in patterns {
268 query.push_str(&format!(
269 "{}{} {} {} .\n",
270 indent_str,
271 self.term_to_sparql(&pattern.subject),
272 self.term_to_sparql(&pattern.predicate),
273 self.term_to_sparql(&pattern.object)
274 ));
275 }
276 }
277 Algebra::Join { left, right } => {
278 self.algebra_to_sparql_recursive(left, query, indent)?;
279 self.algebra_to_sparql_recursive(right, query, indent)?;
280 }
281 Algebra::LeftJoin {
282 left,
283 right,
284 filter,
285 } => {
286 self.algebra_to_sparql_recursive(left, query, indent)?;
287 query.push_str(&format!("{}OPTIONAL {{\n", indent_str));
288 self.algebra_to_sparql_recursive(right, query, indent + 1)?;
289 if let Some(expr) = filter {
290 query.push_str(&format!(
291 "{} FILTER ({})\n",
292 indent_str,
293 self.expr_to_sparql(expr)?
294 ));
295 }
296 query.push_str(&format!("{}}}\n", indent_str));
297 }
298 Algebra::Union { left, right } => {
299 query.push_str(&format!("{}{{\n", indent_str));
300 self.algebra_to_sparql_recursive(left, query, indent + 1)?;
301 query.push_str(&format!("{}}} UNION {{\n", indent_str));
302 self.algebra_to_sparql_recursive(right, query, indent + 1)?;
303 query.push_str(&format!("{}}}\n", indent_str));
304 }
305 Algebra::Filter { pattern, condition } => {
306 self.algebra_to_sparql_recursive(pattern, query, indent)?;
307 query.push_str(&format!(
308 "{}FILTER ({})\n",
309 indent_str,
310 self.expr_to_sparql(condition)?
311 ));
312 }
313 _ => {
314 query.push_str(&format!("{}# Complex pattern: {:?}\n", indent_str, algebra));
316 }
317 }
318 Ok(())
319 }
320
321 fn term_to_sparql(&self, term: &Term) -> String {
323 match term {
324 Term::Variable(var) => format!("?{}", var.name()),
325 Term::Iri(iri) => format!("<{}>", iri.as_str()),
326 Term::Literal(lit) => format!("\"{}\"", lit.value),
327 Term::BlankNode(bn_id) => format!("_:{}", bn_id),
328 _ => "?var".to_string(),
329 }
330 }
331
332 fn expr_to_sparql(&self, expr: &Expression) -> Result<String> {
334 Ok(format!("{:?}", expr))
336 }
337
338 async fn execute_query(
340 &self,
341 endpoint: &str,
342 pattern: &Algebra,
343 pool: &Arc<ConnectionPool>,
344 ) -> Result<Solution> {
345 let start_time = Instant::now();
346
347 let sparql_query = self.algebra_to_sparql(pattern)?;
349
350 tracing::debug!(
351 "Executing federated query to {}: {}",
352 endpoint,
353 sparql_query
354 );
355
356 {
358 let mut active = pool.active_connections.lock();
359 if *active >= pool.size {
360 return Err(anyhow!(
361 "Connection pool exhausted for endpoint: {}",
362 endpoint
363 ));
364 }
365 *active += 1;
366 }
367
368 let result = self.execute_http_request(endpoint, &sparql_query).await;
370
371 {
373 let mut active = pool.active_connections.lock();
374 *active = active.saturating_sub(1);
375 }
376
377 let _duration = start_time.elapsed();
379 let _guard = self.metrics.request_duration.start();
380
381 match &result {
382 Ok(solutions) => {
383 self.metrics.successful_requests.inc();
384 self.metrics.results_received.add(solutions.len() as u64);
385 tracing::debug!("Received {} solutions from {}", solutions.len(), endpoint);
386 }
387 Err(e) => {
388 self.metrics.failed_requests.inc();
389 tracing::warn!("Failed to execute query on {}: {}", endpoint, e);
390 }
391 }
392
393 result
394 }
395
396 async fn execute_http_request(&self, endpoint: &str, query: &str) -> Result<Solution> {
398 let client = reqwest::Client::builder()
399 .timeout(self.config.request_timeout)
400 .build()?;
401
402 let response = client
404 .post(endpoint)
405 .header("Accept", "application/sparql-results+json")
406 .header("Content-Type", "application/sparql-query")
407 .body(query.to_string())
408 .send()
409 .await?;
410
411 if !response.status().is_success() {
412 return Err(anyhow!(
413 "SPARQL endpoint returned error: {} - {}",
414 response.status(),
415 response.text().await.unwrap_or_default()
416 ));
417 }
418
419 let json_response: serde_json::Value = response.json().await?;
421
422 self.parse_sparql_json_results(&json_response)
424 }
425
426 fn parse_sparql_json_results(&self, json: &serde_json::Value) -> Result<Solution> {
428 let bindings = json["results"]["bindings"]
429 .as_array()
430 .ok_or_else(|| anyhow!("Invalid SPARQL JSON results format"))?;
431
432 let mut solutions = Vec::new();
433
434 for binding in bindings {
435 let mut solution_binding = Binding::new();
436
437 if let Some(obj) = binding.as_object() {
438 for (var_name, value) in obj {
439 let variable = Variable::new_unchecked(var_name.clone());
441 let term = self.parse_sparql_json_term(value)?;
442 solution_binding.insert(variable, term);
443 }
444 }
445
446 solutions.push(solution_binding);
447 }
448
449 Ok(solutions)
450 }
451
452 fn parse_sparql_json_term(&self, value: &serde_json::Value) -> Result<Term> {
454 let term_type = value["type"]
455 .as_str()
456 .ok_or_else(|| anyhow!("Missing term type"))?;
457
458 let term_value = value["value"]
459 .as_str()
460 .ok_or_else(|| anyhow!("Missing term value"))?;
461
462 match term_type {
463 "uri" => {
464 use oxirs_core::model::NamedNode;
465 Ok(Term::Iri(NamedNode::new_unchecked(term_value)))
466 }
467 "literal" => {
468 use crate::algebra::Literal as AlgebraLiteral;
469 use oxirs_core::model::NamedNode;
470 let datatype = value.get("datatype").and_then(|v| v.as_str());
471 let language = value.get("xml:lang").and_then(|v| v.as_str());
472
473 if let Some(lang) = language {
474 Ok(Term::Literal(AlgebraLiteral {
475 value: term_value.to_string(),
476 language: Some(lang.to_string()),
477 datatype: None,
478 }))
479 } else if let Some(dt) = datatype {
480 Ok(Term::Literal(AlgebraLiteral {
481 value: term_value.to_string(),
482 language: None,
483 datatype: Some(NamedNode::new_unchecked(dt)),
484 }))
485 } else {
486 Ok(Term::Literal(AlgebraLiteral {
487 value: term_value.to_string(),
488 language: None,
489 datatype: None,
490 }))
491 }
492 }
493 "bnode" => {
494 Ok(Term::BlankNode(term_value.to_string()))
496 }
497 _ => Err(anyhow!("Unknown term type: {}", term_type)),
498 }
499 }
500
501 fn get_or_create_pool(&self, endpoint: &str) -> Arc<ConnectionPool> {
503 self.connection_pools
504 .entry(endpoint.to_string())
505 .or_insert_with(|| {
506 Arc::new(ConnectionPool::new(
507 endpoint.to_string(),
508 self.config.connection_pool_size,
509 ))
510 })
511 .clone()
512 }
513
514 fn get_endpoint_health(&self, endpoint: &str) -> EndpointHealth {
516 self.endpoint_health
517 .entry(endpoint.to_string())
518 .or_insert_with(EndpointHealth::new)
519 .clone()
520 }
521
522 fn extract_endpoint_url(&self, term: &Term) -> Result<String> {
524 match term {
525 Term::Iri(iri) => Ok(iri.as_str().to_string()),
526 Term::Variable(_) => Err(anyhow!("Cannot use variable as SERVICE endpoint")),
527 _ => Err(anyhow!("Invalid SERVICE endpoint: {:?}", term)),
528 }
529 }
530
531 pub fn decompose_query(&self, query: &Algebra) -> Vec<FederatedSubquery> {
533 let mut subqueries = Vec::new();
534 Self::extract_service_patterns(query, &mut subqueries);
535 subqueries
536 }
537
538 fn extract_service_patterns(algebra: &Algebra, subqueries: &mut Vec<FederatedSubquery>) {
540 match algebra {
541 Algebra::Service {
542 endpoint,
543 pattern,
544 silent,
545 } => {
546 subqueries.push(FederatedSubquery {
547 endpoint: endpoint.clone(),
548 pattern: (**pattern).clone(),
549 silent: *silent,
550 dependencies: Vec::new(),
551 });
552 }
553 Algebra::Join { left, right }
554 | Algebra::Union { left, right }
555 | Algebra::Minus { left, right } => {
556 Self::extract_service_patterns(left, subqueries);
557 Self::extract_service_patterns(right, subqueries);
558 }
559 Algebra::LeftJoin { left, right, .. } => {
560 Self::extract_service_patterns(left, subqueries);
561 Self::extract_service_patterns(right, subqueries);
562 }
563 Algebra::Filter { pattern, .. }
564 | Algebra::Extend { pattern, .. }
565 | Algebra::Graph { pattern, .. }
566 | Algebra::Project { pattern, .. }
567 | Algebra::Distinct { pattern }
568 | Algebra::Reduced { pattern }
569 | Algebra::Slice { pattern, .. }
570 | Algebra::OrderBy { pattern, .. }
571 | Algebra::Group { pattern, .. }
572 | Algebra::Having { pattern, .. } => {
573 Self::extract_service_patterns(pattern, subqueries);
574 }
575 _ => {}
576 }
577 }
578
579 pub async fn execute_federated_query(
581 &self,
582 subqueries: Vec<FederatedSubquery>,
583 ) -> Result<Solution> {
584 let mut tasks = Vec::new();
585
586 for subquery in subqueries {
587 let executor = self.clone();
588 let task = tokio::spawn(async move {
589 executor
590 .execute_service(&subquery.endpoint, &subquery.pattern, subquery.silent)
591 .await
592 });
593 tasks.push(task);
594 }
595
596 let mut all_results = Vec::new();
598 for task in tasks {
599 match task.await {
600 Ok(Ok(results)) => all_results.push(results),
601 Ok(Err(e)) => tracing::warn!("Federated subquery failed: {}", e),
602 Err(e) => tracing::error!("Task join error: {}", e),
603 }
604 }
605
606 Ok(self.merge_results(all_results))
608 }
609
610 pub fn merge_results(&self, results: Vec<Solution>) -> Solution {
612 let mut merged: Solution = Vec::new();
615
616 for result_set in results {
617 for binding in result_set {
618 if !merged.iter().any(|b| b == &binding) {
620 merged.push(binding);
621 }
622 }
623 }
624
625 merged
626 }
627
628 pub fn statistics(&self) -> FederationStats {
630 let stats = self.metrics.request_duration.get_stats();
631 FederationStats {
632 total_requests: self.metrics.successful_requests.get()
633 + self.metrics.failed_requests.get(),
634 successful_requests: self.metrics.successful_requests.get(),
635 failed_requests: self.metrics.failed_requests.get(),
636 cache_hits: self.metrics.cache_hits.get(),
637 cache_misses: self.metrics.cache_misses.get(),
638 average_request_duration: stats.mean,
639 total_results: self.metrics.results_received.get(),
640 active_endpoints: self.endpoint_health.len(),
641 healthy_endpoints: self
642 .endpoint_health
643 .iter()
644 .filter(|e| e.value().is_healthy())
645 .count(),
646 }
647 }
648}
649
650impl Clone for FederationExecutor {
651 fn clone(&self) -> Self {
652 Self {
653 config: self.config.clone(),
654 connection_pools: Arc::clone(&self.connection_pools),
655 result_cache: Arc::clone(&self.result_cache),
656 endpoint_health: Arc::clone(&self.endpoint_health),
657 metrics: Arc::clone(&self.metrics),
658 request_semaphore: Arc::clone(&self.request_semaphore),
659 }
660 }
661}
662
663pub struct ConnectionPool {
665 endpoint: String,
666 size: usize,
667 active_connections: parking_lot::Mutex<usize>,
668}
669
670impl ConnectionPool {
671 fn new(endpoint: String, size: usize) -> Self {
672 Self {
673 endpoint,
674 size,
675 active_connections: parking_lot::Mutex::new(0),
676 }
677 }
678
679 pub fn endpoint(&self) -> &str {
680 &self.endpoint
681 }
682
683 pub fn available(&self) -> usize {
684 let active = *self.active_connections.lock();
685 self.size.saturating_sub(active)
686 }
687}
688
689#[derive(Debug, Clone, PartialEq, Eq, Hash)]
691struct QueryCacheKey {
692 endpoint: String,
693 query: String,
694}
695
696struct CachedResult {
698 results: Solution,
699 timestamp: Instant,
700 ttl: Duration,
701}
702
703impl CachedResult {
704 fn is_expired(&self) -> bool {
705 self.timestamp.elapsed() > self.ttl
706 }
707}
708
709#[derive(Debug, Clone)]
711pub struct EndpointHealth {
712 healthy: bool,
714 last_success: Option<Instant>,
716 last_failure: Option<Instant>,
718 consecutive_failures: usize,
720 avg_response_time: Duration,
722 status_message: String,
724}
725
726impl EndpointHealth {
727 fn new() -> Self {
728 Self {
729 healthy: true,
730 last_success: None,
731 last_failure: None,
732 consecutive_failures: 0,
733 avg_response_time: Duration::from_secs(0),
734 status_message: "OK".to_string(),
735 }
736 }
737
738 fn is_healthy(&self) -> bool {
739 self.healthy
740 }
741
742 fn record_success(&mut self, response_time: Duration) {
743 self.healthy = true;
744 self.last_success = Some(Instant::now());
745 self.consecutive_failures = 0;
746 self.avg_response_time = response_time;
747 self.status_message = "OK".to_string();
748 }
749
750 fn record_failure(&mut self) {
751 self.last_failure = Some(Instant::now());
752 self.consecutive_failures += 1;
753
754 if self.consecutive_failures >= 3 {
756 self.healthy = false;
757 self.status_message = format!("{} consecutive failures", self.consecutive_failures);
758 }
759 }
760}
761
762#[derive(Debug, Clone)]
764pub struct FederatedSubquery {
765 pub endpoint: Term,
766 pub pattern: Algebra,
767 pub silent: bool,
768 pub dependencies: Vec<Variable>,
769}
770
771struct FederationMetrics {
773 successful_requests: Counter,
774 failed_requests: Counter,
775 retried_requests: Counter,
776 cache_hits: Counter,
777 cache_misses: Counter,
778 request_duration: Timer,
779 results_received: Counter,
780}
781
782impl FederationMetrics {
783 fn new() -> Self {
784 Self {
785 successful_requests: Counter::new("federation.successful_requests".to_string()),
786 failed_requests: Counter::new("federation.failed_requests".to_string()),
787 retried_requests: Counter::new("federation.retried_requests".to_string()),
788 cache_hits: Counter::new("federation.cache_hits".to_string()),
789 cache_misses: Counter::new("federation.cache_misses".to_string()),
790 request_duration: Timer::new("federation.request_duration".to_string()),
791 results_received: Counter::new("federation.results_received".to_string()),
792 }
793 }
794}
795
796#[derive(Debug, Clone, Serialize, Deserialize)]
798pub struct FederationStats {
799 pub total_requests: u64,
800 pub successful_requests: u64,
801 pub failed_requests: u64,
802 pub cache_hits: u64,
803 pub cache_misses: u64,
804 pub average_request_duration: f64,
805 pub total_results: u64,
806 pub active_endpoints: usize,
807 pub healthy_endpoints: usize,
808}
809
810#[derive(Debug, Clone, Serialize, Deserialize)]
812pub struct EndpointCapabilities {
813 pub endpoint: String,
815 pub sparql_version: String,
817 pub result_formats: Vec<String>,
819 pub max_query_complexity: Option<usize>,
821 pub supports_federation: bool,
823 pub supports_rdf_star: bool,
825 pub named_graphs: Vec<String>,
827}
828
829pub struct EndpointDiscovery {
831 endpoints: Arc<DashMap<String, EndpointCapabilities>>,
833}
834
835impl EndpointDiscovery {
836 pub fn new() -> Self {
838 Self {
839 endpoints: Arc::new(DashMap::new()),
840 }
841 }
842
843 pub fn register_endpoint(&self, capabilities: EndpointCapabilities) {
845 self.endpoints
846 .insert(capabilities.endpoint.clone(), capabilities);
847 }
848
849 pub async fn discover_endpoint(&self, endpoint: &str) -> Result<EndpointCapabilities> {
851 let service_description_query = r#"
854 PREFIX sd: <http://www.w3.org/ns/sparql-service-description#>
855 PREFIX void: <http://rdfs.org/ns/void#>
856
857 SELECT ?feature ?format ?version ?graph WHERE {
858 {
859 ?service a sd:Service .
860 OPTIONAL { ?service sd:feature ?feature }
861 OPTIONAL { ?service sd:resultFormat ?format }
862 OPTIONAL { ?service sd:languageExtension ?version }
863 OPTIONAL { ?service sd:defaultDataset/sd:namedGraph/sd:name ?graph }
864 }
865 }
866 "#;
867
868 let client = reqwest::Client::builder()
869 .timeout(Duration::from_secs(30))
870 .build()?;
871
872 let response_result = client
874 .post(endpoint)
875 .header("Accept", "application/sparql-results+json")
876 .header("Content-Type", "application/sparql-query")
877 .body(service_description_query)
878 .send()
879 .await;
880
881 let mut capabilities = EndpointCapabilities {
882 endpoint: endpoint.to_string(),
883 sparql_version: "1.1".to_string(),
884 result_formats: vec!["application/sparql-results+json".to_string()],
885 max_query_complexity: None,
886 supports_federation: false,
887 supports_rdf_star: false,
888 named_graphs: Vec::new(),
889 };
890
891 match response_result {
892 Ok(response) if response.status().is_success() => {
893 if let Ok(json) = response.json::<serde_json::Value>().await {
895 if let Some(bindings) = json["results"]["bindings"].as_array() {
896 let mut formats = Vec::new();
897 let mut graphs = Vec::new();
898
899 for binding in bindings {
900 if let Some(format) = binding["format"]["value"].as_str() {
902 formats.push(format.to_string());
903 }
904
905 if let Some(feature) = binding["feature"]["value"].as_str() {
907 if feature.contains("UnionDefaultGraph") {
908 capabilities.supports_federation = true;
909 }
910 if feature.contains("SPARQL-star") || feature.contains("RDFstar") {
911 capabilities.supports_rdf_star = true;
912 }
913 if feature.contains("1.2") {
914 capabilities.sparql_version = "1.2".to_string();
915 }
916 }
917
918 if let Some(graph) = binding["graph"]["value"].as_str() {
920 graphs.push(graph.to_string());
921 }
922 }
923
924 if !formats.is_empty() {
925 capabilities.result_formats = formats;
926 }
927 capabilities.named_graphs = graphs;
928 }
929 }
930
931 tracing::info!(
932 "Discovered endpoint capabilities for {}: version={}, formats={:?}, federation={}, rdf-star={}",
933 endpoint,
934 capabilities.sparql_version,
935 capabilities.result_formats,
936 capabilities.supports_federation,
937 capabilities.supports_rdf_star
938 );
939 }
940 Ok(response) => {
941 tracing::warn!(
942 "Service description query failed with status {}: {}",
943 response.status(),
944 response.text().await.unwrap_or_default()
945 );
946 }
947 Err(e) => {
948 tracing::warn!(
949 "Failed to query service description from {}: {}. Using default capabilities.",
950 endpoint,
951 e
952 );
953 }
954 }
955
956 let test_query = "ASK { ?s ?p ?o }";
958 if let Ok(response) = client
959 .post(endpoint)
960 .header("Accept", "application/sparql-results+json")
961 .header("Content-Type", "application/sparql-query")
962 .body(test_query)
963 .send()
964 .await
965 {
966 if response.status().is_success() {
967 tracing::debug!("Endpoint {} is responsive", endpoint);
968 }
969 }
970
971 Ok(capabilities)
972 }
973
974 pub fn find_endpoints(&self, criteria: EndpointCriteria) -> Vec<EndpointCapabilities> {
976 self.endpoints
977 .iter()
978 .filter(|entry| {
979 let caps = entry.value();
980 (criteria.supports_federation.is_none()
981 || criteria.supports_federation == Some(caps.supports_federation))
982 && (criteria.supports_rdf_star.is_none()
983 || criteria.supports_rdf_star == Some(caps.supports_rdf_star))
984 })
985 .map(|entry| entry.value().clone())
986 .collect()
987 }
988}
989
990impl Default for EndpointDiscovery {
991 fn default() -> Self {
992 Self::new()
993 }
994}
995
996#[derive(Debug, Clone, Default)]
998pub struct EndpointCriteria {
999 pub supports_federation: Option<bool>,
1000 pub supports_rdf_star: Option<bool>,
1001 pub min_sparql_version: Option<String>,
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::*;
1007 use crate::algebra::TriplePattern;
1008
1009 #[test]
1010 fn test_federation_config_default() {
1011 let config = FederationConfig::default();
1012 assert_eq!(config.max_concurrent_requests, 32);
1013 assert_eq!(config.max_retries, 3);
1014 assert!(config.enable_caching);
1015 }
1016
1017 #[test]
1018 fn test_endpoint_health() {
1019 let mut health = EndpointHealth::new();
1020 assert!(health.is_healthy());
1021
1022 health.record_failure();
1023 assert!(health.is_healthy());
1024
1025 health.record_failure();
1026 health.record_failure();
1027 assert!(!health.is_healthy());
1028
1029 health.record_success(Duration::from_millis(100));
1030 assert!(health.is_healthy());
1031 }
1032
1033 #[test]
1034 fn test_endpoint_discovery() {
1035 let discovery = EndpointDiscovery::new();
1036
1037 let caps = EndpointCapabilities {
1038 endpoint: "http://example.org/sparql".to_string(),
1039 sparql_version: "1.1".to_string(),
1040 result_formats: vec!["application/sparql-results+json".to_string()],
1041 max_query_complexity: None,
1042 supports_federation: true,
1043 supports_rdf_star: false,
1044 named_graphs: Vec::new(),
1045 };
1046
1047 discovery.register_endpoint(caps.clone());
1048
1049 let found = discovery.find_endpoints(EndpointCriteria {
1050 supports_federation: Some(true),
1051 ..Default::default()
1052 });
1053
1054 assert_eq!(found.len(), 1);
1055 assert_eq!(found[0].endpoint, caps.endpoint);
1056 }
1057
1058 #[tokio::test]
1059 async fn test_federation_executor() {
1060 let config = FederationConfig::default();
1061 let executor = FederationExecutor::new(config);
1062
1063 let stats = executor.statistics();
1064 assert_eq!(stats.total_requests, 0);
1065 assert_eq!(stats.active_endpoints, 0);
1066 }
1067
1068 #[test]
1069 fn test_algebra_to_sparql_bgp() {
1070 let executor = FederationExecutor::new(FederationConfig::default());
1071
1072 let patterns = vec![TriplePattern {
1074 subject: Term::Variable(Variable::new_unchecked("s")),
1075 predicate: Term::Iri(crate::algebra::Iri::new_unchecked(
1076 "http://example.org/predicate",
1077 )),
1078 object: Term::Variable(Variable::new_unchecked("o")),
1079 }];
1080
1081 let algebra = Algebra::Bgp(patterns);
1082 let sparql = executor.algebra_to_sparql(&algebra).unwrap();
1083
1084 assert!(sparql.contains("SELECT * WHERE"));
1085 assert!(sparql.contains("?s"));
1086 assert!(sparql.contains("<http://example.org/predicate>"));
1087 assert!(sparql.contains("?o"));
1088 }
1089
1090 #[test]
1091 fn test_parse_sparql_json_results() {
1092 let executor = FederationExecutor::new(FederationConfig::default());
1093
1094 let json_str = r#"{
1095 "results": {
1096 "bindings": [
1097 {
1098 "s": {
1099 "type": "uri",
1100 "value": "http://example.org/subject1"
1101 },
1102 "o": {
1103 "type": "literal",
1104 "value": "object1"
1105 }
1106 }
1107 ]
1108 }
1109 }"#;
1110
1111 let json: serde_json::Value = serde_json::from_str(json_str).unwrap();
1112 let results = executor.parse_sparql_json_results(&json).unwrap();
1113
1114 assert_eq!(results.len(), 1);
1115 assert_eq!(results[0].len(), 2); }
1117
1118 #[test]
1119 fn test_parse_sparql_json_term_types() {
1120 let executor = FederationExecutor::new(FederationConfig::default());
1121
1122 let uri_json = serde_json::json!({
1124 "type": "uri",
1125 "value": "http://example.org/resource"
1126 });
1127 let term = executor.parse_sparql_json_term(&uri_json).unwrap();
1128 matches!(term, Term::Iri(_));
1129
1130 let literal_json = serde_json::json!({
1132 "type": "literal",
1133 "value": "test value"
1134 });
1135 let term = executor.parse_sparql_json_term(&literal_json).unwrap();
1136 match term {
1137 Term::Literal(lit) => assert_eq!(lit.value, "test value"),
1138 _ => panic!("Expected Literal term"),
1139 }
1140
1141 let bnode_json = serde_json::json!({
1143 "type": "bnode",
1144 "value": "b0"
1145 });
1146 let term = executor.parse_sparql_json_term(&bnode_json).unwrap();
1147 match term {
1148 Term::BlankNode(id) => assert_eq!(id, "b0"),
1149 _ => panic!("Expected BlankNode term"),
1150 }
1151 }
1152
1153 #[test]
1154 fn test_merge_results() {
1155 let executor = FederationExecutor::new(FederationConfig::default());
1156
1157 let var_s = Variable::new_unchecked("s");
1158
1159 let mut binding1 = Binding::new();
1160 binding1.insert(
1161 var_s.clone(),
1162 Term::Iri(crate::algebra::Iri::new_unchecked("http://example.org/s1")),
1163 );
1164
1165 let mut binding2 = Binding::new();
1166 binding2.insert(
1167 var_s.clone(),
1168 Term::Iri(crate::algebra::Iri::new_unchecked("http://example.org/s2")),
1169 );
1170
1171 let solution1 = vec![binding1.clone()];
1172 let solution2 = vec![binding2, binding1.clone()]; let merged = executor.merge_results(vec![solution1, solution2]);
1175
1176 assert_eq!(merged.len(), 2); }
1179}