1use crate::ast::CypherQuery as CypherAST;
7use crate::config::GraphConfig;
8use crate::error::{GraphError, Result};
9use crate::logical_plan::LogicalPlanner;
10use crate::parser::parse_cypher_query;
11use crate::simple_executor::{
12 to_df_boolean_expr_simple, to_df_order_by_expr_simple, to_df_value_expr_simple, PathExecutor,
13};
14use std::collections::HashMap;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18pub enum ExecutionStrategy {
19 #[default]
21 DataFusion,
22 Simple,
24 LanceNative,
26}
27
28#[derive(Debug, Clone)]
30pub struct CypherQuery {
31 query_text: String,
33 ast: CypherAST,
35 config: Option<GraphConfig>,
37 parameters: HashMap<String, serde_json::Value>,
39}
40impl CypherQuery {
41 pub fn new(query: &str) -> Result<Self> {
43 let ast = parse_cypher_query(query)?;
44
45 Ok(Self {
46 query_text: query.to_string(),
47 ast,
48 config: None,
49 parameters: HashMap::new(),
50 })
51 }
52
53 pub fn with_config(mut self, config: GraphConfig) -> Self {
55 self.config = Some(config);
56 self
57 }
58
59 pub fn with_parameter<K, V>(mut self, key: K, value: V) -> Self
61 where
62 K: Into<String>,
63 V: Into<serde_json::Value>,
64 {
65 self.parameters.insert(key.into(), value.into());
66 self
67 }
68
69 pub fn with_parameters(mut self, params: HashMap<String, serde_json::Value>) -> Self {
71 self.parameters.extend(params);
72 self
73 }
74
75 pub fn query_text(&self) -> &str {
77 &self.query_text
78 }
79
80 pub fn ast(&self) -> &CypherAST {
82 &self.ast
83 }
84
85 pub fn config(&self) -> Option<&GraphConfig> {
87 self.config.as_ref()
88 }
89
90 pub fn parameters(&self) -> &HashMap<String, serde_json::Value> {
92 &self.parameters
93 }
94
95 fn require_config(&self) -> Result<&GraphConfig> {
97 self.config.as_ref().ok_or_else(|| GraphError::ConfigError {
98 message: "Graph configuration is required for query execution".to_string(),
99 location: snafu::Location::new(file!(), line!(), column!()),
100 })
101 }
102
103 pub async fn execute(
139 &self,
140 datasets: HashMap<String, arrow::record_batch::RecordBatch>,
141 strategy: Option<ExecutionStrategy>,
142 ) -> Result<arrow::record_batch::RecordBatch> {
143 let strategy = strategy.unwrap_or_default();
144 match strategy {
145 ExecutionStrategy::DataFusion => self.execute_datafusion(datasets).await,
146 ExecutionStrategy::Simple => self.execute_simple(datasets).await,
147 ExecutionStrategy::LanceNative => Err(GraphError::UnsupportedFeature {
148 feature: "Lance native execution strategy is not yet implemented".to_string(),
149 location: snafu::Location::new(file!(), line!(), column!()),
150 }),
151 }
152 }
153
154 pub async fn explain(
190 &self,
191 datasets: HashMap<String, arrow::record_batch::RecordBatch>,
192 ) -> Result<String> {
193 use std::sync::Arc;
194
195 let (catalog, ctx) = self
197 .build_catalog_and_context_from_datasets(datasets)
198 .await?;
199
200 self.explain_internal(Arc::new(catalog), ctx).await
202 }
203
204 pub async fn to_sql(
227 &self,
228 datasets: HashMap<String, arrow::record_batch::RecordBatch>,
229 ) -> Result<String> {
230 use datafusion_sql::unparser::plan_to_sql;
231 use std::sync::Arc;
232
233 let _config = self.require_config()?;
234
235 let (catalog, ctx) = self
237 .build_catalog_and_context_from_datasets(datasets)
238 .await?;
239
240 let (_, df_plan) = self.create_logical_plans(Arc::new(catalog))?;
242
243 let optimized_plan = ctx
246 .state()
247 .optimize(&df_plan)
248 .map_err(|e| GraphError::PlanError {
249 message: format!("Failed to optimize plan: {}", e),
250 location: snafu::Location::new(file!(), line!(), column!()),
251 })?;
252
253 let sql_ast = plan_to_sql(&optimized_plan).map_err(|e| GraphError::PlanError {
255 message: format!("Failed to unparse plan to SQL: {}", e),
256 location: snafu::Location::new(file!(), line!(), column!()),
257 })?;
258
259 Ok(sql_ast.to_string())
260 }
261
262 pub async fn execute_with_context(
311 &self,
312 ctx: datafusion::execution::context::SessionContext,
313 ) -> Result<arrow::record_batch::RecordBatch> {
314 use crate::source_catalog::InMemoryCatalog;
315 use datafusion::datasource::DefaultTableSource;
316 use std::sync::Arc;
317
318 let config = self.require_config()?;
319
320 let mut catalog = InMemoryCatalog::new();
322
323 for label in config.node_mappings.keys() {
325 let table_provider =
326 ctx.table_provider(label)
327 .await
328 .map_err(|e| GraphError::ConfigError {
329 message: format!(
330 "Node label '{}' not found in SessionContext: {}",
331 label, e
332 ),
333 location: snafu::Location::new(file!(), line!(), column!()),
334 })?;
335
336 let table_source = Arc::new(DefaultTableSource::new(table_provider));
337 catalog = catalog.with_node_source(label, table_source);
338 }
339
340 for rel_type in config.relationship_mappings.keys() {
342 let table_provider =
343 ctx.table_provider(rel_type)
344 .await
345 .map_err(|e| GraphError::ConfigError {
346 message: format!(
347 "Relationship type '{}' not found in SessionContext: {}",
348 rel_type, e
349 ),
350 location: snafu::Location::new(file!(), line!(), column!()),
351 })?;
352
353 let table_source = Arc::new(DefaultTableSource::new(table_provider));
354 catalog = catalog.with_relationship_source(rel_type, table_source);
355 }
356
357 self.execute_with_catalog_and_context(Arc::new(catalog), ctx)
359 .await
360 }
361
362 pub async fn execute_with_catalog_and_context(
399 &self,
400 catalog: std::sync::Arc<dyn crate::source_catalog::GraphSourceCatalog>,
401 ctx: datafusion::execution::context::SessionContext,
402 ) -> Result<arrow::record_batch::RecordBatch> {
403 use arrow::compute::concat_batches;
404
405 let (_logical_plan, df_logical_plan) = self.create_logical_plans(catalog)?;
407
408 let df = ctx
410 .execute_logical_plan(df_logical_plan)
411 .await
412 .map_err(|e| GraphError::ExecutionError {
413 message: format!("Failed to execute DataFusion plan: {}", e),
414 location: snafu::Location::new(file!(), line!(), column!()),
415 })?;
416
417 let result_schema = df.schema().inner().clone();
419
420 let batches = df.collect().await.map_err(|e| GraphError::ExecutionError {
422 message: format!("Failed to collect query results: {}", e),
423 location: snafu::Location::new(file!(), line!(), column!()),
424 })?;
425
426 if batches.is_empty() {
427 return Ok(arrow::record_batch::RecordBatch::new_empty(result_schema));
430 }
431
432 let schema = batches[0].schema();
434 concat_batches(&schema, &batches).map_err(|e| GraphError::ExecutionError {
435 message: format!("Failed to concatenate result batches: {}", e),
436 location: snafu::Location::new(file!(), line!(), column!()),
437 })
438 }
439
440 async fn execute_datafusion(
456 &self,
457 datasets: HashMap<String, arrow::record_batch::RecordBatch>,
458 ) -> Result<arrow::record_batch::RecordBatch> {
459 use std::sync::Arc;
460
461 let (catalog, ctx) = self
463 .build_catalog_and_context_from_datasets(datasets)
464 .await?;
465
466 self.execute_with_catalog_and_context(Arc::new(catalog), ctx)
468 .await
469 }
470
471 async fn build_catalog_and_context_from_datasets(
473 &self,
474 datasets: HashMap<String, arrow::record_batch::RecordBatch>,
475 ) -> Result<(
476 crate::source_catalog::InMemoryCatalog,
477 datafusion::execution::context::SessionContext,
478 )> {
479 use crate::source_catalog::InMemoryCatalog;
480 use datafusion::datasource::{DefaultTableSource, MemTable};
481 use datafusion::execution::context::SessionContext;
482 use std::sync::Arc;
483
484 if datasets.is_empty() {
485 return Err(GraphError::ConfigError {
486 message: "No input datasets provided".to_string(),
487 location: snafu::Location::new(file!(), line!(), column!()),
488 });
489 }
490
491 let ctx = SessionContext::new();
493 let mut catalog = InMemoryCatalog::new();
494
495 for (name, batch) in &datasets {
497 let mem_table = Arc::new(
498 MemTable::try_new(batch.schema(), vec![vec![batch.clone()]]).map_err(|e| {
499 GraphError::PlanError {
500 message: format!("Failed to create MemTable for {}: {}", name, e),
501 location: snafu::Location::new(file!(), line!(), column!()),
502 }
503 })?,
504 );
505
506 ctx.register_table(name, mem_table.clone())
508 .map_err(|e| GraphError::PlanError {
509 message: format!("Failed to register table {}: {}", name, e),
510 location: snafu::Location::new(file!(), line!(), column!()),
511 })?;
512
513 let table_source = Arc::new(DefaultTableSource::new(mem_table));
514
515 catalog = catalog
518 .with_node_source(name, table_source.clone())
519 .with_relationship_source(name, table_source);
520 }
521
522 Ok((catalog, ctx))
523 }
524
525 async fn explain_internal(
527 &self,
528 catalog: std::sync::Arc<dyn crate::source_catalog::GraphSourceCatalog>,
529 ctx: datafusion::execution::context::SessionContext,
530 ) -> Result<String> {
531 let (logical_plan, df_logical_plan, physical_plan) =
533 self.create_plans(catalog, &ctx).await?;
534
535 self.format_explain_output(&logical_plan, &df_logical_plan, physical_plan.as_ref())
537 }
538
539 fn create_logical_plans(
544 &self,
545 catalog: std::sync::Arc<dyn crate::source_catalog::GraphSourceCatalog>,
546 ) -> Result<(
547 crate::logical_plan::LogicalOperator,
548 datafusion::logical_expr::LogicalPlan,
549 )> {
550 use crate::datafusion_planner::{DataFusionPlanner, GraphPhysicalPlanner};
551 use crate::semantic::SemanticAnalyzer;
552
553 let config = self.require_config()?;
554
555 let mut analyzer = SemanticAnalyzer::new(config.clone());
557 analyzer.analyze(&self.ast)?;
558
559 let mut logical_planner = LogicalPlanner::new();
561 let logical_plan = logical_planner.plan(&self.ast)?;
562
563 let df_planner = DataFusionPlanner::with_catalog(config.clone(), catalog);
565 let df_logical_plan = df_planner.plan(&logical_plan)?;
566
567 Ok((logical_plan, df_logical_plan))
568 }
569
570 async fn create_plans(
572 &self,
573 catalog: std::sync::Arc<dyn crate::source_catalog::GraphSourceCatalog>,
574 ctx: &datafusion::execution::context::SessionContext,
575 ) -> Result<(
576 crate::logical_plan::LogicalOperator,
577 datafusion::logical_expr::LogicalPlan,
578 std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>,
579 )> {
580 let (logical_plan, df_logical_plan) = self.create_logical_plans(catalog)?;
582
583 let df = ctx
585 .execute_logical_plan(df_logical_plan.clone())
586 .await
587 .map_err(|e| GraphError::ExecutionError {
588 message: format!("Failed to execute DataFusion plan: {}", e),
589 location: snafu::Location::new(file!(), line!(), column!()),
590 })?;
591
592 let physical_plan =
593 df.create_physical_plan()
594 .await
595 .map_err(|e| GraphError::ExecutionError {
596 message: format!("Failed to create physical plan: {}", e),
597 location: snafu::Location::new(file!(), line!(), column!()),
598 })?;
599
600 Ok((logical_plan, df_logical_plan, physical_plan))
601 }
602
603 fn format_explain_output(
605 &self,
606 logical_plan: &crate::logical_plan::LogicalOperator,
607 df_logical_plan: &datafusion::logical_expr::LogicalPlan,
608 physical_plan: &dyn datafusion::physical_plan::ExecutionPlan,
609 ) -> Result<String> {
610 let mut output = String::new();
612
613 output.push_str("Cypher Query:\n");
615 output.push_str(&format!(" {}\n\n", self.query_text));
616
617 let mut rows = vec![];
619
620 let graph_plan_str = format!("{:#?}", logical_plan);
622 rows.push(("graph_logical_plan", graph_plan_str));
623
624 let df_logical_str = format!("{}", df_logical_plan.display_indent());
626 rows.push(("logical_plan", df_logical_str));
627
628 let df_physical_str = format!(
630 "{}",
631 datafusion::physical_plan::displayable(physical_plan).indent(true)
632 );
633 rows.push(("physical_plan", df_physical_str));
634
635 let plan_type_width = rows.iter().map(|(t, _)| t.len()).max().unwrap_or(10);
637 let plan_width = rows
638 .iter()
639 .map(|(_, p)| p.lines().map(|l| l.len()).max().unwrap_or(0))
640 .max()
641 .unwrap_or(50);
642
643 let separator = format!(
645 "+{}+{}+",
646 "-".repeat(plan_type_width + 2),
647 "-".repeat(plan_width + 2)
648 );
649
650 output.push_str(&separator);
651 output.push('\n');
652
653 output.push_str(&format!(
655 "| {:<width$} | {:<plan_width$} |\n",
656 "plan_type",
657 "plan",
658 width = plan_type_width,
659 plan_width = plan_width
660 ));
661 output.push_str(&separator);
662 output.push('\n');
663
664 for (plan_type, plan_content) in rows {
666 let lines: Vec<&str> = plan_content.lines().collect();
667 if lines.is_empty() {
668 output.push_str(&format!(
669 "| {:<width$} | {:<plan_width$} |\n",
670 plan_type,
671 "",
672 width = plan_type_width,
673 plan_width = plan_width
674 ));
675 } else {
676 output.push_str(&format!(
678 "| {:<width$} | {:<plan_width$} |\n",
679 plan_type,
680 lines[0],
681 width = plan_type_width,
682 plan_width = plan_width
683 ));
684
685 for line in &lines[1..] {
687 output.push_str(&format!(
688 "| {:<width$} | {:<plan_width$} |\n",
689 "",
690 line,
691 width = plan_type_width,
692 plan_width = plan_width
693 ));
694 }
695 }
696 }
697
698 output.push_str(&separator);
699 output.push('\n');
700
701 Ok(output)
702 }
703
704 pub async fn execute_simple(
711 &self,
712 datasets: HashMap<String, arrow::record_batch::RecordBatch>,
713 ) -> Result<arrow::record_batch::RecordBatch> {
714 use arrow::compute::concat_batches;
715 use datafusion::datasource::MemTable;
716 use datafusion::prelude::*;
717 use std::sync::Arc;
718
719 let _config = self.require_config()?;
721
722 if datasets.is_empty() {
723 return Err(GraphError::PlanError {
724 message: "No input datasets provided".to_string(),
725 location: snafu::Location::new(file!(), line!(), column!()),
726 });
727 }
728
729 let ctx = SessionContext::new();
731 for (name, batch) in &datasets {
732 let table =
733 MemTable::try_new(batch.schema(), vec![vec![batch.clone()]]).map_err(|e| {
734 GraphError::PlanError {
735 message: format!("Failed to create DataFusion table: {}", e),
736 location: snafu::Location::new(file!(), line!(), column!()),
737 }
738 })?;
739 ctx.register_table(name, Arc::new(table))
740 .map_err(|e| GraphError::PlanError {
741 message: format!("Failed to register table '{}': {}", name, e),
742 location: snafu::Location::new(file!(), line!(), column!()),
743 })?;
744 }
745
746 if let Some(df) = self.try_execute_path_generic(&ctx).await? {
748 let batches = df.collect().await.map_err(|e| GraphError::PlanError {
749 message: format!("Failed to collect results: {}", e),
750 location: snafu::Location::new(file!(), line!(), column!()),
751 })?;
752 if batches.is_empty() {
753 let schema = datasets.values().next().unwrap().schema();
754 return Ok(arrow_array::RecordBatch::new_empty(schema));
755 }
756 let merged = concat_batches(&batches[0].schema(), &batches).map_err(|e| {
757 GraphError::PlanError {
758 message: format!("Failed to concatenate result batches: {}", e),
759 location: snafu::Location::new(file!(), line!(), column!()),
760 }
761 })?;
762 return Ok(merged);
763 }
764
765 let (table_name, batch) = datasets.iter().next().unwrap();
767 let schema = batch.schema();
768
769 let mut df = ctx
771 .table(table_name)
772 .await
773 .map_err(|e| GraphError::PlanError {
774 message: format!("Failed to create DataFrame for '{}': {}", table_name, e),
775 location: snafu::Location::new(file!(), line!(), column!()),
776 })?;
777
778 if let Some(where_clause) = &self.ast.where_clause {
780 if let Some(filter_expr) = to_df_boolean_expr_simple(&where_clause.expression) {
781 df = df.filter(filter_expr).map_err(|e| GraphError::PlanError {
782 message: format!("Failed to apply filter: {}", e),
783 location: snafu::Location::new(file!(), line!(), column!()),
784 })?;
785 }
786 }
787
788 let proj_exprs: Vec<Expr> = self
790 .ast
791 .return_clause
792 .items
793 .iter()
794 .map(|item| to_df_value_expr_simple(&item.expression))
795 .collect();
796 if !proj_exprs.is_empty() {
797 df = df.select(proj_exprs).map_err(|e| GraphError::PlanError {
798 message: format!("Failed to project: {}", e),
799 location: snafu::Location::new(file!(), line!(), column!()),
800 })?;
801 }
802
803 if self.ast.return_clause.distinct {
805 df = df.distinct().map_err(|e| GraphError::PlanError {
806 message: format!("Failed to apply DISTINCT: {}", e),
807 location: snafu::Location::new(file!(), line!(), column!()),
808 })?;
809 }
810
811 if let Some(order_by) = &self.ast.order_by {
813 let sort_expr = to_df_order_by_expr_simple(&order_by.items);
814 df = df.sort(sort_expr).map_err(|e| GraphError::PlanError {
815 message: format!("Failed to apply ORDER BY: {}", e),
816 location: snafu::Location::new(file!(), line!(), column!()),
817 })?;
818 }
819
820 if self.ast.skip.is_some() || self.ast.limit.is_some() {
822 let offset = self.ast.skip.unwrap_or(0) as usize;
823 let fetch = self.ast.limit.map(|l| l as usize);
824 df = df.limit(offset, fetch).map_err(|e| GraphError::PlanError {
825 message: format!("Failed to apply SKIP/LIMIT: {}", e),
826 location: snafu::Location::new(file!(), line!(), column!()),
827 })?;
828 }
829
830 let batches = df.collect().await.map_err(|e| GraphError::PlanError {
832 message: format!("Failed to collect results: {}", e),
833 location: snafu::Location::new(file!(), line!(), column!()),
834 })?;
835
836 if batches.is_empty() {
837 return Ok(arrow_array::RecordBatch::new_empty(schema));
839 }
840
841 let merged =
842 concat_batches(&batches[0].schema(), &batches).map_err(|e| GraphError::PlanError {
843 message: format!("Failed to concatenate result batches: {}", e),
844 location: snafu::Location::new(file!(), line!(), column!()),
845 })?;
846 Ok(merged)
847 }
848
849 pub fn referenced_node_labels(&self) -> Vec<String> {
851 let mut labels = Vec::new();
852
853 for match_clause in &self.ast.match_clauses {
854 for pattern in &match_clause.patterns {
855 self.collect_node_labels_from_pattern(pattern, &mut labels);
856 }
857 }
858
859 labels.sort();
860 labels.dedup();
861 labels
862 }
863
864 pub fn referenced_relationship_types(&self) -> Vec<String> {
866 let mut types = Vec::new();
867
868 for match_clause in &self.ast.match_clauses {
869 for pattern in &match_clause.patterns {
870 self.collect_relationship_types_from_pattern(pattern, &mut types);
871 }
872 }
873
874 types.sort();
875 types.dedup();
876 types
877 }
878
879 pub fn variables(&self) -> Vec<String> {
881 let mut variables = Vec::new();
882
883 for match_clause in &self.ast.match_clauses {
884 for pattern in &match_clause.patterns {
885 self.collect_variables_from_pattern(pattern, &mut variables);
886 }
887 }
888
889 variables.sort();
890 variables.dedup();
891 variables
892 }
893
894 fn collect_node_labels_from_pattern(
897 &self,
898 pattern: &crate::ast::GraphPattern,
899 labels: &mut Vec<String>,
900 ) {
901 match pattern {
902 crate::ast::GraphPattern::Node(node) => {
903 labels.extend(node.labels.clone());
904 }
905 crate::ast::GraphPattern::Path(path) => {
906 labels.extend(path.start_node.labels.clone());
907 for segment in &path.segments {
908 labels.extend(segment.end_node.labels.clone());
909 }
910 }
911 }
912 }
913
914 fn collect_relationship_types_from_pattern(
915 &self,
916 pattern: &crate::ast::GraphPattern,
917 types: &mut Vec<String>,
918 ) {
919 if let crate::ast::GraphPattern::Path(path) = pattern {
920 for segment in &path.segments {
921 types.extend(segment.relationship.types.clone());
922 }
923 }
924 }
925
926 fn collect_variables_from_pattern(
927 &self,
928 pattern: &crate::ast::GraphPattern,
929 variables: &mut Vec<String>,
930 ) {
931 match pattern {
932 crate::ast::GraphPattern::Node(node) => {
933 if let Some(var) = &node.variable {
934 variables.push(var.clone());
935 }
936 }
937 crate::ast::GraphPattern::Path(path) => {
938 if let Some(var) = &path.start_node.variable {
939 variables.push(var.clone());
940 }
941 for segment in &path.segments {
942 if let Some(var) = &segment.relationship.variable {
943 variables.push(var.clone());
944 }
945 if let Some(var) = &segment.end_node.variable {
946 variables.push(var.clone());
947 }
948 }
949 }
950 }
951 }
952}
953
954impl CypherQuery {
955 async fn try_execute_path_generic(
957 &self,
958 ctx: &datafusion::prelude::SessionContext,
959 ) -> Result<Option<datafusion::dataframe::DataFrame>> {
960 use crate::ast::GraphPattern;
961 let [mc] = self.ast.match_clauses.as_slice() else {
962 return Ok(None);
963 };
964 let match_clause = mc;
965 let path = match match_clause.patterns.as_slice() {
966 [GraphPattern::Path(p)] if !p.segments.is_empty() => p,
967 _ => return Ok(None),
968 };
969 let cfg = self.require_config()?;
970
971 if path.segments.len() == 1 {
973 if let Some(length_range) = &path.segments[0].relationship.length {
974 let cap: u32 = crate::MAX_VARIABLE_LENGTH_HOPS;
975 let min_len = length_range.min.unwrap_or(1).max(1);
976 let max_len = length_range.max.unwrap_or(cap);
977
978 if min_len > max_len {
979 return Err(GraphError::InvalidPattern {
980 message: format!(
981 "Invalid variable-length range: min {:?} greater than max {:?}",
982 length_range.min, length_range.max
983 ),
984 location: snafu::Location::new(file!(), line!(), column!()),
985 });
986 }
987
988 if max_len > cap {
989 return Err(GraphError::UnsupportedFeature {
990 feature: format!(
991 "Variable-length paths with length > {} are not supported (got {:?}..{:?})",
992 cap, length_range.min, length_range.max
993 ),
994 location: snafu::Location::new(file!(), line!(), column!()),
995 });
996 }
997
998 use datafusion::dataframe::DataFrame;
999 let mut union_df: Option<DataFrame> = None;
1000
1001 for hops in min_len..=max_len {
1002 let mut synthetic = crate::ast::PathPattern {
1004 start_node: path.start_node.clone(),
1005 segments: Vec::with_capacity(hops as usize),
1006 };
1007
1008 for i in 0..hops {
1009 let mut seg = path.segments[0].clone();
1010 seg.relationship.variable = None;
1012 if (i + 1) < hops {
1013 seg.end_node.variable = None; }
1015 seg.relationship.length = None;
1017 synthetic.segments.push(seg);
1018 }
1019
1020 let exec = PathExecutor::new(ctx, cfg, &synthetic)?;
1021 let mut df = exec.build_chain().await?;
1022 df = exec.apply_where(df, &self.ast)?;
1023 df = exec.apply_return(df, &self.ast)?;
1024
1025 union_df = Some(match union_df {
1026 Some(acc) => acc.union(df).map_err(|e| GraphError::PlanError {
1027 message: format!("Failed to UNION variable-length paths: {}", e),
1028 location: snafu::Location::new(file!(), line!(), column!()),
1029 })?,
1030 None => df,
1031 });
1032 }
1033
1034 return Ok(union_df);
1035 }
1036 }
1037
1038 let exec = PathExecutor::new(ctx, cfg, path)?;
1039 let df = exec.build_chain().await?;
1040 let df = exec.apply_where(df, &self.ast)?;
1041 let df = exec.apply_return(df, &self.ast)?;
1042 Ok(Some(df))
1043 }
1044}
1045
1046#[derive(Debug, Default)]
1048pub struct CypherQueryBuilder {
1049 match_clauses: Vec<crate::ast::MatchClause>,
1050 where_expression: Option<crate::ast::BooleanExpression>,
1051 return_items: Vec<crate::ast::ReturnItem>,
1052 order_by_items: Vec<crate::ast::OrderByItem>,
1053 limit: Option<u64>,
1054 distinct: bool,
1055 skip: Option<u64>,
1056 config: Option<GraphConfig>,
1057 parameters: HashMap<String, serde_json::Value>,
1058}
1059
1060impl CypherQueryBuilder {
1061 pub fn new() -> Self {
1063 Self::default()
1064 }
1065
1066 pub fn match_node(mut self, variable: &str, label: &str) -> Self {
1068 let node = crate::ast::NodePattern {
1069 variable: Some(variable.to_string()),
1070 labels: vec![label.to_string()],
1071 properties: HashMap::new(),
1072 };
1073
1074 let match_clause = crate::ast::MatchClause {
1075 patterns: vec![crate::ast::GraphPattern::Node(node)],
1076 };
1077
1078 self.match_clauses.push(match_clause);
1079 self
1080 }
1081
1082 pub fn with_config(mut self, config: GraphConfig) -> Self {
1084 self.config = Some(config);
1085 self
1086 }
1087
1088 pub fn return_property(mut self, variable: &str, property: &str) -> Self {
1090 let prop_ref = crate::ast::PropertyRef::new(variable, property);
1091 let return_item = crate::ast::ReturnItem {
1092 expression: crate::ast::ValueExpression::Property(prop_ref),
1093 alias: None,
1094 };
1095
1096 self.return_items.push(return_item);
1097 self
1098 }
1099
1100 pub fn distinct(mut self, distinct: bool) -> Self {
1102 self.distinct = distinct;
1103 self
1104 }
1105
1106 pub fn limit(mut self, limit: u64) -> Self {
1108 self.limit = Some(limit);
1109 self
1110 }
1111
1112 pub fn skip(mut self, skip: u64) -> Self {
1114 self.skip = Some(skip);
1115 self
1116 }
1117
1118 pub fn build(self) -> Result<CypherQuery> {
1120 if self.match_clauses.is_empty() {
1121 return Err(GraphError::PlanError {
1122 message: "Query must have at least one MATCH clause".to_string(),
1123 location: snafu::Location::new(file!(), line!(), column!()),
1124 });
1125 }
1126
1127 if self.return_items.is_empty() {
1128 return Err(GraphError::PlanError {
1129 message: "Query must have at least one RETURN item".to_string(),
1130 location: snafu::Location::new(file!(), line!(), column!()),
1131 });
1132 }
1133
1134 let ast = crate::ast::CypherQuery {
1135 match_clauses: self.match_clauses,
1136 where_clause: self
1137 .where_expression
1138 .map(|expr| crate::ast::WhereClause { expression: expr }),
1139 return_clause: crate::ast::ReturnClause {
1140 distinct: self.distinct,
1141 items: self.return_items,
1142 },
1143 order_by: if self.order_by_items.is_empty() {
1144 None
1145 } else {
1146 Some(crate::ast::OrderByClause {
1147 items: self.order_by_items,
1148 })
1149 },
1150 limit: self.limit,
1151 skip: self.skip,
1152 };
1153
1154 let query_text = "MATCH ... RETURN ...".to_string(); let query = CypherQuery {
1158 query_text,
1159 ast,
1160 config: self.config,
1161 parameters: self.parameters,
1162 };
1163
1164 Ok(query)
1165 }
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170 use super::*;
1171 use crate::config::GraphConfig;
1172
1173 #[test]
1174 fn test_parse_simple_cypher_query() {
1175 let query = CypherQuery::new("MATCH (n:Person) RETURN n.name").unwrap();
1176 assert_eq!(query.query_text(), "MATCH (n:Person) RETURN n.name");
1177 assert_eq!(query.referenced_node_labels(), vec!["Person"]);
1178 assert_eq!(query.variables(), vec!["n"]);
1179 }
1180
1181 #[test]
1182 fn test_query_with_parameters() {
1183 let mut params = HashMap::new();
1184 params.insert("minAge".to_string(), serde_json::Value::Number(30.into()));
1185
1186 let query = CypherQuery::new("MATCH (n:Person) WHERE n.age > $minAge RETURN n.name")
1187 .unwrap()
1188 .with_parameters(params);
1189
1190 assert!(query.parameters().contains_key("minAge"));
1191 }
1192
1193 #[test]
1194 fn test_query_builder() {
1195 let config = GraphConfig::builder()
1196 .with_node_label("Person", "person_id")
1197 .build()
1198 .unwrap();
1199
1200 let query = CypherQueryBuilder::new()
1201 .with_config(config)
1202 .match_node("n", "Person")
1203 .return_property("n", "name")
1204 .limit(10)
1205 .build()
1206 .unwrap();
1207
1208 assert_eq!(query.referenced_node_labels(), vec!["Person"]);
1209 assert_eq!(query.variables(), vec!["n"]);
1210 }
1211
1212 #[test]
1213 fn test_relationship_query_parsing() {
1214 let query =
1215 CypherQuery::new("MATCH (a:Person)-[r:KNOWS]->(b:Person) RETURN a.name, b.name")
1216 .unwrap();
1217 assert_eq!(query.referenced_node_labels(), vec!["Person"]);
1218 assert_eq!(query.referenced_relationship_types(), vec!["KNOWS"]);
1219 assert_eq!(query.variables(), vec!["a", "b", "r"]);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_execute_basic_projection_and_filter() {
1224 use arrow_array::{Int64Array, RecordBatch, StringArray};
1225 use arrow_schema::{DataType, Field, Schema};
1226 use std::sync::Arc;
1227
1228 let schema = Arc::new(Schema::new(vec![
1230 Field::new("name", DataType::Utf8, true),
1231 Field::new("age", DataType::Int64, true),
1232 ]));
1233 let batch = RecordBatch::try_new(
1234 schema,
1235 vec![
1236 Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "David"])),
1237 Arc::new(Int64Array::from(vec![28, 34, 29, 42])),
1238 ],
1239 )
1240 .unwrap();
1241
1242 let cfg = GraphConfig::builder()
1243 .with_node_label("Person", "id")
1244 .build()
1245 .unwrap();
1246
1247 let q = CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age")
1248 .unwrap()
1249 .with_config(cfg);
1250
1251 let mut data = HashMap::new();
1252 data.insert("people".to_string(), batch);
1253
1254 let out = q.execute_simple(data).await.unwrap();
1255 assert_eq!(out.num_rows(), 2);
1256 let names = out
1257 .column(0)
1258 .as_any()
1259 .downcast_ref::<StringArray>()
1260 .unwrap();
1261 let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
1262 let result: Vec<(String, i64)> = (0..out.num_rows())
1264 .map(|i| (names.value(i).to_string(), ages.value(i)))
1265 .collect();
1266 assert!(result.contains(&("Bob".to_string(), 34)));
1267 assert!(result.contains(&("David".to_string(), 42)));
1268 }
1269
1270 #[tokio::test]
1271 async fn test_execute_single_hop_path_join_projection() {
1272 use arrow_array::{Int64Array, RecordBatch, StringArray};
1273 use arrow_schema::{DataType, Field, Schema};
1274 use std::sync::Arc;
1275
1276 let person_schema = Arc::new(Schema::new(vec![
1278 Field::new("id", DataType::Int64, false),
1279 Field::new("name", DataType::Utf8, true),
1280 Field::new("age", DataType::Int64, true),
1281 ]));
1282 let people = RecordBatch::try_new(
1283 person_schema,
1284 vec![
1285 Arc::new(Int64Array::from(vec![1, 2, 3])),
1286 Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])),
1287 Arc::new(Int64Array::from(vec![28, 34, 29])),
1288 ],
1289 )
1290 .unwrap();
1291
1292 let rel_schema = Arc::new(Schema::new(vec![
1294 Field::new("src_person_id", DataType::Int64, false),
1295 Field::new("dst_person_id", DataType::Int64, false),
1296 ]));
1297 let knows = RecordBatch::try_new(
1298 rel_schema,
1299 vec![
1300 Arc::new(Int64Array::from(vec![1, 2])), Arc::new(Int64Array::from(vec![2, 3])),
1302 ],
1303 )
1304 .unwrap();
1305
1306 let cfg = GraphConfig::builder()
1308 .with_node_label("Person", "id")
1309 .with_relationship("KNOWS", "src_person_id", "dst_person_id")
1310 .build()
1311 .unwrap();
1312
1313 let q = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN b.name")
1315 .unwrap()
1316 .with_config(cfg);
1317
1318 let mut data = HashMap::new();
1319 data.insert("Person".to_string(), people);
1321 data.insert("KNOWS".to_string(), knows);
1322
1323 let out = q.execute_simple(data).await.unwrap();
1324 let names = out
1326 .column(0)
1327 .as_any()
1328 .downcast_ref::<StringArray>()
1329 .unwrap();
1330 let got: Vec<String> = (0..out.num_rows())
1331 .map(|i| names.value(i).to_string())
1332 .collect();
1333 assert_eq!(got.len(), 2);
1334 assert!(got.contains(&"Bob".to_string()));
1335 assert!(got.contains(&"Carol".to_string()));
1336 }
1337
1338 #[tokio::test]
1339 async fn test_execute_order_by_asc() {
1340 use arrow_array::{Int64Array, RecordBatch, StringArray};
1341 use arrow_schema::{DataType, Field, Schema};
1342 use std::sync::Arc;
1343
1344 let schema = Arc::new(Schema::new(vec![
1346 Field::new("name", DataType::Utf8, true),
1347 Field::new("age", DataType::Int64, true),
1348 ]));
1349 let batch = RecordBatch::try_new(
1350 schema,
1351 vec![
1352 Arc::new(StringArray::from(vec!["Bob", "Alice", "David", "Carol"])),
1353 Arc::new(Int64Array::from(vec![34, 28, 42, 29])),
1354 ],
1355 )
1356 .unwrap();
1357
1358 let cfg = GraphConfig::builder()
1359 .with_node_label("Person", "id")
1360 .build()
1361 .unwrap();
1362
1363 let q = CypherQuery::new("MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age ASC")
1365 .unwrap()
1366 .with_config(cfg);
1367
1368 let mut data = HashMap::new();
1369 data.insert("people".to_string(), batch);
1370
1371 let out = q.execute_simple(data).await.unwrap();
1372 let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
1373 let collected: Vec<i64> = (0..out.num_rows()).map(|i| ages.value(i)).collect();
1374 assert_eq!(collected, vec![28, 29, 34, 42]);
1375 }
1376
1377 #[tokio::test]
1378 async fn test_execute_order_by_desc_with_skip_limit() {
1379 use arrow_array::{Int64Array, RecordBatch, StringArray};
1380 use arrow_schema::{DataType, Field, Schema};
1381 use std::sync::Arc;
1382
1383 let schema = Arc::new(Schema::new(vec![
1384 Field::new("name", DataType::Utf8, true),
1385 Field::new("age", DataType::Int64, true),
1386 ]));
1387 let batch = RecordBatch::try_new(
1388 schema,
1389 vec![
1390 Arc::new(StringArray::from(vec!["Bob", "Alice", "David", "Carol"])),
1391 Arc::new(Int64Array::from(vec![34, 28, 42, 29])),
1392 ],
1393 )
1394 .unwrap();
1395
1396 let cfg = GraphConfig::builder()
1397 .with_node_label("Person", "id")
1398 .build()
1399 .unwrap();
1400
1401 let q =
1403 CypherQuery::new("MATCH (p:Person) RETURN p.age ORDER BY p.age DESC SKIP 1 LIMIT 2")
1404 .unwrap()
1405 .with_config(cfg);
1406
1407 let mut data = HashMap::new();
1408 data.insert("people".to_string(), batch);
1409
1410 let out = q.execute_simple(data).await.unwrap();
1411 assert_eq!(out.num_rows(), 2);
1412 let ages = out.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
1413 let collected: Vec<i64> = (0..out.num_rows()).map(|i| ages.value(i)).collect();
1414 assert_eq!(collected, vec![34, 29]);
1415 }
1416
1417 #[tokio::test]
1418 async fn test_execute_skip_without_limit() {
1419 use arrow_array::{Int64Array, RecordBatch};
1420 use arrow_schema::{DataType, Field, Schema};
1421 use std::sync::Arc;
1422
1423 let schema = Arc::new(Schema::new(vec![Field::new("age", DataType::Int64, true)]));
1424 let batch = RecordBatch::try_new(
1425 schema,
1426 vec![Arc::new(Int64Array::from(vec![10, 20, 30, 40]))],
1427 )
1428 .unwrap();
1429
1430 let cfg = GraphConfig::builder()
1431 .with_node_label("Person", "id")
1432 .build()
1433 .unwrap();
1434
1435 let q = CypherQuery::new("MATCH (p:Person) RETURN p.age ORDER BY p.age ASC SKIP 2")
1436 .unwrap()
1437 .with_config(cfg);
1438
1439 let mut data = HashMap::new();
1440 data.insert("people".to_string(), batch);
1441
1442 let out = q.execute_simple(data).await.unwrap();
1443 assert_eq!(out.num_rows(), 2);
1444 let ages = out.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
1445 let collected: Vec<i64> = (0..out.num_rows()).map(|i| ages.value(i)).collect();
1446 assert_eq!(collected, vec![30, 40]);
1447 }
1448
1449 #[tokio::test]
1450 async fn test_execute_datafusion_pipeline() {
1451 use arrow_array::{Int64Array, RecordBatch, StringArray};
1452 use arrow_schema::{DataType, Field, Schema};
1453 use std::sync::Arc;
1454
1455 let schema = Arc::new(Schema::new(vec![
1457 Field::new("id", DataType::Int64, false),
1458 Field::new("name", DataType::Utf8, false),
1459 Field::new("age", DataType::Int64, false),
1460 ]));
1461
1462 let batch = RecordBatch::try_new(
1463 schema,
1464 vec![
1465 Arc::new(Int64Array::from(vec![1, 2, 3])),
1466 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1467 Arc::new(Int64Array::from(vec![25, 35, 30])),
1468 ],
1469 )
1470 .unwrap();
1471
1472 let cfg = GraphConfig::builder()
1473 .with_node_label("Person", "id")
1474 .build()
1475 .unwrap();
1476
1477 let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name")
1479 .unwrap()
1480 .with_config(cfg);
1481
1482 let mut datasets = HashMap::new();
1483 datasets.insert("Person".to_string(), batch);
1484
1485 let result = query.execute_datafusion(datasets.clone()).await;
1487
1488 match &result {
1489 Ok(batch) => {
1490 println!(
1491 "DataFusion result: {} rows, {} columns",
1492 batch.num_rows(),
1493 batch.num_columns()
1494 );
1495 if batch.num_rows() > 0 {
1496 println!("First row data: {:?}", batch.slice(0, 1));
1497 }
1498 }
1499 Err(e) => {
1500 println!("DataFusion execution failed: {:?}", e);
1501 }
1502 }
1503
1504 let legacy_result = query.execute_simple(datasets).await.unwrap();
1506 println!(
1507 "Legacy result: {} rows, {} columns",
1508 legacy_result.num_rows(),
1509 legacy_result.num_columns()
1510 );
1511
1512 let result = result.unwrap();
1513
1514 assert_eq!(
1516 result.num_rows(),
1517 1,
1518 "Expected 1 row after filtering WHERE p.age > 30"
1519 );
1520
1521 assert_eq!(
1523 result.num_columns(),
1524 1,
1525 "Expected 1 column after projection RETURN p.name"
1526 );
1527
1528 let names = result
1530 .column(0)
1531 .as_any()
1532 .downcast_ref::<StringArray>()
1533 .unwrap();
1534 assert_eq!(
1535 names.value(0),
1536 "Bob",
1537 "Expected filtered result to contain Bob"
1538 );
1539 }
1540
1541 #[tokio::test]
1542 async fn test_execute_datafusion_simple_scan() {
1543 use arrow_array::{Int64Array, RecordBatch, StringArray};
1544 use arrow_schema::{DataType, Field, Schema};
1545 use std::sync::Arc;
1546
1547 let schema = Arc::new(Schema::new(vec![
1549 Field::new("id", DataType::Int64, false),
1550 Field::new("name", DataType::Utf8, false),
1551 ]));
1552
1553 let batch = RecordBatch::try_new(
1554 schema,
1555 vec![
1556 Arc::new(Int64Array::from(vec![1, 2])),
1557 Arc::new(StringArray::from(vec!["Alice", "Bob"])),
1558 ],
1559 )
1560 .unwrap();
1561
1562 let cfg = GraphConfig::builder()
1563 .with_node_label("Person", "id")
1564 .build()
1565 .unwrap();
1566
1567 let query = CypherQuery::new("MATCH (p:Person) RETURN p.name")
1569 .unwrap()
1570 .with_config(cfg);
1571
1572 let mut datasets = HashMap::new();
1573 datasets.insert("Person".to_string(), batch);
1574
1575 let result = query.execute_datafusion(datasets).await.unwrap();
1577
1578 assert_eq!(
1580 result.num_rows(),
1581 2,
1582 "Should return all 2 rows without filtering"
1583 );
1584 assert_eq!(result.num_columns(), 1, "Should return 1 column (name)");
1585
1586 let names = result
1588 .column(0)
1589 .as_any()
1590 .downcast_ref::<StringArray>()
1591 .unwrap();
1592 let name_set: std::collections::HashSet<String> = (0..result.num_rows())
1593 .map(|i| names.value(i).to_string())
1594 .collect();
1595 let expected: std::collections::HashSet<String> =
1596 ["Alice", "Bob"].iter().map(|s| s.to_string()).collect();
1597 assert_eq!(name_set, expected, "Should return Alice and Bob");
1598 }
1599
1600 #[tokio::test]
1601 async fn test_execute_with_context_simple_scan() {
1602 use arrow_array::{Int64Array, RecordBatch, StringArray};
1603 use arrow_schema::{DataType, Field, Schema};
1604 use datafusion::datasource::MemTable;
1605 use datafusion::execution::context::SessionContext;
1606 use std::sync::Arc;
1607
1608 let schema = Arc::new(Schema::new(vec![
1610 Field::new("id", DataType::Int64, false),
1611 Field::new("name", DataType::Utf8, false),
1612 Field::new("age", DataType::Int64, false),
1613 ]));
1614 let batch = RecordBatch::try_new(
1615 schema.clone(),
1616 vec![
1617 Arc::new(Int64Array::from(vec![1, 2, 3])),
1618 Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])),
1619 Arc::new(Int64Array::from(vec![28, 34, 29])),
1620 ],
1621 )
1622 .unwrap();
1623
1624 let mem_table =
1626 Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap());
1627 let ctx = SessionContext::new();
1628 ctx.register_table("Person", mem_table).unwrap();
1629
1630 let cfg = GraphConfig::builder()
1632 .with_node_label("Person", "id")
1633 .build()
1634 .unwrap();
1635
1636 let query = CypherQuery::new("MATCH (p:Person) RETURN p.name")
1637 .unwrap()
1638 .with_config(cfg);
1639
1640 let result = query.execute_with_context(ctx).await.unwrap();
1642
1643 assert_eq!(result.num_rows(), 3);
1645 assert_eq!(result.num_columns(), 1);
1646
1647 let names = result
1648 .column(0)
1649 .as_any()
1650 .downcast_ref::<StringArray>()
1651 .unwrap();
1652 assert_eq!(names.value(0), "Alice");
1653 assert_eq!(names.value(1), "Bob");
1654 assert_eq!(names.value(2), "Carol");
1655 }
1656
1657 #[tokio::test]
1658 async fn test_execute_with_context_with_filter() {
1659 use arrow_array::{Int64Array, RecordBatch, StringArray};
1660 use arrow_schema::{DataType, Field, Schema};
1661 use datafusion::datasource::MemTable;
1662 use datafusion::execution::context::SessionContext;
1663 use std::sync::Arc;
1664
1665 let schema = Arc::new(Schema::new(vec![
1667 Field::new("id", DataType::Int64, false),
1668 Field::new("name", DataType::Utf8, false),
1669 Field::new("age", DataType::Int64, false),
1670 ]));
1671 let batch = RecordBatch::try_new(
1672 schema.clone(),
1673 vec![
1674 Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
1675 Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "David"])),
1676 Arc::new(Int64Array::from(vec![28, 34, 29, 42])),
1677 ],
1678 )
1679 .unwrap();
1680
1681 let mem_table =
1683 Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap());
1684 let ctx = SessionContext::new();
1685 ctx.register_table("Person", mem_table).unwrap();
1686
1687 let cfg = GraphConfig::builder()
1689 .with_node_label("Person", "id")
1690 .build()
1691 .unwrap();
1692
1693 let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age")
1694 .unwrap()
1695 .with_config(cfg);
1696
1697 let result = query.execute_with_context(ctx).await.unwrap();
1699
1700 assert_eq!(result.num_rows(), 2);
1702 assert_eq!(result.num_columns(), 2);
1703
1704 let names = result
1705 .column(0)
1706 .as_any()
1707 .downcast_ref::<StringArray>()
1708 .unwrap();
1709 let ages = result
1710 .column(1)
1711 .as_any()
1712 .downcast_ref::<Int64Array>()
1713 .unwrap();
1714
1715 let results: Vec<(String, i64)> = (0..result.num_rows())
1716 .map(|i| (names.value(i).to_string(), ages.value(i)))
1717 .collect();
1718
1719 assert!(results.contains(&("Bob".to_string(), 34)));
1720 assert!(results.contains(&("David".to_string(), 42)));
1721 }
1722
1723 #[tokio::test]
1724 async fn test_execute_with_context_relationship_traversal() {
1725 use arrow_array::{Int64Array, RecordBatch, StringArray};
1726 use arrow_schema::{DataType, Field, Schema};
1727 use datafusion::datasource::MemTable;
1728 use datafusion::execution::context::SessionContext;
1729 use std::sync::Arc;
1730
1731 let person_schema = Arc::new(Schema::new(vec![
1733 Field::new("id", DataType::Int64, false),
1734 Field::new("name", DataType::Utf8, false),
1735 ]));
1736 let person_batch = RecordBatch::try_new(
1737 person_schema.clone(),
1738 vec![
1739 Arc::new(Int64Array::from(vec![1, 2, 3])),
1740 Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])),
1741 ],
1742 )
1743 .unwrap();
1744
1745 let knows_schema = Arc::new(Schema::new(vec![
1747 Field::new("src_id", DataType::Int64, false),
1748 Field::new("dst_id", DataType::Int64, false),
1749 Field::new("since", DataType::Int64, false),
1750 ]));
1751 let knows_batch = RecordBatch::try_new(
1752 knows_schema.clone(),
1753 vec![
1754 Arc::new(Int64Array::from(vec![1, 2])),
1755 Arc::new(Int64Array::from(vec![2, 3])),
1756 Arc::new(Int64Array::from(vec![2020, 2021])),
1757 ],
1758 )
1759 .unwrap();
1760
1761 let person_table = Arc::new(
1763 MemTable::try_new(person_schema.clone(), vec![vec![person_batch.clone()]]).unwrap(),
1764 );
1765 let knows_table = Arc::new(
1766 MemTable::try_new(knows_schema.clone(), vec![vec![knows_batch.clone()]]).unwrap(),
1767 );
1768
1769 let ctx = SessionContext::new();
1770 ctx.register_table("Person", person_table).unwrap();
1771 ctx.register_table("KNOWS", knows_table).unwrap();
1772
1773 let cfg = GraphConfig::builder()
1775 .with_node_label("Person", "id")
1776 .with_relationship("KNOWS", "src_id", "dst_id")
1777 .build()
1778 .unwrap();
1779
1780 let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name, b.name")
1781 .unwrap()
1782 .with_config(cfg);
1783
1784 let result = query.execute_with_context(ctx).await.unwrap();
1786
1787 assert_eq!(result.num_rows(), 2);
1789 assert_eq!(result.num_columns(), 2);
1790
1791 let src_names = result
1792 .column(0)
1793 .as_any()
1794 .downcast_ref::<StringArray>()
1795 .unwrap();
1796 let dst_names = result
1797 .column(1)
1798 .as_any()
1799 .downcast_ref::<StringArray>()
1800 .unwrap();
1801
1802 let relationships: Vec<(String, String)> = (0..result.num_rows())
1803 .map(|i| {
1804 (
1805 src_names.value(i).to_string(),
1806 dst_names.value(i).to_string(),
1807 )
1808 })
1809 .collect();
1810
1811 assert!(relationships.contains(&("Alice".to_string(), "Bob".to_string())));
1812 assert!(relationships.contains(&("Bob".to_string(), "Carol".to_string())));
1813 }
1814
1815 #[tokio::test]
1816 async fn test_execute_with_context_order_by_limit() {
1817 use arrow_array::{Int64Array, RecordBatch, StringArray};
1818 use arrow_schema::{DataType, Field, Schema};
1819 use datafusion::datasource::MemTable;
1820 use datafusion::execution::context::SessionContext;
1821 use std::sync::Arc;
1822
1823 let schema = Arc::new(Schema::new(vec![
1825 Field::new("id", DataType::Int64, false),
1826 Field::new("name", DataType::Utf8, false),
1827 Field::new("score", DataType::Int64, false),
1828 ]));
1829 let batch = RecordBatch::try_new(
1830 schema.clone(),
1831 vec![
1832 Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
1833 Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "David"])),
1834 Arc::new(Int64Array::from(vec![85, 92, 78, 95])),
1835 ],
1836 )
1837 .unwrap();
1838
1839 let mem_table =
1841 Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap());
1842 let ctx = SessionContext::new();
1843 ctx.register_table("Student", mem_table).unwrap();
1844
1845 let cfg = GraphConfig::builder()
1847 .with_node_label("Student", "id")
1848 .build()
1849 .unwrap();
1850
1851 let query = CypherQuery::new(
1852 "MATCH (s:Student) RETURN s.name, s.score ORDER BY s.score DESC LIMIT 2",
1853 )
1854 .unwrap()
1855 .with_config(cfg);
1856
1857 let result = query.execute_with_context(ctx).await.unwrap();
1859
1860 assert_eq!(result.num_rows(), 2);
1862 assert_eq!(result.num_columns(), 2);
1863
1864 let names = result
1865 .column(0)
1866 .as_any()
1867 .downcast_ref::<StringArray>()
1868 .unwrap();
1869 let scores = result
1870 .column(1)
1871 .as_any()
1872 .downcast_ref::<Int64Array>()
1873 .unwrap();
1874
1875 assert_eq!(names.value(0), "David");
1877 assert_eq!(scores.value(0), 95);
1878
1879 assert_eq!(names.value(1), "Bob");
1881 assert_eq!(scores.value(1), 92);
1882 }
1883
1884 #[tokio::test]
1885 async fn test_to_sql() {
1886 use arrow_array::RecordBatch;
1887 use arrow_schema::{DataType, Field, Schema};
1888 use std::collections::HashMap;
1889 use std::sync::Arc;
1890
1891 let schema = Arc::new(Schema::new(vec![
1892 Field::new("id", DataType::Int64, false),
1893 Field::new("name", DataType::Utf8, false),
1894 ]));
1895 let batch = RecordBatch::new_empty(schema.clone());
1896
1897 let mut datasets = HashMap::new();
1898 datasets.insert("Person".to_string(), batch);
1899
1900 let cfg = GraphConfig::builder()
1901 .with_node_label("Person", "id")
1902 .build()
1903 .unwrap();
1904
1905 let query = CypherQuery::new("MATCH (p:Person) RETURN p.name")
1906 .unwrap()
1907 .with_config(cfg);
1908
1909 let sql = query.to_sql(datasets).await.unwrap();
1910 println!("Generated SQL: {}", sql);
1911
1912 assert!(sql.contains("SELECT"));
1913 assert!(sql.to_lowercase().contains("from person"));
1914 assert!(sql.contains("p.name"));
1917 }
1918}