1#![allow(ambiguous_glob_reexports)]
13use anyhow::{anyhow, Result};
34use oxirs_core::Term;
35use serde::{Deserialize, Serialize};
36use std::collections::HashMap;
37
38use crate::executor::types::{QuotedTripleValue, RdfTerm};
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::RwLock;
42use tracing::{info, warn};
43use crate::{
47 auto_discovery::{AutoDiscovery, AutoDiscoveryConfig, DiscoveredEndpoint},
48 cache::{CacheConfig, CacheStats, FederationCache, QueryResultCache},
49 capability_assessment::{AssessmentResult, CapabilityAssessor},
50 discovery::ServiceDiscovery,
51 executor::{FederatedExecutor, FederatedExecutorConfig},
52 graphql::{GraphQLFederation, GraphQLFederationConfig},
53 integration::{ResultIntegrator, ResultIntegratorConfig},
54 monitoring::{FederationMonitor, FederationMonitorConfig, MonitorStats},
55 planner::{PlannerConfig, QueryPlanner},
56 service_registry::{RegistryConfig, RegistryStats, ServiceRegistry},
57 vector_similarity_federation::{
58 VectorFederationConfig, VectorFederationStats, VectorServiceMetadata,
59 VectorSimilarityFederation,
60 },
61};
62
63pub mod adaptive_load_balancer;
64pub mod advanced_anomaly_detection;
65pub mod advanced_benchmarking;
66pub mod advanced_consensus;
67pub mod advanced_enterprise_features;
68pub mod advanced_ml_optimizer;
69pub mod advanced_query_optimizer;
70pub mod advanced_security_hardening;
71pub mod advanced_semantic_features;
72pub mod advanced_visualization;
73pub mod anomaly_detection;
74pub mod auth;
75pub mod auto_discovery;
76pub mod automl_pipeline;
77pub mod cache;
78pub mod capability_assessment;
79pub mod cdc;
80pub mod cloud_cost_optimizer;
81pub mod connection_pool_manager;
82pub mod discovery;
83pub mod distributed_consensus;
84pub mod distributed_ml_trainer;
85pub mod distributed_tracing;
86pub mod distributed_transactions;
87pub mod executor;
88pub mod external_ml_integration;
89pub mod gpu_accelerated_query;
90pub mod graph_algorithms;
91pub mod graphql;
92pub mod integration;
93pub mod jit_query_compiler;
94pub mod join_optimizer;
95pub mod k8s_discovery;
96pub mod materialized_views;
97pub mod memory_efficient_datasets;
98pub mod metadata;
99pub mod ml_model_serving;
100pub mod ml_optimizer;
101pub mod monitoring;
102pub mod multi_level_federation;
103pub mod nats_federation;
104pub mod network_optimizer;
105pub mod optimization_cache;
106pub mod performance_analyzer;
107pub mod performance_benchmarks;
108pub mod planner;
109pub mod privacy;
110pub mod production_hardening;
111pub mod profiling_metrics;
112pub mod query_decomposition;
113pub mod query_plan_explainer;
114pub mod request_batcher;
115pub mod result_streaming;
116pub mod schema_alignment;
117pub mod semantic_enhancer;
118pub mod semantic_reasoner;
119pub mod service;
120pub mod service_client;
121pub mod service_executor;
122pub mod service_optimizer;
123pub mod service_registry;
124pub mod simd_optimized_joins;
125pub mod source_selection;
126pub mod streaming;
127pub mod streaming_optimizer;
128pub mod test_infrastructure;
129pub mod vector_similarity_federation;
130
131pub use adaptive_load_balancer::AdaptiveLoadBalancer;
135pub use advanced_query_optimizer::{
136 AdvancedOptimizerConfig, AdvancedQueryOptimizer, HardwareProfile, OptimizedPlan, QueryPlan,
137 TrainingExample,
138};
139pub use advanced_ml_optimizer::{
141 ActivationType, AdvancedMLConfig, AdvancedMLOptimizer, AutoML, DeepCardinalityEstimator,
142 ExplainableAI, JoinType, NeuralArchitectureSearch, OnlineLearningManager, RLJoinOptimizer,
143 TrainingEpoch, TransferLearningManager,
144};
145pub use advanced_benchmarking::{
147 AdvancedBenchmarkConfig, AdvancedBenchmarkSuite, BenchmarkResult, CustomBenchmarkConfig,
148 CustomBenchmarkGenerator, LUBMSuite, RegressionDetectionResult, SP2BenchSuite,
149 ScalabilityTestResult, StressTestResult, WatDivSuite, WorkloadCharacterization,
150};
151pub use advanced_semantic_features::{
153 AdvancedSemanticConfig, AdvancedSemanticFeatures, AutoMapping, AutoMappingGenerator,
154 ChangeType, ConceptRelationship, DeepOntologyMatcher, Entity, EntityMatch, EntityResolver,
155 MappingType, MatchType, MultiLingualConcept, MultiLingualSchemaManager, OntologyConcept,
156 OntologyMatch, RelationType, SchemaChange, SchemaEvolutionTracker, SchemaVersion,
157};
158pub use advanced_anomaly_detection::{
160 AdvancedAnomalyConfig, AdvancedAnomalyDetection, HealingAction, HealingActionType,
161 IsolationForest, IssueType, LSTMPredictor, MaintenancePriority, MaintenanceTask,
162 PredictiveMaintenanceScheduler, RootCause, RootCauseAnalyzer, SelfHealingEngine,
163 Severity as AnomalySeverity,
164};
165pub use advanced_consensus::{
167 AdvancedConsensusSystem, ByzantineFaultTolerance, DistributedLock, GCounter,
168 NetworkPartitionDetector, PNCounter, VectorClock,
169};
170pub use advanced_enterprise_features::{
172 AdvancedEnterpriseFeatures, AuditLogEntry, AuditLogger, AuditResult, DataLineageTracker,
173 DataSubject, DeletionRequest, EdgeComputingManager, EdgeNode, EdgeNodeStatus,
174 GDPRComplianceManager, GeoLocation, GeographicQueryRouter, LineageNode, MultiTenancyConfig,
175 MultiTenancyManager, PrivacyPreservingFederation, QuantumResistantSecurity, ResourceQuota,
176 ResourceUsage, Tenant,
177};
178pub use advanced_security_hardening::{
180 AdvancedRateLimiter, AdvancedSecurityHardening, AuditConfig, AuditEvent, AuditEventType,
181 AuthMethod, AuthSession, AuthenticationManager, ComplianceChecker, ComplianceFramework,
182 ComplianceStatus, EncryptionConfig, EncryptionKey, EncryptionManager, IdsConfig, IdsResult,
183 IntrusionDetectionSystem, MtlsCertificate, OAuth2Provider, OidcProvider, RateLimitConfig,
184 SecurityAlert, SecurityCheckResult, SecurityConfig, SecurityContext, ThreatCategory,
185 ThreatSeverity, ThreatSignature, TrustScore, UserInfo, Vulnerability, VulnerabilityScanResult,
186 VulnerabilityScanner, VulnerabilitySeverity, ZeroTrustConfig, ZeroTrustController,
187};
188pub use advanced_visualization::{
190 AdvancedVisualization, AggregationType, Alert, AlertGrouping, AlertSeverity, AlertTimeline,
191 AlertVisualizer, ChartData, ChartGenerator, ChartSeries, ChartTheme, ColorScale, CustomTheme,
192 Dashboard, DashboardLayout, DataSource, EdgeType, ExportFormat, HeatmapCell, HeatmapData,
193 LayoutAlgorithm, MetricAggregation, MetricsCollector, NodeStatus, NodeType, PieChartData,
194 PieSlice, PositionedNode, TimeSeries, TopologyEdge, TopologyNode, TopologyVisualization,
195 TopologyVisualizer, VisualizationConfig, VisualizationType, Widget, WidgetConfig,
196 WidgetPosition, WidgetSize, WidgetType,
197};
198pub use advanced_visualization::DataPoint as VizDataPoint;
200pub use anomaly_detection::{
202 AnomalyAlert, AnomalyDetector, AnomalyDetectorConfig, DataPoint, Severity, Trend, TrendAnalysis,
203};
204pub use auth::AuthManager;
205pub use automl_pipeline::{
206 ArchitectureCandidate, AutoMLConfig, AutoMLPipeline, HyperparameterConfig, LayerType,
207 MetaLearningTask, OptimizationStatistics, SearchSpace, TrialMetrics, TrialResult,
208};
209pub use cloud_cost_optimizer::{
210 CloudCostOptimizer, CloudInstance, CloudProvider, CostOptimizerConfig, CostRecommendation,
211 CostTracking, DeploymentConfig, InstanceStatus, OptimizationStrategy, RecommendationPriority,
212 RecommendationType, RoutingDecision, WorkloadMetrics,
213};
214pub use connection_pool_manager::ConnectionPoolManager;
215pub use distributed_ml_trainer::{
216 AggregationStrategy, DistributedMLConfig, DistributedMLTrainer, TrainingMetrics, TrainingMode,
217 WorkerInfo, WorkerMetrics, WorkerStatus,
218};
219pub use distributed_tracing::TracingConfig;
220pub use distributed_transactions::{
221 DistributedTransactionCoordinator, IsolationLevel, Operation, OperationState, OperationType,
222 Participant, ParticipantState, SagaLog, SagaStep, SagaStepState, Transaction,
223 TransactionConfig, TransactionProtocol, TransactionResult, TransactionState,
224};
225pub use executor::ExecutionStatus; pub use external_ml_integration::{
228 ConversionResult, DataType, ExternalMLIntegration, FrameworkAdapter, FrameworkCapabilities,
229 HuggingFaceAdapter, InferenceBackend, InferenceEngine, MLFramework, MLIntegrationConfig,
230 MLModel, MLTaskType, MockModel, ModelConverter, ModelFormat, ModelMetadata, ModelRegistry,
231 ModelVersion, ONNXAdapter, OptimizationConfig, OptimizationResult, PerformanceMetrics,
232 PyTorchAdapter, RegisteredModel, TensorFlowAdapter, TensorSpec,
233};
234pub use gpu_accelerated_query::{
235 FilterCondition, FilterOperator, GpuAccelerationConfig, GpuBackendType, GpuJoinProcessor,
236 GpuProcessingResult, GpuProcessingStats, GpuQueryProcessor, QueryBatch,
237};
238pub use graph_algorithms::{
239 AStar, BellmanFord, CentralityAnalyzer, ConnectivityAnalyzer, Dijkstra, Edge, FederationGraph,
240 FloydWarshall, PrimMST, ShortestPathResult,
241};
242pub use jit_query_compiler::{
243 CompiledQuery, ExecutionMode, JitCompilationConfig, JitCompilationStats, JitQueryCompiler,
244 JitQueryOptimizer, OptimizationRule,
245};
246pub use ml_model_serving::{
247 ABTestConfig, ABTestResults, MLModelServing, ModelMetrics, ModelServingConfig, ModelStatus,
248 ModelType, QueryTransformerModel, TransformerConfig,
249};
250pub use ml_model_serving::ModelVersion as MLModelVersion;
252pub use simd_optimized_joins::{JoinAlgorithm, JoinStatistics, SimdJoinConfig, SimdJoinProcessor};
253pub use ml_optimizer::MLOptimizer;
260pub use multi_level_federation::{
261 FederationCapability, FederationMetrics, FederationNode, MultiLevelConfig,
262 MultiLevelFederation, TopologyOptimizationResult, TopologyStats,
263};
264pub use network_optimizer::NetworkOptimizer;
265pub use optimization_cache::OptimizationCache;
266pub use production_hardening::{
267 CircuitBreakerState, ComplexityResult, HardeningConfig, HardeningStatistics,
268 ProductionHardening, QueryRequest as HardeningQueryRequest, ValidationResult,
269};
270pub use schema_alignment::{
271 Alignment, AlignmentConfig, AlignmentResult, AlignmentType, ClassMetadata, MappingExample,
272 PropertyMetadata, SchemaAligner, VocabularyMetadata,
273};
274pub use semantic_reasoner::{
275 InconsistencyReport, InconsistencyType, ReasonerConfig, SemanticReasoner, Triple,
276};
277pub use join_optimizer::JoinOptimizer;
279pub use planner::{
287 EntityResolutionPlan, EntityResolutionStep, ExecutionContext, ExecutionPlan, ExecutionStep,
288 FederatedQueryPlanner, FederatedSchema, FilterExpression, GraphQLOperationType,
289 HistoricalPerformance, ParsedQuery, QueryInfo, QueryType, ReoptimizationAnalysis, RetryConfig,
290 ServiceQuery, StepType, TriplePattern, UnifiedSchema,
291};
292pub use privacy::*;
293pub use query_decomposition::advanced_pattern_analysis::{
295 AdvancedPatternAnalyzer, ComplexityAssessment, OptimizationOpportunity, PatternAnalysisResult,
296 ServiceRecommendation,
297};
298pub use query_decomposition::types::QueryDecomposer;
299pub use query_plan_explainer::{
300 ExplainFormat, OptimizationSuggestion, PlanExplanation, QueryPlanExplainer, StepExplanation,
301 SuggestionCategory, SuggestionSeverity,
302};
303pub use request_batcher::RequestBatcher;
304pub use result_streaming::ResultStreamingManager;
305pub use semantic_enhancer::SemanticEnhancer;
306pub use service::{
308 AuthCredentials, FederatedService, ServiceAuthConfig, ServiceCapability, ServicePerformance,
309 ServiceType,
310};
311pub use service_client::ServiceClient;
313pub use service_executor::ServiceExecutor;
314pub use service_optimizer::ServiceOptimizer;
315pub use service_registry::{
317 GraphQLService, HealthStatus as ServiceHealthStatus, ServiceCapabilities, SparqlEndpoint,
318};
319#[derive(Debug, Clone)]
327pub struct FederationEngine {
328 service_registry: Arc<RwLock<ServiceRegistry>>,
330 query_planner: Arc<QueryPlanner>,
332 executor: Arc<FederatedExecutor>,
334 integrator: Arc<ResultIntegrator>,
336 graphql_federation: Arc<GraphQLFederation>,
338 monitor: Arc<FederationMonitor>,
340 cache: Arc<FederationCache>,
342 auto_discovery: Arc<RwLock<Option<AutoDiscovery>>>,
344 vector_federation: Arc<RwLock<Option<VectorSimilarityFederation>>>,
346}
347
348impl FederationEngine {
349 pub fn new() -> Self {
351 let service_registry = Arc::new(RwLock::new(ServiceRegistry::new()));
352 let query_planner = Arc::new(QueryPlanner::new());
353 let executor = Arc::new(FederatedExecutor::new());
354 let integrator = Arc::new(ResultIntegrator::new());
355 let graphql_federation = Arc::new(GraphQLFederation::new());
356 let monitor = Arc::new(FederationMonitor::new());
357 let cache = Arc::new(FederationCache::new());
358
359 Self {
360 service_registry,
361 query_planner,
362 executor,
363 integrator,
364 graphql_federation,
365 monitor,
366 cache,
367 auto_discovery: Arc::new(RwLock::new(None)),
368 vector_federation: Arc::new(RwLock::new(None)),
369 }
370 }
371
372 pub fn with_config(config: FederationConfig) -> Self {
374 let service_registry = Arc::new(RwLock::new(ServiceRegistry::with_config(
375 config.registry_config,
376 )));
377 let query_planner = Arc::new(QueryPlanner::with_config(config.planner_config));
378 let executor = Arc::new(FederatedExecutor::with_config(config.executor_config));
379 let integrator = Arc::new(ResultIntegrator::with_config(config.integrator_config));
380 let graphql_federation = Arc::new(GraphQLFederation::with_config(config.graphql_config));
381 let monitor = Arc::new(FederationMonitor::with_config(config.monitor_config));
382 let cache = Arc::new(FederationCache::with_config(config.cache_config));
383
384 Self {
385 service_registry,
386 query_planner,
387 executor,
388 integrator,
389 graphql_federation,
390 monitor,
391 cache,
392 auto_discovery: Arc::new(RwLock::new(None)),
393 vector_federation: Arc::new(RwLock::new(None)),
394 }
395 }
396
397 pub async fn register_service(&self, service: FederatedService) -> Result<()> {
399 let registry = self.service_registry.write().await;
400 registry.register(service).await
401 }
402
403 pub async fn unregister_service(&self, service_id: &str) -> Result<()> {
405 let registry = self.service_registry.write().await;
406 registry.unregister(service_id).await
407 }
408
409 pub async fn execute_sparql(&self, query: &str) -> Result<FederatedResult> {
411 let start_time = Instant::now();
412
413 let query_info = self.query_planner.analyze_sparql(query).await?;
415
416 let cache_key = self.cache.generate_query_key(&query_info);
418
419 if let Some(cached_result) = self.cache.get_query_result(&cache_key).await {
421 let execution_time = start_time.elapsed();
422
423 self.monitor.record_cache_hit("query_cache", true).await;
425 self.monitor
426 .record_query_execution("sparql", execution_time, true)
427 .await;
428
429 return match cached_result {
430 QueryResultCache::Sparql(sparql_results) => {
431 let result_bindings: Vec<HashMap<String, oxirs_core::Term>> = sparql_results
433 .results
434 .bindings
435 .into_iter()
436 .map(|binding| {
437 binding
439 .into_iter()
440 .filter_map(|(var, sparql_value)| {
441 match sparql_value.value_type.as_str() {
442 "uri" => {
443 if let Ok(iri) =
444 oxirs_core::NamedNode::new(&sparql_value.value)
445 {
446 Some((var, oxirs_core::Term::NamedNode(iri)))
447 } else {
448 None
449 }
450 }
451 "literal" => {
452 if let Some(datatype_str) = sparql_value.datatype {
453 if let Ok(datatype) =
454 oxirs_core::NamedNode::new(&datatype_str)
455 {
456 Some((
457 var,
458 oxirs_core::Term::Literal(
459 oxirs_core::Literal::new_typed(
460 &sparql_value.value,
461 datatype,
462 ),
463 ),
464 ))
465 } else {
466 Some((
467 var,
468 oxirs_core::Term::Literal(
469 oxirs_core::Literal::new(
470 &sparql_value.value,
471 ),
472 ),
473 ))
474 }
475 } else if let Some(lang) = sparql_value.lang {
476 if let Ok(literal) = oxirs_core::Literal::new_lang(
477 &sparql_value.value,
478 &lang,
479 ) {
480 Some((var, oxirs_core::Term::Literal(literal)))
481 } else {
482 Some((
483 var,
484 oxirs_core::Term::Literal(
485 oxirs_core::Literal::new(
486 &sparql_value.value,
487 ),
488 ),
489 ))
490 }
491 } else {
492 Some((
493 var,
494 oxirs_core::Term::Literal(
495 oxirs_core::Literal::new(
496 &sparql_value.value,
497 ),
498 ),
499 ))
500 }
501 }
502 "bnode" => {
503 if let Ok(bnode) =
504 oxirs_core::BlankNode::new(&sparql_value.value)
505 {
506 Some((var, oxirs_core::Term::BlankNode(bnode)))
507 } else {
508 None
509 }
510 }
511 _ => None,
512 }
513 })
514 .collect()
515 })
516 .collect();
517
518 Ok(FederatedResult {
519 data: QueryResult::Sparql(result_bindings),
520 metadata: ExecutionMetadata {
521 execution_time,
522 services_used: 0, subqueries_executed: 0,
524 cache_hit: true,
525 plan_summary: "Cached result".to_string(),
526 },
527 errors: vec![],
528 })
529 }
530 _ => {
531 self.cache.remove(&cache_key).await;
533 return Err(anyhow!("Invalid cached result type for SPARQL query"));
534 }
535 };
536 }
537
538 self.monitor.record_cache_hit("query_cache", false).await;
540
541 let registry = self.service_registry.read().await;
543 let execution_plan = self
544 .query_planner
545 .plan_sparql(&query_info, ®istry)
546 .await?;
547 drop(registry);
548
549 let partial_results = self.executor.execute_plan(&execution_plan).await?;
551
552 let final_result = self
554 .integrator
555 .integrate_sparql_results(partial_results)
556 .await?;
557
558 if final_result.is_success() {
560 if let QueryResult::Sparql(ref result_bindings) = final_result.data {
561 let sparql_bindings: Vec<crate::executor::SparqlBinding> = result_bindings
563 .iter()
564 .map(|binding| {
565 binding
566 .iter()
567 .map(|(var, term)| {
568 let sparql_value = match term {
569 oxirs_core::Term::NamedNode(node) => {
570 crate::executor::SparqlValue {
571 value_type: "uri".to_string(),
572 value: node.to_string(),
573 datatype: None,
574 lang: None,
575 quoted_triple: None,
576 }
577 }
578 oxirs_core::Term::Literal(literal) => {
579 if let Some(lang) = literal.language() {
580 crate::executor::SparqlValue {
581 value_type: "literal".to_string(),
582 value: literal.value().to_string(),
583 datatype: None,
584 lang: Some(lang.to_string()),
585 quoted_triple: None,
586 }
587 } else {
588 crate::executor::SparqlValue {
589 value_type: "literal".to_string(),
590 value: literal.value().to_string(),
591 datatype: Some(literal.datatype().to_string()),
592 lang: None,
593 quoted_triple: None,
594 }
595 }
596 }
597 oxirs_core::Term::BlankNode(bnode) => {
598 crate::executor::SparqlValue {
599 value_type: "bnode".to_string(),
600 value: bnode.to_string(),
601 datatype: None,
602 lang: None,
603 quoted_triple: None,
604 }
605 }
606 oxirs_core::Term::Variable(var) => {
607 crate::executor::SparqlValue {
608 value_type: "variable".to_string(),
609 value: var.to_string(),
610 datatype: None,
611 lang: None,
612 quoted_triple: None,
613 }
614 }
615 oxirs_core::Term::QuotedTriple(triple) => {
616 let subject = convert_subject_to_rdf_term(triple.subject());
618 let predicate =
619 convert_predicate_to_rdf_term(triple.predicate());
620 let object = convert_object_to_rdf_term(triple.object());
621
622 crate::executor::SparqlValue::quoted_triple(
623 subject, predicate, object,
624 )
625 }
626 };
627 (var.clone(), sparql_value)
628 })
629 .collect()
630 })
631 .collect();
632
633 let cached_result = crate::executor::SparqlResults {
634 head: crate::executor::SparqlHead {
635 vars: result_bindings
636 .first()
637 .map(|binding| binding.keys().cloned().collect())
638 .unwrap_or_default(),
639 },
640 results: crate::executor::SparqlResultsData {
641 bindings: sparql_bindings,
642 },
643 };
644
645 self.cache
647 .put_query_result(&cache_key, QueryResultCache::Sparql(cached_result), None)
648 .await;
649 }
650 }
651
652 let execution_time = start_time.elapsed();
654 self.monitor
655 .record_query_execution("sparql", execution_time, final_result.is_success())
656 .await;
657
658 Ok(final_result)
659 }
660
661 pub async fn execute_graphql(
663 &self,
664 query: &str,
665 variables: Option<serde_json::Value>,
666 ) -> Result<FederatedResult> {
667 let start_time = Instant::now();
668
669 let query_info = self
671 .query_planner
672 .analyze_graphql(query, variables.as_ref())
673 .await?;
674
675 let registry = self.service_registry.read().await;
677 let execution_plan = self
678 .query_planner
679 .plan_graphql(&query_info, ®istry)
680 .await?;
681 drop(registry);
682
683 let partial_results = self
685 .graphql_federation
686 .execute_federated(&execution_plan)
687 .await?;
688
689 let final_result = self
691 .integrator
692 .integrate_graphql_results(partial_results)
693 .await?;
694
695 let execution_time = start_time.elapsed();
697 self.monitor
698 .record_query_execution("graphql", execution_time, final_result.is_success())
699 .await;
700
701 Ok(final_result)
702 }
703
704 pub async fn get_stats(&self) -> Result<FederationStats> {
706 let registry_stats = {
707 let registry = self.service_registry.read().await;
708 registry.get_stats().await
709 };
710
711 let monitor_stats = self.monitor.get_stats().await;
712
713 Ok(FederationStats {
714 registry: registry_stats?,
715 monitor: monitor_stats,
716 timestamp: chrono::Utc::now(),
717 })
718 }
719
720 pub async fn health_check(&self) -> Result<HealthStatus> {
722 let registry = self.service_registry.read().await;
723 let health_statuses = registry.health_check().await?;
724
725 let overall_status = if health_statuses
727 .iter()
728 .all(|s| matches!(s.status, crate::service_registry::HealthState::Healthy))
729 {
730 ServiceStatus::Healthy
731 } else if health_statuses
732 .iter()
733 .any(|s| matches!(s.status, crate::service_registry::HealthState::Healthy))
734 {
735 ServiceStatus::Degraded
736 } else {
737 ServiceStatus::Unavailable
738 };
739
740 let total_services = health_statuses.len();
741 let healthy_services = health_statuses
742 .iter()
743 .filter(|h| matches!(h.status, crate::service_registry::HealthState::Healthy))
744 .count();
745
746 let service_statuses = health_statuses
747 .iter()
748 .map(|h| {
749 let status = match h.status {
750 crate::service_registry::HealthState::Healthy => ServiceStatus::Healthy,
751 crate::service_registry::HealthState::Degraded => ServiceStatus::Degraded,
752 crate::service_registry::HealthState::Unhealthy => ServiceStatus::Unavailable,
753 crate::service_registry::HealthState::Unknown => ServiceStatus::Unknown,
754 };
755 (h.service_id.clone(), status)
756 })
757 .collect();
758
759 Ok(HealthStatus {
760 overall_status,
761 service_statuses,
762 total_services,
763 healthy_services,
764 timestamp: chrono::Utc::now(),
765 })
766 }
767
768 pub async fn discover_services(&self) -> Result<()> {
770 warn!("Service discovery temporarily disabled due to type conflicts");
773 Ok(())
774 }
775
776 pub async fn get_cache_stats(&self) -> CacheStats {
778 self.cache.get_stats().await
779 }
780
781 pub async fn invalidate_service_cache(&self, service_id: &str) {
783 self.cache.invalidate_service(service_id).await;
784 }
785
786 pub async fn invalidate_query_cache(&self) {
788 self.cache.invalidate_queries().await;
789 }
790
791 pub async fn warmup_cache(&self) -> Result<()> {
793 self.cache.warmup().await
794 }
795
796 pub async fn cleanup_cache(&self) {
798 self.cache.cleanup_expired().await;
799 }
800
801 pub async fn start_auto_discovery(&self, config: AutoDiscoveryConfig) -> Result<()> {
803 let mut auto_discovery_guard = self.auto_discovery.write().await;
804
805 if auto_discovery_guard.is_some() {
806 return Err(anyhow!("Auto-discovery is already running"));
807 }
808
809 let mut discovery = AutoDiscovery::new(config);
810 let mut receiver = discovery.start().await?;
811
812 let registry = self.service_registry.clone();
813 let service_discovery = ServiceDiscovery::new();
814
815 tokio::spawn(async move {
817 while let Some(discovered) = receiver.recv().await {
818 info!(
819 "Auto-discovered service: {} via {:?}",
820 discovered.url, discovered.discovery_method
821 );
822
823 if let Ok(Some(service)) = service_discovery
825 .discover_service_at_endpoint(&discovered.url)
826 .await
827 {
828 let registry_guard = registry.write().await;
829 if let Err(e) = registry_guard.register(service).await {
830 warn!("Failed to register auto-discovered service: {}", e);
831 }
832 }
833 }
834 });
835
836 *auto_discovery_guard = Some(discovery);
837 info!("Auto-discovery started");
838 Ok(())
839 }
840
841 pub async fn stop_auto_discovery(&self) -> Result<()> {
843 let mut auto_discovery_guard = self.auto_discovery.write().await;
844
845 match auto_discovery_guard.take() {
846 Some(mut discovery) => {
847 discovery.stop().await;
848 info!("Auto-discovery stopped");
849 Ok(())
850 }
851 _ => Err(anyhow!("Auto-discovery is not running")),
852 }
853 }
854
855 pub async fn get_auto_discovered_services(&self) -> Result<Vec<DiscoveredEndpoint>> {
857 let auto_discovery_guard = self.auto_discovery.read().await;
858
859 if let Some(ref discovery) = *auto_discovery_guard {
860 Ok(discovery.get_discovered_services().await)
861 } else {
862 Err(anyhow!("Auto-discovery is not running"))
863 }
864 }
865
866 pub async fn assess_service_capabilities(&self, service_id: &str) -> Result<AssessmentResult> {
868 let registry = self.service_registry.read().await;
869 let service = registry
870 .get_service(service_id)
871 .ok_or_else(|| anyhow!("Service {} not found", service_id))?
872 .clone();
873 drop(registry);
874
875 let assessor = CapabilityAssessor::new();
876 let assessment = assessor.assess_service(&service).await?;
877
878 let registry = self.service_registry.read().await;
880 registry.update_service_capabilities(service_id, &assessment.detected_capabilities)?;
881 drop(registry);
882
883 info!(
884 "Capability assessment completed for service: {} - Updated with {} detected capabilities",
885 service_id,
886 assessment.detected_capabilities.len()
887 );
888 Ok(assessment)
889 }
890
891 pub async fn assess_all_services(&self) -> Result<Vec<AssessmentResult>> {
893 let service_ids: Vec<String> = {
894 let registry = self.service_registry.read().await;
895 registry
896 .get_all_services()
897 .into_iter()
898 .map(|s| s.id.clone())
899 .collect()
900 };
901
902 let mut results = Vec::new();
903 for service_id in service_ids {
904 match self.assess_service_capabilities(&service_id).await {
905 Ok(assessment) => results.push(assessment),
906 Err(e) => warn!("Failed to assess service {}: {}", service_id, e),
907 }
908 }
909
910 Ok(results)
911 }
912
913 pub async fn enable_vector_federation(&self, config: VectorFederationConfig) -> Result<()> {
915 let vector_federation =
916 VectorSimilarityFederation::new(config, self.service_registry.clone()).await?;
917
918 let mut vec_fed = self.vector_federation.write().await;
919 *vec_fed = Some(vector_federation);
920
921 info!("Vector similarity federation enabled");
922 Ok(())
923 }
924
925 pub async fn register_vector_service(&self, metadata: VectorServiceMetadata) -> Result<()> {
927 let vec_fed = self.vector_federation.read().await;
928 if let Some(ref federation) = *vec_fed {
929 federation.register_vector_service(metadata).await
930 } else {
931 Err(anyhow!("Vector federation is not enabled"))
932 }
933 }
934
935 pub async fn semantic_query_routing(&self, query: &str) -> Result<Vec<String>> {
937 let vec_fed = self.vector_federation.read().await;
938 if let Some(ref federation) = *vec_fed {
939 let query_embedding = federation.generate_query_embedding(query).await?;
940 federation
941 .semantic_query_routing(&query_embedding, query)
942 .await
943 } else {
944 Ok(Vec::new()) }
946 }
947
948 pub async fn get_vector_statistics(&self) -> Result<Option<VectorFederationStats>> {
950 let vec_fed = self.vector_federation.read().await;
951 if let Some(ref federation) = *vec_fed {
952 Ok(Some(federation.get_statistics().await?))
953 } else {
954 Ok(None)
955 }
956 }
957}
958
959impl Default for FederationEngine {
960 fn default() -> Self {
961 Self::new()
962 }
963}
964
965#[derive(Debug, Clone, Serialize, Deserialize, Default)]
967pub struct FederationConfig {
968 pub registry_config: RegistryConfig,
969 pub planner_config: PlannerConfig,
970 pub executor_config: FederatedExecutorConfig,
971 pub integrator_config: ResultIntegratorConfig,
972 pub graphql_config: GraphQLFederationConfig,
973 pub monitor_config: FederationMonitorConfig,
974 pub cache_config: CacheConfig,
975}
976
977#[derive(Debug, Clone)]
979pub struct FederatedResult {
980 pub data: QueryResult,
982 pub metadata: ExecutionMetadata,
984 pub errors: Vec<FederationError>,
986}
987
988impl FederatedResult {
989 pub fn is_success(&self) -> bool {
991 !self.errors.iter().any(|e| e.is_critical())
992 }
993
994 pub fn result_count(&self) -> usize {
996 match &self.data {
997 QueryResult::Sparql(results) => results.len(),
998 QueryResult::GraphQL(result) => {
999 if result.is_object() {
1000 1
1001 } else if result.is_array() {
1002 result.as_array().map(|a| a.len()).unwrap_or(0)
1003 } else {
1004 0
1005 }
1006 }
1007 }
1008 }
1009}
1010
1011#[derive(Debug, Clone, Serialize, Deserialize)]
1013pub enum QueryResult {
1014 Sparql(Vec<HashMap<String, Term>>),
1015 GraphQL(serde_json::Value),
1016}
1017
1018#[derive(Debug, Clone)]
1020pub struct ExecutionMetadata {
1021 pub execution_time: Duration,
1023 pub services_used: usize,
1025 pub subqueries_executed: usize,
1027 pub cache_hit: bool,
1029 pub plan_summary: String,
1031}
1032
1033#[derive(Debug, Clone, thiserror::Error)]
1035pub enum FederationError {
1036 #[error("Service unavailable: {service_id}")]
1037 ServiceUnavailable { service_id: String },
1038
1039 #[error("Query planning failed: {reason}")]
1040 PlanningFailed { reason: String },
1041
1042 #[error("Execution timeout after {timeout:?}")]
1043 ExecutionTimeout { timeout: Duration },
1044
1045 #[error("Result integration failed: {reason}")]
1046 IntegrationFailed { reason: String },
1047
1048 #[error("Partial results: {successful_services}/{total_services} services responded")]
1049 PartialResults {
1050 successful_services: usize,
1051 total_services: usize,
1052 },
1053
1054 #[error("Schema conflict: {conflict}")]
1055 SchemaConflict { conflict: String },
1056
1057 #[error("Authentication failed for service: {service_id}")]
1058 AuthenticationFailed { service_id: String },
1059
1060 #[error("Rate limit exceeded for service: {service_id}")]
1061 RateLimitExceeded { service_id: String },
1062}
1063
1064impl FederationError {
1065 pub fn is_critical(&self) -> bool {
1067 match self {
1068 FederationError::ServiceUnavailable { .. } => false, FederationError::PlanningFailed { .. } => true,
1070 FederationError::ExecutionTimeout { .. } => true,
1071 FederationError::IntegrationFailed { .. } => true,
1072 FederationError::PartialResults { .. } => false, FederationError::SchemaConflict { .. } => true,
1074 FederationError::AuthenticationFailed { .. } => false,
1075 FederationError::RateLimitExceeded { .. } => false,
1076 }
1077 }
1078}
1079
1080#[derive(Debug, Clone, Serialize)]
1082pub struct FederationStats {
1083 pub registry: RegistryStats,
1084 pub monitor: MonitorStats,
1085 pub timestamp: chrono::DateTime<chrono::Utc>,
1086}
1087
1088#[derive(Debug, Clone, Serialize)]
1090pub struct HealthStatus {
1091 pub overall_status: ServiceStatus,
1092 pub service_statuses: HashMap<String, ServiceStatus>,
1093 pub total_services: usize,
1094 pub healthy_services: usize,
1095 pub timestamp: chrono::DateTime<chrono::Utc>,
1096}
1097
1098#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1100pub enum ServiceStatus {
1101 Healthy,
1102 Degraded,
1103 Unavailable,
1104 Unknown,
1105}
1106
1107#[derive(Debug, Clone)]
1109pub struct SpatialCoverage {
1110 pub coverage_type: SpatialCoverageType,
1111 pub min_lat: f64,
1112 pub max_lat: f64,
1113 pub min_lon: f64,
1114 pub max_lon: f64,
1115}
1116
1117#[derive(Debug, Clone)]
1119pub enum SpatialCoverageType {
1120 BoundingBox,
1121 Circle,
1122 Polygon,
1123}
1124
1125#[derive(Debug, Clone)]
1127pub struct NumericRange {
1128 pub min: f64,
1129 pub max: f64,
1130 pub data_type: String,
1131}
1132
1133#[derive(Debug, Clone, Default)]
1135pub struct ExtendedServiceMetadata {
1136 pub estimated_triple_count: Option<u64>,
1137 pub domain_specializations: Option<Vec<String>>,
1138 pub known_vocabularies: Option<Vec<String>>,
1139 pub schema_mappings: Option<HashMap<String, String>>,
1140 pub performance_history: Option<HashMap<String, PerformanceRecord>>,
1141 pub successful_query_patterns: Option<Vec<PatternFeatures>>,
1142 pub temporal_coverage: Option<TemporalRange>,
1143 pub spatial_coverage: Option<SpatialCoverage>,
1144 pub numeric_ranges: Option<Vec<NumericRange>>,
1145}
1146
1147#[derive(Debug, Clone)]
1149pub struct PerformanceRecord {
1150 pub avg_response_time_score: f64,
1151 pub success_rate: f64,
1152 pub last_updated: chrono::DateTime<chrono::Utc>,
1153}
1154
1155pub use materialized_views::PatternFeatures;
1157pub use materialized_views::TemporalRange;
1158
1159#[allow(dead_code)]
1161fn convert_core_term_to_rdf_term(term: &oxirs_core::Term) -> RdfTerm {
1162 match term {
1163 oxirs_core::Term::NamedNode(node) => RdfTerm::IRI(node.as_str().to_string()),
1164 oxirs_core::Term::BlankNode(node) => RdfTerm::BlankNode(node.as_str().to_string()),
1165 oxirs_core::Term::Literal(literal) => {
1166 let value = literal.value().to_string();
1167 let datatype = if literal.datatype() != *oxirs_core::vocab::xsd::STRING {
1168 Some(literal.datatype().as_str().to_string())
1169 } else {
1170 None
1171 };
1172 let lang = literal.language().map(|l| l.to_string());
1173
1174 RdfTerm::Literal {
1175 value,
1176 datatype,
1177 lang,
1178 }
1179 }
1180 oxirs_core::Term::QuotedTriple(triple) => {
1181 let subject = Box::new(convert_subject_to_rdf_term(triple.subject()));
1182 let predicate = Box::new(convert_predicate_to_rdf_term(triple.predicate()));
1183 let object = Box::new(convert_object_to_rdf_term(triple.object()));
1184
1185 RdfTerm::QuotedTriple(QuotedTripleValue {
1186 subject,
1187 predicate,
1188 object,
1189 })
1190 }
1191 oxirs_core::Term::Variable(var) => {
1192 RdfTerm::Literal {
1194 value: format!("?{}", var.as_str()),
1195 datatype: None,
1196 lang: None,
1197 }
1198 }
1199 }
1200}
1201
1202fn convert_subject_to_rdf_term(subject: &oxirs_core::Subject) -> RdfTerm {
1204 match subject {
1205 oxirs_core::Subject::NamedNode(node) => RdfTerm::IRI(node.as_str().to_string()),
1206 oxirs_core::Subject::BlankNode(node) => RdfTerm::BlankNode(node.as_str().to_string()),
1207 oxirs_core::Subject::QuotedTriple(triple) => {
1208 let subject = Box::new(convert_subject_to_rdf_term(triple.subject()));
1209 let predicate = Box::new(convert_predicate_to_rdf_term(triple.predicate()));
1210 let object = Box::new(convert_object_to_rdf_term(triple.object()));
1211 RdfTerm::QuotedTriple(QuotedTripleValue {
1212 subject,
1213 predicate,
1214 object,
1215 })
1216 }
1217 oxirs_core::Subject::Variable(var) => RdfTerm::Literal {
1218 value: format!("?{}", var.as_str()),
1219 datatype: None,
1220 lang: None,
1221 },
1222 }
1223}
1224
1225fn convert_predicate_to_rdf_term(predicate: &oxirs_core::Predicate) -> RdfTerm {
1227 match predicate {
1228 oxirs_core::Predicate::NamedNode(node) => RdfTerm::IRI(node.as_str().to_string()),
1229 oxirs_core::Predicate::Variable(var) => RdfTerm::Literal {
1230 value: format!("?{}", var.as_str()),
1231 datatype: None,
1232 lang: None,
1233 },
1234 }
1235}
1236
1237fn convert_object_to_rdf_term(object: &oxirs_core::Object) -> RdfTerm {
1239 match object {
1240 oxirs_core::Object::NamedNode(node) => RdfTerm::IRI(node.as_str().to_string()),
1241 oxirs_core::Object::BlankNode(node) => RdfTerm::BlankNode(node.as_str().to_string()),
1242 oxirs_core::Object::Literal(literal) => {
1243 let value = literal.value().to_string();
1244 let datatype = if literal.datatype() != *oxirs_core::vocab::xsd::STRING {
1245 Some(literal.datatype().as_str().to_string())
1246 } else {
1247 None
1248 };
1249 let lang = literal.language().map(|l| l.to_string());
1250 RdfTerm::Literal {
1251 value,
1252 datatype,
1253 lang,
1254 }
1255 }
1256 oxirs_core::Object::QuotedTriple(triple) => {
1257 let subject = Box::new(convert_subject_to_rdf_term(triple.subject()));
1258 let predicate = Box::new(convert_predicate_to_rdf_term(triple.predicate()));
1259 let object = Box::new(convert_object_to_rdf_term(triple.object()));
1260 RdfTerm::QuotedTriple(QuotedTripleValue {
1261 subject,
1262 predicate,
1263 object,
1264 })
1265 }
1266 oxirs_core::Object::Variable(var) => RdfTerm::Literal {
1267 value: format!("?{}", var.as_str()),
1268 datatype: None,
1269 lang: None,
1270 },
1271 }
1272}
1273
1274#[cfg(test)]
1275mod tests {
1276 use super::*;
1277
1278 #[tokio::test]
1279 async fn test_federation_engine_creation() {
1280 let engine = FederationEngine::new();
1281 let stats = engine.get_stats().await.unwrap();
1282
1283 assert_eq!(stats.registry.total_sparql_endpoints, 0);
1284 }
1285
1286 #[tokio::test]
1287 async fn test_federation_engine_with_config() {
1288 let config = FederationConfig::default();
1289 let engine = FederationEngine::with_config(config);
1290 let health = engine.health_check().await.unwrap();
1291
1292 assert_eq!(health.overall_status, ServiceStatus::Healthy);
1293 assert_eq!(health.total_services, 0);
1294 }
1295
1296 #[tokio::test]
1297 async fn test_federation_error_criticality() {
1298 let critical_error = FederationError::PlanningFailed {
1299 reason: "Test error".to_string(),
1300 };
1301 assert!(critical_error.is_critical());
1302
1303 let non_critical_error = FederationError::ServiceUnavailable {
1304 service_id: "test-service".to_string(),
1305 };
1306 assert!(!non_critical_error.is_critical());
1307 }
1308
1309 #[tokio::test]
1310 async fn test_federated_result_success() {
1311 let result = FederatedResult {
1312 data: QueryResult::Sparql(vec![]),
1313 metadata: ExecutionMetadata {
1314 execution_time: Duration::from_millis(100),
1315 services_used: 1,
1316 subqueries_executed: 1,
1317 cache_hit: false,
1318 plan_summary: "Test plan".to_string(),
1319 },
1320 errors: vec![],
1321 };
1322
1323 assert!(result.is_success());
1324 assert_eq!(result.result_count(), 0);
1325 }
1326}