lance_graph/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! High-level Cypher query interface for Lance datasets
5
6use crate::ast::CypherQuery as CypherAST;
7use crate::config::GraphConfig;
8use crate::error::{GraphError, Result};
9use crate::logical_plan::LogicalPlanner;
10use crate::parser::parse_cypher_query;
11use crate::simple_executor::{
12    to_df_boolean_expr_simple, to_df_order_by_expr_simple, to_df_value_expr_simple, PathExecutor,
13};
14use std::collections::HashMap;
15
16/// Execution strategy for Cypher queries
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18pub enum ExecutionStrategy {
19    /// Use DataFusion query planner (default, full feature support)
20    #[default]
21    DataFusion,
22    /// Use simple single-table executor (legacy, limited features)
23    Simple,
24    /// Use Lance native executor (not yet implemented)
25    LanceNative,
26}
27
28/// A Cypher query that can be executed against Lance datasets
29#[derive(Debug, Clone)]
30pub struct CypherQuery {
31    /// The original Cypher query string
32    query_text: String,
33    /// Parsed AST representation
34    ast: CypherAST,
35    /// Graph configuration for mapping
36    config: Option<GraphConfig>,
37    /// Query parameters
38    parameters: HashMap<String, serde_json::Value>,
39}
40impl CypherQuery {
41    /// Create a new Cypher query from a query string
42    pub fn new(query: &str) -> Result<Self> {
43        let ast = parse_cypher_query(query)?;
44
45        Ok(Self {
46            query_text: query.to_string(),
47            ast,
48            config: None,
49            parameters: HashMap::new(),
50        })
51    }
52
53    /// Set the graph configuration for this query
54    pub fn with_config(mut self, config: GraphConfig) -> Self {
55        self.config = Some(config);
56        self
57    }
58
59    /// Add a parameter to the query
60    pub fn with_parameter<K, V>(mut self, key: K, value: V) -> Self
61    where
62        K: Into<String>,
63        V: Into<serde_json::Value>,
64    {
65        self.parameters.insert(key.into(), value.into());
66        self
67    }
68
69    /// Add multiple parameters to the query
70    pub fn with_parameters(mut self, params: HashMap<String, serde_json::Value>) -> Self {
71        self.parameters.extend(params);
72        self
73    }
74
75    /// Get the original query text
76    pub fn query_text(&self) -> &str {
77        &self.query_text
78    }
79
80    /// Get the parsed AST
81    pub fn ast(&self) -> &CypherAST {
82        &self.ast
83    }
84
85    /// Get the graph configuration
86    pub fn config(&self) -> Option<&GraphConfig> {
87        self.config.as_ref()
88    }
89
90    /// Get query parameters
91    pub fn parameters(&self) -> &HashMap<String, serde_json::Value> {
92        &self.parameters
93    }
94
95    /// Get the required config, returning an error if not set
96    fn require_config(&self) -> Result<&GraphConfig> {
97        self.config.as_ref().ok_or_else(|| GraphError::ConfigError {
98            message: "Graph configuration is required for query execution".to_string(),
99            location: snafu::Location::new(file!(), line!(), column!()),
100        })
101    }
102
103    /// Execute the query against provided in-memory datasets
104    ///
105    /// This method uses the DataFusion planner by default for comprehensive query support
106    /// including joins, aggregations, and complex patterns. You can optionally specify
107    /// a different execution strategy.
108    ///
109    /// # Arguments
110    /// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships)
111    /// * `strategy` - Optional execution strategy (defaults to DataFusion)
112    ///
113    /// # Returns
114    /// A single RecordBatch containing the query results
115    ///
116    /// # Errors
117    /// Returns error if query parsing, planning, or execution fails
118    ///
119    /// # Example
120    /// ```ignore
121    /// use std::collections::HashMap;
122    /// use arrow::record_batch::RecordBatch;
123    /// use lance_graph::query::CypherQuery;
124    ///
125    /// // Create in-memory datasets
126    /// let mut datasets = HashMap::new();
127    /// datasets.insert("Person".to_string(), person_batch);
128    /// datasets.insert("KNOWS".to_string(), knows_batch);
129    ///
130    /// // Parse and execute query
131    /// let query = CypherQuery::parse("MATCH (p:Person)-[:KNOWS]->(f) RETURN p.name, f.name")?
132    ///     .with_config(config);
133    /// // Use the default DataFusion strategy
134    /// let result = query.execute(datasets, None).await?;
135    /// // Use the Simple strategy explicitly
136    /// let result = query.execute(datasets, Some(ExecutionStrategy::Simple)).await?;
137    /// ```
138    pub async fn execute(
139        &self,
140        datasets: HashMap<String, arrow::record_batch::RecordBatch>,
141        strategy: Option<ExecutionStrategy>,
142    ) -> Result<arrow::record_batch::RecordBatch> {
143        let strategy = strategy.unwrap_or_default();
144        match strategy {
145            ExecutionStrategy::DataFusion => self.execute_datafusion(datasets).await,
146            ExecutionStrategy::Simple => self.execute_simple(datasets).await,
147            ExecutionStrategy::LanceNative => Err(GraphError::UnsupportedFeature {
148                feature: "Lance native execution strategy is not yet implemented".to_string(),
149                location: snafu::Location::new(file!(), line!(), column!()),
150            }),
151        }
152    }
153
154    /// Explain the query execution plan using in-memory datasets
155    ///
156    /// Returns a formatted string showing the query execution plan at different stages:
157    /// - Graph Logical Plan (graph-specific operators)
158    /// - DataFusion Logical Plan (optimized relational plan)
159    /// - DataFusion Physical Plan (execution plan with optimizations)
160    ///
161    /// This is useful for understanding query performance, debugging, and optimization.
162    ///
163    /// # Arguments
164    /// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships)
165    ///
166    /// # Returns
167    /// A formatted string containing the execution plan at multiple levels
168    ///
169    /// # Errors
170    /// Returns error if planning fails
171    ///
172    /// # Example
173    /// ```ignore
174    /// use std::collections::HashMap;
175    /// use arrow::record_batch::RecordBatch;
176    /// use lance_graph::query::CypherQuery;
177    ///
178    /// // Create in-memory datasets
179    /// let mut datasets = HashMap::new();
180    /// datasets.insert("Person".to_string(), person_batch);
181    /// datasets.insert("KNOWS".to_string(), knows_batch);
182    ///
183    /// let query = CypherQuery::parse("MATCH (p:Person) WHERE p.age > 30 RETURN p.name")?
184    ///     .with_config(config);
185    ///
186    /// let plan = query.explain(datasets).await?;
187    /// println!("{}", plan);
188    /// ```
189    pub async fn explain(
190        &self,
191        datasets: HashMap<String, arrow::record_batch::RecordBatch>,
192    ) -> Result<String> {
193        use std::sync::Arc;
194
195        // Build catalog and context from datasets
196        let (catalog, ctx) = self
197            .build_catalog_and_context_from_datasets(datasets)
198            .await?;
199
200        // Delegate to the internal explain method
201        self.explain_internal(Arc::new(catalog), ctx).await
202    }
203
204    /// Convert the Cypher query to a DataFusion SQL string
205    ///
206    /// This method generates a SQL string that corresponds to the DataFusion logical plan
207    /// derived from the Cypher query. It uses the `datafusion-sql` unparser.
208    ///
209    /// **WARNING**: This method is experimental and the generated SQL dialect may change.
210    ///
211    /// **Case Sensitivity Limitation**: All table names in the generated SQL are lowercased
212    /// (e.g., `Person` becomes `person`, `Company` becomes `company`), due to the internal
213    /// handling of DataFusion's SQL unparser. Note that this only affects the SQL string
214    /// representation - actual query execution with `execute()` handles case-sensitive labels
215    /// correctly.
216    ///
217    /// If you need case-sensitive table names in the SQL output, consider:
218    /// - Using lowercase labels consistently in your Cypher queries and table names
219    /// - Post-processing the SQL string to replace table names with the correct case
220    ///
221    /// # Arguments
222    /// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships)
223    ///
224    /// # Returns
225    /// A SQL string representing the query
226    pub async fn to_sql(
227        &self,
228        datasets: HashMap<String, arrow::record_batch::RecordBatch>,
229    ) -> Result<String> {
230        use datafusion_sql::unparser::plan_to_sql;
231        use std::sync::Arc;
232
233        let _config = self.require_config()?;
234
235        // Build catalog and context from datasets using the helper
236        let (catalog, ctx) = self
237            .build_catalog_and_context_from_datasets(datasets)
238            .await?;
239
240        // Generate Logical Plan
241        let (_, df_plan) = self.create_logical_plans(Arc::new(catalog))?;
242
243        // Optimize the plan using DataFusion's default optimizer rules
244        // This helps simplify the plan (e.g., merging projections) to produce cleaner SQL
245        let optimized_plan = ctx
246            .state()
247            .optimize(&df_plan)
248            .map_err(|e| GraphError::PlanError {
249                message: format!("Failed to optimize plan: {}", e),
250                location: snafu::Location::new(file!(), line!(), column!()),
251            })?;
252
253        // Unparse to SQL
254        let sql_ast = plan_to_sql(&optimized_plan).map_err(|e| GraphError::PlanError {
255            message: format!("Failed to unparse plan to SQL: {}", e),
256            location: snafu::Location::new(file!(), line!(), column!()),
257        })?;
258
259        Ok(sql_ast.to_string())
260    }
261
262    /// Execute query with a DataFusion SessionContext, automatically building the catalog
263    ///
264    /// This is a convenience method that builds the graph catalog by querying the
265    /// SessionContext for table schemas. The GraphConfig determines which tables to
266    /// look up (node labels and relationship types).
267    ///
268    /// This method is ideal for integrating with DataFusion's rich data source ecosystem
269    /// (CSV, Parquet, Delta Lake, Iceberg, etc.) without manually building a catalog.
270    ///
271    /// # Arguments
272    /// * `ctx` - DataFusion SessionContext with pre-registered tables
273    ///
274    /// # Returns
275    /// Query results as an Arrow RecordBatch
276    ///
277    /// # Errors
278    /// Returns error if:
279    /// - GraphConfig is not set (use `.with_config()` first)
280    /// - Required tables are not registered in the SessionContext
281    /// - Query execution fails
282    ///
283    /// # Example
284    /// ```ignore
285    /// use datafusion::execution::context::SessionContext;
286    /// use datafusion::prelude::CsvReadOptions;
287    /// use lance_graph::{CypherQuery, GraphConfig};
288    ///
289    /// // Step 1: Create GraphConfig
290    /// let config = GraphConfig::builder()
291    ///     .with_node_label("Person", "person_id")
292    ///     .with_relationship("KNOWS", "src_id", "dst_id")
293    ///     .build()?;
294    ///
295    /// // Step 2: Register data sources in DataFusion
296    /// let ctx = SessionContext::new();
297    /// ctx.register_csv("Person", "data/persons.csv", CsvReadOptions::default()).await?;
298    /// ctx.register_parquet("KNOWS", "s3://bucket/knows.parquet", Default::default()).await?;
299    ///
300    /// // Step 3: Execute query (catalog is built automatically)
301    /// let query = CypherQuery::parse("MATCH (p:Person)-[:KNOWS]->(f) RETURN p.name")?
302    ///     .with_config(config);
303    /// let result = query.execute_with_context(ctx).await?;
304    /// ```
305    ///
306    /// # Note
307    /// The catalog is built by querying the SessionContext for schemas of tables
308    /// mentioned in the GraphConfig. Table names must match between GraphConfig
309    /// (node labels/relationship types) and SessionContext (registered table names).
310    pub async fn execute_with_context(
311        &self,
312        ctx: datafusion::execution::context::SessionContext,
313    ) -> Result<arrow::record_batch::RecordBatch> {
314        use crate::source_catalog::InMemoryCatalog;
315        use datafusion::datasource::DefaultTableSource;
316        use std::sync::Arc;
317
318        let config = self.require_config()?;
319
320        // Build catalog by querying SessionContext for table providers
321        let mut catalog = InMemoryCatalog::new();
322
323        // Register node sources
324        for label in config.node_mappings.keys() {
325            let table_provider =
326                ctx.table_provider(label)
327                    .await
328                    .map_err(|e| GraphError::ConfigError {
329                        message: format!(
330                            "Node label '{}' not found in SessionContext: {}",
331                            label, e
332                        ),
333                        location: snafu::Location::new(file!(), line!(), column!()),
334                    })?;
335
336            let table_source = Arc::new(DefaultTableSource::new(table_provider));
337            catalog = catalog.with_node_source(label, table_source);
338        }
339
340        // Register relationship sources
341        for rel_type in config.relationship_mappings.keys() {
342            let table_provider =
343                ctx.table_provider(rel_type)
344                    .await
345                    .map_err(|e| GraphError::ConfigError {
346                        message: format!(
347                            "Relationship type '{}' not found in SessionContext: {}",
348                            rel_type, e
349                        ),
350                        location: snafu::Location::new(file!(), line!(), column!()),
351                    })?;
352
353            let table_source = Arc::new(DefaultTableSource::new(table_provider));
354            catalog = catalog.with_relationship_source(rel_type, table_source);
355        }
356
357        // Execute using the built catalog
358        self.execute_with_catalog_and_context(Arc::new(catalog), ctx)
359            .await
360    }
361
362    /// Execute query with an explicit catalog and session context
363    ///
364    /// This is the most flexible API for advanced users who want to provide their own
365    /// catalog implementation or have fine-grained control over both the catalog and
366    /// session context.
367    ///
368    /// # Arguments
369    /// * `catalog` - Graph catalog containing node and relationship schemas for planning
370    /// * `ctx` - DataFusion SessionContext with registered data sources for execution
371    ///
372    /// # Returns
373    /// Query results as an Arrow RecordBatch
374    ///
375    /// # Errors
376    /// Returns error if query parsing, planning, or execution fails
377    ///
378    /// # Example
379    /// ```ignore
380    /// use std::sync::Arc;
381    /// use datafusion::execution::context::SessionContext;
382    /// use lance_graph::source_catalog::InMemoryCatalog;
383    /// use lance_graph::query::CypherQuery;
384    ///
385    /// // Create custom catalog
386    /// let catalog = InMemoryCatalog::new()
387    ///     .with_node_source("Person", custom_table_source);
388    ///
389    /// // Create SessionContext
390    /// let ctx = SessionContext::new();
391    /// ctx.register_table("Person", custom_table).unwrap();
392    ///
393    /// // Execute with explicit catalog and context
394    /// let query = CypherQuery::parse("MATCH (p:Person) RETURN p.name")?
395    ///     .with_config(config);
396    /// let result = query.execute_with_catalog_and_context(Arc::new(catalog), ctx).await?;
397    /// ```
398    pub async fn execute_with_catalog_and_context(
399        &self,
400        catalog: std::sync::Arc<dyn crate::source_catalog::GraphSourceCatalog>,
401        ctx: datafusion::execution::context::SessionContext,
402    ) -> Result<arrow::record_batch::RecordBatch> {
403        use arrow::compute::concat_batches;
404
405        // Create logical plans (phases 1-3)
406        let (_logical_plan, df_logical_plan) = self.create_logical_plans(catalog)?;
407
408        // Execute the DataFusion plan (phase 4)
409        let df = ctx
410            .execute_logical_plan(df_logical_plan)
411            .await
412            .map_err(|e| GraphError::ExecutionError {
413                message: format!("Failed to execute DataFusion plan: {}", e),
414                location: snafu::Location::new(file!(), line!(), column!()),
415            })?;
416
417        // Get schema before collecting (in case result is empty)
418        let result_schema = df.schema().inner().clone();
419
420        // Collect results
421        let batches = df.collect().await.map_err(|e| GraphError::ExecutionError {
422            message: format!("Failed to collect query results: {}", e),
423            location: snafu::Location::new(file!(), line!(), column!()),
424        })?;
425
426        if batches.is_empty() {
427            // Return empty batch with the schema from the DataFrame
428            // This preserves column structure even when there are no rows
429            return Ok(arrow::record_batch::RecordBatch::new_empty(result_schema));
430        }
431
432        // Combine all batches
433        let schema = batches[0].schema();
434        concat_batches(&schema, &batches).map_err(|e| GraphError::ExecutionError {
435            message: format!("Failed to concatenate result batches: {}", e),
436            location: snafu::Location::new(file!(), line!(), column!()),
437        })
438    }
439
440    /// Execute using the DataFusion planner with in-memory datasets
441    ///
442    /// # Overview
443    /// This convenience method creates both a catalog and session context from the provided
444    /// in-memory RecordBatches. It's ideal for testing and small datasets that fit in memory.
445    ///
446    /// For production use with external data sources (CSV, Parquet, databases), use
447    /// `execute_with_context` instead, which automatically builds the catalog
448    /// from the SessionContext.
449    ///
450    /// # Arguments
451    /// * `datasets` - HashMap of table name to RecordBatch (nodes and relationships)
452    ///
453    /// # Returns
454    /// A single RecordBatch containing the query results
455    async fn execute_datafusion(
456        &self,
457        datasets: HashMap<String, arrow::record_batch::RecordBatch>,
458    ) -> Result<arrow::record_batch::RecordBatch> {
459        use std::sync::Arc;
460
461        // Build catalog and context from datasets
462        let (catalog, ctx) = self
463            .build_catalog_and_context_from_datasets(datasets)
464            .await?;
465
466        // Delegate to common execution logic
467        self.execute_with_catalog_and_context(Arc::new(catalog), ctx)
468            .await
469    }
470
471    /// Helper to build catalog and context from in-memory datasets
472    async fn build_catalog_and_context_from_datasets(
473        &self,
474        datasets: HashMap<String, arrow::record_batch::RecordBatch>,
475    ) -> Result<(
476        crate::source_catalog::InMemoryCatalog,
477        datafusion::execution::context::SessionContext,
478    )> {
479        use crate::source_catalog::InMemoryCatalog;
480        use datafusion::datasource::{DefaultTableSource, MemTable};
481        use datafusion::execution::context::SessionContext;
482        use std::sync::Arc;
483
484        if datasets.is_empty() {
485            return Err(GraphError::ConfigError {
486                message: "No input datasets provided".to_string(),
487                location: snafu::Location::new(file!(), line!(), column!()),
488            });
489        }
490
491        // Create session context and catalog
492        let ctx = SessionContext::new();
493        let mut catalog = InMemoryCatalog::new();
494
495        // Register all datasets as tables
496        for (name, batch) in &datasets {
497            let mem_table = Arc::new(
498                MemTable::try_new(batch.schema(), vec![vec![batch.clone()]]).map_err(|e| {
499                    GraphError::PlanError {
500                        message: format!("Failed to create MemTable for {}: {}", name, e),
501                        location: snafu::Location::new(file!(), line!(), column!()),
502                    }
503                })?,
504            );
505
506            // Register in session context for execution
507            ctx.register_table(name, mem_table.clone())
508                .map_err(|e| GraphError::PlanError {
509                    message: format!("Failed to register table {}: {}", name, e),
510                    location: snafu::Location::new(file!(), line!(), column!()),
511                })?;
512
513            let table_source = Arc::new(DefaultTableSource::new(mem_table));
514
515            // Register as both node and relationship source
516            // The planner will use whichever is appropriate based on the query
517            catalog = catalog
518                .with_node_source(name, table_source.clone())
519                .with_relationship_source(name, table_source);
520        }
521
522        Ok((catalog, ctx))
523    }
524
525    /// Internal helper to explain the query execution plan with explicit catalog and session context
526    async fn explain_internal(
527        &self,
528        catalog: std::sync::Arc<dyn crate::source_catalog::GraphSourceCatalog>,
529        ctx: datafusion::execution::context::SessionContext,
530    ) -> Result<String> {
531        // Create all plans (phases 1-4)
532        let (logical_plan, df_logical_plan, physical_plan) =
533            self.create_plans(catalog, &ctx).await?;
534
535        // Format the explain output
536        self.format_explain_output(&logical_plan, &df_logical_plan, physical_plan.as_ref())
537    }
538
539    /// Helper to create logical plans (graph logical, DataFusion logical)
540    ///
541    /// This performs phases 1-3 of query execution (semantic analysis, graph logical planning,
542    /// DataFusion logical planning) without creating the physical plan.
543    fn create_logical_plans(
544        &self,
545        catalog: std::sync::Arc<dyn crate::source_catalog::GraphSourceCatalog>,
546    ) -> Result<(
547        crate::logical_plan::LogicalOperator,
548        datafusion::logical_expr::LogicalPlan,
549    )> {
550        use crate::datafusion_planner::{DataFusionPlanner, GraphPhysicalPlanner};
551        use crate::semantic::SemanticAnalyzer;
552
553        let config = self.require_config()?;
554
555        // Phase 1: Semantic Analysis
556        let mut analyzer = SemanticAnalyzer::new(config.clone());
557        analyzer.analyze(&self.ast)?;
558
559        // Phase 2: Graph Logical Plan
560        let mut logical_planner = LogicalPlanner::new();
561        let logical_plan = logical_planner.plan(&self.ast)?;
562
563        // Phase 3: DataFusion Logical Plan
564        let df_planner = DataFusionPlanner::with_catalog(config.clone(), catalog);
565        let df_logical_plan = df_planner.plan(&logical_plan)?;
566
567        Ok((logical_plan, df_logical_plan))
568    }
569
570    /// Helper to create all plans (graph logical, DataFusion logical, physical)
571    async fn create_plans(
572        &self,
573        catalog: std::sync::Arc<dyn crate::source_catalog::GraphSourceCatalog>,
574        ctx: &datafusion::execution::context::SessionContext,
575    ) -> Result<(
576        crate::logical_plan::LogicalOperator,
577        datafusion::logical_expr::LogicalPlan,
578        std::sync::Arc<dyn datafusion::physical_plan::ExecutionPlan>,
579    )> {
580        // Phases 1-3: Create logical plans
581        let (logical_plan, df_logical_plan) = self.create_logical_plans(catalog)?;
582
583        // Phase 4: DataFusion Physical Plan
584        let df = ctx
585            .execute_logical_plan(df_logical_plan.clone())
586            .await
587            .map_err(|e| GraphError::ExecutionError {
588                message: format!("Failed to execute DataFusion plan: {}", e),
589                location: snafu::Location::new(file!(), line!(), column!()),
590            })?;
591
592        let physical_plan =
593            df.create_physical_plan()
594                .await
595                .map_err(|e| GraphError::ExecutionError {
596                    message: format!("Failed to create physical plan: {}", e),
597                    location: snafu::Location::new(file!(), line!(), column!()),
598                })?;
599
600        Ok((logical_plan, df_logical_plan, physical_plan))
601    }
602
603    /// Format explain output as a table
604    fn format_explain_output(
605        &self,
606        logical_plan: &crate::logical_plan::LogicalOperator,
607        df_logical_plan: &datafusion::logical_expr::LogicalPlan,
608        physical_plan: &dyn datafusion::physical_plan::ExecutionPlan,
609    ) -> Result<String> {
610        // Format output with query first, then table
611        let mut output = String::new();
612
613        // Show Cypher query before the table
614        output.push_str("Cypher Query:\n");
615        output.push_str(&format!("  {}\n\n", self.query_text));
616
617        // Build table rows (without the query)
618        let mut rows = vec![];
619
620        // Row 1: Graph Logical Plan
621        let graph_plan_str = format!("{:#?}", logical_plan);
622        rows.push(("graph_logical_plan", graph_plan_str));
623
624        // Row 2: DataFusion Logical Plan
625        let df_logical_str = format!("{}", df_logical_plan.display_indent());
626        rows.push(("logical_plan", df_logical_str));
627
628        // Row 3: DataFusion Physical Plan
629        let df_physical_str = format!(
630            "{}",
631            datafusion::physical_plan::displayable(physical_plan).indent(true)
632        );
633        rows.push(("physical_plan", df_physical_str));
634
635        // Calculate column widths
636        let plan_type_width = rows.iter().map(|(t, _)| t.len()).max().unwrap_or(10);
637        let plan_width = rows
638            .iter()
639            .map(|(_, p)| p.lines().map(|l| l.len()).max().unwrap_or(0))
640            .max()
641            .unwrap_or(50);
642
643        // Build table
644        let separator = format!(
645            "+{}+{}+",
646            "-".repeat(plan_type_width + 2),
647            "-".repeat(plan_width + 2)
648        );
649
650        output.push_str(&separator);
651        output.push('\n');
652
653        // Header
654        output.push_str(&format!(
655            "| {:<width$} | {:<plan_width$} |\n",
656            "plan_type",
657            "plan",
658            width = plan_type_width,
659            plan_width = plan_width
660        ));
661        output.push_str(&separator);
662        output.push('\n');
663
664        // Data rows
665        for (plan_type, plan_content) in rows {
666            let lines: Vec<&str> = plan_content.lines().collect();
667            if lines.is_empty() {
668                output.push_str(&format!(
669                    "| {:<width$} | {:<plan_width$} |\n",
670                    plan_type,
671                    "",
672                    width = plan_type_width,
673                    plan_width = plan_width
674                ));
675            } else {
676                // First line with plan_type
677                output.push_str(&format!(
678                    "| {:<width$} | {:<plan_width$} |\n",
679                    plan_type,
680                    lines[0],
681                    width = plan_type_width,
682                    plan_width = plan_width
683                ));
684
685                // Remaining lines with empty plan_type
686                for line in &lines[1..] {
687                    output.push_str(&format!(
688                        "| {:<width$} | {:<plan_width$} |\n",
689                        "",
690                        line,
691                        width = plan_type_width,
692                        plan_width = plan_width
693                    ));
694                }
695            }
696        }
697
698        output.push_str(&separator);
699        output.push('\n');
700
701        Ok(output)
702    }
703
704    /// Execute simple single-table queries (legacy implementation)
705    ///
706    /// This method supports basic projection/filter/limit workflows on a single table.
707    /// For full query support including joins and complex patterns, use `execute()` instead.
708    ///
709    /// Note: This implementation is retained for backward compatibility and simple use cases.
710    pub async fn execute_simple(
711        &self,
712        datasets: HashMap<String, arrow::record_batch::RecordBatch>,
713    ) -> Result<arrow::record_batch::RecordBatch> {
714        use arrow::compute::concat_batches;
715        use datafusion::datasource::MemTable;
716        use datafusion::prelude::*;
717        use std::sync::Arc;
718
719        // Require a config for now, even if we don't fully exploit it yet
720        let _config = self.require_config()?;
721
722        if datasets.is_empty() {
723            return Err(GraphError::PlanError {
724                message: "No input datasets provided".to_string(),
725                location: snafu::Location::new(file!(), line!(), column!()),
726            });
727        }
728
729        // Create DataFusion context and register all provided tables
730        let ctx = SessionContext::new();
731        for (name, batch) in &datasets {
732            let table =
733                MemTable::try_new(batch.schema(), vec![vec![batch.clone()]]).map_err(|e| {
734                    GraphError::PlanError {
735                        message: format!("Failed to create DataFusion table: {}", e),
736                        location: snafu::Location::new(file!(), line!(), column!()),
737                    }
738                })?;
739            ctx.register_table(name, Arc::new(table))
740                .map_err(|e| GraphError::PlanError {
741                    message: format!("Failed to register table '{}': {}", name, e),
742                    location: snafu::Location::new(file!(), line!(), column!()),
743                })?;
744        }
745
746        // Try to execute a path (1+ hops) if the query is a simple pattern
747        if let Some(df) = self.try_execute_path_generic(&ctx).await? {
748            let batches = df.collect().await.map_err(|e| GraphError::PlanError {
749                message: format!("Failed to collect results: {}", e),
750                location: snafu::Location::new(file!(), line!(), column!()),
751            })?;
752            if batches.is_empty() {
753                let schema = datasets.values().next().unwrap().schema();
754                return Ok(arrow_array::RecordBatch::new_empty(schema));
755            }
756            let merged = concat_batches(&batches[0].schema(), &batches).map_err(|e| {
757                GraphError::PlanError {
758                    message: format!("Failed to concatenate result batches: {}", e),
759                    location: snafu::Location::new(file!(), line!(), column!()),
760                }
761            })?;
762            return Ok(merged);
763        }
764
765        // Fallback: single-table style query on the first provided table
766        let (table_name, batch) = datasets.iter().next().unwrap();
767        let schema = batch.schema();
768
769        // Start a DataFrame from the registered table
770        let mut df = ctx
771            .table(table_name)
772            .await
773            .map_err(|e| GraphError::PlanError {
774                message: format!("Failed to create DataFrame for '{}': {}", table_name, e),
775                location: snafu::Location::new(file!(), line!(), column!()),
776            })?;
777
778        // Apply WHERE if present (limited support: simple comparisons on a single column)
779        if let Some(where_clause) = &self.ast.where_clause {
780            if let Some(filter_expr) = to_df_boolean_expr_simple(&where_clause.expression) {
781                df = df.filter(filter_expr).map_err(|e| GraphError::PlanError {
782                    message: format!("Failed to apply filter: {}", e),
783                    location: snafu::Location::new(file!(), line!(), column!()),
784                })?;
785            }
786        }
787
788        // Build projection from RETURN clause
789        let proj_exprs: Vec<Expr> = self
790            .ast
791            .return_clause
792            .items
793            .iter()
794            .map(|item| to_df_value_expr_simple(&item.expression))
795            .collect();
796        if !proj_exprs.is_empty() {
797            df = df.select(proj_exprs).map_err(|e| GraphError::PlanError {
798                message: format!("Failed to project: {}", e),
799                location: snafu::Location::new(file!(), line!(), column!()),
800            })?;
801        }
802
803        // Apply DISTINCT
804        if self.ast.return_clause.distinct {
805            df = df.distinct().map_err(|e| GraphError::PlanError {
806                message: format!("Failed to apply DISTINCT: {}", e),
807                location: snafu::Location::new(file!(), line!(), column!()),
808            })?;
809        }
810
811        // Apply ORDER BY if present
812        if let Some(order_by) = &self.ast.order_by {
813            let sort_expr = to_df_order_by_expr_simple(&order_by.items);
814            df = df.sort(sort_expr).map_err(|e| GraphError::PlanError {
815                message: format!("Failed to apply ORDER BY: {}", e),
816                location: snafu::Location::new(file!(), line!(), column!()),
817            })?;
818        }
819
820        // Apply SKIP/OFFSET and LIMIT if present
821        if self.ast.skip.is_some() || self.ast.limit.is_some() {
822            let offset = self.ast.skip.unwrap_or(0) as usize;
823            let fetch = self.ast.limit.map(|l| l as usize);
824            df = df.limit(offset, fetch).map_err(|e| GraphError::PlanError {
825                message: format!("Failed to apply SKIP/LIMIT: {}", e),
826                location: snafu::Location::new(file!(), line!(), column!()),
827            })?;
828        }
829
830        // Collect results and concat into a single RecordBatch
831        let batches = df.collect().await.map_err(|e| GraphError::PlanError {
832            message: format!("Failed to collect results: {}", e),
833            location: snafu::Location::new(file!(), line!(), column!()),
834        })?;
835
836        if batches.is_empty() {
837            // Return an empty batch with the source schema
838            return Ok(arrow_array::RecordBatch::new_empty(schema));
839        }
840
841        let merged =
842            concat_batches(&batches[0].schema(), &batches).map_err(|e| GraphError::PlanError {
843                message: format!("Failed to concatenate result batches: {}", e),
844                location: snafu::Location::new(file!(), line!(), column!()),
845            })?;
846        Ok(merged)
847    }
848
849    /// Get all node labels referenced in this query
850    pub fn referenced_node_labels(&self) -> Vec<String> {
851        let mut labels = Vec::new();
852
853        for match_clause in &self.ast.match_clauses {
854            for pattern in &match_clause.patterns {
855                self.collect_node_labels_from_pattern(pattern, &mut labels);
856            }
857        }
858
859        labels.sort();
860        labels.dedup();
861        labels
862    }
863
864    /// Get all relationship types referenced in this query
865    pub fn referenced_relationship_types(&self) -> Vec<String> {
866        let mut types = Vec::new();
867
868        for match_clause in &self.ast.match_clauses {
869            for pattern in &match_clause.patterns {
870                self.collect_relationship_types_from_pattern(pattern, &mut types);
871            }
872        }
873
874        types.sort();
875        types.dedup();
876        types
877    }
878
879    /// Get all variables used in this query
880    pub fn variables(&self) -> Vec<String> {
881        let mut variables = Vec::new();
882
883        for match_clause in &self.ast.match_clauses {
884            for pattern in &match_clause.patterns {
885                self.collect_variables_from_pattern(pattern, &mut variables);
886            }
887        }
888
889        variables.sort();
890        variables.dedup();
891        variables
892    }
893
894    // Collection helper methods
895
896    fn collect_node_labels_from_pattern(
897        &self,
898        pattern: &crate::ast::GraphPattern,
899        labels: &mut Vec<String>,
900    ) {
901        match pattern {
902            crate::ast::GraphPattern::Node(node) => {
903                labels.extend(node.labels.clone());
904            }
905            crate::ast::GraphPattern::Path(path) => {
906                labels.extend(path.start_node.labels.clone());
907                for segment in &path.segments {
908                    labels.extend(segment.end_node.labels.clone());
909                }
910            }
911        }
912    }
913
914    fn collect_relationship_types_from_pattern(
915        &self,
916        pattern: &crate::ast::GraphPattern,
917        types: &mut Vec<String>,
918    ) {
919        if let crate::ast::GraphPattern::Path(path) = pattern {
920            for segment in &path.segments {
921                types.extend(segment.relationship.types.clone());
922            }
923        }
924    }
925
926    fn collect_variables_from_pattern(
927        &self,
928        pattern: &crate::ast::GraphPattern,
929        variables: &mut Vec<String>,
930    ) {
931        match pattern {
932            crate::ast::GraphPattern::Node(node) => {
933                if let Some(var) = &node.variable {
934                    variables.push(var.clone());
935                }
936            }
937            crate::ast::GraphPattern::Path(path) => {
938                if let Some(var) = &path.start_node.variable {
939                    variables.push(var.clone());
940                }
941                for segment in &path.segments {
942                    if let Some(var) = &segment.relationship.variable {
943                        variables.push(var.clone());
944                    }
945                    if let Some(var) = &segment.end_node.variable {
946                        variables.push(var.clone());
947                    }
948                }
949            }
950        }
951    }
952}
953
954impl CypherQuery {
955    // Generic path executor (N-hop) entrypoint.
956    async fn try_execute_path_generic(
957        &self,
958        ctx: &datafusion::prelude::SessionContext,
959    ) -> Result<Option<datafusion::dataframe::DataFrame>> {
960        use crate::ast::GraphPattern;
961        let [mc] = self.ast.match_clauses.as_slice() else {
962            return Ok(None);
963        };
964        let match_clause = mc;
965        let path = match match_clause.patterns.as_slice() {
966            [GraphPattern::Path(p)] if !p.segments.is_empty() => p,
967            _ => return Ok(None),
968        };
969        let cfg = self.require_config()?;
970
971        // Handle single-segment variable-length paths by unrolling ranges (*1..N, capped)
972        if path.segments.len() == 1 {
973            if let Some(length_range) = &path.segments[0].relationship.length {
974                let cap: u32 = crate::MAX_VARIABLE_LENGTH_HOPS;
975                let min_len = length_range.min.unwrap_or(1).max(1);
976                let max_len = length_range.max.unwrap_or(cap);
977
978                if min_len > max_len {
979                    return Err(GraphError::InvalidPattern {
980                        message: format!(
981                            "Invalid variable-length range: min {:?} greater than max {:?}",
982                            length_range.min, length_range.max
983                        ),
984                        location: snafu::Location::new(file!(), line!(), column!()),
985                    });
986                }
987
988                if max_len > cap {
989                    return Err(GraphError::UnsupportedFeature {
990                        feature: format!(
991                            "Variable-length paths with length > {} are not supported (got {:?}..{:?})",
992                            cap, length_range.min, length_range.max
993                        ),
994                        location: snafu::Location::new(file!(), line!(), column!()),
995                    });
996                }
997
998                use datafusion::dataframe::DataFrame;
999                let mut union_df: Option<DataFrame> = None;
1000
1001                for hops in min_len..=max_len {
1002                    // Build a fixed-length synthetic path by repeating the single segment
1003                    let mut synthetic = crate::ast::PathPattern {
1004                        start_node: path.start_node.clone(),
1005                        segments: Vec::with_capacity(hops as usize),
1006                    };
1007
1008                    for i in 0..hops {
1009                        let mut seg = path.segments[0].clone();
1010                        // Drop variables to avoid alias collisions on repeated hops
1011                        seg.relationship.variable = None;
1012                        if (i + 1) < hops {
1013                            seg.end_node.variable = None; // intermediate hop
1014                        }
1015                        // Clear length spec for this fixed hop
1016                        seg.relationship.length = None;
1017                        synthetic.segments.push(seg);
1018                    }
1019
1020                    let exec = PathExecutor::new(ctx, cfg, &synthetic)?;
1021                    let mut df = exec.build_chain().await?;
1022                    df = exec.apply_where(df, &self.ast)?;
1023                    df = exec.apply_return(df, &self.ast)?;
1024
1025                    union_df = Some(match union_df {
1026                        Some(acc) => acc.union(df).map_err(|e| GraphError::PlanError {
1027                            message: format!("Failed to UNION variable-length paths: {}", e),
1028                            location: snafu::Location::new(file!(), line!(), column!()),
1029                        })?,
1030                        None => df,
1031                    });
1032                }
1033
1034                return Ok(union_df);
1035            }
1036        }
1037
1038        let exec = PathExecutor::new(ctx, cfg, path)?;
1039        let df = exec.build_chain().await?;
1040        let df = exec.apply_where(df, &self.ast)?;
1041        let df = exec.apply_return(df, &self.ast)?;
1042        Ok(Some(df))
1043    }
1044}
1045
1046/// Builder for constructing Cypher queries programmatically
1047#[derive(Debug, Default)]
1048pub struct CypherQueryBuilder {
1049    match_clauses: Vec<crate::ast::MatchClause>,
1050    where_expression: Option<crate::ast::BooleanExpression>,
1051    return_items: Vec<crate::ast::ReturnItem>,
1052    order_by_items: Vec<crate::ast::OrderByItem>,
1053    limit: Option<u64>,
1054    distinct: bool,
1055    skip: Option<u64>,
1056    config: Option<GraphConfig>,
1057    parameters: HashMap<String, serde_json::Value>,
1058}
1059
1060impl CypherQueryBuilder {
1061    /// Create a new query builder
1062    pub fn new() -> Self {
1063        Self::default()
1064    }
1065
1066    /// Add a MATCH clause for a node pattern
1067    pub fn match_node(mut self, variable: &str, label: &str) -> Self {
1068        let node = crate::ast::NodePattern {
1069            variable: Some(variable.to_string()),
1070            labels: vec![label.to_string()],
1071            properties: HashMap::new(),
1072        };
1073
1074        let match_clause = crate::ast::MatchClause {
1075            patterns: vec![crate::ast::GraphPattern::Node(node)],
1076        };
1077
1078        self.match_clauses.push(match_clause);
1079        self
1080    }
1081
1082    /// Set the graph configuration
1083    pub fn with_config(mut self, config: GraphConfig) -> Self {
1084        self.config = Some(config);
1085        self
1086    }
1087
1088    /// Add a RETURN item
1089    pub fn return_property(mut self, variable: &str, property: &str) -> Self {
1090        let prop_ref = crate::ast::PropertyRef::new(variable, property);
1091        let return_item = crate::ast::ReturnItem {
1092            expression: crate::ast::ValueExpression::Property(prop_ref),
1093            alias: None,
1094        };
1095
1096        self.return_items.push(return_item);
1097        self
1098    }
1099
1100    /// Set DISTINCT flag
1101    pub fn distinct(mut self, distinct: bool) -> Self {
1102        self.distinct = distinct;
1103        self
1104    }
1105
1106    /// Add a LIMIT clause
1107    pub fn limit(mut self, limit: u64) -> Self {
1108        self.limit = Some(limit);
1109        self
1110    }
1111
1112    /// Add a SKIP clause
1113    pub fn skip(mut self, skip: u64) -> Self {
1114        self.skip = Some(skip);
1115        self
1116    }
1117
1118    /// Build the final CypherQuery
1119    pub fn build(self) -> Result<CypherQuery> {
1120        if self.match_clauses.is_empty() {
1121            return Err(GraphError::PlanError {
1122                message: "Query must have at least one MATCH clause".to_string(),
1123                location: snafu::Location::new(file!(), line!(), column!()),
1124            });
1125        }
1126
1127        if self.return_items.is_empty() {
1128            return Err(GraphError::PlanError {
1129                message: "Query must have at least one RETURN item".to_string(),
1130                location: snafu::Location::new(file!(), line!(), column!()),
1131            });
1132        }
1133
1134        let ast = crate::ast::CypherQuery {
1135            match_clauses: self.match_clauses,
1136            where_clause: self
1137                .where_expression
1138                .map(|expr| crate::ast::WhereClause { expression: expr }),
1139            return_clause: crate::ast::ReturnClause {
1140                distinct: self.distinct,
1141                items: self.return_items,
1142            },
1143            order_by: if self.order_by_items.is_empty() {
1144                None
1145            } else {
1146                Some(crate::ast::OrderByClause {
1147                    items: self.order_by_items,
1148                })
1149            },
1150            limit: self.limit,
1151            skip: self.skip,
1152        };
1153
1154        // Generate query text from AST (simplified)
1155        let query_text = "MATCH ... RETURN ...".to_string(); // TODO: Implement AST->text conversion
1156
1157        let query = CypherQuery {
1158            query_text,
1159            ast,
1160            config: self.config,
1161            parameters: self.parameters,
1162        };
1163
1164        Ok(query)
1165    }
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170    use super::*;
1171    use crate::config::GraphConfig;
1172
1173    #[test]
1174    fn test_parse_simple_cypher_query() {
1175        let query = CypherQuery::new("MATCH (n:Person) RETURN n.name").unwrap();
1176        assert_eq!(query.query_text(), "MATCH (n:Person) RETURN n.name");
1177        assert_eq!(query.referenced_node_labels(), vec!["Person"]);
1178        assert_eq!(query.variables(), vec!["n"]);
1179    }
1180
1181    #[test]
1182    fn test_query_with_parameters() {
1183        let mut params = HashMap::new();
1184        params.insert("minAge".to_string(), serde_json::Value::Number(30.into()));
1185
1186        let query = CypherQuery::new("MATCH (n:Person) WHERE n.age > $minAge RETURN n.name")
1187            .unwrap()
1188            .with_parameters(params);
1189
1190        assert!(query.parameters().contains_key("minAge"));
1191    }
1192
1193    #[test]
1194    fn test_query_builder() {
1195        let config = GraphConfig::builder()
1196            .with_node_label("Person", "person_id")
1197            .build()
1198            .unwrap();
1199
1200        let query = CypherQueryBuilder::new()
1201            .with_config(config)
1202            .match_node("n", "Person")
1203            .return_property("n", "name")
1204            .limit(10)
1205            .build()
1206            .unwrap();
1207
1208        assert_eq!(query.referenced_node_labels(), vec!["Person"]);
1209        assert_eq!(query.variables(), vec!["n"]);
1210    }
1211
1212    #[test]
1213    fn test_relationship_query_parsing() {
1214        let query =
1215            CypherQuery::new("MATCH (a:Person)-[r:KNOWS]->(b:Person) RETURN a.name, b.name")
1216                .unwrap();
1217        assert_eq!(query.referenced_node_labels(), vec!["Person"]);
1218        assert_eq!(query.referenced_relationship_types(), vec!["KNOWS"]);
1219        assert_eq!(query.variables(), vec!["a", "b", "r"]);
1220    }
1221
1222    #[tokio::test]
1223    async fn test_execute_basic_projection_and_filter() {
1224        use arrow_array::{Int64Array, RecordBatch, StringArray};
1225        use arrow_schema::{DataType, Field, Schema};
1226        use std::sync::Arc;
1227
1228        // Build a simple batch: name (Utf8), age (Int64)
1229        let schema = Arc::new(Schema::new(vec![
1230            Field::new("name", DataType::Utf8, true),
1231            Field::new("age", DataType::Int64, true),
1232        ]));
1233        let batch = RecordBatch::try_new(
1234            schema,
1235            vec![
1236                Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "David"])),
1237                Arc::new(Int64Array::from(vec![28, 34, 29, 42])),
1238            ],
1239        )
1240        .unwrap();
1241
1242        let cfg = GraphConfig::builder()
1243            .with_node_label("Person", "id")
1244            .build()
1245            .unwrap();
1246
1247        let q = CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age")
1248            .unwrap()
1249            .with_config(cfg);
1250
1251        let mut data = HashMap::new();
1252        data.insert("people".to_string(), batch);
1253
1254        let out = q.execute_simple(data).await.unwrap();
1255        assert_eq!(out.num_rows(), 2);
1256        let names = out
1257            .column(0)
1258            .as_any()
1259            .downcast_ref::<StringArray>()
1260            .unwrap();
1261        let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
1262        // Expect Bob (34) and David (42)
1263        let result: Vec<(String, i64)> = (0..out.num_rows())
1264            .map(|i| (names.value(i).to_string(), ages.value(i)))
1265            .collect();
1266        assert!(result.contains(&("Bob".to_string(), 34)));
1267        assert!(result.contains(&("David".to_string(), 42)));
1268    }
1269
1270    #[tokio::test]
1271    async fn test_execute_single_hop_path_join_projection() {
1272        use arrow_array::{Int64Array, RecordBatch, StringArray};
1273        use arrow_schema::{DataType, Field, Schema};
1274        use std::sync::Arc;
1275
1276        // People table: id, name, age
1277        let person_schema = Arc::new(Schema::new(vec![
1278            Field::new("id", DataType::Int64, false),
1279            Field::new("name", DataType::Utf8, true),
1280            Field::new("age", DataType::Int64, true),
1281        ]));
1282        let people = RecordBatch::try_new(
1283            person_schema,
1284            vec![
1285                Arc::new(Int64Array::from(vec![1, 2, 3])),
1286                Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])),
1287                Arc::new(Int64Array::from(vec![28, 34, 29])),
1288            ],
1289        )
1290        .unwrap();
1291
1292        // KNOWS relationship: src_person_id -> dst_person_id
1293        let rel_schema = Arc::new(Schema::new(vec![
1294            Field::new("src_person_id", DataType::Int64, false),
1295            Field::new("dst_person_id", DataType::Int64, false),
1296        ]));
1297        let knows = RecordBatch::try_new(
1298            rel_schema,
1299            vec![
1300                Arc::new(Int64Array::from(vec![1, 2])), // Alice -> Bob, Bob -> Carol
1301                Arc::new(Int64Array::from(vec![2, 3])),
1302            ],
1303        )
1304        .unwrap();
1305
1306        // Config: Person(id) and KNOWS(src_person_id -> dst_person_id)
1307        let cfg = GraphConfig::builder()
1308            .with_node_label("Person", "id")
1309            .with_relationship("KNOWS", "src_person_id", "dst_person_id")
1310            .build()
1311            .unwrap();
1312
1313        // Query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN b.name
1314        let q = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN b.name")
1315            .unwrap()
1316            .with_config(cfg);
1317
1318        let mut data = HashMap::new();
1319        // Register tables using labels / rel types as names
1320        data.insert("Person".to_string(), people);
1321        data.insert("KNOWS".to_string(), knows);
1322
1323        let out = q.execute_simple(data).await.unwrap();
1324        // Expect two rows: Bob, Carol (the targets)
1325        let names = out
1326            .column(0)
1327            .as_any()
1328            .downcast_ref::<StringArray>()
1329            .unwrap();
1330        let got: Vec<String> = (0..out.num_rows())
1331            .map(|i| names.value(i).to_string())
1332            .collect();
1333        assert_eq!(got.len(), 2);
1334        assert!(got.contains(&"Bob".to_string()));
1335        assert!(got.contains(&"Carol".to_string()));
1336    }
1337
1338    #[tokio::test]
1339    async fn test_execute_order_by_asc() {
1340        use arrow_array::{Int64Array, RecordBatch, StringArray};
1341        use arrow_schema::{DataType, Field, Schema};
1342        use std::sync::Arc;
1343
1344        // name, age (int)
1345        let schema = Arc::new(Schema::new(vec![
1346            Field::new("name", DataType::Utf8, true),
1347            Field::new("age", DataType::Int64, true),
1348        ]));
1349        let batch = RecordBatch::try_new(
1350            schema,
1351            vec![
1352                Arc::new(StringArray::from(vec!["Bob", "Alice", "David", "Carol"])),
1353                Arc::new(Int64Array::from(vec![34, 28, 42, 29])),
1354            ],
1355        )
1356        .unwrap();
1357
1358        let cfg = GraphConfig::builder()
1359            .with_node_label("Person", "id")
1360            .build()
1361            .unwrap();
1362
1363        // Order ascending by age
1364        let q = CypherQuery::new("MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age ASC")
1365            .unwrap()
1366            .with_config(cfg);
1367
1368        let mut data = HashMap::new();
1369        data.insert("people".to_string(), batch);
1370
1371        let out = q.execute_simple(data).await.unwrap();
1372        let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
1373        let collected: Vec<i64> = (0..out.num_rows()).map(|i| ages.value(i)).collect();
1374        assert_eq!(collected, vec![28, 29, 34, 42]);
1375    }
1376
1377    #[tokio::test]
1378    async fn test_execute_order_by_desc_with_skip_limit() {
1379        use arrow_array::{Int64Array, RecordBatch, StringArray};
1380        use arrow_schema::{DataType, Field, Schema};
1381        use std::sync::Arc;
1382
1383        let schema = Arc::new(Schema::new(vec![
1384            Field::new("name", DataType::Utf8, true),
1385            Field::new("age", DataType::Int64, true),
1386        ]));
1387        let batch = RecordBatch::try_new(
1388            schema,
1389            vec![
1390                Arc::new(StringArray::from(vec!["Bob", "Alice", "David", "Carol"])),
1391                Arc::new(Int64Array::from(vec![34, 28, 42, 29])),
1392            ],
1393        )
1394        .unwrap();
1395
1396        let cfg = GraphConfig::builder()
1397            .with_node_label("Person", "id")
1398            .build()
1399            .unwrap();
1400
1401        // Desc by age, skip 1 (drop 42), take 2 -> [34, 29]
1402        let q =
1403            CypherQuery::new("MATCH (p:Person) RETURN p.age ORDER BY p.age DESC SKIP 1 LIMIT 2")
1404                .unwrap()
1405                .with_config(cfg);
1406
1407        let mut data = HashMap::new();
1408        data.insert("people".to_string(), batch);
1409
1410        let out = q.execute_simple(data).await.unwrap();
1411        assert_eq!(out.num_rows(), 2);
1412        let ages = out.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
1413        let collected: Vec<i64> = (0..out.num_rows()).map(|i| ages.value(i)).collect();
1414        assert_eq!(collected, vec![34, 29]);
1415    }
1416
1417    #[tokio::test]
1418    async fn test_execute_skip_without_limit() {
1419        use arrow_array::{Int64Array, RecordBatch};
1420        use arrow_schema::{DataType, Field, Schema};
1421        use std::sync::Arc;
1422
1423        let schema = Arc::new(Schema::new(vec![Field::new("age", DataType::Int64, true)]));
1424        let batch = RecordBatch::try_new(
1425            schema,
1426            vec![Arc::new(Int64Array::from(vec![10, 20, 30, 40]))],
1427        )
1428        .unwrap();
1429
1430        let cfg = GraphConfig::builder()
1431            .with_node_label("Person", "id")
1432            .build()
1433            .unwrap();
1434
1435        let q = CypherQuery::new("MATCH (p:Person) RETURN p.age ORDER BY p.age ASC SKIP 2")
1436            .unwrap()
1437            .with_config(cfg);
1438
1439        let mut data = HashMap::new();
1440        data.insert("people".to_string(), batch);
1441
1442        let out = q.execute_simple(data).await.unwrap();
1443        assert_eq!(out.num_rows(), 2);
1444        let ages = out.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
1445        let collected: Vec<i64> = (0..out.num_rows()).map(|i| ages.value(i)).collect();
1446        assert_eq!(collected, vec![30, 40]);
1447    }
1448
1449    #[tokio::test]
1450    async fn test_execute_datafusion_pipeline() {
1451        use arrow_array::{Int64Array, RecordBatch, StringArray};
1452        use arrow_schema::{DataType, Field, Schema};
1453        use std::sync::Arc;
1454
1455        // Create test data
1456        let schema = Arc::new(Schema::new(vec![
1457            Field::new("id", DataType::Int64, false),
1458            Field::new("name", DataType::Utf8, false),
1459            Field::new("age", DataType::Int64, false),
1460        ]));
1461
1462        let batch = RecordBatch::try_new(
1463            schema,
1464            vec![
1465                Arc::new(Int64Array::from(vec![1, 2, 3])),
1466                Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
1467                Arc::new(Int64Array::from(vec![25, 35, 30])),
1468            ],
1469        )
1470        .unwrap();
1471
1472        let cfg = GraphConfig::builder()
1473            .with_node_label("Person", "id")
1474            .build()
1475            .unwrap();
1476
1477        // Test simple node query with DataFusion pipeline
1478        let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name")
1479            .unwrap()
1480            .with_config(cfg);
1481
1482        let mut datasets = HashMap::new();
1483        datasets.insert("Person".to_string(), batch);
1484
1485        // Execute using the new DataFusion pipeline
1486        let result = query.execute_datafusion(datasets.clone()).await;
1487
1488        match &result {
1489            Ok(batch) => {
1490                println!(
1491                    "DataFusion result: {} rows, {} columns",
1492                    batch.num_rows(),
1493                    batch.num_columns()
1494                );
1495                if batch.num_rows() > 0 {
1496                    println!("First row data: {:?}", batch.slice(0, 1));
1497                }
1498            }
1499            Err(e) => {
1500                println!("DataFusion execution failed: {:?}", e);
1501            }
1502        }
1503
1504        // For comparison, try legacy execution
1505        let legacy_result = query.execute_simple(datasets).await.unwrap();
1506        println!(
1507            "Legacy result: {} rows, {} columns",
1508            legacy_result.num_rows(),
1509            legacy_result.num_columns()
1510        );
1511
1512        let result = result.unwrap();
1513
1514        // Verify correct filtering: should return 1 row (Bob with age > 30)
1515        assert_eq!(
1516            result.num_rows(),
1517            1,
1518            "Expected 1 row after filtering WHERE p.age > 30"
1519        );
1520
1521        // Verify correct projection: should return 1 column (name)
1522        assert_eq!(
1523            result.num_columns(),
1524            1,
1525            "Expected 1 column after projection RETURN p.name"
1526        );
1527
1528        // Verify correct data: should contain "Bob"
1529        let names = result
1530            .column(0)
1531            .as_any()
1532            .downcast_ref::<StringArray>()
1533            .unwrap();
1534        assert_eq!(
1535            names.value(0),
1536            "Bob",
1537            "Expected filtered result to contain Bob"
1538        );
1539    }
1540
1541    #[tokio::test]
1542    async fn test_execute_datafusion_simple_scan() {
1543        use arrow_array::{Int64Array, RecordBatch, StringArray};
1544        use arrow_schema::{DataType, Field, Schema};
1545        use std::sync::Arc;
1546
1547        // Create test data
1548        let schema = Arc::new(Schema::new(vec![
1549            Field::new("id", DataType::Int64, false),
1550            Field::new("name", DataType::Utf8, false),
1551        ]));
1552
1553        let batch = RecordBatch::try_new(
1554            schema,
1555            vec![
1556                Arc::new(Int64Array::from(vec![1, 2])),
1557                Arc::new(StringArray::from(vec!["Alice", "Bob"])),
1558            ],
1559        )
1560        .unwrap();
1561
1562        let cfg = GraphConfig::builder()
1563            .with_node_label("Person", "id")
1564            .build()
1565            .unwrap();
1566
1567        // Test simple scan without filters
1568        let query = CypherQuery::new("MATCH (p:Person) RETURN p.name")
1569            .unwrap()
1570            .with_config(cfg);
1571
1572        let mut datasets = HashMap::new();
1573        datasets.insert("Person".to_string(), batch);
1574
1575        // Execute using DataFusion pipeline
1576        let result = query.execute_datafusion(datasets).await.unwrap();
1577
1578        // Should return all rows
1579        assert_eq!(
1580            result.num_rows(),
1581            2,
1582            "Should return all 2 rows without filtering"
1583        );
1584        assert_eq!(result.num_columns(), 1, "Should return 1 column (name)");
1585
1586        // Verify data
1587        let names = result
1588            .column(0)
1589            .as_any()
1590            .downcast_ref::<StringArray>()
1591            .unwrap();
1592        let name_set: std::collections::HashSet<String> = (0..result.num_rows())
1593            .map(|i| names.value(i).to_string())
1594            .collect();
1595        let expected: std::collections::HashSet<String> =
1596            ["Alice", "Bob"].iter().map(|s| s.to_string()).collect();
1597        assert_eq!(name_set, expected, "Should return Alice and Bob");
1598    }
1599
1600    #[tokio::test]
1601    async fn test_execute_with_context_simple_scan() {
1602        use arrow_array::{Int64Array, RecordBatch, StringArray};
1603        use arrow_schema::{DataType, Field, Schema};
1604        use datafusion::datasource::MemTable;
1605        use datafusion::execution::context::SessionContext;
1606        use std::sync::Arc;
1607
1608        // Create test data
1609        let schema = Arc::new(Schema::new(vec![
1610            Field::new("id", DataType::Int64, false),
1611            Field::new("name", DataType::Utf8, false),
1612            Field::new("age", DataType::Int64, false),
1613        ]));
1614        let batch = RecordBatch::try_new(
1615            schema.clone(),
1616            vec![
1617                Arc::new(Int64Array::from(vec![1, 2, 3])),
1618                Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])),
1619                Arc::new(Int64Array::from(vec![28, 34, 29])),
1620            ],
1621        )
1622        .unwrap();
1623
1624        // Create SessionContext and register data source
1625        let mem_table =
1626            Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap());
1627        let ctx = SessionContext::new();
1628        ctx.register_table("Person", mem_table).unwrap();
1629
1630        // Create query
1631        let cfg = GraphConfig::builder()
1632            .with_node_label("Person", "id")
1633            .build()
1634            .unwrap();
1635
1636        let query = CypherQuery::new("MATCH (p:Person) RETURN p.name")
1637            .unwrap()
1638            .with_config(cfg);
1639
1640        // Execute with context (catalog built automatically)
1641        let result = query.execute_with_context(ctx).await.unwrap();
1642
1643        // Verify results
1644        assert_eq!(result.num_rows(), 3);
1645        assert_eq!(result.num_columns(), 1);
1646
1647        let names = result
1648            .column(0)
1649            .as_any()
1650            .downcast_ref::<StringArray>()
1651            .unwrap();
1652        assert_eq!(names.value(0), "Alice");
1653        assert_eq!(names.value(1), "Bob");
1654        assert_eq!(names.value(2), "Carol");
1655    }
1656
1657    #[tokio::test]
1658    async fn test_execute_with_context_with_filter() {
1659        use arrow_array::{Int64Array, RecordBatch, StringArray};
1660        use arrow_schema::{DataType, Field, Schema};
1661        use datafusion::datasource::MemTable;
1662        use datafusion::execution::context::SessionContext;
1663        use std::sync::Arc;
1664
1665        // Create test data
1666        let schema = Arc::new(Schema::new(vec![
1667            Field::new("id", DataType::Int64, false),
1668            Field::new("name", DataType::Utf8, false),
1669            Field::new("age", DataType::Int64, false),
1670        ]));
1671        let batch = RecordBatch::try_new(
1672            schema.clone(),
1673            vec![
1674                Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
1675                Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "David"])),
1676                Arc::new(Int64Array::from(vec![28, 34, 29, 42])),
1677            ],
1678        )
1679        .unwrap();
1680
1681        // Create SessionContext
1682        let mem_table =
1683            Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap());
1684        let ctx = SessionContext::new();
1685        ctx.register_table("Person", mem_table).unwrap();
1686
1687        // Create query with filter
1688        let cfg = GraphConfig::builder()
1689            .with_node_label("Person", "id")
1690            .build()
1691            .unwrap();
1692
1693        let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age")
1694            .unwrap()
1695            .with_config(cfg);
1696
1697        // Execute with context
1698        let result = query.execute_with_context(ctx).await.unwrap();
1699
1700        // Verify: should return Bob (34) and David (42)
1701        assert_eq!(result.num_rows(), 2);
1702        assert_eq!(result.num_columns(), 2);
1703
1704        let names = result
1705            .column(0)
1706            .as_any()
1707            .downcast_ref::<StringArray>()
1708            .unwrap();
1709        let ages = result
1710            .column(1)
1711            .as_any()
1712            .downcast_ref::<Int64Array>()
1713            .unwrap();
1714
1715        let results: Vec<(String, i64)> = (0..result.num_rows())
1716            .map(|i| (names.value(i).to_string(), ages.value(i)))
1717            .collect();
1718
1719        assert!(results.contains(&("Bob".to_string(), 34)));
1720        assert!(results.contains(&("David".to_string(), 42)));
1721    }
1722
1723    #[tokio::test]
1724    async fn test_execute_with_context_relationship_traversal() {
1725        use arrow_array::{Int64Array, RecordBatch, StringArray};
1726        use arrow_schema::{DataType, Field, Schema};
1727        use datafusion::datasource::MemTable;
1728        use datafusion::execution::context::SessionContext;
1729        use std::sync::Arc;
1730
1731        // Create Person nodes
1732        let person_schema = Arc::new(Schema::new(vec![
1733            Field::new("id", DataType::Int64, false),
1734            Field::new("name", DataType::Utf8, false),
1735        ]));
1736        let person_batch = RecordBatch::try_new(
1737            person_schema.clone(),
1738            vec![
1739                Arc::new(Int64Array::from(vec![1, 2, 3])),
1740                Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])),
1741            ],
1742        )
1743        .unwrap();
1744
1745        // Create KNOWS relationships
1746        let knows_schema = Arc::new(Schema::new(vec![
1747            Field::new("src_id", DataType::Int64, false),
1748            Field::new("dst_id", DataType::Int64, false),
1749            Field::new("since", DataType::Int64, false),
1750        ]));
1751        let knows_batch = RecordBatch::try_new(
1752            knows_schema.clone(),
1753            vec![
1754                Arc::new(Int64Array::from(vec![1, 2])),
1755                Arc::new(Int64Array::from(vec![2, 3])),
1756                Arc::new(Int64Array::from(vec![2020, 2021])),
1757            ],
1758        )
1759        .unwrap();
1760
1761        // Create SessionContext and register tables
1762        let person_table = Arc::new(
1763            MemTable::try_new(person_schema.clone(), vec![vec![person_batch.clone()]]).unwrap(),
1764        );
1765        let knows_table = Arc::new(
1766            MemTable::try_new(knows_schema.clone(), vec![vec![knows_batch.clone()]]).unwrap(),
1767        );
1768
1769        let ctx = SessionContext::new();
1770        ctx.register_table("Person", person_table).unwrap();
1771        ctx.register_table("KNOWS", knows_table).unwrap();
1772
1773        // Create query
1774        let cfg = GraphConfig::builder()
1775            .with_node_label("Person", "id")
1776            .with_relationship("KNOWS", "src_id", "dst_id")
1777            .build()
1778            .unwrap();
1779
1780        let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name, b.name")
1781            .unwrap()
1782            .with_config(cfg);
1783
1784        // Execute with context
1785        let result = query.execute_with_context(ctx).await.unwrap();
1786
1787        // Verify: should return 2 relationships (Alice->Bob, Bob->Carol)
1788        assert_eq!(result.num_rows(), 2);
1789        assert_eq!(result.num_columns(), 2);
1790
1791        let src_names = result
1792            .column(0)
1793            .as_any()
1794            .downcast_ref::<StringArray>()
1795            .unwrap();
1796        let dst_names = result
1797            .column(1)
1798            .as_any()
1799            .downcast_ref::<StringArray>()
1800            .unwrap();
1801
1802        let relationships: Vec<(String, String)> = (0..result.num_rows())
1803            .map(|i| {
1804                (
1805                    src_names.value(i).to_string(),
1806                    dst_names.value(i).to_string(),
1807                )
1808            })
1809            .collect();
1810
1811        assert!(relationships.contains(&("Alice".to_string(), "Bob".to_string())));
1812        assert!(relationships.contains(&("Bob".to_string(), "Carol".to_string())));
1813    }
1814
1815    #[tokio::test]
1816    async fn test_execute_with_context_order_by_limit() {
1817        use arrow_array::{Int64Array, RecordBatch, StringArray};
1818        use arrow_schema::{DataType, Field, Schema};
1819        use datafusion::datasource::MemTable;
1820        use datafusion::execution::context::SessionContext;
1821        use std::sync::Arc;
1822
1823        // Create test data
1824        let schema = Arc::new(Schema::new(vec![
1825            Field::new("id", DataType::Int64, false),
1826            Field::new("name", DataType::Utf8, false),
1827            Field::new("score", DataType::Int64, false),
1828        ]));
1829        let batch = RecordBatch::try_new(
1830            schema.clone(),
1831            vec![
1832                Arc::new(Int64Array::from(vec![1, 2, 3, 4])),
1833                Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "David"])),
1834                Arc::new(Int64Array::from(vec![85, 92, 78, 95])),
1835            ],
1836        )
1837        .unwrap();
1838
1839        // Create SessionContext
1840        let mem_table =
1841            Arc::new(MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap());
1842        let ctx = SessionContext::new();
1843        ctx.register_table("Student", mem_table).unwrap();
1844
1845        // Create query with ORDER BY and LIMIT
1846        let cfg = GraphConfig::builder()
1847            .with_node_label("Student", "id")
1848            .build()
1849            .unwrap();
1850
1851        let query = CypherQuery::new(
1852            "MATCH (s:Student) RETURN s.name, s.score ORDER BY s.score DESC LIMIT 2",
1853        )
1854        .unwrap()
1855        .with_config(cfg);
1856
1857        // Execute with context
1858        let result = query.execute_with_context(ctx).await.unwrap();
1859
1860        // Verify: should return top 2 scores (David: 95, Bob: 92)
1861        assert_eq!(result.num_rows(), 2);
1862        assert_eq!(result.num_columns(), 2);
1863
1864        let names = result
1865            .column(0)
1866            .as_any()
1867            .downcast_ref::<StringArray>()
1868            .unwrap();
1869        let scores = result
1870            .column(1)
1871            .as_any()
1872            .downcast_ref::<Int64Array>()
1873            .unwrap();
1874
1875        // First row should be David (95)
1876        assert_eq!(names.value(0), "David");
1877        assert_eq!(scores.value(0), 95);
1878
1879        // Second row should be Bob (92)
1880        assert_eq!(names.value(1), "Bob");
1881        assert_eq!(scores.value(1), 92);
1882    }
1883
1884    #[tokio::test]
1885    async fn test_to_sql() {
1886        use arrow_array::RecordBatch;
1887        use arrow_schema::{DataType, Field, Schema};
1888        use std::collections::HashMap;
1889        use std::sync::Arc;
1890
1891        let schema = Arc::new(Schema::new(vec![
1892            Field::new("id", DataType::Int64, false),
1893            Field::new("name", DataType::Utf8, false),
1894        ]));
1895        let batch = RecordBatch::new_empty(schema.clone());
1896
1897        let mut datasets = HashMap::new();
1898        datasets.insert("Person".to_string(), batch);
1899
1900        let cfg = GraphConfig::builder()
1901            .with_node_label("Person", "id")
1902            .build()
1903            .unwrap();
1904
1905        let query = CypherQuery::new("MATCH (p:Person) RETURN p.name")
1906            .unwrap()
1907            .with_config(cfg);
1908
1909        let sql = query.to_sql(datasets).await.unwrap();
1910        println!("Generated SQL: {}", sql);
1911
1912        assert!(sql.contains("SELECT"));
1913        assert!(sql.to_lowercase().contains("from person"));
1914        // Note: DataFusion unparser might quote identifiers or use aliases
1915        // We check for "p.name" which is the expected output alias
1916        assert!(sql.contains("p.name"));
1917    }
1918}