oxirs_federate/
lib.rs

1//! # OxiRS Federation - Federated Query Engine
2//!
3//! [![Version](https://img.shields.io/badge/version-0.1.0-blue)](https://github.com/cool-japan/oxirs/releases)
4//! [![docs.rs](https://docs.rs/oxirs-federate/badge.svg)](https://docs.rs/oxirs-federate)
5//!
6//! **Status**: Production Release (v0.1.0)
7//! **Stability**: Public APIs are stable. Production-ready with comprehensive testing.
8//!
9//! Federated query processing capabilities for SPARQL and GraphQL with service discovery,
10//! query decomposition, result integration, and fault tolerance.
11
12#![allow(ambiguous_glob_reexports)]
13//!
14//! # Features
15//!
16//! - SPARQL SERVICE planner and executor for federated SPARQL queries
17//! - GraphQL schema stitching and federation
18//! - Service discovery and capability detection
19//! - Query decomposition and optimization across multiple sources
20//! - Result integration with fault tolerance and partial result handling
21//! - Load balancing and performance monitoring
22//!
23//! # Architecture
24//!
25//! The federation engine consists of several key components:
26//!
27//! - `ServiceRegistry`: Manages available federated services and their capabilities
28//! - `QueryPlanner`: Decomposes queries across multiple services
29//! - `Executor`: Executes federated queries with parallel processing
30//! - `ResultIntegrator`: Combines results from multiple sources
31//! - `FaultHandler`: Manages service failures and retries
32
33use anyhow::{anyhow, Result};
34use oxirs_core::Term;
35use serde::{Deserialize, Serialize};
36use std::collections::HashMap;
37
38use crate::executor::types::{QuotedTripleValue, RdfTerm};
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::RwLock;
42use tracing::{info, warn};
43// PlannerConfig will be imported via planner module re-exports
44
45// Import types needed for the main struct
46use crate::{
47    auto_discovery::{AutoDiscovery, AutoDiscoveryConfig, DiscoveredEndpoint},
48    cache::{CacheConfig, CacheStats, FederationCache, QueryResultCache},
49    capability_assessment::{AssessmentResult, CapabilityAssessor},
50    discovery::ServiceDiscovery,
51    executor::{FederatedExecutor, FederatedExecutorConfig},
52    graphql::{GraphQLFederation, GraphQLFederationConfig},
53    integration::{ResultIntegrator, ResultIntegratorConfig},
54    monitoring::{FederationMonitor, FederationMonitorConfig, MonitorStats},
55    planner::{PlannerConfig, QueryPlanner},
56    service_registry::{RegistryConfig, RegistryStats, ServiceRegistry},
57    vector_similarity_federation::{
58        VectorFederationConfig, VectorFederationStats, VectorServiceMetadata,
59        VectorSimilarityFederation,
60    },
61};
62
63pub mod adaptive_load_balancer;
64pub mod advanced_anomaly_detection;
65pub mod advanced_benchmarking;
66pub mod advanced_consensus;
67pub mod advanced_enterprise_features;
68pub mod advanced_ml_optimizer;
69pub mod advanced_query_optimizer;
70pub mod advanced_security_hardening;
71pub mod advanced_semantic_features;
72pub mod advanced_visualization;
73pub mod anomaly_detection;
74pub mod auth;
75pub mod auto_discovery;
76pub mod automl_pipeline;
77pub mod cache;
78pub mod capability_assessment;
79pub mod cdc;
80pub mod cloud_cost_optimizer;
81pub mod connection_pool_manager;
82pub mod discovery;
83pub mod distributed_consensus;
84pub mod distributed_ml_trainer;
85pub mod distributed_tracing;
86pub mod distributed_transactions;
87pub mod executor;
88pub mod external_ml_integration;
89pub mod gpu_accelerated_query;
90pub mod graph_algorithms;
91pub mod graphql;
92pub mod integration;
93pub mod jit_query_compiler;
94pub mod join_optimizer;
95pub mod k8s_discovery;
96pub mod materialized_views;
97pub mod memory_efficient_datasets;
98pub mod metadata;
99pub mod ml_model_serving;
100pub mod ml_optimizer;
101pub mod monitoring;
102pub mod multi_level_federation;
103pub mod nats_federation;
104pub mod network_optimizer;
105pub mod optimization_cache;
106pub mod performance_analyzer;
107pub mod performance_benchmarks;
108pub mod planner;
109pub mod privacy;
110pub mod production_hardening;
111pub mod profiling_metrics;
112pub mod query_decomposition;
113pub mod query_plan_explainer;
114pub mod request_batcher;
115pub mod result_streaming;
116pub mod schema_alignment;
117pub mod semantic_enhancer;
118pub mod semantic_reasoner;
119pub mod service;
120pub mod service_client;
121pub mod service_executor;
122pub mod service_optimizer;
123pub mod service_registry;
124pub mod simd_optimized_joins;
125pub mod source_selection;
126pub mod streaming;
127pub mod streaming_optimizer;
128pub mod test_infrastructure;
129pub mod vector_similarity_federation;
130
131// Minimal imports to ensure compilation - only core types
132// Re-enabled specific non-duplicate exports after fixing ServiceRegistry conflicts
133// AutoDiscovery and CacheConfig already imported above, skipping duplicates
134pub use adaptive_load_balancer::AdaptiveLoadBalancer;
135pub use advanced_query_optimizer::{
136    AdvancedOptimizerConfig, AdvancedQueryOptimizer, HardwareProfile, OptimizedPlan, QueryPlan,
137    TrainingExample,
138};
139// Advanced ML Optimizer (v0.1.0 advanced features)
140pub use advanced_ml_optimizer::{
141    ActivationType, AdvancedMLConfig, AdvancedMLOptimizer, AutoML, DeepCardinalityEstimator,
142    ExplainableAI, JoinType, NeuralArchitectureSearch, OnlineLearningManager, RLJoinOptimizer,
143    TrainingEpoch, TransferLearningManager,
144};
145// Advanced Benchmarking (v0.1.0 advanced features)
146pub use advanced_benchmarking::{
147    AdvancedBenchmarkConfig, AdvancedBenchmarkSuite, BenchmarkResult, CustomBenchmarkConfig,
148    CustomBenchmarkGenerator, LUBMSuite, RegressionDetectionResult, SP2BenchSuite,
149    ScalabilityTestResult, StressTestResult, WatDivSuite, WorkloadCharacterization,
150};
151// Advanced Semantic Features (v0.1.0 advanced features)
152pub use advanced_semantic_features::{
153    AdvancedSemanticConfig, AdvancedSemanticFeatures, AutoMapping, AutoMappingGenerator,
154    ChangeType, ConceptRelationship, DeepOntologyMatcher, Entity, EntityMatch, EntityResolver,
155    MappingType, MatchType, MultiLingualConcept, MultiLingualSchemaManager, OntologyConcept,
156    OntologyMatch, RelationType, SchemaChange, SchemaEvolutionTracker, SchemaVersion,
157};
158// Advanced Anomaly Detection (v0.1.0 advanced features)
159pub use advanced_anomaly_detection::{
160    AdvancedAnomalyConfig, AdvancedAnomalyDetection, HealingAction, HealingActionType,
161    IsolationForest, IssueType, LSTMPredictor, MaintenancePriority, MaintenanceTask,
162    PredictiveMaintenanceScheduler, RootCause, RootCauseAnalyzer, SelfHealingEngine,
163    Severity as AnomalySeverity,
164};
165// Advanced Consensus (v0.1.0 advanced features)
166pub use advanced_consensus::{
167    AdvancedConsensusSystem, ByzantineFaultTolerance, DistributedLock, GCounter,
168    NetworkPartitionDetector, PNCounter, VectorClock,
169};
170// Advanced Enterprise Features (v0.1.0 advanced features)
171pub use advanced_enterprise_features::{
172    AdvancedEnterpriseFeatures, AuditLogEntry, AuditLogger, AuditResult, DataLineageTracker,
173    DataSubject, DeletionRequest, EdgeComputingManager, EdgeNode, EdgeNodeStatus,
174    GDPRComplianceManager, GeoLocation, GeographicQueryRouter, LineageNode, MultiTenancyConfig,
175    MultiTenancyManager, PrivacyPreservingFederation, QuantumResistantSecurity, ResourceQuota,
176    ResourceUsage, Tenant,
177};
178// Advanced Security Hardening (v0.2.0 Phase 3)
179pub use advanced_security_hardening::{
180    AdvancedRateLimiter, AdvancedSecurityHardening, AuditConfig, AuditEvent, AuditEventType,
181    AuthMethod, AuthSession, AuthenticationManager, ComplianceChecker, ComplianceFramework,
182    ComplianceStatus, EncryptionConfig, EncryptionKey, EncryptionManager, IdsConfig, IdsResult,
183    IntrusionDetectionSystem, MtlsCertificate, OAuth2Provider, OidcProvider, RateLimitConfig,
184    SecurityAlert, SecurityCheckResult, SecurityConfig, SecurityContext, ThreatCategory,
185    ThreatSeverity, ThreatSignature, TrustScore, UserInfo, Vulnerability, VulnerabilityScanResult,
186    VulnerabilityScanner, VulnerabilitySeverity, ZeroTrustConfig, ZeroTrustController,
187};
188// Advanced Visualization & Dashboarding (v0.2.0 Phase 3)
189pub use advanced_visualization::{
190    AdvancedVisualization, AggregationType, Alert, AlertGrouping, AlertSeverity, AlertTimeline,
191    AlertVisualizer, ChartData, ChartGenerator, ChartSeries, ChartTheme, ColorScale, CustomTheme,
192    Dashboard, DashboardLayout, DataSource, EdgeType, ExportFormat, HeatmapCell, HeatmapData,
193    LayoutAlgorithm, MetricAggregation, MetricsCollector, NodeStatus, NodeType, PieChartData,
194    PieSlice, PositionedNode, TimeSeries, TopologyEdge, TopologyNode, TopologyVisualization,
195    TopologyVisualizer, VisualizationConfig, VisualizationType, Widget, WidgetConfig,
196    WidgetPosition, WidgetSize, WidgetType,
197};
198// Re-export DataPoint from advanced_visualization as VizDataPoint to avoid conflict
199pub use advanced_visualization::DataPoint as VizDataPoint;
200// v0.2.0 Performance & Scalability Features
201pub use anomaly_detection::{
202    AnomalyAlert, AnomalyDetector, AnomalyDetectorConfig, DataPoint, Severity, Trend, TrendAnalysis,
203};
204pub use auth::AuthManager;
205pub use automl_pipeline::{
206    ArchitectureCandidate, AutoMLConfig, AutoMLPipeline, HyperparameterConfig, LayerType,
207    MetaLearningTask, OptimizationStatistics, SearchSpace, TrialMetrics, TrialResult,
208};
209pub use cloud_cost_optimizer::{
210    CloudCostOptimizer, CloudInstance, CloudProvider, CostOptimizerConfig, CostRecommendation,
211    CostTracking, DeploymentConfig, InstanceStatus, OptimizationStrategy, RecommendationPriority,
212    RecommendationType, RoutingDecision, WorkloadMetrics,
213};
214pub use connection_pool_manager::ConnectionPoolManager;
215pub use distributed_ml_trainer::{
216    AggregationStrategy, DistributedMLConfig, DistributedMLTrainer, TrainingMetrics, TrainingMode,
217    WorkerInfo, WorkerMetrics, WorkerStatus,
218};
219pub use distributed_tracing::TracingConfig;
220pub use distributed_transactions::{
221    DistributedTransactionCoordinator, IsolationLevel, Operation, OperationState, OperationType,
222    Participant, ParticipantState, SagaLog, SagaStep, SagaStepState, Transaction,
223    TransactionConfig, TransactionProtocol, TransactionResult, TransactionState,
224};
225pub use executor::ExecutionStatus; // ExecutionMetrics not exported from executor
226                                   // External ML Frameworks Integration (v0.2.0 Phase 3)
227pub use external_ml_integration::{
228    ConversionResult, DataType, ExternalMLIntegration, FrameworkAdapter, FrameworkCapabilities,
229    HuggingFaceAdapter, InferenceBackend, InferenceEngine, MLFramework, MLIntegrationConfig,
230    MLModel, MLTaskType, MockModel, ModelConverter, ModelFormat, ModelMetadata, ModelRegistry,
231    ModelVersion, ONNXAdapter, OptimizationConfig, OptimizationResult, PerformanceMetrics,
232    PyTorchAdapter, RegisteredModel, TensorFlowAdapter, TensorSpec,
233};
234pub use gpu_accelerated_query::{
235    FilterCondition, FilterOperator, GpuAccelerationConfig, GpuBackendType, GpuJoinProcessor,
236    GpuProcessingResult, GpuProcessingStats, GpuQueryProcessor, QueryBatch,
237};
238pub use graph_algorithms::{
239    AStar, BellmanFord, CentralityAnalyzer, ConnectivityAnalyzer, Dijkstra, Edge, FederationGraph,
240    FloydWarshall, PrimMST, ShortestPathResult,
241};
242pub use jit_query_compiler::{
243    CompiledQuery, ExecutionMode, JitCompilationConfig, JitCompilationStats, JitQueryCompiler,
244    JitQueryOptimizer, OptimizationRule,
245};
246pub use ml_model_serving::{
247    ABTestConfig, ABTestResults, MLModelServing, ModelMetrics, ModelServingConfig, ModelStatus,
248    ModelType, QueryTransformerModel, TransformerConfig,
249};
250// Re-export ModelVersion from ml_model_serving as MLModelVersion to avoid conflict
251pub use ml_model_serving::ModelVersion as MLModelVersion;
252pub use simd_optimized_joins::{JoinAlgorithm, JoinStatistics, SimdJoinConfig, SimdJoinProcessor};
253// Import from graphql module - minimal types to avoid conflicts
254// pub use graphql::GraphQLFederation;
255
256// More specific imports to avoid conflicts - conservative approach
257// Re-enabled after fixing module exports - only exports that actually exist
258// pub use integration::ResultIntegratorConfig; // Already imported above
259pub use ml_optimizer::MLOptimizer;
260pub use multi_level_federation::{
261    FederationCapability, FederationMetrics, FederationNode, MultiLevelConfig,
262    MultiLevelFederation, TopologyOptimizationResult, TopologyStats,
263};
264pub use network_optimizer::NetworkOptimizer;
265pub use optimization_cache::OptimizationCache;
266pub use production_hardening::{
267    CircuitBreakerState, ComplexityResult, HardeningConfig, HardeningStatistics,
268    ProductionHardening, QueryRequest as HardeningQueryRequest, ValidationResult,
269};
270pub use schema_alignment::{
271    Alignment, AlignmentConfig, AlignmentResult, AlignmentType, ClassMetadata, MappingExample,
272    PropertyMetadata, SchemaAligner, VocabularyMetadata,
273};
274pub use semantic_reasoner::{
275    InconsistencyReport, InconsistencyType, ReasonerConfig, SemanticReasoner, Triple,
276};
277// Re-enabled: JoinOptimizer now properly implemented
278pub use join_optimizer::JoinOptimizer;
279// pub use k8s_discovery::KubernetesDiscovery;
280// pub use materialized_views::MaterializedView;
281// pub use metadata::MetadataConfig;
282// pub use monitoring::MonitoringService;
283// pub use nats_federation::NatsFederation;
284// pub use performance_analyzer::PerformanceAnalyzer;
285// Import from planner module excluding GraphQLFederationConfig to avoid conflict
286pub use planner::{
287    EntityResolutionPlan, EntityResolutionStep, ExecutionContext, ExecutionPlan, ExecutionStep,
288    FederatedQueryPlanner, FederatedSchema, FilterExpression, GraphQLOperationType,
289    HistoricalPerformance, ParsedQuery, QueryInfo, QueryType, ReoptimizationAnalysis, RetryConfig,
290    ServiceQuery, StepType, TriplePattern, UnifiedSchema,
291};
292pub use privacy::*;
293// Re-enabled query decomposition exports
294pub use query_decomposition::advanced_pattern_analysis::{
295    AdvancedPatternAnalyzer, ComplexityAssessment, OptimizationOpportunity, PatternAnalysisResult,
296    ServiceRecommendation,
297};
298pub use query_decomposition::types::QueryDecomposer;
299pub use query_plan_explainer::{
300    ExplainFormat, OptimizationSuggestion, PlanExplanation, QueryPlanExplainer, StepExplanation,
301    SuggestionCategory, SuggestionSeverity,
302};
303pub use request_batcher::RequestBatcher;
304pub use result_streaming::ResultStreamingManager;
305pub use semantic_enhancer::SemanticEnhancer;
306// Export main service types (from service.rs)
307pub use service::{
308    AuthCredentials, FederatedService, ServiceAuthConfig, ServiceCapability, ServicePerformance,
309    ServiceType,
310};
311// Re-enabled service implementation exports
312pub use service_client::ServiceClient;
313pub use service_executor::ServiceExecutor;
314pub use service_optimizer::ServiceOptimizer;
315// Export specific types from service_registry (non-conflicting types only)
316pub use service_registry::{
317    GraphQLService, HealthStatus as ServiceHealthStatus, ServiceCapabilities, SparqlEndpoint,
318};
319// pub use source_selection::SourceSelector;
320// pub use streaming::StreamProcessor;
321// pub use streaming_optimizer::StreamOptimizer;
322// pub use test_infrastructure::TestRunner;
323// pub use vector_similarity_federation::VectorFederation;
324
325/// Main federation engine that coordinates all federated query processing
326#[derive(Debug, Clone)]
327pub struct FederationEngine {
328    /// Registry of available services
329    service_registry: Arc<RwLock<ServiceRegistry>>,
330    /// Query planner for service selection and decomposition
331    query_planner: Arc<QueryPlanner>,
332    /// Execution engine for federated queries
333    executor: Arc<FederatedExecutor>,
334    /// Result integration engine
335    integrator: Arc<ResultIntegrator>,
336    /// GraphQL federation manager
337    graphql_federation: Arc<GraphQLFederation>,
338    /// Performance monitoring
339    monitor: Arc<FederationMonitor>,
340    /// Advanced caching system
341    cache: Arc<FederationCache>,
342    /// Automatic service discovery
343    auto_discovery: Arc<RwLock<Option<AutoDiscovery>>>,
344    /// Vector similarity federation
345    vector_federation: Arc<RwLock<Option<VectorSimilarityFederation>>>,
346}
347
348impl FederationEngine {
349    /// Create a new federation engine with default configuration
350    pub fn new() -> Self {
351        let service_registry = Arc::new(RwLock::new(ServiceRegistry::new()));
352        let query_planner = Arc::new(QueryPlanner::new());
353        let executor = Arc::new(FederatedExecutor::new());
354        let integrator = Arc::new(ResultIntegrator::new());
355        let graphql_federation = Arc::new(GraphQLFederation::new());
356        let monitor = Arc::new(FederationMonitor::new());
357        let cache = Arc::new(FederationCache::new());
358
359        Self {
360            service_registry,
361            query_planner,
362            executor,
363            integrator,
364            graphql_federation,
365            monitor,
366            cache,
367            auto_discovery: Arc::new(RwLock::new(None)),
368            vector_federation: Arc::new(RwLock::new(None)),
369        }
370    }
371
372    /// Create a new federation engine with custom configuration
373    pub fn with_config(config: FederationConfig) -> Self {
374        let service_registry = Arc::new(RwLock::new(ServiceRegistry::with_config(
375            config.registry_config,
376        )));
377        let query_planner = Arc::new(QueryPlanner::with_config(config.planner_config));
378        let executor = Arc::new(FederatedExecutor::with_config(config.executor_config));
379        let integrator = Arc::new(ResultIntegrator::with_config(config.integrator_config));
380        let graphql_federation = Arc::new(GraphQLFederation::with_config(config.graphql_config));
381        let monitor = Arc::new(FederationMonitor::with_config(config.monitor_config));
382        let cache = Arc::new(FederationCache::with_config(config.cache_config));
383
384        Self {
385            service_registry,
386            query_planner,
387            executor,
388            integrator,
389            graphql_federation,
390            monitor,
391            cache,
392            auto_discovery: Arc::new(RwLock::new(None)),
393            vector_federation: Arc::new(RwLock::new(None)),
394        }
395    }
396
397    /// Register a new federated service
398    pub async fn register_service(&self, service: FederatedService) -> Result<()> {
399        let registry = self.service_registry.write().await;
400        registry.register(service).await
401    }
402
403    /// Unregister a federated service
404    pub async fn unregister_service(&self, service_id: &str) -> Result<()> {
405        let registry = self.service_registry.write().await;
406        registry.unregister(service_id).await
407    }
408
409    /// Execute a federated SPARQL query
410    pub async fn execute_sparql(&self, query: &str) -> Result<FederatedResult> {
411        let start_time = Instant::now();
412
413        // Parse and analyze the query
414        let query_info = self.query_planner.analyze_sparql(query).await?;
415
416        // Generate cache key
417        let cache_key = self.cache.generate_query_key(&query_info);
418
419        // Check cache first
420        if let Some(cached_result) = self.cache.get_query_result(&cache_key).await {
421            let execution_time = start_time.elapsed();
422
423            // Record cache hit
424            self.monitor.record_cache_hit("query_cache", true).await;
425            self.monitor
426                .record_query_execution("sparql", execution_time, true)
427                .await;
428
429            return match cached_result {
430                QueryResultCache::Sparql(sparql_results) => {
431                    // Convert back to FederatedResult
432                    let result_bindings: Vec<HashMap<String, oxirs_core::Term>> = sparql_results
433                        .results
434                        .bindings
435                        .into_iter()
436                        .map(|binding| {
437                            // Convert SparqlBinding to HashMap<String, Term>
438                            binding
439                                .into_iter()
440                                .filter_map(|(var, sparql_value)| {
441                                    match sparql_value.value_type.as_str() {
442                                        "uri" => {
443                                            if let Ok(iri) =
444                                                oxirs_core::NamedNode::new(&sparql_value.value)
445                                            {
446                                                Some((var, oxirs_core::Term::NamedNode(iri)))
447                                            } else {
448                                                None
449                                            }
450                                        }
451                                        "literal" => {
452                                            if let Some(datatype_str) = sparql_value.datatype {
453                                                if let Ok(datatype) =
454                                                    oxirs_core::NamedNode::new(&datatype_str)
455                                                {
456                                                    Some((
457                                                        var,
458                                                        oxirs_core::Term::Literal(
459                                                            oxirs_core::Literal::new_typed(
460                                                                &sparql_value.value,
461                                                                datatype,
462                                                            ),
463                                                        ),
464                                                    ))
465                                                } else {
466                                                    Some((
467                                                        var,
468                                                        oxirs_core::Term::Literal(
469                                                            oxirs_core::Literal::new(
470                                                                &sparql_value.value,
471                                                            ),
472                                                        ),
473                                                    ))
474                                                }
475                                            } else if let Some(lang) = sparql_value.lang {
476                                                if let Ok(literal) = oxirs_core::Literal::new_lang(
477                                                    &sparql_value.value,
478                                                    &lang,
479                                                ) {
480                                                    Some((var, oxirs_core::Term::Literal(literal)))
481                                                } else {
482                                                    Some((
483                                                        var,
484                                                        oxirs_core::Term::Literal(
485                                                            oxirs_core::Literal::new(
486                                                                &sparql_value.value,
487                                                            ),
488                                                        ),
489                                                    ))
490                                                }
491                                            } else {
492                                                Some((
493                                                    var,
494                                                    oxirs_core::Term::Literal(
495                                                        oxirs_core::Literal::new(
496                                                            &sparql_value.value,
497                                                        ),
498                                                    ),
499                                                ))
500                                            }
501                                        }
502                                        "bnode" => {
503                                            if let Ok(bnode) =
504                                                oxirs_core::BlankNode::new(&sparql_value.value)
505                                            {
506                                                Some((var, oxirs_core::Term::BlankNode(bnode)))
507                                            } else {
508                                                None
509                                            }
510                                        }
511                                        _ => None,
512                                    }
513                                })
514                                .collect()
515                        })
516                        .collect();
517
518                    Ok(FederatedResult {
519                        data: QueryResult::Sparql(result_bindings),
520                        metadata: ExecutionMetadata {
521                            execution_time,
522                            services_used: 0, // From cache
523                            subqueries_executed: 0,
524                            cache_hit: true,
525                            plan_summary: "Cached result".to_string(),
526                        },
527                        errors: vec![],
528                    })
529                }
530                _ => {
531                    // Invalid cache entry type
532                    self.cache.remove(&cache_key).await;
533                    return Err(anyhow!("Invalid cached result type for SPARQL query"));
534                }
535            };
536        }
537
538        // Cache miss - execute normally
539        self.monitor.record_cache_hit("query_cache", false).await;
540
541        // Plan the federated execution
542        let registry = self.service_registry.read().await;
543        let execution_plan = self
544            .query_planner
545            .plan_sparql(&query_info, &registry)
546            .await?;
547        drop(registry);
548
549        // Execute the plan
550        let partial_results = self.executor.execute_plan(&execution_plan).await?;
551
552        // Integrate results
553        let final_result = self
554            .integrator
555            .integrate_sparql_results(partial_results)
556            .await?;
557
558        // Cache the result if successful
559        if final_result.is_success() {
560            if let QueryResult::Sparql(ref result_bindings) = final_result.data {
561                // Convert to cacheable format
562                let sparql_bindings: Vec<crate::executor::SparqlBinding> = result_bindings
563                    .iter()
564                    .map(|binding| {
565                        binding
566                            .iter()
567                            .map(|(var, term)| {
568                                let sparql_value = match term {
569                                    oxirs_core::Term::NamedNode(node) => {
570                                        crate::executor::SparqlValue {
571                                            value_type: "uri".to_string(),
572                                            value: node.to_string(),
573                                            datatype: None,
574                                            lang: None,
575                                            quoted_triple: None,
576                                        }
577                                    }
578                                    oxirs_core::Term::Literal(literal) => {
579                                        if let Some(lang) = literal.language() {
580                                            crate::executor::SparqlValue {
581                                                value_type: "literal".to_string(),
582                                                value: literal.value().to_string(),
583                                                datatype: None,
584                                                lang: Some(lang.to_string()),
585                                                quoted_triple: None,
586                                            }
587                                        } else {
588                                            crate::executor::SparqlValue {
589                                                value_type: "literal".to_string(),
590                                                value: literal.value().to_string(),
591                                                datatype: Some(literal.datatype().to_string()),
592                                                lang: None,
593                                                quoted_triple: None,
594                                            }
595                                        }
596                                    }
597                                    oxirs_core::Term::BlankNode(bnode) => {
598                                        crate::executor::SparqlValue {
599                                            value_type: "bnode".to_string(),
600                                            value: bnode.to_string(),
601                                            datatype: None,
602                                            lang: None,
603                                            quoted_triple: None,
604                                        }
605                                    }
606                                    oxirs_core::Term::Variable(var) => {
607                                        crate::executor::SparqlValue {
608                                            value_type: "variable".to_string(),
609                                            value: var.to_string(),
610                                            datatype: None,
611                                            lang: None,
612                                            quoted_triple: None,
613                                        }
614                                    }
615                                    oxirs_core::Term::QuotedTriple(triple) => {
616                                        // Convert oxirs_core terms to our RDF term representation
617                                        let subject = convert_subject_to_rdf_term(triple.subject());
618                                        let predicate =
619                                            convert_predicate_to_rdf_term(triple.predicate());
620                                        let object = convert_object_to_rdf_term(triple.object());
621
622                                        crate::executor::SparqlValue::quoted_triple(
623                                            subject, predicate, object,
624                                        )
625                                    }
626                                };
627                                (var.clone(), sparql_value)
628                            })
629                            .collect()
630                    })
631                    .collect();
632
633                let cached_result = crate::executor::SparqlResults {
634                    head: crate::executor::SparqlHead {
635                        vars: result_bindings
636                            .first()
637                            .map(|binding| binding.keys().cloned().collect())
638                            .unwrap_or_default(),
639                    },
640                    results: crate::executor::SparqlResultsData {
641                        bindings: sparql_bindings,
642                    },
643                };
644
645                // Cache with default TTL
646                self.cache
647                    .put_query_result(&cache_key, QueryResultCache::Sparql(cached_result), None)
648                    .await;
649            }
650        }
651
652        // Record metrics
653        let execution_time = start_time.elapsed();
654        self.monitor
655            .record_query_execution("sparql", execution_time, final_result.is_success())
656            .await;
657
658        Ok(final_result)
659    }
660
661    /// Execute a federated GraphQL query
662    pub async fn execute_graphql(
663        &self,
664        query: &str,
665        variables: Option<serde_json::Value>,
666    ) -> Result<FederatedResult> {
667        let start_time = Instant::now();
668
669        // Parse and analyze the GraphQL query
670        let query_info = self
671            .query_planner
672            .analyze_graphql(query, variables.as_ref())
673            .await?;
674
675        // Plan the federated execution
676        let registry = self.service_registry.read().await;
677        let execution_plan = self
678            .query_planner
679            .plan_graphql(&query_info, &registry)
680            .await?;
681        drop(registry);
682
683        // Execute the plan using GraphQL federation
684        let partial_results = self
685            .graphql_federation
686            .execute_federated(&execution_plan)
687            .await?;
688
689        // Integrate results
690        let final_result = self
691            .integrator
692            .integrate_graphql_results(partial_results)
693            .await?;
694
695        // Record metrics
696        let execution_time = start_time.elapsed();
697        self.monitor
698            .record_query_execution("graphql", execution_time, final_result.is_success())
699            .await;
700
701        Ok(final_result)
702    }
703
704    /// Get federation statistics and health information
705    pub async fn get_stats(&self) -> Result<FederationStats> {
706        let registry_stats = {
707            let registry = self.service_registry.read().await;
708            registry.get_stats().await
709        };
710
711        let monitor_stats = self.monitor.get_stats().await;
712
713        Ok(FederationStats {
714            registry: registry_stats?,
715            monitor: monitor_stats,
716            timestamp: chrono::Utc::now(),
717        })
718    }
719
720    /// Perform health check on all registered services
721    pub async fn health_check(&self) -> Result<HealthStatus> {
722        let registry = self.service_registry.read().await;
723        let health_statuses = registry.health_check().await?;
724
725        // Convert Vec<HealthStatus> to a single HealthStatus
726        let overall_status = if health_statuses
727            .iter()
728            .all(|s| matches!(s.status, crate::service_registry::HealthState::Healthy))
729        {
730            ServiceStatus::Healthy
731        } else if health_statuses
732            .iter()
733            .any(|s| matches!(s.status, crate::service_registry::HealthState::Healthy))
734        {
735            ServiceStatus::Degraded
736        } else {
737            ServiceStatus::Unavailable
738        };
739
740        let total_services = health_statuses.len();
741        let healthy_services = health_statuses
742            .iter()
743            .filter(|h| matches!(h.status, crate::service_registry::HealthState::Healthy))
744            .count();
745
746        let service_statuses = health_statuses
747            .iter()
748            .map(|h| {
749                let status = match h.status {
750                    crate::service_registry::HealthState::Healthy => ServiceStatus::Healthy,
751                    crate::service_registry::HealthState::Degraded => ServiceStatus::Degraded,
752                    crate::service_registry::HealthState::Unhealthy => ServiceStatus::Unavailable,
753                    crate::service_registry::HealthState::Unknown => ServiceStatus::Unknown,
754                };
755                (h.service_id.clone(), status)
756            })
757            .collect();
758
759        Ok(HealthStatus {
760            overall_status,
761            service_statuses,
762            total_services,
763            healthy_services,
764            timestamp: chrono::Utc::now(),
765        })
766    }
767
768    /// Update service capabilities through discovery
769    pub async fn discover_services(&self) -> Result<()> {
770        // Fixed: lib.rs now uses FederatedServiceRegistry from service.rs
771        // discovery.rs uses ServiceRegistry from service_registry.rs - different purposes
772        warn!("Service discovery temporarily disabled due to type conflicts");
773        Ok(())
774    }
775
776    /// Get cache statistics
777    pub async fn get_cache_stats(&self) -> CacheStats {
778        self.cache.get_stats().await
779    }
780
781    /// Invalidate cache for a specific service
782    pub async fn invalidate_service_cache(&self, service_id: &str) {
783        self.cache.invalidate_service(service_id).await;
784    }
785
786    /// Invalidate all query caches
787    pub async fn invalidate_query_cache(&self) {
788        self.cache.invalidate_queries().await;
789    }
790
791    /// Warm up the cache with commonly used data
792    pub async fn warmup_cache(&self) -> Result<()> {
793        self.cache.warmup().await
794    }
795
796    /// Clean up expired cache entries
797    pub async fn cleanup_cache(&self) {
798        self.cache.cleanup_expired().await;
799    }
800
801    /// Start automatic service discovery
802    pub async fn start_auto_discovery(&self, config: AutoDiscoveryConfig) -> Result<()> {
803        let mut auto_discovery_guard = self.auto_discovery.write().await;
804
805        if auto_discovery_guard.is_some() {
806            return Err(anyhow!("Auto-discovery is already running"));
807        }
808
809        let mut discovery = AutoDiscovery::new(config);
810        let mut receiver = discovery.start().await?;
811
812        let registry = self.service_registry.clone();
813        let service_discovery = ServiceDiscovery::new();
814
815        // Spawn task to handle discovered services
816        tokio::spawn(async move {
817            while let Some(discovered) = receiver.recv().await {
818                info!(
819                    "Auto-discovered service: {} via {:?}",
820                    discovered.url, discovered.discovery_method
821                );
822
823                // Register the discovered service
824                if let Ok(Some(service)) = service_discovery
825                    .discover_service_at_endpoint(&discovered.url)
826                    .await
827                {
828                    let registry_guard = registry.write().await;
829                    if let Err(e) = registry_guard.register(service).await {
830                        warn!("Failed to register auto-discovered service: {}", e);
831                    }
832                }
833            }
834        });
835
836        *auto_discovery_guard = Some(discovery);
837        info!("Auto-discovery started");
838        Ok(())
839    }
840
841    /// Stop automatic service discovery
842    pub async fn stop_auto_discovery(&self) -> Result<()> {
843        let mut auto_discovery_guard = self.auto_discovery.write().await;
844
845        match auto_discovery_guard.take() {
846            Some(mut discovery) => {
847                discovery.stop().await;
848                info!("Auto-discovery stopped");
849                Ok(())
850            }
851            _ => Err(anyhow!("Auto-discovery is not running")),
852        }
853    }
854
855    /// Get auto-discovered services
856    pub async fn get_auto_discovered_services(&self) -> Result<Vec<DiscoveredEndpoint>> {
857        let auto_discovery_guard = self.auto_discovery.read().await;
858
859        if let Some(ref discovery) = *auto_discovery_guard {
860            Ok(discovery.get_discovered_services().await)
861        } else {
862            Err(anyhow!("Auto-discovery is not running"))
863        }
864    }
865
866    /// Assess capabilities of a registered service
867    pub async fn assess_service_capabilities(&self, service_id: &str) -> Result<AssessmentResult> {
868        let registry = self.service_registry.read().await;
869        let service = registry
870            .get_service(service_id)
871            .ok_or_else(|| anyhow!("Service {} not found", service_id))?
872            .clone();
873        drop(registry);
874
875        let assessor = CapabilityAssessor::new();
876        let assessment = assessor.assess_service(&service).await?;
877
878        // Update service with detected capabilities
879        let registry = self.service_registry.read().await;
880        registry.update_service_capabilities(service_id, &assessment.detected_capabilities)?;
881        drop(registry);
882
883        info!(
884            "Capability assessment completed for service: {} - Updated with {} detected capabilities",
885            service_id,
886            assessment.detected_capabilities.len()
887        );
888        Ok(assessment)
889    }
890
891    /// Assess all registered services
892    pub async fn assess_all_services(&self) -> Result<Vec<AssessmentResult>> {
893        let service_ids: Vec<String> = {
894            let registry = self.service_registry.read().await;
895            registry
896                .get_all_services()
897                .into_iter()
898                .map(|s| s.id.clone())
899                .collect()
900        };
901
902        let mut results = Vec::new();
903        for service_id in service_ids {
904            match self.assess_service_capabilities(&service_id).await {
905                Ok(assessment) => results.push(assessment),
906                Err(e) => warn!("Failed to assess service {}: {}", service_id, e),
907            }
908        }
909
910        Ok(results)
911    }
912
913    /// Enable vector similarity federation
914    pub async fn enable_vector_federation(&self, config: VectorFederationConfig) -> Result<()> {
915        let vector_federation =
916            VectorSimilarityFederation::new(config, self.service_registry.clone()).await?;
917
918        let mut vec_fed = self.vector_federation.write().await;
919        *vec_fed = Some(vector_federation);
920
921        info!("Vector similarity federation enabled");
922        Ok(())
923    }
924
925    /// Register a vector-enabled service
926    pub async fn register_vector_service(&self, metadata: VectorServiceMetadata) -> Result<()> {
927        let vec_fed = self.vector_federation.read().await;
928        if let Some(ref federation) = *vec_fed {
929            federation.register_vector_service(metadata).await
930        } else {
931            Err(anyhow!("Vector federation is not enabled"))
932        }
933    }
934
935    /// Execute semantic query routing
936    pub async fn semantic_query_routing(&self, query: &str) -> Result<Vec<String>> {
937        let vec_fed = self.vector_federation.read().await;
938        if let Some(ref federation) = *vec_fed {
939            let query_embedding = federation.generate_query_embedding(query).await?;
940            federation
941                .semantic_query_routing(&query_embedding, query)
942                .await
943        } else {
944            Ok(Vec::new()) // Return empty if vector federation is disabled
945        }
946    }
947
948    /// Get vector federation statistics
949    pub async fn get_vector_statistics(&self) -> Result<Option<VectorFederationStats>> {
950        let vec_fed = self.vector_federation.read().await;
951        if let Some(ref federation) = *vec_fed {
952            Ok(Some(federation.get_statistics().await?))
953        } else {
954            Ok(None)
955        }
956    }
957}
958
959impl Default for FederationEngine {
960    fn default() -> Self {
961        Self::new()
962    }
963}
964
965/// Configuration for the federation engine
966#[derive(Debug, Clone, Serialize, Deserialize, Default)]
967pub struct FederationConfig {
968    pub registry_config: RegistryConfig,
969    pub planner_config: PlannerConfig,
970    pub executor_config: FederatedExecutorConfig,
971    pub integrator_config: ResultIntegratorConfig,
972    pub graphql_config: GraphQLFederationConfig,
973    pub monitor_config: FederationMonitorConfig,
974    pub cache_config: CacheConfig,
975}
976
977/// Result of a federated query execution
978#[derive(Debug, Clone)]
979pub struct FederatedResult {
980    /// The integrated query results
981    pub data: QueryResult,
982    /// Metadata about the execution
983    pub metadata: ExecutionMetadata,
984    /// Any errors or warnings from the execution
985    pub errors: Vec<FederationError>,
986}
987
988impl FederatedResult {
989    /// Check if the execution was successful (no critical errors)
990    pub fn is_success(&self) -> bool {
991        !self.errors.iter().any(|e| e.is_critical())
992    }
993
994    /// Get the number of results
995    pub fn result_count(&self) -> usize {
996        match &self.data {
997            QueryResult::Sparql(results) => results.len(),
998            QueryResult::GraphQL(result) => {
999                if result.is_object() {
1000                    1
1001                } else if result.is_array() {
1002                    result.as_array().map(|a| a.len()).unwrap_or(0)
1003                } else {
1004                    0
1005                }
1006            }
1007        }
1008    }
1009}
1010
1011/// Enumeration of different query result types
1012#[derive(Debug, Clone, Serialize, Deserialize)]
1013pub enum QueryResult {
1014    Sparql(Vec<HashMap<String, Term>>),
1015    GraphQL(serde_json::Value),
1016}
1017
1018/// Metadata about query execution
1019#[derive(Debug, Clone)]
1020pub struct ExecutionMetadata {
1021    /// Total execution time
1022    pub execution_time: Duration,
1023    /// Number of services involved
1024    pub services_used: usize,
1025    /// Number of subqueries executed
1026    pub subqueries_executed: usize,
1027    /// Whether results were cached
1028    pub cache_hit: bool,
1029    /// Execution plan used
1030    pub plan_summary: String,
1031}
1032
1033/// Federation-specific error types
1034#[derive(Debug, Clone, thiserror::Error)]
1035pub enum FederationError {
1036    #[error("Service unavailable: {service_id}")]
1037    ServiceUnavailable { service_id: String },
1038
1039    #[error("Query planning failed: {reason}")]
1040    PlanningFailed { reason: String },
1041
1042    #[error("Execution timeout after {timeout:?}")]
1043    ExecutionTimeout { timeout: Duration },
1044
1045    #[error("Result integration failed: {reason}")]
1046    IntegrationFailed { reason: String },
1047
1048    #[error("Partial results: {successful_services}/{total_services} services responded")]
1049    PartialResults {
1050        successful_services: usize,
1051        total_services: usize,
1052    },
1053
1054    #[error("Schema conflict: {conflict}")]
1055    SchemaConflict { conflict: String },
1056
1057    #[error("Authentication failed for service: {service_id}")]
1058    AuthenticationFailed { service_id: String },
1059
1060    #[error("Rate limit exceeded for service: {service_id}")]
1061    RateLimitExceeded { service_id: String },
1062}
1063
1064impl FederationError {
1065    /// Check if this error should cause the entire query to fail
1066    pub fn is_critical(&self) -> bool {
1067        match self {
1068            FederationError::ServiceUnavailable { .. } => false, // Can continue with other services
1069            FederationError::PlanningFailed { .. } => true,
1070            FederationError::ExecutionTimeout { .. } => true,
1071            FederationError::IntegrationFailed { .. } => true,
1072            FederationError::PartialResults { .. } => false, // Warning, not critical
1073            FederationError::SchemaConflict { .. } => true,
1074            FederationError::AuthenticationFailed { .. } => false,
1075            FederationError::RateLimitExceeded { .. } => false,
1076        }
1077    }
1078}
1079
1080/// Statistics about federation performance
1081#[derive(Debug, Clone, Serialize)]
1082pub struct FederationStats {
1083    pub registry: RegistryStats,
1084    pub monitor: MonitorStats,
1085    pub timestamp: chrono::DateTime<chrono::Utc>,
1086}
1087
1088/// Health status of the federation system
1089#[derive(Debug, Clone, Serialize)]
1090pub struct HealthStatus {
1091    pub overall_status: ServiceStatus,
1092    pub service_statuses: HashMap<String, ServiceStatus>,
1093    pub total_services: usize,
1094    pub healthy_services: usize,
1095    pub timestamp: chrono::DateTime<chrono::Utc>,
1096}
1097
1098/// Service status enumeration
1099#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1100pub enum ServiceStatus {
1101    Healthy,
1102    Degraded,
1103    Unavailable,
1104    Unknown,
1105}
1106
1107/// Spatial coverage for geospatial services
1108#[derive(Debug, Clone)]
1109pub struct SpatialCoverage {
1110    pub coverage_type: SpatialCoverageType,
1111    pub min_lat: f64,
1112    pub max_lat: f64,
1113    pub min_lon: f64,
1114    pub max_lon: f64,
1115}
1116
1117/// Types of spatial coverage
1118#[derive(Debug, Clone)]
1119pub enum SpatialCoverageType {
1120    BoundingBox,
1121    Circle,
1122    Polygon,
1123}
1124
1125/// Numeric range for numeric services
1126#[derive(Debug, Clone)]
1127pub struct NumericRange {
1128    pub min: f64,
1129    pub max: f64,
1130    pub data_type: String,
1131}
1132
1133/// Extended service metadata for enhanced optimization
1134#[derive(Debug, Clone, Default)]
1135pub struct ExtendedServiceMetadata {
1136    pub estimated_triple_count: Option<u64>,
1137    pub domain_specializations: Option<Vec<String>>,
1138    pub known_vocabularies: Option<Vec<String>>,
1139    pub schema_mappings: Option<HashMap<String, String>>,
1140    pub performance_history: Option<HashMap<String, PerformanceRecord>>,
1141    pub successful_query_patterns: Option<Vec<PatternFeatures>>,
1142    pub temporal_coverage: Option<TemporalRange>,
1143    pub spatial_coverage: Option<SpatialCoverage>,
1144    pub numeric_ranges: Option<Vec<NumericRange>>,
1145}
1146
1147/// Performance record for service history
1148#[derive(Debug, Clone)]
1149pub struct PerformanceRecord {
1150    pub avg_response_time_score: f64,
1151    pub success_rate: f64,
1152    pub last_updated: chrono::DateTime<chrono::Utc>,
1153}
1154
1155/// Pattern features for ML analysis (reexport from materialized_views)
1156pub use materialized_views::PatternFeatures;
1157pub use materialized_views::TemporalRange;
1158
1159/// Helper function to convert oxirs_core::Term to our RdfTerm representation for RDF-star support
1160#[allow(dead_code)]
1161fn convert_core_term_to_rdf_term(term: &oxirs_core::Term) -> RdfTerm {
1162    match term {
1163        oxirs_core::Term::NamedNode(node) => RdfTerm::IRI(node.as_str().to_string()),
1164        oxirs_core::Term::BlankNode(node) => RdfTerm::BlankNode(node.as_str().to_string()),
1165        oxirs_core::Term::Literal(literal) => {
1166            let value = literal.value().to_string();
1167            let datatype = if literal.datatype() != *oxirs_core::vocab::xsd::STRING {
1168                Some(literal.datatype().as_str().to_string())
1169            } else {
1170                None
1171            };
1172            let lang = literal.language().map(|l| l.to_string());
1173
1174            RdfTerm::Literal {
1175                value,
1176                datatype,
1177                lang,
1178            }
1179        }
1180        oxirs_core::Term::QuotedTriple(triple) => {
1181            let subject = Box::new(convert_subject_to_rdf_term(triple.subject()));
1182            let predicate = Box::new(convert_predicate_to_rdf_term(triple.predicate()));
1183            let object = Box::new(convert_object_to_rdf_term(triple.object()));
1184
1185            RdfTerm::QuotedTriple(QuotedTripleValue {
1186                subject,
1187                predicate,
1188                object,
1189            })
1190        }
1191        oxirs_core::Term::Variable(var) => {
1192            // Variables are represented as literals in RDF terms
1193            RdfTerm::Literal {
1194                value: format!("?{}", var.as_str()),
1195                datatype: None,
1196                lang: None,
1197            }
1198        }
1199    }
1200}
1201
1202/// Helper function to convert oxirs_core::Subject to our RdfTerm representation
1203fn convert_subject_to_rdf_term(subject: &oxirs_core::Subject) -> RdfTerm {
1204    match subject {
1205        oxirs_core::Subject::NamedNode(node) => RdfTerm::IRI(node.as_str().to_string()),
1206        oxirs_core::Subject::BlankNode(node) => RdfTerm::BlankNode(node.as_str().to_string()),
1207        oxirs_core::Subject::QuotedTriple(triple) => {
1208            let subject = Box::new(convert_subject_to_rdf_term(triple.subject()));
1209            let predicate = Box::new(convert_predicate_to_rdf_term(triple.predicate()));
1210            let object = Box::new(convert_object_to_rdf_term(triple.object()));
1211            RdfTerm::QuotedTriple(QuotedTripleValue {
1212                subject,
1213                predicate,
1214                object,
1215            })
1216        }
1217        oxirs_core::Subject::Variable(var) => RdfTerm::Literal {
1218            value: format!("?{}", var.as_str()),
1219            datatype: None,
1220            lang: None,
1221        },
1222    }
1223}
1224
1225/// Helper function to convert oxirs_core::Predicate to our RdfTerm representation
1226fn convert_predicate_to_rdf_term(predicate: &oxirs_core::Predicate) -> RdfTerm {
1227    match predicate {
1228        oxirs_core::Predicate::NamedNode(node) => RdfTerm::IRI(node.as_str().to_string()),
1229        oxirs_core::Predicate::Variable(var) => RdfTerm::Literal {
1230            value: format!("?{}", var.as_str()),
1231            datatype: None,
1232            lang: None,
1233        },
1234    }
1235}
1236
1237/// Helper function to convert oxirs_core::Object to our RdfTerm representation
1238fn convert_object_to_rdf_term(object: &oxirs_core::Object) -> RdfTerm {
1239    match object {
1240        oxirs_core::Object::NamedNode(node) => RdfTerm::IRI(node.as_str().to_string()),
1241        oxirs_core::Object::BlankNode(node) => RdfTerm::BlankNode(node.as_str().to_string()),
1242        oxirs_core::Object::Literal(literal) => {
1243            let value = literal.value().to_string();
1244            let datatype = if literal.datatype() != *oxirs_core::vocab::xsd::STRING {
1245                Some(literal.datatype().as_str().to_string())
1246            } else {
1247                None
1248            };
1249            let lang = literal.language().map(|l| l.to_string());
1250            RdfTerm::Literal {
1251                value,
1252                datatype,
1253                lang,
1254            }
1255        }
1256        oxirs_core::Object::QuotedTriple(triple) => {
1257            let subject = Box::new(convert_subject_to_rdf_term(triple.subject()));
1258            let predicate = Box::new(convert_predicate_to_rdf_term(triple.predicate()));
1259            let object = Box::new(convert_object_to_rdf_term(triple.object()));
1260            RdfTerm::QuotedTriple(QuotedTripleValue {
1261                subject,
1262                predicate,
1263                object,
1264            })
1265        }
1266        oxirs_core::Object::Variable(var) => RdfTerm::Literal {
1267            value: format!("?{}", var.as_str()),
1268            datatype: None,
1269            lang: None,
1270        },
1271    }
1272}
1273
1274#[cfg(test)]
1275mod tests {
1276    use super::*;
1277
1278    #[tokio::test]
1279    async fn test_federation_engine_creation() {
1280        let engine = FederationEngine::new();
1281        let stats = engine.get_stats().await.unwrap();
1282
1283        assert_eq!(stats.registry.total_sparql_endpoints, 0);
1284    }
1285
1286    #[tokio::test]
1287    async fn test_federation_engine_with_config() {
1288        let config = FederationConfig::default();
1289        let engine = FederationEngine::with_config(config);
1290        let health = engine.health_check().await.unwrap();
1291
1292        assert_eq!(health.overall_status, ServiceStatus::Healthy);
1293        assert_eq!(health.total_services, 0);
1294    }
1295
1296    #[tokio::test]
1297    async fn test_federation_error_criticality() {
1298        let critical_error = FederationError::PlanningFailed {
1299            reason: "Test error".to_string(),
1300        };
1301        assert!(critical_error.is_critical());
1302
1303        let non_critical_error = FederationError::ServiceUnavailable {
1304            service_id: "test-service".to_string(),
1305        };
1306        assert!(!non_critical_error.is_critical());
1307    }
1308
1309    #[tokio::test]
1310    async fn test_federated_result_success() {
1311        let result = FederatedResult {
1312            data: QueryResult::Sparql(vec![]),
1313            metadata: ExecutionMetadata {
1314                execution_time: Duration::from_millis(100),
1315                services_used: 1,
1316                subqueries_executed: 1,
1317                cache_hit: false,
1318                plan_summary: "Test plan".to_string(),
1319            },
1320            errors: vec![],
1321        };
1322
1323        assert!(result.is_success());
1324        assert_eq!(result.result_count(), 0);
1325    }
1326}