1use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, warn};
12
13use crate::raft::OxirsNodeId;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct DistributedQueryPlan {
18 pub query_id: String,
19 pub original_sparql: String,
20 pub subqueries: Vec<SubqueryPlan>,
21 pub join_operations: Vec<JoinOperation>,
22 pub aggregation_plan: Option<AggregationPlan>,
23 pub estimated_cost: f64,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct SubqueryPlan {
29 pub subquery_id: String,
30 pub target_node: OxirsNodeId,
31 pub sparql_fragment: String,
32 pub variables: Vec<String>,
33 pub estimated_rows: u64,
34 pub estimated_latency_ms: u64,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct JoinOperation {
40 pub left_subquery: String,
41 pub right_subquery: String,
42 pub join_variables: Vec<String>,
43 pub join_type: JoinType,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum JoinType {
48 Inner,
49 Left,
50 Optional,
51 Union,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct AggregationPlan {
57 pub group_by: Vec<String>,
58 pub aggregates: Vec<AggregateFunction>,
59 pub having_conditions: Vec<String>,
60 pub order_by: Vec<OrderByClause>,
61 pub limit: Option<u64>,
62 pub offset: Option<u64>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct AggregateFunction {
67 pub function: String,
68 pub variable: String,
69 pub alias: Option<String>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct OrderByClause {
74 pub variable: String,
75 pub ascending: bool,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct QueryStats {
81 pub query_id: String,
82 pub execution_time_ms: u64,
83 pub nodes_involved: u32,
84 pub total_intermediate_results: u64,
85 pub final_result_count: u64,
86 pub network_transfer_bytes: u64,
87 pub cache_hits: u32,
88 pub cache_misses: u32,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
93pub struct ResultBinding {
94 pub variables: BTreeMap<String, String>,
95}
96
97impl Default for ResultBinding {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl ResultBinding {
104 pub fn new() -> Self {
105 Self {
106 variables: BTreeMap::new(),
107 }
108 }
109
110 pub fn add_binding(&mut self, variable: String, value: String) {
111 self.variables.insert(variable, value);
112 }
113
114 pub fn get(&self, variable: &str) -> Option<&String> {
115 self.variables.get(variable)
116 }
117
118 pub fn merge(&self, other: &ResultBinding) -> Option<ResultBinding> {
119 let mut merged = self.clone();
120 for (var, val) in &other.variables {
121 if let Some(existing) = merged.variables.get(var) {
122 if existing != val {
123 return None; }
125 } else {
126 merged.variables.insert(var.clone(), val.clone());
127 }
128 }
129 Some(merged)
130 }
131}
132
133#[derive(Debug)]
135pub struct DistributedQueryExecutor {
136 #[allow(dead_code)]
137 node_id: OxirsNodeId,
138 cluster_nodes: Arc<RwLock<HashSet<OxirsNodeId>>>,
139 query_cache: Arc<RwLock<HashMap<String, Vec<ResultBinding>>>>,
140 statistics: Arc<RwLock<HashMap<String, QueryStats>>>,
141}
142
143impl DistributedQueryExecutor {
144 pub fn new(node_id: OxirsNodeId) -> Self {
145 Self {
146 node_id,
147 cluster_nodes: Arc::new(RwLock::new(HashSet::new())),
148 query_cache: Arc::new(RwLock::new(HashMap::new())),
149 statistics: Arc::new(RwLock::new(HashMap::new())),
150 }
151 }
152
153 pub async fn add_node(&self, node_id: OxirsNodeId) {
155 let mut nodes = self.cluster_nodes.write().await;
156 nodes.insert(node_id);
157 info!("Added node {} to distributed query executor", node_id);
158 }
159
160 pub async fn remove_node(&self, node_id: OxirsNodeId) {
162 let mut nodes = self.cluster_nodes.write().await;
163 nodes.remove(&node_id);
164 info!("Removed node {} from distributed query executor", node_id);
165 }
166
167 pub async fn execute_query(&self, sparql: &str) -> Result<Vec<ResultBinding>> {
169 let query_id = uuid::Uuid::new_v4().to_string();
170 let start_time = std::time::Instant::now();
171
172 info!("Executing distributed query {}: {}", query_id, sparql);
173
174 if let Some(cached_results) = self.check_cache(sparql).await {
176 info!("Cache hit for query {}", query_id);
177 return Ok(cached_results);
178 }
179
180 let plan = self.create_execution_plan(&query_id, sparql).await?;
182
183 let subquery_results = self.execute_subqueries(&plan).await?;
185
186 let final_results = self.combine_results(&plan, subquery_results).await?;
188
189 self.cache_results(sparql, &final_results).await;
191
192 let execution_time = start_time.elapsed().as_millis() as u64;
194 self.record_statistics(&query_id, &plan, &final_results, execution_time)
195 .await;
196
197 info!(
198 "Completed distributed query {} in {}ms, {} results",
199 query_id,
200 execution_time,
201 final_results.len()
202 );
203
204 Ok(final_results)
205 }
206
207 async fn create_execution_plan(
209 &self,
210 query_id: &str,
211 sparql: &str,
212 ) -> Result<DistributedQueryPlan> {
213 let parsed = self.parse_sparql(sparql)?;
215
216 let data_distribution = self.analyze_data_distribution().await?;
218
219 let subqueries = self.create_subqueries(&parsed, &data_distribution).await?;
221
222 let join_operations = self.plan_joins(&subqueries)?;
224
225 let aggregation_plan = self.create_aggregation_plan(&parsed)?;
227
228 let estimated_cost = self.estimate_cost(&subqueries, &join_operations).await;
230
231 Ok(DistributedQueryPlan {
232 query_id: query_id.to_string(),
233 original_sparql: sparql.to_string(),
234 subqueries,
235 join_operations,
236 aggregation_plan,
237 estimated_cost,
238 })
239 }
240
241 fn parse_sparql(&self, sparql: &str) -> Result<ParsedQuery> {
243 let mut variables = Vec::new();
246 let mut triple_patterns = Vec::new();
247 let filters = Vec::new();
248
249 if let Some(select_part) = sparql.split("WHERE").next() {
251 if select_part.contains("SELECT") {
252 let vars_part = select_part.replace("SELECT", "").trim().to_string();
253 if vars_part != "*" {
254 variables = vars_part
255 .split_whitespace()
256 .filter(|v| v.starts_with('?'))
257 .map(|v| v.to_string())
258 .collect();
259 }
260 }
261 }
262
263 if let Some(where_part) = sparql.split("WHERE").nth(1) {
265 let clean_where = where_part.replace(['{', '}'], "");
266 for line in clean_where.lines() {
267 let line = line.trim();
268 if !line.is_empty() && line.contains(' ') {
269 let parts: Vec<&str> = line.split_whitespace().collect();
270 if parts.len() >= 3 {
271 triple_patterns.push(TriplePattern {
272 subject: parts[0].to_string(),
273 predicate: parts[1].to_string(),
274 object: parts[2].replace('.', ""),
275 });
276 }
277 }
278 }
279 }
280
281 Ok(ParsedQuery {
282 variables,
283 triple_patterns,
284 filters,
285 limit: None,
286 offset: None,
287 order_by: Vec::new(),
288 })
289 }
290
291 async fn analyze_data_distribution(&self) -> Result<DataDistribution> {
293 let nodes = self.cluster_nodes.read().await;
294 let mut distribution = DataDistribution {
295 node_triple_counts: HashMap::new(),
296 predicate_distribution: HashMap::new(),
297 subject_distribution: HashMap::new(),
298 };
299
300 for &node_id in nodes.iter() {
302 distribution.node_triple_counts.insert(node_id, 10000); }
304
305 Ok(distribution)
306 }
307
308 async fn create_subqueries(
310 &self,
311 parsed: &ParsedQuery,
312 _distribution: &DataDistribution,
313 ) -> Result<Vec<SubqueryPlan>> {
314 let mut subqueries = Vec::new();
315 let nodes: Vec<_> = self.cluster_nodes.read().await.iter().cloned().collect();
316
317 if nodes.is_empty() {
318 return Err(anyhow::anyhow!("No nodes available for query execution"));
319 }
320
321 for (i, triple_pattern) in parsed.triple_patterns.iter().enumerate() {
323 let target_node = nodes[i % nodes.len()];
324
325 let sparql_fragment = format!(
326 "SELECT {} WHERE {{ {} {} {} }}",
327 parsed.variables.join(" "),
328 triple_pattern.subject,
329 triple_pattern.predicate,
330 triple_pattern.object
331 );
332
333 subqueries.push(SubqueryPlan {
334 subquery_id: format!("subquery_{i}"),
335 target_node,
336 sparql_fragment,
337 variables: parsed.variables.clone(),
338 estimated_rows: 1000, estimated_latency_ms: 50,
340 });
341 }
342
343 Ok(subqueries)
344 }
345
346 fn plan_joins(&self, subqueries: &[SubqueryPlan]) -> Result<Vec<JoinOperation>> {
348 let mut joins = Vec::new();
349
350 for i in 0..subqueries.len().saturating_sub(1) {
352 let left = &subqueries[i];
353 let right = &subqueries[i + 1];
354
355 let common_vars: Vec<String> = left
357 .variables
358 .iter()
359 .filter(|v| right.variables.contains(v))
360 .cloned()
361 .collect();
362
363 if !common_vars.is_empty() {
364 joins.push(JoinOperation {
365 left_subquery: left.subquery_id.clone(),
366 right_subquery: right.subquery_id.clone(),
367 join_variables: common_vars,
368 join_type: JoinType::Inner,
369 });
370 }
371 }
372
373 Ok(joins)
374 }
375
376 fn create_aggregation_plan(&self, parsed: &ParsedQuery) -> Result<Option<AggregationPlan>> {
378 if parsed.order_by.is_empty() && parsed.limit.is_none() {
380 return Ok(None);
381 }
382
383 Ok(Some(AggregationPlan {
384 group_by: Vec::new(),
385 aggregates: Vec::new(),
386 having_conditions: Vec::new(),
387 order_by: parsed.order_by.clone(),
388 limit: parsed.limit,
389 offset: parsed.offset,
390 }))
391 }
392
393 async fn estimate_cost(&self, subqueries: &[SubqueryPlan], joins: &[JoinOperation]) -> f64 {
395 let mut total_cost = 0.0;
396
397 for subquery in subqueries {
399 total_cost += subquery.estimated_rows as f64 * 0.001; total_cost += subquery.estimated_latency_ms as f64 * 0.01; }
402
403 for _join in joins {
405 total_cost += 10.0; }
407
408 total_cost
409 }
410
411 async fn execute_subqueries(
413 &self,
414 plan: &DistributedQueryPlan,
415 ) -> Result<HashMap<String, Vec<ResultBinding>>> {
416 let mut results = HashMap::new();
417 let mut handles = Vec::new();
418
419 for subquery in &plan.subqueries {
420 let subquery_clone = subquery.clone();
421 let handle =
422 tokio::spawn(async move { Self::execute_single_subquery(subquery_clone).await });
423 handles.push((subquery.subquery_id.clone(), handle));
424 }
425
426 for (subquery_id, handle) in handles {
428 match handle.await {
429 Ok(Ok(subquery_results)) => {
430 results.insert(subquery_id, subquery_results);
431 }
432 Ok(Err(e)) => {
433 error!("Subquery {} failed: {}", subquery_id, e);
434 return Err(e);
435 }
436 Err(e) => {
437 error!("Subquery {} task failed: {}", subquery_id, e);
438 return Err(anyhow::anyhow!("Task execution failed: {}", e));
439 }
440 }
441 }
442
443 Ok(results)
444 }
445
446 async fn execute_single_subquery(subquery: SubqueryPlan) -> Result<Vec<ResultBinding>> {
448 debug!(
449 "Executing subquery {} on node {}",
450 subquery.subquery_id, subquery.target_node
451 );
452
453 let client = reqwest::Client::builder()
455 .timeout(tokio::time::Duration::from_millis(
456 subquery.estimated_latency_ms * 3,
457 ))
458 .build()
459 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {}", e))?;
460
461 let endpoint_url = format!("http://node-{}/sparql", subquery.target_node);
463
464 let response = client
465 .post(&endpoint_url)
466 .header("Content-Type", "application/sparql-query")
467 .header("Accept", "application/sparql-results+json")
468 .body(subquery.sparql_fragment.clone())
469 .send()
470 .await;
471
472 match response {
473 Ok(resp) if resp.status().is_success() => {
474 let json: serde_json::Value = resp
475 .json()
476 .await
477 .map_err(|e| anyhow::anyhow!("Failed to parse JSON response: {}", e))?;
478
479 Self::parse_sparql_json_results(json)
480 }
481 Ok(resp) => {
482 warn!(
484 "Node {} returned error status {}: falling back to simulation",
485 subquery.target_node,
486 resp.status()
487 );
488 Self::simulate_subquery_execution(subquery).await
489 }
490 Err(e) => {
491 warn!(
493 "Failed to reach node {}: {} - falling back to simulation",
494 subquery.target_node, e
495 );
496 Self::simulate_subquery_execution(subquery).await
497 }
498 }
499 }
500
501 fn parse_sparql_json_results(json: serde_json::Value) -> Result<Vec<ResultBinding>> {
503 let bindings_array = json
504 .get("results")
505 .and_then(|r| r.get("bindings"))
506 .and_then(|b| b.as_array())
507 .ok_or_else(|| anyhow::anyhow!("Invalid SPARQL JSON results format"))?;
508
509 let mut results = Vec::new();
510 for binding_obj in bindings_array {
511 if let Some(binding_map) = binding_obj.as_object() {
512 let mut result_binding = ResultBinding::new();
513
514 for (var_name, var_value) in binding_map {
515 if let Some(value_obj) = var_value.as_object() {
516 let value = value_obj
517 .get("value")
518 .and_then(|v| v.as_str())
519 .unwrap_or("")
520 .to_string();
521 result_binding.add_binding(format!("?{var_name}"), value);
522 }
523 }
524 results.push(result_binding);
525 }
526 }
527
528 Ok(results)
529 }
530
531 async fn simulate_subquery_execution(subquery: SubqueryPlan) -> Result<Vec<ResultBinding>> {
533 tokio::time::sleep(tokio::time::Duration::from_millis(
535 subquery.estimated_latency_ms,
536 ))
537 .await;
538
539 let mut results = Vec::new();
541 let result_count = std::cmp::min(subquery.estimated_rows, 100);
542
543 for i in 0..result_count {
544 let mut binding = ResultBinding::new();
545 for var in &subquery.variables {
546 let value = match var.as_str() {
548 "?s" | "?subject" => format!("http://example.org/resource_{i}"),
549 "?p" | "?predicate" => format!("http://example.org/property_{}", i % 10),
550 "?o" | "?object" => format!("\"Object value {i}\""),
551 "?name" => format!("\"Name {i}\""),
552 "?type" => "http://example.org/Type".to_string(),
553 _ => format!("value_{}_{}", subquery.target_node, i),
554 };
555 binding.add_binding(var.clone(), value);
556 }
557 results.push(binding);
558 }
559
560 Ok(results)
561 }
562
563 async fn combine_results(
565 &self,
566 plan: &DistributedQueryPlan,
567 subquery_results: HashMap<String, Vec<ResultBinding>>,
568 ) -> Result<Vec<ResultBinding>> {
569 let mut current_results = Vec::new();
570
571 if let Some(first_subquery) = plan.subqueries.first() {
573 if let Some(first_results) = subquery_results.get(&first_subquery.subquery_id) {
574 current_results = first_results.clone();
575 }
576 }
577
578 for join in &plan.join_operations {
580 if let Some(right_results) = subquery_results.get(&join.right_subquery) {
581 current_results = self
582 .execute_join(¤t_results, right_results, join)
583 .await?;
584 }
585 }
586
587 if let Some(agg_plan) = &plan.aggregation_plan {
589 current_results = self.apply_aggregation(current_results, agg_plan).await?;
590 }
591
592 Ok(current_results)
593 }
594
595 async fn execute_join(
597 &self,
598 left_results: &[ResultBinding],
599 right_results: &[ResultBinding],
600 join: &JoinOperation,
601 ) -> Result<Vec<ResultBinding>> {
602 let mut joined_results = Vec::new();
603
604 match join.join_type {
605 JoinType::Inner => {
606 for left_binding in left_results {
607 for right_binding in right_results {
608 if self.bindings_compatible(
609 left_binding,
610 right_binding,
611 &join.join_variables,
612 ) {
613 if let Some(merged) = left_binding.merge(right_binding) {
614 joined_results.push(merged);
615 }
616 }
617 }
618 }
619 }
620 JoinType::Left => {
621 for left_binding in left_results {
622 let mut found_match = false;
623 for right_binding in right_results {
624 if self.bindings_compatible(
625 left_binding,
626 right_binding,
627 &join.join_variables,
628 ) {
629 if let Some(merged) = left_binding.merge(right_binding) {
630 joined_results.push(merged);
631 found_match = true;
632 }
633 }
634 }
635 if !found_match {
636 joined_results.push(left_binding.clone());
637 }
638 }
639 }
640 JoinType::Optional => {
641 joined_results = Box::pin(self.execute_join(
643 left_results,
644 right_results,
645 &JoinOperation {
646 left_subquery: join.left_subquery.clone(),
647 right_subquery: join.right_subquery.clone(),
648 join_variables: join.join_variables.clone(),
649 join_type: JoinType::Left,
650 },
651 ))
652 .await?;
653 }
654 JoinType::Union => {
655 joined_results.extend_from_slice(left_results);
656 joined_results.extend_from_slice(right_results);
657 joined_results.sort_by(|a, b| format!("{a:?}").cmp(&format!("{b:?}")));
659 joined_results.dedup();
660 }
661 }
662
663 Ok(joined_results)
664 }
665
666 fn bindings_compatible(
668 &self,
669 left: &ResultBinding,
670 right: &ResultBinding,
671 join_variables: &[String],
672 ) -> bool {
673 for var in join_variables {
674 if let (Some(left_val), Some(right_val)) = (left.get(var), right.get(var)) {
675 if left_val != right_val {
676 return false;
677 }
678 }
679 }
680 true
681 }
682
683 async fn apply_aggregation(
685 &self,
686 mut results: Vec<ResultBinding>,
687 agg_plan: &AggregationPlan,
688 ) -> Result<Vec<ResultBinding>> {
689 if !agg_plan.order_by.is_empty() {
691 results.sort_by(|a, b| {
692 for order_clause in &agg_plan.order_by {
693 let empty_string = String::new();
694 let a_val = a.get(&order_clause.variable).unwrap_or(&empty_string);
695 let b_val = b.get(&order_clause.variable).unwrap_or(&empty_string);
696 let cmp = if order_clause.ascending {
697 a_val.cmp(b_val)
698 } else {
699 b_val.cmp(a_val)
700 };
701 if cmp != std::cmp::Ordering::Equal {
702 return cmp;
703 }
704 }
705 std::cmp::Ordering::Equal
706 });
707 }
708
709 if let Some(offset) = agg_plan.offset {
711 if offset < results.len() as u64 {
712 results = results.into_iter().skip(offset as usize).collect();
713 } else {
714 results.clear();
715 }
716 }
717
718 if let Some(limit) = agg_plan.limit {
719 results.truncate(limit as usize);
720 }
721
722 Ok(results)
723 }
724
725 async fn check_cache(&self, sparql: &str) -> Option<Vec<ResultBinding>> {
727 let cache = self.query_cache.read().await;
728 cache.get(sparql).cloned()
729 }
730
731 async fn cache_results(&self, sparql: &str, results: &[ResultBinding]) {
733 let mut cache = self.query_cache.write().await;
734 cache.insert(sparql.to_string(), results.to_vec());
735
736 if cache.len() > 1000 {
738 let keys_to_remove: Vec<_> = cache.keys().take(100).cloned().collect();
739 for key in keys_to_remove {
740 cache.remove(&key);
741 }
742 }
743 }
744
745 async fn record_statistics(
747 &self,
748 query_id: &str,
749 plan: &DistributedQueryPlan,
750 results: &[ResultBinding],
751 execution_time_ms: u64,
752 ) {
753 let stats = QueryStats {
754 query_id: query_id.to_string(),
755 execution_time_ms,
756 nodes_involved: plan.subqueries.len() as u32,
757 total_intermediate_results: plan.subqueries.iter().map(|s| s.estimated_rows).sum(),
758 final_result_count: results.len() as u64,
759 network_transfer_bytes: 0, cache_hits: 0,
761 cache_misses: 1,
762 };
763
764 let mut statistics = self.statistics.write().await;
765 statistics.insert(query_id.to_string(), stats);
766
767 if statistics.len() > 10000 {
769 let keys_to_remove: Vec<_> = statistics.keys().take(1000).cloned().collect();
770 for key in keys_to_remove {
771 statistics.remove(&key);
772 }
773 }
774 }
775
776 pub async fn get_statistics(&self) -> HashMap<String, QueryStats> {
778 self.statistics.read().await.clone()
779 }
780
781 pub async fn clear_cache(&self) {
783 let mut cache = self.query_cache.write().await;
784 cache.clear();
785 info!("Query cache cleared");
786 }
787}
788
789#[derive(Debug, Clone)]
791struct ParsedQuery {
792 variables: Vec<String>,
793 triple_patterns: Vec<TriplePattern>,
794 #[allow(dead_code)]
795 filters: Vec<String>,
796 limit: Option<u64>,
797 offset: Option<u64>,
798 order_by: Vec<OrderByClause>,
799}
800
801#[derive(Debug, Clone)]
802struct TriplePattern {
803 subject: String,
804 predicate: String,
805 object: String,
806}
807
808#[derive(Debug, Clone)]
810struct DataDistribution {
811 node_triple_counts: HashMap<OxirsNodeId, u64>,
812 #[allow(dead_code)]
813 predicate_distribution: HashMap<String, Vec<OxirsNodeId>>,
814 #[allow(dead_code)]
815 subject_distribution: HashMap<String, Vec<OxirsNodeId>>,
816}
817
818#[cfg(test)]
819mod tests {
820 use super::*;
821
822 #[tokio::test]
823 async fn test_distributed_query_executor_creation() {
824 let executor = DistributedQueryExecutor::new(1);
825 executor.add_node(2).await;
826 executor.add_node(3).await;
827
828 let nodes = executor.cluster_nodes.read().await;
829 assert_eq!(nodes.len(), 2);
830 assert!(nodes.contains(&2));
831 assert!(nodes.contains(&3));
832 }
833
834 #[tokio::test]
835 async fn test_result_binding() {
836 let mut binding1 = ResultBinding::new();
837 binding1.add_binding("?x".to_string(), "value1".to_string());
838
839 let mut binding2 = ResultBinding::new();
840 binding2.add_binding("?y".to_string(), "value2".to_string());
841
842 let merged = binding1.merge(&binding2).unwrap();
843 assert_eq!(merged.get("?x"), Some(&"value1".to_string()));
844 assert_eq!(merged.get("?y"), Some(&"value2".to_string()));
845 }
846
847 #[tokio::test]
848 async fn test_result_binding_conflict() {
849 let mut binding1 = ResultBinding::new();
850 binding1.add_binding("?x".to_string(), "value1".to_string());
851
852 let mut binding2 = ResultBinding::new();
853 binding2.add_binding("?x".to_string(), "value2".to_string());
854
855 let merged = binding1.merge(&binding2);
856 assert!(merged.is_none()); }
858
859 #[test]
860 fn test_sparql_parsing() {
861 let executor = DistributedQueryExecutor::new(1);
862 let sparql = "SELECT ?x ?y WHERE { ?x <predicate> ?y }";
863 let parsed = executor.parse_sparql(sparql).unwrap();
864
865 assert_eq!(parsed.variables, vec!["?x", "?y"]);
866 assert_eq!(parsed.triple_patterns.len(), 1);
867 }
868
869 #[tokio::test]
870 async fn test_query_execution() {
871 let executor = DistributedQueryExecutor::new(1);
872 executor.add_node(2).await;
873 executor.add_node(3).await;
874
875 let sparql = "SELECT ?x WHERE { ?x <type> <Person> }";
876 let results = executor.execute_query(sparql).await.unwrap();
877
878 assert!(!results.is_empty());
880 }
881
882 #[tokio::test]
883 async fn test_query_caching() {
884 let executor = DistributedQueryExecutor::new(1);
885 executor.add_node(2).await;
886
887 let sparql = "SELECT ?x WHERE { ?x <type> <Person> }";
888
889 let results1 = executor.execute_query(sparql).await.unwrap();
891
892 let results2 = executor.execute_query(sparql).await.unwrap();
894
895 assert_eq!(results1.len(), results2.len());
896 }
897}