1#![allow(dead_code)]
7
8use crate::model::*;
9use crate::query::algebra::{self, *};
10use crate::OxirsError;
12use async_trait::async_trait;
13use std::collections::{HashMap, HashSet};
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant};
16#[cfg(feature = "async")]
17#[allow(unused_imports)] use tokio::sync::mpsc;
19
20pub struct DistributedQueryEngine {
22 endpoints: Arc<RwLock<HashMap<String, FederatedEndpoint>>>,
24 router: Arc<QueryRouter>,
26 network_stats: Arc<RwLock<NetworkStatistics>>,
28 edge_nodes: Arc<RwLock<Vec<EdgeNode>>>,
30 config: DistributedConfig,
32}
33
34#[derive(Debug, Clone)]
36pub struct FederatedEndpoint {
37 pub url: String,
39 pub features: EndpointFeatures,
41 pub latency_ms: f64,
43 pub throughput: f64,
45 pub datasets: Vec<String>,
47 pub last_health_check: Instant,
49 pub status: EndpointStatus,
51}
52
53#[derive(Debug, Clone)]
55pub struct EndpointFeatures {
56 pub sparql_version: String,
58 pub update_support: bool,
60 pub federation_support: bool,
62 pub text_search: bool,
64 pub geospatial: bool,
66 pub extensions: HashSet<String>,
68}
69
70#[derive(Debug, Clone, PartialEq)]
72pub enum EndpointStatus {
73 Healthy,
75 Degraded,
77 Unreachable,
79 Overloaded,
81}
82
83pub struct QueryRouter {
85 policy: RoutingPolicy,
87 data_locality: Arc<RwLock<DataLocalityMap>>,
89 pattern_cache: Arc<RwLock<PatternCache>>,
91}
92
93pub type RoutingFunction =
95 Arc<dyn Fn(&Query, &[FederatedEndpoint]) -> Vec<QueryRoute> + Send + Sync>;
96
97#[derive(Clone)]
99pub enum RoutingPolicy {
100 NearestEndpoint,
102 LoadBalanced,
104 DataLocality,
106 MinimizeTransfers,
108 Custom(RoutingFunction),
110}
111
112impl std::fmt::Debug for RoutingPolicy {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 match self {
115 Self::NearestEndpoint => write!(f, "NearestEndpoint"),
116 Self::LoadBalanced => write!(f, "LoadBalanced"),
117 Self::DataLocality => write!(f, "DataLocality"),
118 Self::MinimizeTransfers => write!(f, "MinimizeTransfers"),
119 Self::Custom(_) => write!(f, "Custom(<function>)"),
120 }
121 }
122}
123
124pub struct DataLocalityMap {
126 dataset_locations: HashMap<String, Vec<String>>,
128 predicate_distribution: HashMap<NamedNode, Vec<String>>,
130 affinity_scores: HashMap<(String, String), f64>,
132}
133
134pub struct PatternCache {
136 plans: HashMap<QueryHash, CachedPlan>,
138 stats: HashMap<QueryPattern, PatternStats>,
140 max_size: usize,
142}
143
144type QueryHash = u64;
146
147pub struct CachedPlan {
149 plan: DistributedPlan,
151 created: Instant,
153 hits: usize,
155 avg_exec_time: Duration,
157}
158
159#[derive(Debug, Clone, Hash, PartialEq, Eq)]
161struct QueryPattern {
162 patterns: Vec<algebra::TriplePattern>,
164 joins: Vec<JoinType>,
166 filters: Vec<FilterType>,
168}
169
170struct PatternStats {
172 count: usize,
174 success_rate: f64,
176 avg_result_size: usize,
178 preferred_endpoints: Vec<String>,
180}
181
182#[derive(Debug, Clone, Hash, PartialEq, Eq)]
184enum JoinType {
185 InnerJoin,
186 LeftJoin,
187 Union,
188 Optional,
189}
190
191#[derive(Debug, Clone, Hash, PartialEq, Eq)]
193enum FilterType {
194 Comparison,
195 Regex,
196 Exists,
197 Function(String),
198}
199
200pub struct NetworkStatistics {
202 latencies: HashMap<String, Vec<Duration>>,
204 transfer_rates: HashMap<String, Vec<f64>>,
206 error_rates: HashMap<String, f64>,
208 last_update: Instant,
210}
211
212#[derive(Debug, Clone)]
214pub struct EdgeNode {
215 pub id: String,
217 pub location: GeoLocation,
219 pub capacity: ComputeCapacity,
221 pub cached_data: HashSet<String>,
223 pub load: f64,
225}
226
227#[derive(Debug, Clone)]
229pub struct GeoLocation {
230 pub latitude: f64,
232 pub longitude: f64,
234 pub region: String,
236}
237
238#[derive(Debug, Clone)]
240pub struct ComputeCapacity {
241 pub cpu_cores: u32,
243 pub memory_gb: u32,
245 pub storage_gb: u32,
247 pub bandwidth_gbps: f64,
249}
250
251#[derive(Debug, Clone)]
253pub struct DistributedConfig {
254 pub query_timeout: Duration,
256 pub max_parallel_queries: usize,
258 pub edge_computing_enabled: bool,
260 pub cache_results: bool,
262 pub cache_ttl: Duration,
264 pub network_timeout: Duration,
266 pub retry_policy: RetryPolicy,
268}
269
270#[derive(Debug, Clone)]
272pub struct RetryPolicy {
273 pub max_attempts: u32,
275 pub base_delay: Duration,
277 pub backoff_factor: f64,
279 pub max_delay: Duration,
281}
282
283pub struct QueryRoute {
285 pub endpoint: String,
287 pub fragment: QueryFragment,
289 pub estimated_cost: f64,
291 pub priority: u32,
293}
294
295pub struct QueryFragment {
297 pub query: Query,
299 pub patterns: Vec<algebra::TriplePattern>,
301 pub required_vars: HashSet<Variable>,
303 pub output_vars: HashSet<Variable>,
305}
306
307pub struct DistributedPlan {
309 pub routes: Vec<QueryRoute>,
311 pub join_order: Vec<JoinOperation>,
313 pub aggregation: AggregationStrategy,
315 pub total_cost: f64,
317}
318
319pub struct JoinOperation {
321 pub left: usize,
323 pub right: usize,
325 pub join_vars: Vec<Variable>,
327 pub algorithm: JoinAlgorithm,
329}
330
331#[derive(Debug, Clone)]
333pub enum JoinAlgorithm {
334 HashJoin,
336 SortMergeJoin,
338 NestedLoop,
340 BroadcastJoin,
342 Adaptive,
344}
345
346#[derive(Clone)]
348pub enum AggregationStrategy {
349 Union,
351 MergeDistinct,
353 Streaming,
355 Custom(Arc<dyn Fn(Vec<QueryResult>) -> QueryResult + Send + Sync>),
357}
358
359impl std::fmt::Debug for AggregationStrategy {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 match self {
362 Self::Union => write!(f, "Union"),
363 Self::MergeDistinct => write!(f, "MergeDistinct"),
364 Self::Streaming => write!(f, "Streaming"),
365 Self::Custom(_) => write!(f, "Custom(<function>)"),
366 }
367 }
368}
369
370pub struct QueryResult {
372 pub bindings: Vec<HashMap<Variable, Term>>,
374 pub metadata: ExecutionMetadata,
376 pub source: String,
378}
379
380#[derive(Debug, Clone)]
382pub struct ExecutionMetadata {
383 pub execution_time: Duration,
385 pub result_count: usize,
387 pub bytes_transferred: usize,
389 pub cache_hit: bool,
391 pub warnings: Vec<String>,
393}
394
395impl DistributedQueryEngine {
396 pub fn new(config: DistributedConfig) -> Self {
398 Self {
399 endpoints: Arc::new(RwLock::new(HashMap::new())),
400 router: Arc::new(QueryRouter::new(RoutingPolicy::DataLocality)),
401 network_stats: Arc::new(RwLock::new(NetworkStatistics::new())),
402 edge_nodes: Arc::new(RwLock::new(Vec::new())),
403 config,
404 }
405 }
406
407 pub fn register_endpoint(&self, endpoint: FederatedEndpoint) -> Result<(), OxirsError> {
409 let mut endpoints = self
410 .endpoints
411 .write()
412 .map_err(|_| OxirsError::Query("Failed to acquire endpoints lock".to_string()))?;
413
414 endpoints.insert(endpoint.url.clone(), endpoint);
415 Ok(())
416 }
417
418 pub async fn execute(&self, query: Query) -> Result<QueryResult, OxirsError> {
420 let plan = self.plan_query(&query)?;
422
423 let fragment_results = self.execute_fragments(&plan).await?;
425
426 let joined = self.join_results(fragment_results, &plan)?;
428
429 let aggregated = self.aggregate_results(joined, &plan)?;
431
432 Ok(aggregated)
433 }
434
435 fn plan_query(&self, query: &Query) -> Result<DistributedPlan, OxirsError> {
437 let endpoints = self
438 .endpoints
439 .read()
440 .map_err(|_| OxirsError::Query("Failed to read endpoints".to_string()))?;
441
442 let routes = self.router.route_query(query, &endpoints)?;
444
445 let join_order = self.optimize_join_order(&routes)?;
447
448 let aggregation = self.select_aggregation_strategy(query)?;
450
451 let total_cost = routes.iter().map(|r| r.estimated_cost).sum();
453
454 Ok(DistributedPlan {
455 routes,
456 join_order,
457 aggregation,
458 total_cost,
459 })
460 }
461
462 async fn execute_fragments(
464 &self,
465 plan: &DistributedPlan,
466 ) -> Result<Vec<QueryResult>, OxirsError> {
467 use futures::future::join_all;
468
469 let mut futures = Vec::new();
470
471 for route in &plan.routes {
472 let future = self.execute_fragment(route);
473 futures.push(future);
474 }
475
476 let results = join_all(futures).await;
477
478 let mut fragment_results = Vec::new();
480 for result in results {
481 fragment_results.push(result?);
482 }
483
484 Ok(fragment_results)
485 }
486
487 async fn execute_fragment(&self, route: &QueryRoute) -> Result<QueryResult, OxirsError> {
489 Ok(QueryResult {
492 bindings: Vec::new(),
493 metadata: ExecutionMetadata {
494 execution_time: Duration::from_millis(100),
495 result_count: 0,
496 bytes_transferred: 0,
497 cache_hit: false,
498 warnings: Vec::new(),
499 },
500 source: route.endpoint.clone(),
501 })
502 }
503
504 fn join_results(
506 &self,
507 results: Vec<QueryResult>,
508 plan: &DistributedPlan,
509 ) -> Result<Vec<QueryResult>, OxirsError> {
510 let mut joined = results;
512
513 for join_op in &plan.join_order {
514 joined = self.apply_join(joined, join_op)?;
515 }
516
517 Ok(joined)
518 }
519
520 fn apply_join(
522 &self,
523 results: Vec<QueryResult>,
524 _join_op: &JoinOperation,
525 ) -> Result<Vec<QueryResult>, OxirsError> {
526 Ok(results)
528 }
529
530 fn aggregate_results(
532 &self,
533 results: Vec<QueryResult>,
534 plan: &DistributedPlan,
535 ) -> Result<QueryResult, OxirsError> {
536 match &plan.aggregation {
537 AggregationStrategy::Union => self.union_results(results),
538 AggregationStrategy::MergeDistinct => self.merge_distinct(results),
539 AggregationStrategy::Streaming => self.streaming_aggregate(results),
540 AggregationStrategy::Custom(f) => Ok(f(results)),
541 }
542 }
543
544 fn union_results(&self, results: Vec<QueryResult>) -> Result<QueryResult, OxirsError> {
546 let mut all_bindings = Vec::new();
547 let mut total_time = Duration::ZERO;
548 let mut total_bytes = 0;
549
550 for result in results {
551 all_bindings.extend(result.bindings);
552 total_time += result.metadata.execution_time;
553 total_bytes += result.metadata.bytes_transferred;
554 }
555
556 let result_count = all_bindings.len();
557 Ok(QueryResult {
558 bindings: all_bindings,
559 metadata: ExecutionMetadata {
560 execution_time: total_time,
561 result_count,
562 bytes_transferred: total_bytes,
563 cache_hit: false,
564 warnings: Vec::new(),
565 },
566 source: "distributed".to_string(),
567 })
568 }
569
570 fn merge_distinct(&self, results: Vec<QueryResult>) -> Result<QueryResult, OxirsError> {
572 use std::collections::HashSet;
573
574 let mut seen = HashSet::new();
575 let mut unique_bindings = Vec::new();
576
577 for result in results {
578 for binding in result.bindings {
579 let key = self.binding_key(&binding);
580 if seen.insert(key) {
581 unique_bindings.push(binding);
582 }
583 }
584 }
585
586 let result_count = unique_bindings.len();
587 Ok(QueryResult {
588 bindings: unique_bindings,
589 metadata: ExecutionMetadata {
590 execution_time: Duration::from_millis(100),
591 result_count,
592 bytes_transferred: 0,
593 cache_hit: false,
594 warnings: Vec::new(),
595 },
596 source: "distributed".to_string(),
597 })
598 }
599
600 fn binding_key(&self, binding: &HashMap<Variable, Term>) -> String {
602 let mut key = String::new();
603 let mut vars: Vec<_> = binding.keys().collect();
604 vars.sort();
605
606 for var in vars {
607 key.push_str(&format!("{}={},", var, binding[var]));
608 }
609
610 key
611 }
612
613 fn streaming_aggregate(&self, results: Vec<QueryResult>) -> Result<QueryResult, OxirsError> {
615 self.union_results(results)
617 }
618
619 fn optimize_join_order(
621 &self,
622 _routes: &[QueryRoute],
623 ) -> Result<Vec<JoinOperation>, OxirsError> {
624 Ok(Vec::new())
626 }
627
628 fn select_aggregation_strategy(
630 &self,
631 query: &Query,
632 ) -> Result<AggregationStrategy, OxirsError> {
633 if let QueryForm::Select { distinct, .. } = &query.form {
635 if *distinct {
636 return Ok(AggregationStrategy::MergeDistinct);
637 }
638 }
639
640 Ok(AggregationStrategy::Union)
641 }
642}
643
644impl QueryRouter {
645 pub fn new(policy: RoutingPolicy) -> Self {
647 Self {
648 policy,
649 data_locality: Arc::new(RwLock::new(DataLocalityMap::new())),
650 pattern_cache: Arc::new(RwLock::new(PatternCache::new())),
651 }
652 }
653
654 pub fn route_query(
656 &self,
657 query: &Query,
658 endpoints: &HashMap<String, FederatedEndpoint>,
659 ) -> Result<Vec<QueryRoute>, OxirsError> {
660 match &self.policy {
661 RoutingPolicy::NearestEndpoint => self.route_nearest(query, endpoints),
662 RoutingPolicy::LoadBalanced => self.route_load_balanced(query, endpoints),
663 RoutingPolicy::DataLocality => self.route_data_locality(query, endpoints),
664 RoutingPolicy::MinimizeTransfers => self.route_minimize_transfers(query, endpoints),
665 RoutingPolicy::Custom(f) => {
666 let endpoint_vec: Vec<_> = endpoints.values().cloned().collect();
667 Ok(f(query, &endpoint_vec))
668 }
669 }
670 }
671
672 fn route_nearest(
674 &self,
675 query: &Query,
676 endpoints: &HashMap<String, FederatedEndpoint>,
677 ) -> Result<Vec<QueryRoute>, OxirsError> {
678 let best_endpoint = endpoints
680 .values()
681 .filter(|e| e.status == EndpointStatus::Healthy)
682 .min_by(|a, b| {
683 a.latency_ms
684 .partial_cmp(&b.latency_ms)
685 .unwrap_or(std::cmp::Ordering::Equal)
686 })
687 .ok_or_else(|| OxirsError::Query("No healthy endpoints available".to_string()))?;
688
689 Ok(vec![QueryRoute {
690 endpoint: best_endpoint.url.clone(),
691 fragment: QueryFragment {
692 query: query.clone(),
693 patterns: self.extract_patterns(query)?,
694 required_vars: self.extract_variables(query)?,
695 output_vars: self.extract_output_vars(query)?,
696 },
697 estimated_cost: 1.0,
698 priority: 1,
699 }])
700 }
701
702 fn route_load_balanced(
704 &self,
705 query: &Query,
706 endpoints: &HashMap<String, FederatedEndpoint>,
707 ) -> Result<Vec<QueryRoute>, OxirsError> {
708 let healthy_endpoints: Vec<_> = endpoints
710 .values()
711 .filter(|e| e.status == EndpointStatus::Healthy)
712 .collect();
713
714 if healthy_endpoints.is_empty() {
715 return Err(OxirsError::Query(
716 "No healthy endpoints available".to_string(),
717 ));
718 }
719
720 let patterns = self.extract_patterns(query)?;
721 let mut routes = Vec::new();
722
723 for (i, pattern) in patterns.into_iter().enumerate() {
725 let endpoint = &healthy_endpoints[i % healthy_endpoints.len()];
726
727 routes.push(QueryRoute {
728 endpoint: endpoint.url.clone(),
729 fragment: QueryFragment {
730 query: query.clone(),
731 patterns: vec![pattern],
732 required_vars: HashSet::new(),
733 output_vars: HashSet::new(),
734 },
735 estimated_cost: 1.0 / healthy_endpoints.len() as f64,
736 priority: 1,
737 });
738 }
739
740 Ok(routes)
741 }
742
743 fn route_data_locality(
745 &self,
746 query: &Query,
747 endpoints: &HashMap<String, FederatedEndpoint>,
748 ) -> Result<Vec<QueryRoute>, OxirsError> {
749 self.route_load_balanced(query, endpoints)
751 }
752
753 fn route_minimize_transfers(
755 &self,
756 query: &Query,
757 endpoints: &HashMap<String, FederatedEndpoint>,
758 ) -> Result<Vec<QueryRoute>, OxirsError> {
759 self.route_load_balanced(query, endpoints)
761 }
762
763 fn extract_patterns(&self, query: &Query) -> Result<Vec<algebra::TriplePattern>, OxirsError> {
765 match &query.form {
766 QueryForm::Select { where_clause, .. } => {
767 self.extract_patterns_from_graph_pattern(where_clause)
768 }
769 _ => Ok(Vec::new()),
770 }
771 }
772
773 fn extract_patterns_from_graph_pattern(
775 &self,
776 pattern: &GraphPattern,
777 ) -> Result<Vec<algebra::TriplePattern>, OxirsError> {
778 match pattern {
779 GraphPattern::Bgp(patterns) => {
780 let model_patterns: Vec<algebra::TriplePattern> = patterns
782 .iter()
783 .filter_map(|p| self.convert_algebra_to_model_pattern(p))
784 .collect();
785 Ok(model_patterns)
786 }
787 GraphPattern::Join(left, right) => {
788 let mut left_patterns = self.extract_patterns_from_graph_pattern(left)?;
789 let mut right_patterns = self.extract_patterns_from_graph_pattern(right)?;
790 left_patterns.append(&mut right_patterns);
791 Ok(left_patterns)
792 }
793 GraphPattern::Filter { inner, .. } => self.extract_patterns_from_graph_pattern(inner),
794 GraphPattern::Union(left, right) => {
795 let mut left_patterns = self.extract_patterns_from_graph_pattern(left)?;
796 let mut right_patterns = self.extract_patterns_from_graph_pattern(right)?;
797 left_patterns.append(&mut right_patterns);
798 Ok(left_patterns)
799 }
800 _ => Ok(Vec::new()),
801 }
802 }
803
804 fn convert_to_algebra_pattern(
806 &self,
807 pattern: &crate::model::pattern::TriplePattern,
808 ) -> Result<algebra::TriplePattern, OxirsError> {
809 let subject = match &pattern.subject {
810 Some(crate::model::pattern::SubjectPattern::NamedNode(n)) => {
811 algebra::TermPattern::NamedNode(n.clone())
812 }
813 Some(crate::model::pattern::SubjectPattern::BlankNode(b)) => {
814 algebra::TermPattern::BlankNode(b.clone())
815 }
816 Some(crate::model::pattern::SubjectPattern::Variable(v)) => {
817 algebra::TermPattern::Variable(v.clone())
818 }
819 None => {
820 return Err(OxirsError::Query(
821 "Subject pattern cannot be None in basic graph pattern".to_string(),
822 ))
823 }
824 };
825
826 let predicate = match &pattern.predicate {
827 Some(crate::model::pattern::PredicatePattern::NamedNode(n)) => {
828 algebra::TermPattern::NamedNode(n.clone())
829 }
830 Some(crate::model::pattern::PredicatePattern::Variable(v)) => {
831 algebra::TermPattern::Variable(v.clone())
832 }
833 None => {
834 return Err(OxirsError::Query(
835 "Predicate pattern cannot be None in basic graph pattern".to_string(),
836 ))
837 }
838 };
839
840 let object = match &pattern.object {
841 Some(crate::model::pattern::ObjectPattern::NamedNode(n)) => {
842 algebra::TermPattern::NamedNode(n.clone())
843 }
844 Some(crate::model::pattern::ObjectPattern::BlankNode(b)) => {
845 algebra::TermPattern::BlankNode(b.clone())
846 }
847 Some(crate::model::pattern::ObjectPattern::Literal(l)) => {
848 algebra::TermPattern::Literal(l.clone())
849 }
850 Some(crate::model::pattern::ObjectPattern::Variable(v)) => {
851 algebra::TermPattern::Variable(v.clone())
852 }
853 None => {
854 return Err(OxirsError::Query(
855 "Object pattern cannot be None in basic graph pattern".to_string(),
856 ))
857 }
858 };
859
860 Ok(algebra::TriplePattern::new(
861 Some(subject.into()),
862 Some(predicate.into()),
863 Some(object.into()),
864 ))
865 }
866
867 fn convert_algebra_to_model_pattern(
869 &self,
870 algebra_pattern: &AlgebraTriplePattern,
871 ) -> Option<algebra::TriplePattern> {
872 use crate::model::pattern::{ObjectPattern, PredicatePattern, SubjectPattern};
873
874 let subject = match &algebra_pattern.subject {
875 algebra::TermPattern::NamedNode(n) => Some(SubjectPattern::NamedNode(n.clone())),
876 algebra::TermPattern::BlankNode(b) => Some(SubjectPattern::BlankNode(b.clone())),
877 algebra::TermPattern::Variable(v) => Some(SubjectPattern::Variable(v.clone())),
878 _ => None,
879 };
880
881 let predicate = match &algebra_pattern.predicate {
882 algebra::TermPattern::NamedNode(n) => Some(PredicatePattern::NamedNode(n.clone())),
883 algebra::TermPattern::Variable(v) => Some(PredicatePattern::Variable(v.clone())),
884 _ => None,
885 };
886
887 let object = match &algebra_pattern.object {
888 algebra::TermPattern::NamedNode(n) => Some(ObjectPattern::NamedNode(n.clone())),
889 algebra::TermPattern::BlankNode(b) => Some(ObjectPattern::BlankNode(b.clone())),
890 algebra::TermPattern::Literal(l) => Some(ObjectPattern::Literal(l.clone())),
891 algebra::TermPattern::Variable(v) => Some(ObjectPattern::Variable(v.clone())),
892 algebra::TermPattern::QuotedTriple(_) => {
893 panic!("RDF-star quoted triples not yet supported in distributed queries")
894 }
895 };
896
897 Some(algebra::TriplePattern::new(subject, predicate, object))
898 }
899
900 fn extract_variables(&self, query: &Query) -> Result<HashSet<Variable>, OxirsError> {
902 let mut vars = HashSet::new();
903
904 if let QueryForm::Select { where_clause, .. } = &query.form {
905 self.collect_variables_from_pattern(where_clause, &mut vars)?;
906 }
907
908 Ok(vars)
909 }
910
911 fn collect_variables_from_pattern(
913 &self,
914 pattern: &GraphPattern,
915 vars: &mut HashSet<Variable>,
916 ) -> Result<(), OxirsError> {
917 if let GraphPattern::Bgp(patterns) = pattern {
918 for tp in patterns {
919 if let TermPattern::Variable(v) = &tp.subject {
920 vars.insert(v.clone());
921 }
922 if let TermPattern::Variable(v) = &tp.predicate {
923 vars.insert(v.clone());
924 }
925 if let TermPattern::Variable(v) = &tp.object {
926 vars.insert(v.clone());
927 }
928 }
929 }
930
931 Ok(())
932 }
933
934 fn extract_output_vars(&self, query: &Query) -> Result<HashSet<Variable>, OxirsError> {
936 match &query.form {
937 QueryForm::Select { variables, .. } => match variables {
938 SelectVariables::All => self.extract_variables(query),
939 SelectVariables::Specific(vars) => Ok(vars.iter().cloned().collect()),
940 },
941 _ => Ok(HashSet::new()),
942 }
943 }
944}
945
946impl Default for NetworkStatistics {
947 fn default() -> Self {
948 Self::new()
949 }
950}
951
952impl NetworkStatistics {
953 pub fn new() -> Self {
955 Self {
956 latencies: HashMap::new(),
957 transfer_rates: HashMap::new(),
958 error_rates: HashMap::new(),
959 last_update: Instant::now(),
960 }
961 }
962
963 pub fn update_latency(&mut self, endpoint: String, latency: Duration) {
965 self.latencies.entry(endpoint).or_default().push(latency);
966 self.last_update = Instant::now();
967 }
968
969 pub fn avg_latency(&self, endpoint: &str) -> Option<Duration> {
971 self.latencies.get(endpoint).map(|samples| {
972 let sum: Duration = samples.iter().sum();
973 sum / samples.len() as u32
974 })
975 }
976}
977
978impl Default for DataLocalityMap {
979 fn default() -> Self {
980 Self::new()
981 }
982}
983
984impl DataLocalityMap {
985 pub fn new() -> Self {
987 Self {
988 dataset_locations: HashMap::new(),
989 predicate_distribution: HashMap::new(),
990 affinity_scores: HashMap::new(),
991 }
992 }
993
994 pub fn update_dataset_location(&mut self, dataset: String, endpoints: Vec<String>) {
996 self.dataset_locations.insert(dataset, endpoints);
997 }
998
999 pub fn get_dataset_endpoints(&self, dataset: &str) -> Option<&Vec<String>> {
1001 self.dataset_locations.get(dataset)
1002 }
1003}
1004
1005impl Default for PatternCache {
1006 fn default() -> Self {
1007 Self::new()
1008 }
1009}
1010
1011impl PatternCache {
1012 pub fn new() -> Self {
1014 Self {
1015 plans: HashMap::new(),
1016 stats: HashMap::new(),
1017 max_size: 1000,
1018 }
1019 }
1020
1021 pub fn get_plan(&mut self, hash: QueryHash) -> Option<&mut CachedPlan> {
1023 self.plans.get_mut(&hash).map(|plan| {
1024 plan.hits += 1;
1025 plan
1026 })
1027 }
1028
1029 pub fn cache_plan(&mut self, hash: QueryHash, plan: DistributedPlan) {
1031 if self.plans.len() >= self.max_size {
1033 if let Some(&oldest) = self.plans.keys().next() {
1035 self.plans.remove(&oldest);
1036 }
1037 }
1038
1039 self.plans.insert(
1040 hash,
1041 CachedPlan {
1042 plan,
1043 created: Instant::now(),
1044 hits: 0,
1045 avg_exec_time: Duration::ZERO,
1046 },
1047 );
1048 }
1049}
1050
1051impl Default for DistributedConfig {
1052 fn default() -> Self {
1053 Self {
1054 query_timeout: Duration::from_secs(30),
1055 max_parallel_queries: 100,
1056 edge_computing_enabled: true,
1057 cache_results: true,
1058 cache_ttl: Duration::from_secs(300),
1059 network_timeout: Duration::from_secs(10),
1060 retry_policy: RetryPolicy::default(),
1061 }
1062 }
1063}
1064
1065impl Default for RetryPolicy {
1066 fn default() -> Self {
1067 Self {
1068 max_attempts: 3,
1069 base_delay: Duration::from_millis(100),
1070 backoff_factor: 2.0,
1071 max_delay: Duration::from_secs(10),
1072 }
1073 }
1074}
1075
1076#[async_trait]
1078pub trait FederatedQueryExecutor: Send + Sync {
1079 async fn execute_query(
1081 &self,
1082 endpoint: &FederatedEndpoint,
1083 query: &Query,
1084 ) -> Result<QueryResult, OxirsError>;
1085
1086 async fn check_health(&self, endpoint: &FederatedEndpoint) -> EndpointStatus;
1088
1089 async fn get_capabilities(
1091 &self,
1092 endpoint: &FederatedEndpoint,
1093 ) -> Result<EndpointFeatures, OxirsError>;
1094}
1095
1096pub struct CollaborativeFilter {
1098 active_queries: Arc<RwLock<HashMap<QueryHash, ActiveQuery>>>,
1100 similarity_threshold: f64,
1102 #[cfg(feature = "async")]
1104 result_channel: tokio::sync::mpsc::Sender<SharedResult>,
1105 #[cfg(not(feature = "async"))]
1106 result_channel: std::sync::mpsc::Sender<SharedResult>,
1107}
1108
1109struct ActiveQuery {
1111 pattern: QueryPattern,
1113 clients: HashSet<String>,
1115 partial_results: Vec<QueryResult>,
1117 start_time: Instant,
1119}
1120
1121pub struct SharedResult {
1123 query_hash: QueryHash,
1125 result: QueryResult,
1127 client_id: String,
1129}
1130
1131impl CollaborativeFilter {
1132 #[cfg(feature = "async")]
1134 pub fn new(similarity_threshold: f64) -> (Self, tokio::sync::mpsc::Receiver<SharedResult>) {
1135 let (tx, rx) = tokio::sync::mpsc::channel(1000);
1136
1137 (
1138 Self {
1139 active_queries: Arc::new(RwLock::new(HashMap::new())),
1140 similarity_threshold,
1141 result_channel: tx,
1142 },
1143 rx,
1144 )
1145 }
1146
1147 #[cfg(not(feature = "async"))]
1148 pub fn new(similarity_threshold: f64) -> (Self, std::sync::mpsc::Receiver<SharedResult>) {
1149 let (tx, rx) = std::sync::mpsc::channel();
1150
1151 (
1152 Self {
1153 active_queries: Arc::new(RwLock::new(HashMap::new())),
1154 similarity_threshold,
1155 result_channel: tx,
1156 },
1157 rx,
1158 )
1159 }
1160
1161 pub async fn register_query(
1163 &self,
1164 query: &Query,
1165 client_id: String,
1166 ) -> Result<QueryHash, OxirsError> {
1167 let pattern = self.extract_query_pattern(query)?;
1168 let hash = self.hash_pattern(&pattern);
1169
1170 let mut active = self
1171 .active_queries
1172 .write()
1173 .map_err(|_| OxirsError::Query("Failed to acquire lock".to_string()))?;
1174
1175 active
1176 .entry(hash)
1177 .or_insert_with(|| ActiveQuery {
1178 pattern: pattern.clone(),
1179 clients: HashSet::new(),
1180 partial_results: Vec::new(),
1181 start_time: Instant::now(),
1182 })
1183 .clients
1184 .insert(client_id);
1185
1186 Ok(hash)
1187 }
1188
1189 #[cfg(feature = "async")]
1191 pub async fn share_results(
1192 &self,
1193 hash: QueryHash,
1194 result: QueryResult,
1195 client_id: String,
1196 ) -> Result<(), OxirsError> {
1197 self.result_channel
1198 .send(SharedResult {
1199 query_hash: hash,
1200 result,
1201 client_id,
1202 })
1203 .await
1204 .map_err(|_| OxirsError::Query("Failed to share results".to_string()))
1205 }
1206
1207 #[cfg(not(feature = "async"))]
1208 pub fn share_results(
1209 &self,
1210 hash: QueryHash,
1211 result: QueryResult,
1212 client_id: String,
1213 ) -> Result<(), OxirsError> {
1214 self.result_channel
1215 .send(SharedResult {
1216 query_hash: hash,
1217 result,
1218 client_id,
1219 })
1220 .map_err(|_| OxirsError::Query("Failed to share results".to_string()))
1221 }
1222
1223 fn extract_query_pattern(&self, _query: &Query) -> Result<QueryPattern, OxirsError> {
1225 Ok(QueryPattern {
1227 patterns: Vec::new(),
1228 joins: Vec::new(),
1229 filters: Vec::new(),
1230 })
1231 }
1232
1233 fn hash_pattern(&self, pattern: &QueryPattern) -> QueryHash {
1235 use std::collections::hash_map::DefaultHasher;
1236 use std::hash::{Hash, Hasher};
1237
1238 let mut hasher = DefaultHasher::new();
1239 pattern.hash(&mut hasher);
1240 hasher.finish()
1241 }
1242}
1243
1244#[cfg(test)]
1245mod tests {
1246 use super::*;
1247
1248 #[test]
1249 fn test_distributed_engine_creation() {
1250 let config = DistributedConfig::default();
1251 let engine = DistributedQueryEngine::new(config);
1252
1253 assert!(engine
1254 .endpoints
1255 .read()
1256 .expect("lock should not be poisoned")
1257 .is_empty());
1258 }
1259
1260 #[test]
1261 fn test_endpoint_registration() {
1262 let config = DistributedConfig::default();
1263 let engine = DistributedQueryEngine::new(config);
1264
1265 let endpoint = FederatedEndpoint {
1266 url: "http://example.org/sparql".to_string(),
1267 features: EndpointFeatures {
1268 sparql_version: "1.1".to_string(),
1269 update_support: true,
1270 federation_support: true,
1271 text_search: false,
1272 geospatial: false,
1273 extensions: HashSet::new(),
1274 },
1275 latency_ms: 50.0,
1276 throughput: 10000.0,
1277 datasets: vec!["dataset1".to_string()],
1278 last_health_check: Instant::now(),
1279 status: EndpointStatus::Healthy,
1280 };
1281
1282 engine
1283 .register_endpoint(endpoint)
1284 .expect("operation should succeed");
1285
1286 let endpoints = engine
1287 .endpoints
1288 .read()
1289 .expect("lock should not be poisoned");
1290 assert_eq!(endpoints.len(), 1);
1291 assert!(endpoints.contains_key("http://example.org/sparql"));
1292 }
1293
1294 #[test]
1295 fn test_query_router() {
1296 let router = QueryRouter::new(RoutingPolicy::NearestEndpoint);
1297 let mut endpoints = HashMap::new();
1298
1299 endpoints.insert(
1300 "endpoint1".to_string(),
1301 FederatedEndpoint {
1302 url: "http://endpoint1.org/sparql".to_string(),
1303 features: EndpointFeatures {
1304 sparql_version: "1.1".to_string(),
1305 update_support: false,
1306 federation_support: true,
1307 text_search: false,
1308 geospatial: false,
1309 extensions: HashSet::new(),
1310 },
1311 latency_ms: 20.0,
1312 throughput: 5000.0,
1313 datasets: vec![],
1314 last_health_check: Instant::now(),
1315 status: EndpointStatus::Healthy,
1316 },
1317 );
1318
1319 let query = Query {
1320 base: None,
1321 prefixes: HashMap::new(),
1322 form: QueryForm::Select {
1323 variables: SelectVariables::All,
1324 where_clause: GraphPattern::Bgp(vec![]),
1325 distinct: false,
1326 reduced: false,
1327 order_by: vec![],
1328 offset: 0,
1329 limit: None,
1330 },
1331 dataset: crate::query::algebra::Dataset::default(),
1332 };
1333
1334 let routes = router
1335 .route_query(&query, &endpoints)
1336 .expect("operation should succeed");
1337 assert_eq!(routes.len(), 1);
1338 assert_eq!(routes[0].endpoint, "http://endpoint1.org/sparql");
1339 }
1340}