Skip to main content

oxirs_arq/
federation.rs

1//! Federated SPARQL Query Execution
2//!
3//! Provides advanced federation support for SPARQL queries including:
4//! - SERVICE keyword execution with connection pooling
5//! - Query decomposition and distribution
6//! - Result merging from multiple endpoints
7//! - Endpoint discovery and capability detection
8//! - Load balancing and failover
9
10use crate::algebra::{Algebra, Binding, Expression, Solution, Term, Variable};
11use anyhow::{anyhow, Result};
12use dashmap::DashMap;
13use reqwest;
14use scirs2_core::metrics::{Counter, Timer};
15use scirs2_core::random::{rng, Rng}; // Use scirs2-core for random number generation
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::Semaphore;
20
21/// Configuration for federated query execution
22#[derive(Debug, Clone)]
23pub struct FederationConfig {
24    /// Maximum number of concurrent SERVICE requests
25    pub max_concurrent_requests: usize,
26    /// Request timeout
27    pub request_timeout: Duration,
28    /// Maximum number of retries for failed requests
29    pub max_retries: usize,
30    /// Base delay for exponential backoff
31    pub retry_base_delay: Duration,
32    /// Maximum retry delay
33    pub retry_max_delay: Duration,
34    /// Enable result caching
35    pub enable_caching: bool,
36    /// Cache TTL
37    pub cache_ttl: Duration,
38    /// Connection pool size per endpoint
39    pub connection_pool_size: usize,
40    /// Enable endpoint health monitoring
41    pub enable_health_monitoring: bool,
42    /// Health check interval
43    pub health_check_interval: Duration,
44    /// Load balancing strategy
45    pub load_balancing: LoadBalancingStrategy,
46    /// Enable query decomposition
47    pub enable_query_decomposition: bool,
48}
49
50impl Default for FederationConfig {
51    fn default() -> Self {
52        Self {
53            max_concurrent_requests: 32,
54            request_timeout: Duration::from_secs(60),
55            max_retries: 3,
56            retry_base_delay: Duration::from_millis(100),
57            retry_max_delay: Duration::from_secs(10),
58            enable_caching: true,
59            cache_ttl: Duration::from_secs(300),
60            connection_pool_size: 8,
61            enable_health_monitoring: true,
62            health_check_interval: Duration::from_secs(30),
63            load_balancing: LoadBalancingStrategy::LeastLoaded,
64            enable_query_decomposition: true,
65        }
66    }
67}
68
69/// Load balancing strategies
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum LoadBalancingStrategy {
72    /// Round-robin distribution
73    RoundRobin,
74    /// Random selection
75    Random,
76    /// Select least loaded endpoint
77    LeastLoaded,
78    /// Select endpoint with best response time
79    FastestResponse,
80    /// Adaptive strategy based on historical performance
81    Adaptive,
82}
83
84/// Federation executor
85pub struct FederationExecutor {
86    config: FederationConfig,
87    /// Connection pools per endpoint
88    connection_pools: Arc<DashMap<String, Arc<ConnectionPool>>>,
89    /// Result cache
90    result_cache: Arc<DashMap<QueryCacheKey, CachedResult>>,
91    /// Endpoint health status
92    endpoint_health: Arc<DashMap<String, EndpointHealth>>,
93    /// Metrics
94    metrics: Arc<FederationMetrics>,
95    /// Semaphore for limiting concurrent requests
96    request_semaphore: Arc<Semaphore>,
97}
98
99impl FederationExecutor {
100    /// Create a new federation executor
101    pub fn new(config: FederationConfig) -> Self {
102        let max_concurrent = config.max_concurrent_requests;
103        Self {
104            config,
105            connection_pools: Arc::new(DashMap::new()),
106            result_cache: Arc::new(DashMap::new()),
107            endpoint_health: Arc::new(DashMap::new()),
108            metrics: Arc::new(FederationMetrics::new()),
109            request_semaphore: Arc::new(Semaphore::new(max_concurrent)),
110        }
111    }
112
113    /// Execute a SERVICE query
114    pub async fn execute_service(
115        &self,
116        endpoint: &Term,
117        pattern: &Algebra,
118        silent: bool,
119    ) -> Result<Solution> {
120        let endpoint_url = self.extract_endpoint_url(endpoint)?;
121
122        // Check endpoint health
123        if self.config.enable_health_monitoring {
124            let health = self.get_endpoint_health(&endpoint_url);
125            if !health.is_healthy() && !silent {
126                return Err(anyhow!(
127                    "Endpoint {} is unhealthy: {}",
128                    endpoint_url,
129                    health.status_message
130                ));
131            }
132        }
133
134        // Check cache
135        if self.config.enable_caching {
136            let cache_key = QueryCacheKey {
137                endpoint: endpoint_url.clone(),
138                query: format!("{:?}", pattern),
139            };
140            if let Some(cached) = self.result_cache.get(&cache_key) {
141                if !cached.is_expired() {
142                    self.metrics.cache_hits.inc();
143                    return Ok(cached.results.clone());
144                } else {
145                    self.result_cache.remove(&cache_key);
146                }
147            }
148            self.metrics.cache_misses.inc();
149        }
150
151        // Execute with retry logic
152        let start = Instant::now();
153        let result = self
154            .execute_with_retry(&endpoint_url, pattern, silent)
155            .await;
156        let elapsed = start.elapsed();
157
158        // Update metrics
159        let _timer_guard = self.metrics.request_duration.start();
160        match &result {
161            Ok(results) => {
162                self.metrics.successful_requests.inc();
163                self.metrics.results_received.add(results.len() as u64);
164
165                // Cache result
166                if self.config.enable_caching {
167                    let cache_key = QueryCacheKey {
168                        endpoint: endpoint_url.clone(),
169                        query: format!("{:?}", pattern),
170                    };
171                    self.result_cache.insert(
172                        cache_key,
173                        CachedResult {
174                            results: results.clone(),
175                            timestamp: Instant::now(),
176                            ttl: self.config.cache_ttl,
177                        },
178                    );
179                }
180
181                // Update endpoint health
182                if let Some(mut health) = self.endpoint_health.get_mut(&endpoint_url) {
183                    health.record_success(elapsed);
184                }
185            }
186            Err(_) => {
187                self.metrics.failed_requests.inc();
188
189                // Update endpoint health
190                if let Some(mut health) = self.endpoint_health.get_mut(&endpoint_url) {
191                    health.record_failure();
192                }
193            }
194        }
195
196        result
197    }
198
199    /// Execute with retry logic and exponential backoff
200    async fn execute_with_retry(
201        &self,
202        endpoint: &str,
203        pattern: &Algebra,
204        silent: bool,
205    ) -> Result<Solution> {
206        let mut last_error = None;
207        let mut delay = self.config.retry_base_delay;
208
209        for attempt in 0..=self.config.max_retries {
210            // Acquire semaphore permit
211            let _permit = self
212                .request_semaphore
213                .acquire()
214                .await
215                .map_err(|e| anyhow!("Failed to acquire request semaphore: {}", e))?;
216
217            // Get connection from pool
218            let pool = self.get_or_create_pool(endpoint);
219
220            match self.execute_query(endpoint, pattern, &pool).await {
221                Ok(results) => {
222                    if attempt > 0 {
223                        self.metrics.retried_requests.inc();
224                    }
225                    return Ok(results);
226                }
227                Err(e) => {
228                    last_error = Some(e);
229
230                    if attempt < self.config.max_retries {
231                        // Exponential backoff with jitter (using scirs2-core random for thread safety)
232                        let jitter_ms = rng().random_range(0..100);
233                        let jitter = Duration::from_millis(jitter_ms);
234                        tokio::time::sleep(delay + jitter).await;
235                        delay = (delay * 2).min(self.config.retry_max_delay);
236                    }
237                }
238            }
239        }
240
241        if silent {
242            Ok(Vec::new())
243        } else {
244            Err(last_error.unwrap_or_else(|| anyhow!("All retry attempts failed")))
245        }
246    }
247
248    /// Convert Algebra to SPARQL query string
249    fn algebra_to_sparql(&self, algebra: &Algebra) -> Result<String> {
250        let mut query = String::from("SELECT * WHERE {\n");
251        self.algebra_to_sparql_recursive(algebra, &mut query, 1)?;
252        query.push_str("}\n");
253        Ok(query)
254    }
255
256    /// Recursively convert Algebra to SPARQL
257    fn algebra_to_sparql_recursive(
258        &self,
259        algebra: &Algebra,
260        query: &mut String,
261        indent: usize,
262    ) -> Result<()> {
263        let indent_str = "  ".repeat(indent);
264
265        match algebra {
266            Algebra::Bgp(patterns) => {
267                for pattern in patterns {
268                    query.push_str(&format!(
269                        "{}{}  {}  {} .\n",
270                        indent_str,
271                        self.term_to_sparql(&pattern.subject),
272                        self.term_to_sparql(&pattern.predicate),
273                        self.term_to_sparql(&pattern.object)
274                    ));
275                }
276            }
277            Algebra::Join { left, right } => {
278                self.algebra_to_sparql_recursive(left, query, indent)?;
279                self.algebra_to_sparql_recursive(right, query, indent)?;
280            }
281            Algebra::LeftJoin {
282                left,
283                right,
284                filter,
285            } => {
286                self.algebra_to_sparql_recursive(left, query, indent)?;
287                query.push_str(&format!("{}OPTIONAL {{\n", indent_str));
288                self.algebra_to_sparql_recursive(right, query, indent + 1)?;
289                if let Some(expr) = filter {
290                    query.push_str(&format!(
291                        "{}  FILTER ({})\n",
292                        indent_str,
293                        self.expr_to_sparql(expr)?
294                    ));
295                }
296                query.push_str(&format!("{}}}\n", indent_str));
297            }
298            Algebra::Union { left, right } => {
299                query.push_str(&format!("{}{{\n", indent_str));
300                self.algebra_to_sparql_recursive(left, query, indent + 1)?;
301                query.push_str(&format!("{}}} UNION {{\n", indent_str));
302                self.algebra_to_sparql_recursive(right, query, indent + 1)?;
303                query.push_str(&format!("{}}}\n", indent_str));
304            }
305            Algebra::Filter { pattern, condition } => {
306                self.algebra_to_sparql_recursive(pattern, query, indent)?;
307                query.push_str(&format!(
308                    "{}FILTER ({})\n",
309                    indent_str,
310                    self.expr_to_sparql(condition)?
311                ));
312            }
313            _ => {
314                // For other algebra types, use a simplified representation
315                query.push_str(&format!("{}# Complex pattern: {:?}\n", indent_str, algebra));
316            }
317        }
318        Ok(())
319    }
320
321    /// Convert Term to SPARQL representation
322    fn term_to_sparql(&self, term: &Term) -> String {
323        match term {
324            Term::Variable(var) => format!("?{}", var.name()),
325            Term::Iri(iri) => format!("<{}>", iri.as_str()),
326            Term::Literal(lit) => format!("\"{}\"", lit.value),
327            Term::BlankNode(bn_id) => format!("_:{}", bn_id),
328            _ => "?var".to_string(),
329        }
330    }
331
332    /// Convert Expression to SPARQL representation
333    fn expr_to_sparql(&self, expr: &Expression) -> Result<String> {
334        // Simplified expression conversion
335        Ok(format!("{:?}", expr))
336    }
337
338    /// Execute query against endpoint
339    async fn execute_query(
340        &self,
341        endpoint: &str,
342        pattern: &Algebra,
343        pool: &Arc<ConnectionPool>,
344    ) -> Result<Solution> {
345        let start_time = Instant::now();
346
347        // Convert algebra to SPARQL query
348        let sparql_query = self.algebra_to_sparql(pattern)?;
349
350        tracing::debug!(
351            "Executing federated query to {}: {}",
352            endpoint,
353            sparql_query
354        );
355
356        // Acquire connection from pool
357        {
358            let mut active = pool.active_connections.lock();
359            if *active >= pool.size {
360                return Err(anyhow!(
361                    "Connection pool exhausted for endpoint: {}",
362                    endpoint
363                ));
364            }
365            *active += 1;
366        }
367
368        // Make HTTP request
369        let result = self.execute_http_request(endpoint, &sparql_query).await;
370
371        // Release connection
372        {
373            let mut active = pool.active_connections.lock();
374            *active = active.saturating_sub(1);
375        }
376
377        // Update metrics
378        let _duration = start_time.elapsed();
379        let _guard = self.metrics.request_duration.start();
380
381        match &result {
382            Ok(solutions) => {
383                self.metrics.successful_requests.inc();
384                self.metrics.results_received.add(solutions.len() as u64);
385                tracing::debug!("Received {} solutions from {}", solutions.len(), endpoint);
386            }
387            Err(e) => {
388                self.metrics.failed_requests.inc();
389                tracing::warn!("Failed to execute query on {}: {}", endpoint, e);
390            }
391        }
392
393        result
394    }
395
396    /// Execute HTTP request to SPARQL endpoint
397    async fn execute_http_request(&self, endpoint: &str, query: &str) -> Result<Solution> {
398        let client = reqwest::Client::builder()
399            .timeout(self.config.request_timeout)
400            .build()?;
401
402        // Make POST request with SPARQL query
403        let response = client
404            .post(endpoint)
405            .header("Accept", "application/sparql-results+json")
406            .header("Content-Type", "application/sparql-query")
407            .body(query.to_string())
408            .send()
409            .await?;
410
411        if !response.status().is_success() {
412            return Err(anyhow!(
413                "SPARQL endpoint returned error: {} - {}",
414                response.status(),
415                response.text().await.unwrap_or_default()
416            ));
417        }
418
419        // Parse JSON response
420        let json_response: serde_json::Value = response.json().await?;
421
422        // Convert JSON results to Solution format
423        self.parse_sparql_json_results(&json_response)
424    }
425
426    /// Parse SPARQL JSON results into Solution format
427    fn parse_sparql_json_results(&self, json: &serde_json::Value) -> Result<Solution> {
428        let bindings = json["results"]["bindings"]
429            .as_array()
430            .ok_or_else(|| anyhow!("Invalid SPARQL JSON results format"))?;
431
432        let mut solutions = Vec::new();
433
434        for binding in bindings {
435            let mut solution_binding = Binding::new();
436
437            if let Some(obj) = binding.as_object() {
438                for (var_name, value) in obj {
439                    // Use new_unchecked for performance since variable names from SPARQL endpoints are trusted
440                    let variable = Variable::new_unchecked(var_name.clone());
441                    let term = self.parse_sparql_json_term(value)?;
442                    solution_binding.insert(variable, term);
443                }
444            }
445
446            solutions.push(solution_binding);
447        }
448
449        Ok(solutions)
450    }
451
452    /// Parse SPARQL JSON term
453    fn parse_sparql_json_term(&self, value: &serde_json::Value) -> Result<Term> {
454        let term_type = value["type"]
455            .as_str()
456            .ok_or_else(|| anyhow!("Missing term type"))?;
457
458        let term_value = value["value"]
459            .as_str()
460            .ok_or_else(|| anyhow!("Missing term value"))?;
461
462        match term_type {
463            "uri" => {
464                use oxirs_core::model::NamedNode;
465                Ok(Term::Iri(NamedNode::new_unchecked(term_value)))
466            }
467            "literal" => {
468                use crate::algebra::Literal as AlgebraLiteral;
469                use oxirs_core::model::NamedNode;
470                let datatype = value.get("datatype").and_then(|v| v.as_str());
471                let language = value.get("xml:lang").and_then(|v| v.as_str());
472
473                if let Some(lang) = language {
474                    Ok(Term::Literal(AlgebraLiteral {
475                        value: term_value.to_string(),
476                        language: Some(lang.to_string()),
477                        datatype: None,
478                    }))
479                } else if let Some(dt) = datatype {
480                    Ok(Term::Literal(AlgebraLiteral {
481                        value: term_value.to_string(),
482                        language: None,
483                        datatype: Some(NamedNode::new_unchecked(dt)),
484                    }))
485                } else {
486                    Ok(Term::Literal(AlgebraLiteral {
487                        value: term_value.to_string(),
488                        language: None,
489                        datatype: None,
490                    }))
491                }
492            }
493            "bnode" => {
494                // BlankNode in algebra is just a String
495                Ok(Term::BlankNode(term_value.to_string()))
496            }
497            _ => Err(anyhow!("Unknown term type: {}", term_type)),
498        }
499    }
500
501    /// Get or create connection pool for endpoint
502    fn get_or_create_pool(&self, endpoint: &str) -> Arc<ConnectionPool> {
503        self.connection_pools
504            .entry(endpoint.to_string())
505            .or_insert_with(|| {
506                Arc::new(ConnectionPool::new(
507                    endpoint.to_string(),
508                    self.config.connection_pool_size,
509                ))
510            })
511            .clone()
512    }
513
514    /// Get endpoint health status
515    fn get_endpoint_health(&self, endpoint: &str) -> EndpointHealth {
516        self.endpoint_health
517            .entry(endpoint.to_string())
518            .or_insert_with(EndpointHealth::new)
519            .clone()
520    }
521
522    /// Extract endpoint URL from Term
523    fn extract_endpoint_url(&self, term: &Term) -> Result<String> {
524        match term {
525            Term::Iri(iri) => Ok(iri.as_str().to_string()),
526            Term::Variable(_) => Err(anyhow!("Cannot use variable as SERVICE endpoint")),
527            _ => Err(anyhow!("Invalid SERVICE endpoint: {:?}", term)),
528        }
529    }
530
531    /// Decompose query for federated execution
532    pub fn decompose_query(&self, query: &Algebra) -> Vec<FederatedSubquery> {
533        let mut subqueries = Vec::new();
534        Self::extract_service_patterns(query, &mut subqueries);
535        subqueries
536    }
537
538    /// Extract SERVICE patterns from query
539    fn extract_service_patterns(algebra: &Algebra, subqueries: &mut Vec<FederatedSubquery>) {
540        match algebra {
541            Algebra::Service {
542                endpoint,
543                pattern,
544                silent,
545            } => {
546                subqueries.push(FederatedSubquery {
547                    endpoint: endpoint.clone(),
548                    pattern: (**pattern).clone(),
549                    silent: *silent,
550                    dependencies: Vec::new(),
551                });
552            }
553            Algebra::Join { left, right }
554            | Algebra::Union { left, right }
555            | Algebra::Minus { left, right } => {
556                Self::extract_service_patterns(left, subqueries);
557                Self::extract_service_patterns(right, subqueries);
558            }
559            Algebra::LeftJoin { left, right, .. } => {
560                Self::extract_service_patterns(left, subqueries);
561                Self::extract_service_patterns(right, subqueries);
562            }
563            Algebra::Filter { pattern, .. }
564            | Algebra::Extend { pattern, .. }
565            | Algebra::Graph { pattern, .. }
566            | Algebra::Project { pattern, .. }
567            | Algebra::Distinct { pattern }
568            | Algebra::Reduced { pattern }
569            | Algebra::Slice { pattern, .. }
570            | Algebra::OrderBy { pattern, .. }
571            | Algebra::Group { pattern, .. }
572            | Algebra::Having { pattern, .. } => {
573                Self::extract_service_patterns(pattern, subqueries);
574            }
575            _ => {}
576        }
577    }
578
579    /// Execute federated query with parallel execution
580    pub async fn execute_federated_query(
581        &self,
582        subqueries: Vec<FederatedSubquery>,
583    ) -> Result<Solution> {
584        let mut tasks = Vec::new();
585
586        for subquery in subqueries {
587            let executor = self.clone();
588            let task = tokio::spawn(async move {
589                executor
590                    .execute_service(&subquery.endpoint, &subquery.pattern, subquery.silent)
591                    .await
592            });
593            tasks.push(task);
594        }
595
596        // Collect results
597        let mut all_results = Vec::new();
598        for task in tasks {
599            match task.await {
600                Ok(Ok(results)) => all_results.push(results),
601                Ok(Err(e)) => tracing::warn!("Federated subquery failed: {}", e),
602                Err(e) => tracing::error!("Task join error: {}", e),
603            }
604        }
605
606        // Merge all results
607        Ok(self.merge_results(all_results))
608    }
609
610    /// Merge results from multiple endpoints
611    pub fn merge_results(&self, results: Vec<Solution>) -> Solution {
612        // Simple concatenation with duplicate elimination
613        // Since Binding (HashMap<Variable, Term>) implements PartialEq, we can eliminate duplicates
614        let mut merged: Solution = Vec::new();
615
616        for result_set in results {
617            for binding in result_set {
618                // Check if binding already exists
619                if !merged.iter().any(|b| b == &binding) {
620                    merged.push(binding);
621                }
622            }
623        }
624
625        merged
626    }
627
628    /// Get federation statistics
629    pub fn statistics(&self) -> FederationStats {
630        let stats = self.metrics.request_duration.get_stats();
631        FederationStats {
632            total_requests: self.metrics.successful_requests.get()
633                + self.metrics.failed_requests.get(),
634            successful_requests: self.metrics.successful_requests.get(),
635            failed_requests: self.metrics.failed_requests.get(),
636            cache_hits: self.metrics.cache_hits.get(),
637            cache_misses: self.metrics.cache_misses.get(),
638            average_request_duration: stats.mean,
639            total_results: self.metrics.results_received.get(),
640            active_endpoints: self.endpoint_health.len(),
641            healthy_endpoints: self
642                .endpoint_health
643                .iter()
644                .filter(|e| e.value().is_healthy())
645                .count(),
646        }
647    }
648}
649
650impl Clone for FederationExecutor {
651    fn clone(&self) -> Self {
652        Self {
653            config: self.config.clone(),
654            connection_pools: Arc::clone(&self.connection_pools),
655            result_cache: Arc::clone(&self.result_cache),
656            endpoint_health: Arc::clone(&self.endpoint_health),
657            metrics: Arc::clone(&self.metrics),
658            request_semaphore: Arc::clone(&self.request_semaphore),
659        }
660    }
661}
662
663/// Connection pool for a SPARQL endpoint
664pub struct ConnectionPool {
665    endpoint: String,
666    size: usize,
667    active_connections: parking_lot::Mutex<usize>,
668}
669
670impl ConnectionPool {
671    fn new(endpoint: String, size: usize) -> Self {
672        Self {
673            endpoint,
674            size,
675            active_connections: parking_lot::Mutex::new(0),
676        }
677    }
678
679    pub fn endpoint(&self) -> &str {
680        &self.endpoint
681    }
682
683    pub fn available(&self) -> usize {
684        let active = *self.active_connections.lock();
685        self.size.saturating_sub(active)
686    }
687}
688
689/// Cache key for query results
690#[derive(Debug, Clone, PartialEq, Eq, Hash)]
691struct QueryCacheKey {
692    endpoint: String,
693    query: String,
694}
695
696/// Cached query result
697struct CachedResult {
698    results: Solution,
699    timestamp: Instant,
700    ttl: Duration,
701}
702
703impl CachedResult {
704    fn is_expired(&self) -> bool {
705        self.timestamp.elapsed() > self.ttl
706    }
707}
708
709/// Endpoint health status
710#[derive(Debug, Clone)]
711pub struct EndpointHealth {
712    /// Is endpoint healthy
713    healthy: bool,
714    /// Last successful request time
715    last_success: Option<Instant>,
716    /// Last failure time
717    last_failure: Option<Instant>,
718    /// Consecutive failures
719    consecutive_failures: usize,
720    /// Average response time
721    avg_response_time: Duration,
722    /// Status message
723    status_message: String,
724}
725
726impl EndpointHealth {
727    fn new() -> Self {
728        Self {
729            healthy: true,
730            last_success: None,
731            last_failure: None,
732            consecutive_failures: 0,
733            avg_response_time: Duration::from_secs(0),
734            status_message: "OK".to_string(),
735        }
736    }
737
738    fn is_healthy(&self) -> bool {
739        self.healthy
740    }
741
742    fn record_success(&mut self, response_time: Duration) {
743        self.healthy = true;
744        self.last_success = Some(Instant::now());
745        self.consecutive_failures = 0;
746        self.avg_response_time = response_time;
747        self.status_message = "OK".to_string();
748    }
749
750    fn record_failure(&mut self) {
751        self.last_failure = Some(Instant::now());
752        self.consecutive_failures += 1;
753
754        // Mark unhealthy after 3 consecutive failures
755        if self.consecutive_failures >= 3 {
756            self.healthy = false;
757            self.status_message = format!("{} consecutive failures", self.consecutive_failures);
758        }
759    }
760}
761
762/// Federated subquery
763#[derive(Debug, Clone)]
764pub struct FederatedSubquery {
765    pub endpoint: Term,
766    pub pattern: Algebra,
767    pub silent: bool,
768    pub dependencies: Vec<Variable>,
769}
770
771/// Federation metrics
772struct FederationMetrics {
773    successful_requests: Counter,
774    failed_requests: Counter,
775    retried_requests: Counter,
776    cache_hits: Counter,
777    cache_misses: Counter,
778    request_duration: Timer,
779    results_received: Counter,
780}
781
782impl FederationMetrics {
783    fn new() -> Self {
784        Self {
785            successful_requests: Counter::new("federation.successful_requests".to_string()),
786            failed_requests: Counter::new("federation.failed_requests".to_string()),
787            retried_requests: Counter::new("federation.retried_requests".to_string()),
788            cache_hits: Counter::new("federation.cache_hits".to_string()),
789            cache_misses: Counter::new("federation.cache_misses".to_string()),
790            request_duration: Timer::new("federation.request_duration".to_string()),
791            results_received: Counter::new("federation.results_received".to_string()),
792        }
793    }
794}
795
796/// Federation statistics
797#[derive(Debug, Clone, Serialize, Deserialize)]
798pub struct FederationStats {
799    pub total_requests: u64,
800    pub successful_requests: u64,
801    pub failed_requests: u64,
802    pub cache_hits: u64,
803    pub cache_misses: u64,
804    pub average_request_duration: f64,
805    pub total_results: u64,
806    pub active_endpoints: usize,
807    pub healthy_endpoints: usize,
808}
809
810/// Endpoint capability information
811#[derive(Debug, Clone, Serialize, Deserialize)]
812pub struct EndpointCapabilities {
813    /// Endpoint URL
814    pub endpoint: String,
815    /// Supported SPARQL version
816    pub sparql_version: String,
817    /// Supported result formats
818    pub result_formats: Vec<String>,
819    /// Maximum query complexity
820    pub max_query_complexity: Option<usize>,
821    /// Supports federation
822    pub supports_federation: bool,
823    /// Supports RDF-star
824    pub supports_rdf_star: bool,
825    /// Available named graphs
826    pub named_graphs: Vec<String>,
827}
828
829/// Endpoint discovery service
830pub struct EndpointDiscovery {
831    /// Known endpoints
832    endpoints: Arc<DashMap<String, EndpointCapabilities>>,
833}
834
835impl EndpointDiscovery {
836    /// Create a new endpoint discovery service
837    pub fn new() -> Self {
838        Self {
839            endpoints: Arc::new(DashMap::new()),
840        }
841    }
842
843    /// Register an endpoint
844    pub fn register_endpoint(&self, capabilities: EndpointCapabilities) {
845        self.endpoints
846            .insert(capabilities.endpoint.clone(), capabilities);
847    }
848
849    /// Discover endpoint capabilities using SPARQL Service Description
850    pub async fn discover_endpoint(&self, endpoint: &str) -> Result<EndpointCapabilities> {
851        // Query the endpoint for service description
852        // Using SPARQL Service Description vocabulary from W3C
853        let service_description_query = r#"
854            PREFIX sd: <http://www.w3.org/ns/sparql-service-description#>
855            PREFIX void: <http://rdfs.org/ns/void#>
856
857            SELECT ?feature ?format ?version ?graph WHERE {
858                {
859                    ?service a sd:Service .
860                    OPTIONAL { ?service sd:feature ?feature }
861                    OPTIONAL { ?service sd:resultFormat ?format }
862                    OPTIONAL { ?service sd:languageExtension ?version }
863                    OPTIONAL { ?service sd:defaultDataset/sd:namedGraph/sd:name ?graph }
864                }
865            }
866        "#;
867
868        let client = reqwest::Client::builder()
869            .timeout(Duration::from_secs(30))
870            .build()?;
871
872        // Try to query for service description
873        let response_result = client
874            .post(endpoint)
875            .header("Accept", "application/sparql-results+json")
876            .header("Content-Type", "application/sparql-query")
877            .body(service_description_query)
878            .send()
879            .await;
880
881        let mut capabilities = EndpointCapabilities {
882            endpoint: endpoint.to_string(),
883            sparql_version: "1.1".to_string(),
884            result_formats: vec!["application/sparql-results+json".to_string()],
885            max_query_complexity: None,
886            supports_federation: false,
887            supports_rdf_star: false,
888            named_graphs: Vec::new(),
889        };
890
891        match response_result {
892            Ok(response) if response.status().is_success() => {
893                // Parse service description response
894                if let Ok(json) = response.json::<serde_json::Value>().await {
895                    if let Some(bindings) = json["results"]["bindings"].as_array() {
896                        let mut formats = Vec::new();
897                        let mut graphs = Vec::new();
898
899                        for binding in bindings {
900                            // Extract result formats
901                            if let Some(format) = binding["format"]["value"].as_str() {
902                                formats.push(format.to_string());
903                            }
904
905                            // Extract SPARQL version/features
906                            if let Some(feature) = binding["feature"]["value"].as_str() {
907                                if feature.contains("UnionDefaultGraph") {
908                                    capabilities.supports_federation = true;
909                                }
910                                if feature.contains("SPARQL-star") || feature.contains("RDFstar") {
911                                    capabilities.supports_rdf_star = true;
912                                }
913                                if feature.contains("1.2") {
914                                    capabilities.sparql_version = "1.2".to_string();
915                                }
916                            }
917
918                            // Extract named graphs
919                            if let Some(graph) = binding["graph"]["value"].as_str() {
920                                graphs.push(graph.to_string());
921                            }
922                        }
923
924                        if !formats.is_empty() {
925                            capabilities.result_formats = formats;
926                        }
927                        capabilities.named_graphs = graphs;
928                    }
929                }
930
931                tracing::info!(
932                    "Discovered endpoint capabilities for {}: version={}, formats={:?}, federation={}, rdf-star={}",
933                    endpoint,
934                    capabilities.sparql_version,
935                    capabilities.result_formats,
936                    capabilities.supports_federation,
937                    capabilities.supports_rdf_star
938                );
939            }
940            Ok(response) => {
941                tracing::warn!(
942                    "Service description query failed with status {}: {}",
943                    response.status(),
944                    response.text().await.unwrap_or_default()
945                );
946            }
947            Err(e) => {
948                tracing::warn!(
949                    "Failed to query service description from {}: {}. Using default capabilities.",
950                    endpoint,
951                    e
952                );
953            }
954        }
955
956        // Try a simple ASK query to verify endpoint is responsive
957        let test_query = "ASK { ?s ?p ?o }";
958        if let Ok(response) = client
959            .post(endpoint)
960            .header("Accept", "application/sparql-results+json")
961            .header("Content-Type", "application/sparql-query")
962            .body(test_query)
963            .send()
964            .await
965        {
966            if response.status().is_success() {
967                tracing::debug!("Endpoint {} is responsive", endpoint);
968            }
969        }
970
971        Ok(capabilities)
972    }
973
974    /// Find endpoints matching criteria
975    pub fn find_endpoints(&self, criteria: EndpointCriteria) -> Vec<EndpointCapabilities> {
976        self.endpoints
977            .iter()
978            .filter(|entry| {
979                let caps = entry.value();
980                (criteria.supports_federation.is_none()
981                    || criteria.supports_federation == Some(caps.supports_federation))
982                    && (criteria.supports_rdf_star.is_none()
983                        || criteria.supports_rdf_star == Some(caps.supports_rdf_star))
984            })
985            .map(|entry| entry.value().clone())
986            .collect()
987    }
988}
989
990impl Default for EndpointDiscovery {
991    fn default() -> Self {
992        Self::new()
993    }
994}
995
996/// Endpoint search criteria
997#[derive(Debug, Clone, Default)]
998pub struct EndpointCriteria {
999    pub supports_federation: Option<bool>,
1000    pub supports_rdf_star: Option<bool>,
1001    pub min_sparql_version: Option<String>,
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007    use crate::algebra::TriplePattern;
1008
1009    #[test]
1010    fn test_federation_config_default() {
1011        let config = FederationConfig::default();
1012        assert_eq!(config.max_concurrent_requests, 32);
1013        assert_eq!(config.max_retries, 3);
1014        assert!(config.enable_caching);
1015    }
1016
1017    #[test]
1018    fn test_endpoint_health() {
1019        let mut health = EndpointHealth::new();
1020        assert!(health.is_healthy());
1021
1022        health.record_failure();
1023        assert!(health.is_healthy());
1024
1025        health.record_failure();
1026        health.record_failure();
1027        assert!(!health.is_healthy());
1028
1029        health.record_success(Duration::from_millis(100));
1030        assert!(health.is_healthy());
1031    }
1032
1033    #[test]
1034    fn test_endpoint_discovery() {
1035        let discovery = EndpointDiscovery::new();
1036
1037        let caps = EndpointCapabilities {
1038            endpoint: "http://example.org/sparql".to_string(),
1039            sparql_version: "1.1".to_string(),
1040            result_formats: vec!["application/sparql-results+json".to_string()],
1041            max_query_complexity: None,
1042            supports_federation: true,
1043            supports_rdf_star: false,
1044            named_graphs: Vec::new(),
1045        };
1046
1047        discovery.register_endpoint(caps.clone());
1048
1049        let found = discovery.find_endpoints(EndpointCriteria {
1050            supports_federation: Some(true),
1051            ..Default::default()
1052        });
1053
1054        assert_eq!(found.len(), 1);
1055        assert_eq!(found[0].endpoint, caps.endpoint);
1056    }
1057
1058    #[tokio::test]
1059    async fn test_federation_executor() {
1060        let config = FederationConfig::default();
1061        let executor = FederationExecutor::new(config);
1062
1063        let stats = executor.statistics();
1064        assert_eq!(stats.total_requests, 0);
1065        assert_eq!(stats.active_endpoints, 0);
1066    }
1067
1068    #[test]
1069    fn test_algebra_to_sparql_bgp() {
1070        let executor = FederationExecutor::new(FederationConfig::default());
1071
1072        // Create a simple BGP
1073        let patterns = vec![TriplePattern {
1074            subject: Term::Variable(Variable::new_unchecked("s")),
1075            predicate: Term::Iri(crate::algebra::Iri::new_unchecked(
1076                "http://example.org/predicate",
1077            )),
1078            object: Term::Variable(Variable::new_unchecked("o")),
1079        }];
1080
1081        let algebra = Algebra::Bgp(patterns);
1082        let sparql = executor.algebra_to_sparql(&algebra).unwrap();
1083
1084        assert!(sparql.contains("SELECT * WHERE"));
1085        assert!(sparql.contains("?s"));
1086        assert!(sparql.contains("<http://example.org/predicate>"));
1087        assert!(sparql.contains("?o"));
1088    }
1089
1090    #[test]
1091    fn test_parse_sparql_json_results() {
1092        let executor = FederationExecutor::new(FederationConfig::default());
1093
1094        let json_str = r#"{
1095            "results": {
1096                "bindings": [
1097                    {
1098                        "s": {
1099                            "type": "uri",
1100                            "value": "http://example.org/subject1"
1101                        },
1102                        "o": {
1103                            "type": "literal",
1104                            "value": "object1"
1105                        }
1106                    }
1107                ]
1108            }
1109        }"#;
1110
1111        let json: serde_json::Value = serde_json::from_str(json_str).unwrap();
1112        let results = executor.parse_sparql_json_results(&json).unwrap();
1113
1114        assert_eq!(results.len(), 1);
1115        assert_eq!(results[0].len(), 2); // s, o
1116    }
1117
1118    #[test]
1119    fn test_parse_sparql_json_term_types() {
1120        let executor = FederationExecutor::new(FederationConfig::default());
1121
1122        // Test URI
1123        let uri_json = serde_json::json!({
1124            "type": "uri",
1125            "value": "http://example.org/resource"
1126        });
1127        let term = executor.parse_sparql_json_term(&uri_json).unwrap();
1128        matches!(term, Term::Iri(_));
1129
1130        // Test literal
1131        let literal_json = serde_json::json!({
1132            "type": "literal",
1133            "value": "test value"
1134        });
1135        let term = executor.parse_sparql_json_term(&literal_json).unwrap();
1136        match term {
1137            Term::Literal(lit) => assert_eq!(lit.value, "test value"),
1138            _ => panic!("Expected Literal term"),
1139        }
1140
1141        // Test blank node
1142        let bnode_json = serde_json::json!({
1143            "type": "bnode",
1144            "value": "b0"
1145        });
1146        let term = executor.parse_sparql_json_term(&bnode_json).unwrap();
1147        match term {
1148            Term::BlankNode(id) => assert_eq!(id, "b0"),
1149            _ => panic!("Expected BlankNode term"),
1150        }
1151    }
1152
1153    #[test]
1154    fn test_merge_results() {
1155        let executor = FederationExecutor::new(FederationConfig::default());
1156
1157        let var_s = Variable::new_unchecked("s");
1158
1159        let mut binding1 = Binding::new();
1160        binding1.insert(
1161            var_s.clone(),
1162            Term::Iri(crate::algebra::Iri::new_unchecked("http://example.org/s1")),
1163        );
1164
1165        let mut binding2 = Binding::new();
1166        binding2.insert(
1167            var_s.clone(),
1168            Term::Iri(crate::algebra::Iri::new_unchecked("http://example.org/s2")),
1169        );
1170
1171        let solution1 = vec![binding1.clone()];
1172        let solution2 = vec![binding2, binding1.clone()]; // Contains duplicate
1173
1174        let merged = executor.merge_results(vec![solution1, solution2]);
1175
1176        // Should eliminate duplicates
1177        assert_eq!(merged.len(), 2); // Only 2 unique bindings
1178    }
1179}