1#![allow(dead_code)]
7
8use crate::model::*;
9use crate::query::algebra::{self, *};
10use crate::OxirsError;
12use async_trait::async_trait;
13use std::collections::{HashMap, HashSet};
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant};
16#[cfg(feature = "async")]
17use tokio::sync::mpsc;
18
19pub struct DistributedQueryEngine {
21 endpoints: Arc<RwLock<HashMap<String, FederatedEndpoint>>>,
23 router: Arc<QueryRouter>,
25 network_stats: Arc<RwLock<NetworkStatistics>>,
27 edge_nodes: Arc<RwLock<Vec<EdgeNode>>>,
29 config: DistributedConfig,
31}
32
33#[derive(Debug, Clone)]
35pub struct FederatedEndpoint {
36 pub url: String,
38 pub features: EndpointFeatures,
40 pub latency_ms: f64,
42 pub throughput: f64,
44 pub datasets: Vec<String>,
46 pub last_health_check: Instant,
48 pub status: EndpointStatus,
50}
51
52#[derive(Debug, Clone)]
54pub struct EndpointFeatures {
55 pub sparql_version: String,
57 pub update_support: bool,
59 pub federation_support: bool,
61 pub text_search: bool,
63 pub geospatial: bool,
65 pub extensions: HashSet<String>,
67}
68
69#[derive(Debug, Clone, PartialEq)]
71pub enum EndpointStatus {
72 Healthy,
74 Degraded,
76 Unreachable,
78 Overloaded,
80}
81
82pub struct QueryRouter {
84 policy: RoutingPolicy,
86 data_locality: Arc<RwLock<DataLocalityMap>>,
88 pattern_cache: Arc<RwLock<PatternCache>>,
90}
91
92pub type RoutingFunction =
94 Arc<dyn Fn(&Query, &[FederatedEndpoint]) -> Vec<QueryRoute> + Send + Sync>;
95
96#[derive(Clone)]
98pub enum RoutingPolicy {
99 NearestEndpoint,
101 LoadBalanced,
103 DataLocality,
105 MinimizeTransfers,
107 Custom(RoutingFunction),
109}
110
111impl std::fmt::Debug for RoutingPolicy {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 match self {
114 Self::NearestEndpoint => write!(f, "NearestEndpoint"),
115 Self::LoadBalanced => write!(f, "LoadBalanced"),
116 Self::DataLocality => write!(f, "DataLocality"),
117 Self::MinimizeTransfers => write!(f, "MinimizeTransfers"),
118 Self::Custom(_) => write!(f, "Custom(<function>)"),
119 }
120 }
121}
122
123pub struct DataLocalityMap {
125 dataset_locations: HashMap<String, Vec<String>>,
127 predicate_distribution: HashMap<NamedNode, Vec<String>>,
129 affinity_scores: HashMap<(String, String), f64>,
131}
132
133pub struct PatternCache {
135 plans: HashMap<QueryHash, CachedPlan>,
137 stats: HashMap<QueryPattern, PatternStats>,
139 max_size: usize,
141}
142
143type QueryHash = u64;
145
146pub struct CachedPlan {
148 plan: DistributedPlan,
150 created: Instant,
152 hits: usize,
154 avg_exec_time: Duration,
156}
157
158#[derive(Debug, Clone, Hash, PartialEq, Eq)]
160struct QueryPattern {
161 patterns: Vec<algebra::TriplePattern>,
163 joins: Vec<JoinType>,
165 filters: Vec<FilterType>,
167}
168
169struct PatternStats {
171 count: usize,
173 success_rate: f64,
175 avg_result_size: usize,
177 preferred_endpoints: Vec<String>,
179}
180
181#[derive(Debug, Clone, Hash, PartialEq, Eq)]
183enum JoinType {
184 InnerJoin,
185 LeftJoin,
186 Union,
187 Optional,
188}
189
190#[derive(Debug, Clone, Hash, PartialEq, Eq)]
192enum FilterType {
193 Comparison,
194 Regex,
195 Exists,
196 Function(String),
197}
198
199pub struct NetworkStatistics {
201 latencies: HashMap<String, Vec<Duration>>,
203 transfer_rates: HashMap<String, Vec<f64>>,
205 error_rates: HashMap<String, f64>,
207 last_update: Instant,
209}
210
211#[derive(Debug, Clone)]
213pub struct EdgeNode {
214 pub id: String,
216 pub location: GeoLocation,
218 pub capacity: ComputeCapacity,
220 pub cached_data: HashSet<String>,
222 pub load: f64,
224}
225
226#[derive(Debug, Clone)]
228pub struct GeoLocation {
229 pub latitude: f64,
231 pub longitude: f64,
233 pub region: String,
235}
236
237#[derive(Debug, Clone)]
239pub struct ComputeCapacity {
240 pub cpu_cores: u32,
242 pub memory_gb: u32,
244 pub storage_gb: u32,
246 pub bandwidth_gbps: f64,
248}
249
250#[derive(Debug, Clone)]
252pub struct DistributedConfig {
253 pub query_timeout: Duration,
255 pub max_parallel_queries: usize,
257 pub edge_computing_enabled: bool,
259 pub cache_results: bool,
261 pub cache_ttl: Duration,
263 pub network_timeout: Duration,
265 pub retry_policy: RetryPolicy,
267}
268
269#[derive(Debug, Clone)]
271pub struct RetryPolicy {
272 pub max_attempts: u32,
274 pub base_delay: Duration,
276 pub backoff_factor: f64,
278 pub max_delay: Duration,
280}
281
282pub struct QueryRoute {
284 pub endpoint: String,
286 pub fragment: QueryFragment,
288 pub estimated_cost: f64,
290 pub priority: u32,
292}
293
294pub struct QueryFragment {
296 pub query: Query,
298 pub patterns: Vec<algebra::TriplePattern>,
300 pub required_vars: HashSet<Variable>,
302 pub output_vars: HashSet<Variable>,
304}
305
306pub struct DistributedPlan {
308 pub routes: Vec<QueryRoute>,
310 pub join_order: Vec<JoinOperation>,
312 pub aggregation: AggregationStrategy,
314 pub total_cost: f64,
316}
317
318pub struct JoinOperation {
320 pub left: usize,
322 pub right: usize,
324 pub join_vars: Vec<Variable>,
326 pub algorithm: JoinAlgorithm,
328}
329
330#[derive(Debug, Clone)]
332pub enum JoinAlgorithm {
333 HashJoin,
335 SortMergeJoin,
337 NestedLoop,
339 BroadcastJoin,
341 Adaptive,
343}
344
345#[derive(Clone)]
347pub enum AggregationStrategy {
348 Union,
350 MergeDistinct,
352 Streaming,
354 Custom(Arc<dyn Fn(Vec<QueryResult>) -> QueryResult + Send + Sync>),
356}
357
358impl std::fmt::Debug for AggregationStrategy {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 match self {
361 Self::Union => write!(f, "Union"),
362 Self::MergeDistinct => write!(f, "MergeDistinct"),
363 Self::Streaming => write!(f, "Streaming"),
364 Self::Custom(_) => write!(f, "Custom(<function>)"),
365 }
366 }
367}
368
369pub struct QueryResult {
371 pub bindings: Vec<HashMap<Variable, Term>>,
373 pub metadata: ExecutionMetadata,
375 pub source: String,
377}
378
379#[derive(Debug, Clone)]
381pub struct ExecutionMetadata {
382 pub execution_time: Duration,
384 pub result_count: usize,
386 pub bytes_transferred: usize,
388 pub cache_hit: bool,
390 pub warnings: Vec<String>,
392}
393
394impl DistributedQueryEngine {
395 pub fn new(config: DistributedConfig) -> Self {
397 Self {
398 endpoints: Arc::new(RwLock::new(HashMap::new())),
399 router: Arc::new(QueryRouter::new(RoutingPolicy::DataLocality)),
400 network_stats: Arc::new(RwLock::new(NetworkStatistics::new())),
401 edge_nodes: Arc::new(RwLock::new(Vec::new())),
402 config,
403 }
404 }
405
406 pub fn register_endpoint(&self, endpoint: FederatedEndpoint) -> Result<(), OxirsError> {
408 let mut endpoints = self
409 .endpoints
410 .write()
411 .map_err(|_| OxirsError::Query("Failed to acquire endpoints lock".to_string()))?;
412
413 endpoints.insert(endpoint.url.clone(), endpoint);
414 Ok(())
415 }
416
417 pub async fn execute(&self, query: Query) -> Result<QueryResult, OxirsError> {
419 let plan = self.plan_query(&query)?;
421
422 let fragment_results = self.execute_fragments(&plan).await?;
424
425 let joined = self.join_results(fragment_results, &plan)?;
427
428 let aggregated = self.aggregate_results(joined, &plan)?;
430
431 Ok(aggregated)
432 }
433
434 fn plan_query(&self, query: &Query) -> Result<DistributedPlan, OxirsError> {
436 let endpoints = self
437 .endpoints
438 .read()
439 .map_err(|_| OxirsError::Query("Failed to read endpoints".to_string()))?;
440
441 let routes = self.router.route_query(query, &endpoints)?;
443
444 let join_order = self.optimize_join_order(&routes)?;
446
447 let aggregation = self.select_aggregation_strategy(query)?;
449
450 let total_cost = routes.iter().map(|r| r.estimated_cost).sum();
452
453 Ok(DistributedPlan {
454 routes,
455 join_order,
456 aggregation,
457 total_cost,
458 })
459 }
460
461 async fn execute_fragments(
463 &self,
464 plan: &DistributedPlan,
465 ) -> Result<Vec<QueryResult>, OxirsError> {
466 use futures::future::join_all;
467
468 let mut futures = Vec::new();
469
470 for route in &plan.routes {
471 let future = self.execute_fragment(route);
472 futures.push(future);
473 }
474
475 let results = join_all(futures).await;
476
477 let mut fragment_results = Vec::new();
479 for result in results {
480 fragment_results.push(result?);
481 }
482
483 Ok(fragment_results)
484 }
485
486 async fn execute_fragment(&self, route: &QueryRoute) -> Result<QueryResult, OxirsError> {
488 Ok(QueryResult {
491 bindings: Vec::new(),
492 metadata: ExecutionMetadata {
493 execution_time: Duration::from_millis(100),
494 result_count: 0,
495 bytes_transferred: 0,
496 cache_hit: false,
497 warnings: Vec::new(),
498 },
499 source: route.endpoint.clone(),
500 })
501 }
502
503 fn join_results(
505 &self,
506 results: Vec<QueryResult>,
507 plan: &DistributedPlan,
508 ) -> Result<Vec<QueryResult>, OxirsError> {
509 let mut joined = results;
511
512 for join_op in &plan.join_order {
513 joined = self.apply_join(joined, join_op)?;
514 }
515
516 Ok(joined)
517 }
518
519 fn apply_join(
521 &self,
522 results: Vec<QueryResult>,
523 _join_op: &JoinOperation,
524 ) -> Result<Vec<QueryResult>, OxirsError> {
525 Ok(results)
527 }
528
529 fn aggregate_results(
531 &self,
532 results: Vec<QueryResult>,
533 plan: &DistributedPlan,
534 ) -> Result<QueryResult, OxirsError> {
535 match &plan.aggregation {
536 AggregationStrategy::Union => self.union_results(results),
537 AggregationStrategy::MergeDistinct => self.merge_distinct(results),
538 AggregationStrategy::Streaming => self.streaming_aggregate(results),
539 AggregationStrategy::Custom(f) => Ok(f(results)),
540 }
541 }
542
543 fn union_results(&self, results: Vec<QueryResult>) -> Result<QueryResult, OxirsError> {
545 let mut all_bindings = Vec::new();
546 let mut total_time = Duration::ZERO;
547 let mut total_bytes = 0;
548
549 for result in results {
550 all_bindings.extend(result.bindings);
551 total_time += result.metadata.execution_time;
552 total_bytes += result.metadata.bytes_transferred;
553 }
554
555 let result_count = all_bindings.len();
556 Ok(QueryResult {
557 bindings: all_bindings,
558 metadata: ExecutionMetadata {
559 execution_time: total_time,
560 result_count,
561 bytes_transferred: total_bytes,
562 cache_hit: false,
563 warnings: Vec::new(),
564 },
565 source: "distributed".to_string(),
566 })
567 }
568
569 fn merge_distinct(&self, results: Vec<QueryResult>) -> Result<QueryResult, OxirsError> {
571 use std::collections::HashSet;
572
573 let mut seen = HashSet::new();
574 let mut unique_bindings = Vec::new();
575
576 for result in results {
577 for binding in result.bindings {
578 let key = self.binding_key(&binding);
579 if seen.insert(key) {
580 unique_bindings.push(binding);
581 }
582 }
583 }
584
585 let result_count = unique_bindings.len();
586 Ok(QueryResult {
587 bindings: unique_bindings,
588 metadata: ExecutionMetadata {
589 execution_time: Duration::from_millis(100),
590 result_count,
591 bytes_transferred: 0,
592 cache_hit: false,
593 warnings: Vec::new(),
594 },
595 source: "distributed".to_string(),
596 })
597 }
598
599 fn binding_key(&self, binding: &HashMap<Variable, Term>) -> String {
601 let mut key = String::new();
602 let mut vars: Vec<_> = binding.keys().collect();
603 vars.sort();
604
605 for var in vars {
606 key.push_str(&format!("{}={},", var, binding[var]));
607 }
608
609 key
610 }
611
612 fn streaming_aggregate(&self, results: Vec<QueryResult>) -> Result<QueryResult, OxirsError> {
614 self.union_results(results)
616 }
617
618 fn optimize_join_order(
620 &self,
621 _routes: &[QueryRoute],
622 ) -> Result<Vec<JoinOperation>, OxirsError> {
623 Ok(Vec::new())
625 }
626
627 fn select_aggregation_strategy(
629 &self,
630 query: &Query,
631 ) -> Result<AggregationStrategy, OxirsError> {
632 if let QueryForm::Select { distinct, .. } = &query.form {
634 if *distinct {
635 return Ok(AggregationStrategy::MergeDistinct);
636 }
637 }
638
639 Ok(AggregationStrategy::Union)
640 }
641}
642
643impl QueryRouter {
644 pub fn new(policy: RoutingPolicy) -> Self {
646 Self {
647 policy,
648 data_locality: Arc::new(RwLock::new(DataLocalityMap::new())),
649 pattern_cache: Arc::new(RwLock::new(PatternCache::new())),
650 }
651 }
652
653 pub fn route_query(
655 &self,
656 query: &Query,
657 endpoints: &HashMap<String, FederatedEndpoint>,
658 ) -> Result<Vec<QueryRoute>, OxirsError> {
659 match &self.policy {
660 RoutingPolicy::NearestEndpoint => self.route_nearest(query, endpoints),
661 RoutingPolicy::LoadBalanced => self.route_load_balanced(query, endpoints),
662 RoutingPolicy::DataLocality => self.route_data_locality(query, endpoints),
663 RoutingPolicy::MinimizeTransfers => self.route_minimize_transfers(query, endpoints),
664 RoutingPolicy::Custom(f) => {
665 let endpoint_vec: Vec<_> = endpoints.values().cloned().collect();
666 Ok(f(query, &endpoint_vec))
667 }
668 }
669 }
670
671 fn route_nearest(
673 &self,
674 query: &Query,
675 endpoints: &HashMap<String, FederatedEndpoint>,
676 ) -> Result<Vec<QueryRoute>, OxirsError> {
677 let best_endpoint = endpoints
679 .values()
680 .filter(|e| e.status == EndpointStatus::Healthy)
681 .min_by(|a, b| a.latency_ms.partial_cmp(&b.latency_ms).unwrap())
682 .ok_or_else(|| OxirsError::Query("No healthy endpoints available".to_string()))?;
683
684 Ok(vec![QueryRoute {
685 endpoint: best_endpoint.url.clone(),
686 fragment: QueryFragment {
687 query: query.clone(),
688 patterns: self.extract_patterns(query)?,
689 required_vars: self.extract_variables(query)?,
690 output_vars: self.extract_output_vars(query)?,
691 },
692 estimated_cost: 1.0,
693 priority: 1,
694 }])
695 }
696
697 fn route_load_balanced(
699 &self,
700 query: &Query,
701 endpoints: &HashMap<String, FederatedEndpoint>,
702 ) -> Result<Vec<QueryRoute>, OxirsError> {
703 let healthy_endpoints: Vec<_> = endpoints
705 .values()
706 .filter(|e| e.status == EndpointStatus::Healthy)
707 .collect();
708
709 if healthy_endpoints.is_empty() {
710 return Err(OxirsError::Query(
711 "No healthy endpoints available".to_string(),
712 ));
713 }
714
715 let patterns = self.extract_patterns(query)?;
716 let mut routes = Vec::new();
717
718 for (i, pattern) in patterns.into_iter().enumerate() {
720 let endpoint = &healthy_endpoints[i % healthy_endpoints.len()];
721
722 routes.push(QueryRoute {
723 endpoint: endpoint.url.clone(),
724 fragment: QueryFragment {
725 query: query.clone(),
726 patterns: vec![pattern],
727 required_vars: HashSet::new(),
728 output_vars: HashSet::new(),
729 },
730 estimated_cost: 1.0 / healthy_endpoints.len() as f64,
731 priority: 1,
732 });
733 }
734
735 Ok(routes)
736 }
737
738 fn route_data_locality(
740 &self,
741 query: &Query,
742 endpoints: &HashMap<String, FederatedEndpoint>,
743 ) -> Result<Vec<QueryRoute>, OxirsError> {
744 self.route_load_balanced(query, endpoints)
746 }
747
748 fn route_minimize_transfers(
750 &self,
751 query: &Query,
752 endpoints: &HashMap<String, FederatedEndpoint>,
753 ) -> Result<Vec<QueryRoute>, OxirsError> {
754 self.route_load_balanced(query, endpoints)
756 }
757
758 fn extract_patterns(&self, query: &Query) -> Result<Vec<algebra::TriplePattern>, OxirsError> {
760 match &query.form {
761 QueryForm::Select { where_clause, .. } => {
762 self.extract_patterns_from_graph_pattern(where_clause)
763 }
764 _ => Ok(Vec::new()),
765 }
766 }
767
768 fn extract_patterns_from_graph_pattern(
770 &self,
771 pattern: &GraphPattern,
772 ) -> Result<Vec<algebra::TriplePattern>, OxirsError> {
773 match pattern {
774 GraphPattern::Bgp(patterns) => {
775 let model_patterns: Vec<algebra::TriplePattern> = patterns
777 .iter()
778 .filter_map(|p| self.convert_algebra_to_model_pattern(p))
779 .collect();
780 Ok(model_patterns)
781 }
782 GraphPattern::Join(left, right) => {
783 let mut left_patterns = self.extract_patterns_from_graph_pattern(left)?;
784 let mut right_patterns = self.extract_patterns_from_graph_pattern(right)?;
785 left_patterns.append(&mut right_patterns);
786 Ok(left_patterns)
787 }
788 GraphPattern::Filter { inner, .. } => self.extract_patterns_from_graph_pattern(inner),
789 GraphPattern::Union(left, right) => {
790 let mut left_patterns = self.extract_patterns_from_graph_pattern(left)?;
791 let mut right_patterns = self.extract_patterns_from_graph_pattern(right)?;
792 left_patterns.append(&mut right_patterns);
793 Ok(left_patterns)
794 }
795 _ => Ok(Vec::new()),
796 }
797 }
798
799 fn convert_to_algebra_pattern(
801 &self,
802 pattern: &crate::model::pattern::TriplePattern,
803 ) -> Result<algebra::TriplePattern, OxirsError> {
804 let subject = match &pattern.subject {
805 Some(crate::model::pattern::SubjectPattern::NamedNode(n)) => {
806 algebra::TermPattern::NamedNode(n.clone())
807 }
808 Some(crate::model::pattern::SubjectPattern::BlankNode(b)) => {
809 algebra::TermPattern::BlankNode(b.clone())
810 }
811 Some(crate::model::pattern::SubjectPattern::Variable(v)) => {
812 algebra::TermPattern::Variable(v.clone())
813 }
814 None => {
815 return Err(OxirsError::Query(
816 "Subject pattern cannot be None in basic graph pattern".to_string(),
817 ))
818 }
819 };
820
821 let predicate = match &pattern.predicate {
822 Some(crate::model::pattern::PredicatePattern::NamedNode(n)) => {
823 algebra::TermPattern::NamedNode(n.clone())
824 }
825 Some(crate::model::pattern::PredicatePattern::Variable(v)) => {
826 algebra::TermPattern::Variable(v.clone())
827 }
828 None => {
829 return Err(OxirsError::Query(
830 "Predicate pattern cannot be None in basic graph pattern".to_string(),
831 ))
832 }
833 };
834
835 let object = match &pattern.object {
836 Some(crate::model::pattern::ObjectPattern::NamedNode(n)) => {
837 algebra::TermPattern::NamedNode(n.clone())
838 }
839 Some(crate::model::pattern::ObjectPattern::BlankNode(b)) => {
840 algebra::TermPattern::BlankNode(b.clone())
841 }
842 Some(crate::model::pattern::ObjectPattern::Literal(l)) => {
843 algebra::TermPattern::Literal(l.clone())
844 }
845 Some(crate::model::pattern::ObjectPattern::Variable(v)) => {
846 algebra::TermPattern::Variable(v.clone())
847 }
848 None => {
849 return Err(OxirsError::Query(
850 "Object pattern cannot be None in basic graph pattern".to_string(),
851 ))
852 }
853 };
854
855 Ok(algebra::TriplePattern::new(
856 Some(subject.into()),
857 Some(predicate.into()),
858 Some(object.into()),
859 ))
860 }
861
862 fn convert_algebra_to_model_pattern(
864 &self,
865 algebra_pattern: &AlgebraTriplePattern,
866 ) -> Option<algebra::TriplePattern> {
867 use crate::model::pattern::{ObjectPattern, PredicatePattern, SubjectPattern};
868
869 let subject = match &algebra_pattern.subject {
870 algebra::TermPattern::NamedNode(n) => Some(SubjectPattern::NamedNode(n.clone())),
871 algebra::TermPattern::BlankNode(b) => Some(SubjectPattern::BlankNode(b.clone())),
872 algebra::TermPattern::Variable(v) => Some(SubjectPattern::Variable(v.clone())),
873 _ => None,
874 };
875
876 let predicate = match &algebra_pattern.predicate {
877 algebra::TermPattern::NamedNode(n) => Some(PredicatePattern::NamedNode(n.clone())),
878 algebra::TermPattern::Variable(v) => Some(PredicatePattern::Variable(v.clone())),
879 _ => None,
880 };
881
882 let object = match &algebra_pattern.object {
883 algebra::TermPattern::NamedNode(n) => Some(ObjectPattern::NamedNode(n.clone())),
884 algebra::TermPattern::BlankNode(b) => Some(ObjectPattern::BlankNode(b.clone())),
885 algebra::TermPattern::Literal(l) => Some(ObjectPattern::Literal(l.clone())),
886 algebra::TermPattern::Variable(v) => Some(ObjectPattern::Variable(v.clone())),
887 };
888
889 Some(algebra::TriplePattern::new(subject, predicate, object))
890 }
891
892 fn extract_variables(&self, query: &Query) -> Result<HashSet<Variable>, OxirsError> {
894 let mut vars = HashSet::new();
895
896 if let QueryForm::Select { where_clause, .. } = &query.form {
897 self.collect_variables_from_pattern(where_clause, &mut vars)?;
898 }
899
900 Ok(vars)
901 }
902
903 fn collect_variables_from_pattern(
905 &self,
906 pattern: &GraphPattern,
907 vars: &mut HashSet<Variable>,
908 ) -> Result<(), OxirsError> {
909 if let GraphPattern::Bgp(patterns) = pattern {
910 for tp in patterns {
911 if let TermPattern::Variable(v) = &tp.subject {
912 vars.insert(v.clone());
913 }
914 if let TermPattern::Variable(v) = &tp.predicate {
915 vars.insert(v.clone());
916 }
917 if let TermPattern::Variable(v) = &tp.object {
918 vars.insert(v.clone());
919 }
920 }
921 }
922
923 Ok(())
924 }
925
926 fn extract_output_vars(&self, query: &Query) -> Result<HashSet<Variable>, OxirsError> {
928 match &query.form {
929 QueryForm::Select { variables, .. } => match variables {
930 SelectVariables::All => self.extract_variables(query),
931 SelectVariables::Specific(vars) => Ok(vars.iter().cloned().collect()),
932 },
933 _ => Ok(HashSet::new()),
934 }
935 }
936}
937
938impl Default for NetworkStatistics {
939 fn default() -> Self {
940 Self::new()
941 }
942}
943
944impl NetworkStatistics {
945 pub fn new() -> Self {
947 Self {
948 latencies: HashMap::new(),
949 transfer_rates: HashMap::new(),
950 error_rates: HashMap::new(),
951 last_update: Instant::now(),
952 }
953 }
954
955 pub fn update_latency(&mut self, endpoint: String, latency: Duration) {
957 self.latencies.entry(endpoint).or_default().push(latency);
958 self.last_update = Instant::now();
959 }
960
961 pub fn avg_latency(&self, endpoint: &str) -> Option<Duration> {
963 self.latencies.get(endpoint).map(|samples| {
964 let sum: Duration = samples.iter().sum();
965 sum / samples.len() as u32
966 })
967 }
968}
969
970impl Default for DataLocalityMap {
971 fn default() -> Self {
972 Self::new()
973 }
974}
975
976impl DataLocalityMap {
977 pub fn new() -> Self {
979 Self {
980 dataset_locations: HashMap::new(),
981 predicate_distribution: HashMap::new(),
982 affinity_scores: HashMap::new(),
983 }
984 }
985
986 pub fn update_dataset_location(&mut self, dataset: String, endpoints: Vec<String>) {
988 self.dataset_locations.insert(dataset, endpoints);
989 }
990
991 pub fn get_dataset_endpoints(&self, dataset: &str) -> Option<&Vec<String>> {
993 self.dataset_locations.get(dataset)
994 }
995}
996
997impl Default for PatternCache {
998 fn default() -> Self {
999 Self::new()
1000 }
1001}
1002
1003impl PatternCache {
1004 pub fn new() -> Self {
1006 Self {
1007 plans: HashMap::new(),
1008 stats: HashMap::new(),
1009 max_size: 1000,
1010 }
1011 }
1012
1013 pub fn get_plan(&mut self, hash: QueryHash) -> Option<&mut CachedPlan> {
1015 self.plans.get_mut(&hash).map(|plan| {
1016 plan.hits += 1;
1017 plan
1018 })
1019 }
1020
1021 pub fn cache_plan(&mut self, hash: QueryHash, plan: DistributedPlan) {
1023 if self.plans.len() >= self.max_size {
1025 if let Some(&oldest) = self.plans.keys().next() {
1027 self.plans.remove(&oldest);
1028 }
1029 }
1030
1031 self.plans.insert(
1032 hash,
1033 CachedPlan {
1034 plan,
1035 created: Instant::now(),
1036 hits: 0,
1037 avg_exec_time: Duration::ZERO,
1038 },
1039 );
1040 }
1041}
1042
1043impl Default for DistributedConfig {
1044 fn default() -> Self {
1045 Self {
1046 query_timeout: Duration::from_secs(30),
1047 max_parallel_queries: 100,
1048 edge_computing_enabled: true,
1049 cache_results: true,
1050 cache_ttl: Duration::from_secs(300),
1051 network_timeout: Duration::from_secs(10),
1052 retry_policy: RetryPolicy::default(),
1053 }
1054 }
1055}
1056
1057impl Default for RetryPolicy {
1058 fn default() -> Self {
1059 Self {
1060 max_attempts: 3,
1061 base_delay: Duration::from_millis(100),
1062 backoff_factor: 2.0,
1063 max_delay: Duration::from_secs(10),
1064 }
1065 }
1066}
1067
1068#[async_trait]
1070pub trait FederatedQueryExecutor: Send + Sync {
1071 async fn execute_query(
1073 &self,
1074 endpoint: &FederatedEndpoint,
1075 query: &Query,
1076 ) -> Result<QueryResult, OxirsError>;
1077
1078 async fn check_health(&self, endpoint: &FederatedEndpoint) -> EndpointStatus;
1080
1081 async fn get_capabilities(
1083 &self,
1084 endpoint: &FederatedEndpoint,
1085 ) -> Result<EndpointFeatures, OxirsError>;
1086}
1087
1088pub struct CollaborativeFilter {
1090 active_queries: Arc<RwLock<HashMap<QueryHash, ActiveQuery>>>,
1092 similarity_threshold: f64,
1094 #[cfg(feature = "async")]
1096 result_channel: tokio::sync::mpsc::Sender<SharedResult>,
1097 #[cfg(not(feature = "async"))]
1098 result_channel: std::sync::mpsc::Sender<SharedResult>,
1099}
1100
1101struct ActiveQuery {
1103 pattern: QueryPattern,
1105 clients: HashSet<String>,
1107 partial_results: Vec<QueryResult>,
1109 start_time: Instant,
1111}
1112
1113pub struct SharedResult {
1115 query_hash: QueryHash,
1117 result: QueryResult,
1119 client_id: String,
1121}
1122
1123impl CollaborativeFilter {
1124 #[cfg(feature = "async")]
1126 pub fn new(similarity_threshold: f64) -> (Self, tokio::sync::mpsc::Receiver<SharedResult>) {
1127 let (tx, rx) = tokio::sync::mpsc::channel(1000);
1128
1129 (
1130 Self {
1131 active_queries: Arc::new(RwLock::new(HashMap::new())),
1132 similarity_threshold,
1133 result_channel: tx,
1134 },
1135 rx,
1136 )
1137 }
1138
1139 #[cfg(not(feature = "async"))]
1140 pub fn new(similarity_threshold: f64) -> (Self, std::sync::mpsc::Receiver<SharedResult>) {
1141 let (tx, rx) = std::sync::mpsc::channel();
1142
1143 (
1144 Self {
1145 active_queries: Arc::new(RwLock::new(HashMap::new())),
1146 similarity_threshold,
1147 result_channel: tx,
1148 },
1149 rx,
1150 )
1151 }
1152
1153 pub async fn register_query(
1155 &self,
1156 query: &Query,
1157 client_id: String,
1158 ) -> Result<QueryHash, OxirsError> {
1159 let pattern = self.extract_query_pattern(query)?;
1160 let hash = self.hash_pattern(&pattern);
1161
1162 let mut active = self
1163 .active_queries
1164 .write()
1165 .map_err(|_| OxirsError::Query("Failed to acquire lock".to_string()))?;
1166
1167 active
1168 .entry(hash)
1169 .or_insert_with(|| ActiveQuery {
1170 pattern: pattern.clone(),
1171 clients: HashSet::new(),
1172 partial_results: Vec::new(),
1173 start_time: Instant::now(),
1174 })
1175 .clients
1176 .insert(client_id);
1177
1178 Ok(hash)
1179 }
1180
1181 #[cfg(feature = "async")]
1183 pub async fn share_results(
1184 &self,
1185 hash: QueryHash,
1186 result: QueryResult,
1187 client_id: String,
1188 ) -> Result<(), OxirsError> {
1189 self.result_channel
1190 .send(SharedResult {
1191 query_hash: hash,
1192 result,
1193 client_id,
1194 })
1195 .await
1196 .map_err(|_| OxirsError::Query("Failed to share results".to_string()))
1197 }
1198
1199 #[cfg(not(feature = "async"))]
1200 pub fn share_results(
1201 &self,
1202 hash: QueryHash,
1203 result: QueryResult,
1204 client_id: String,
1205 ) -> Result<(), OxirsError> {
1206 self.result_channel
1207 .send(SharedResult {
1208 query_hash: hash,
1209 result,
1210 client_id,
1211 })
1212 .map_err(|_| OxirsError::Query("Failed to share results".to_string()))
1213 }
1214
1215 fn extract_query_pattern(&self, _query: &Query) -> Result<QueryPattern, OxirsError> {
1217 Ok(QueryPattern {
1219 patterns: Vec::new(),
1220 joins: Vec::new(),
1221 filters: Vec::new(),
1222 })
1223 }
1224
1225 fn hash_pattern(&self, pattern: &QueryPattern) -> QueryHash {
1227 use std::collections::hash_map::DefaultHasher;
1228 use std::hash::{Hash, Hasher};
1229
1230 let mut hasher = DefaultHasher::new();
1231 pattern.hash(&mut hasher);
1232 hasher.finish()
1233 }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::*;
1239
1240 #[test]
1241 fn test_distributed_engine_creation() {
1242 let config = DistributedConfig::default();
1243 let engine = DistributedQueryEngine::new(config);
1244
1245 assert!(engine.endpoints.read().unwrap().is_empty());
1246 }
1247
1248 #[test]
1249 fn test_endpoint_registration() {
1250 let config = DistributedConfig::default();
1251 let engine = DistributedQueryEngine::new(config);
1252
1253 let endpoint = FederatedEndpoint {
1254 url: "http://example.org/sparql".to_string(),
1255 features: EndpointFeatures {
1256 sparql_version: "1.1".to_string(),
1257 update_support: true,
1258 federation_support: true,
1259 text_search: false,
1260 geospatial: false,
1261 extensions: HashSet::new(),
1262 },
1263 latency_ms: 50.0,
1264 throughput: 10000.0,
1265 datasets: vec!["dataset1".to_string()],
1266 last_health_check: Instant::now(),
1267 status: EndpointStatus::Healthy,
1268 };
1269
1270 engine.register_endpoint(endpoint).unwrap();
1271
1272 let endpoints = engine.endpoints.read().unwrap();
1273 assert_eq!(endpoints.len(), 1);
1274 assert!(endpoints.contains_key("http://example.org/sparql"));
1275 }
1276
1277 #[test]
1278 fn test_query_router() {
1279 let router = QueryRouter::new(RoutingPolicy::NearestEndpoint);
1280 let mut endpoints = HashMap::new();
1281
1282 endpoints.insert(
1283 "endpoint1".to_string(),
1284 FederatedEndpoint {
1285 url: "http://endpoint1.org/sparql".to_string(),
1286 features: EndpointFeatures {
1287 sparql_version: "1.1".to_string(),
1288 update_support: false,
1289 federation_support: true,
1290 text_search: false,
1291 geospatial: false,
1292 extensions: HashSet::new(),
1293 },
1294 latency_ms: 20.0,
1295 throughput: 5000.0,
1296 datasets: vec![],
1297 last_health_check: Instant::now(),
1298 status: EndpointStatus::Healthy,
1299 },
1300 );
1301
1302 let query = Query {
1303 base: None,
1304 prefixes: HashMap::new(),
1305 form: QueryForm::Select {
1306 variables: SelectVariables::All,
1307 where_clause: GraphPattern::Bgp(vec![]),
1308 distinct: false,
1309 reduced: false,
1310 order_by: vec![],
1311 offset: 0,
1312 limit: None,
1313 },
1314 dataset: crate::query::algebra::Dataset::default(),
1315 };
1316
1317 let routes = router.route_query(&query, &endpoints).unwrap();
1318 assert_eq!(routes.len(), 1);
1319 assert_eq!(routes[0].endpoint, "http://endpoint1.org/sparql");
1320 }
1321}