Skip to main content

uni_query/query/df_graph/
expr_compiler.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9    PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
12use crate::query::df_graph::reduce::ReduceExecExpr;
13use crate::query::planner::QueryPlanner;
14use anyhow::{Result, anyhow};
15use arrow_array::builder::BooleanBuilder;
16use arrow_schema::{DataType, Field, Schema};
17use datafusion::execution::context::SessionState;
18use datafusion::physical_expr::expressions::binary;
19use datafusion::physical_plan::PhysicalExpr;
20use datafusion::physical_planner::PhysicalPlanner;
21use datafusion::prelude::SessionContext;
22use parking_lot::RwLock;
23use std::collections::{HashMap, HashSet};
24use std::sync::Arc;
25use uni_common::Value;
26use uni_common::core::schema::Schema as UniSchema;
27use uni_cypher::ast::{
28    BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
29    Statement, UnaryOp, UnwindClause, WithClause,
30};
31use uni_store::storage::manager::StorageManager;
32
33/// Check if a data type represents CypherValue (LargeBinary).
34fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
35    dt.is_some_and(|t| *t == DataType::LargeBinary)
36}
37
38/// Resolve the element type for a list expression.
39///
40/// Extracts the element type from List/LargeList/Null/LargeBinary data types.
41/// Falls back to the provided fallback type for LargeBinary, and returns an
42/// error with the provided context for unsupported types.
43///
44/// # Errors
45///
46/// Returns an error if the data type is not a recognized list type.
47fn resolve_list_element_type(
48    list_data_type: &DataType,
49    large_binary_fallback: DataType,
50    context: &str,
51) -> Result<DataType> {
52    match list_data_type {
53        DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
54        DataType::Null => Ok(DataType::Null),
55        DataType::LargeBinary => Ok(large_binary_fallback),
56        _ => Err(anyhow!(
57            "{} input must be a list, got {:?}",
58            context,
59            list_data_type
60        )),
61    }
62}
63
64/// Physical expression wrapper that converts LargeList<T> to LargeBinary (CypherValue).
65///
66/// Used in CASE expressions to unify branch types when mixing typed lists
67/// (e.g., from list comprehensions) with CypherValue-encoded lists.
68#[derive(Debug)]
69struct LargeListToCypherValueExpr {
70    child: Arc<dyn PhysicalExpr>,
71}
72
73impl LargeListToCypherValueExpr {
74    fn new(child: Arc<dyn PhysicalExpr>) -> Self {
75        Self { child }
76    }
77}
78
79impl std::fmt::Display for LargeListToCypherValueExpr {
80    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
81        write!(f, "LargeListToCypherValue({})", self.child)
82    }
83}
84
85impl PartialEq for LargeListToCypherValueExpr {
86    fn eq(&self, other: &Self) -> bool {
87        Arc::ptr_eq(&self.child, &other.child)
88    }
89}
90
91impl Eq for LargeListToCypherValueExpr {}
92
93impl std::hash::Hash for LargeListToCypherValueExpr {
94    fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
95        // Hash based on type since we can't hash PhysicalExpr
96        std::any::type_name::<Self>().hash(_state);
97    }
98}
99
100impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
101    fn eq(&self, other: &dyn std::any::Any) -> bool {
102        other
103            .downcast_ref::<Self>()
104            .map(|x| self == x)
105            .unwrap_or(false)
106    }
107}
108
109impl PhysicalExpr for LargeListToCypherValueExpr {
110    fn as_any(&self) -> &dyn std::any::Any {
111        self
112    }
113
114    fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
115        Ok(DataType::LargeBinary)
116    }
117
118    fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
119        self.child.nullable(input_schema)
120    }
121
122    fn evaluate(
123        &self,
124        batch: &arrow_array::RecordBatch,
125    ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
126        use datafusion::arrow::compute::cast;
127        use datafusion::logical_expr::ColumnarValue;
128
129        let child_result = self.child.evaluate(batch)?;
130        let child_array = child_result.into_array(batch.num_rows())?;
131
132        // Normalize List → LargeList (pattern from quantifier.rs:182-189)
133        let list_array = if let DataType::List(field) = child_array.data_type() {
134            let target_type = DataType::LargeList(field.clone());
135            cast(&child_array, &target_type).map_err(|e| {
136                datafusion::error::DataFusionError::Execution(format!(
137                    "List to LargeList cast failed: {e}"
138                ))
139            })?
140        } else {
141            child_array.clone()
142        };
143
144        // If already LargeBinary, pass through
145        if list_array.data_type() == &DataType::LargeBinary {
146            return Ok(ColumnarValue::Array(list_array));
147        }
148
149        // Convert LargeList to CypherValue
150        if let Some(large_list) = list_array
151            .as_any()
152            .downcast_ref::<datafusion::arrow::array::LargeListArray>()
153        {
154            let cv_array =
155                crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
156            Ok(ColumnarValue::Array(cv_array))
157        } else {
158            Err(datafusion::error::DataFusionError::Execution(format!(
159                "Expected List or LargeList, got {:?}",
160                list_array.data_type()
161            )))
162        }
163    }
164
165    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
166        vec![&self.child]
167    }
168
169    fn with_new_children(
170        self: Arc<Self>,
171        children: Vec<Arc<dyn PhysicalExpr>>,
172    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
173        if children.len() != 1 {
174            return Err(datafusion::error::DataFusionError::Execution(
175                "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
176            ));
177        }
178        Ok(Arc::new(LargeListToCypherValueExpr::new(
179            children[0].clone(),
180        )))
181    }
182
183    fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
184        write!(f, "LargeListToCypherValue({})", self.child)
185    }
186}
187
188/// Compiler for converting Cypher expressions directly to DataFusion Physical Expressions.
189pub struct CypherPhysicalExprCompiler<'a> {
190    state: &'a SessionState,
191    translation_ctx: Option<&'a TranslationContext>,
192    graph_ctx: Option<Arc<GraphExecutionContext>>,
193    uni_schema: Option<Arc<UniSchema>>,
194    /// Session context for EXISTS subquery execution.
195    session_ctx: Option<Arc<RwLock<SessionContext>>>,
196    /// Storage manager for EXISTS subquery execution.
197    storage: Option<Arc<StorageManager>>,
198    /// Query parameters for EXISTS subquery execution.
199    params: HashMap<String, Value>,
200}
201
202impl<'a> CypherPhysicalExprCompiler<'a> {
203    pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
204        Self {
205            state,
206            translation_ctx,
207            graph_ctx: None,
208            uni_schema: None,
209            session_ctx: None,
210            storage: None,
211            params: HashMap::new(),
212        }
213    }
214
215    /// Build a scoped compiler that excludes the given variables from the translation context.
216    ///
217    /// When compiling inner expressions for list comprehensions, reduce, or quantifiers,
218    /// loop variables must be removed from `variable_kinds` so that property access on
219    /// them does not incorrectly generate flat columns that don't exist in the inner schema.
220    ///
221    /// If none of the `exclude_vars` are present in the current context, the returned
222    /// compiler simply reuses `self`'s translation context unchanged.
223    ///
224    /// The caller must own `scoped_ctx_slot` and keep it alive for the returned compiler's
225    /// lifetime.
226    fn scoped_compiler<'b>(
227        &'b self,
228        exclude_vars: &[&str],
229        scoped_ctx_slot: &'b mut Option<TranslationContext>,
230    ) -> CypherPhysicalExprCompiler<'b>
231    where
232        'a: 'b,
233    {
234        let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
235            exclude_vars
236                .iter()
237                .any(|v| ctx.variable_kinds.contains_key(*v))
238        });
239
240        let ctx_ref = if needs_scoping {
241            let ctx = self.translation_ctx.unwrap();
242            let mut new_kinds = ctx.variable_kinds.clone();
243            for v in exclude_vars {
244                new_kinds.remove(*v);
245            }
246            *scoped_ctx_slot = Some(TranslationContext {
247                parameters: ctx.parameters.clone(),
248                outer_values: ctx.outer_values.clone(),
249                variable_labels: ctx.variable_labels.clone(),
250                variable_kinds: new_kinds,
251                node_variable_hints: ctx.node_variable_hints.clone(),
252                mutation_edge_hints: ctx.mutation_edge_hints.clone(),
253                statement_time: ctx.statement_time,
254            });
255            scoped_ctx_slot.as_ref()
256        } else {
257            self.translation_ctx
258        };
259
260        CypherPhysicalExprCompiler {
261            state: self.state,
262            translation_ctx: ctx_ref,
263            graph_ctx: self.graph_ctx.clone(),
264            uni_schema: self.uni_schema.clone(),
265            session_ctx: self.session_ctx.clone(),
266            storage: self.storage.clone(),
267            params: self.params.clone(),
268        }
269    }
270
271    /// Attach graph context and schema for pattern comprehension support.
272    pub fn with_graph_ctx(
273        mut self,
274        graph_ctx: Arc<GraphExecutionContext>,
275        uni_schema: Arc<UniSchema>,
276    ) -> Self {
277        self.graph_ctx = Some(graph_ctx);
278        self.uni_schema = Some(uni_schema);
279        self
280    }
281
282    /// Attach full subquery context for EXISTS support.
283    pub fn with_subquery_ctx(
284        mut self,
285        graph_ctx: Arc<GraphExecutionContext>,
286        uni_schema: Arc<UniSchema>,
287        session_ctx: Arc<RwLock<SessionContext>>,
288        storage: Arc<StorageManager>,
289        params: HashMap<String, Value>,
290    ) -> Self {
291        self.graph_ctx = Some(graph_ctx);
292        self.uni_schema = Some(uni_schema);
293        self.session_ctx = Some(session_ctx);
294        self.storage = Some(storage);
295        self.params = params;
296        self
297    }
298
299    /// Compile a Cypher expression into a DataFusion PhysicalExpr.
300    pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
301        match expr {
302            Expr::ListComprehension {
303                variable,
304                list,
305                where_clause,
306                map_expr,
307            } => self.compile_list_comprehension(
308                variable,
309                list,
310                where_clause.as_deref(),
311                map_expr,
312                input_schema,
313            ),
314            Expr::Quantifier {
315                quantifier,
316                variable,
317                list,
318                predicate,
319            } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
320            Expr::Reduce {
321                accumulator,
322                init,
323                variable,
324                list,
325                expr: expression,
326            } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
327            // For BinaryOp, check if children contain custom expressions or CypherValue types
328            Expr::BinaryOp { left, op, right } => {
329                self.compile_binary_op_dispatch(left, op, right, input_schema)
330            }
331            Expr::UnaryOp { op, expr: inner } => {
332                if matches!(op, UnaryOp::Not) {
333                    let mut inner_phy = self.compile(inner, input_schema)?;
334                    if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
335                        inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
336                    }
337                    self.compile_unary_op(op, inner_phy, input_schema)
338                } else if Self::contains_custom_expr(inner) {
339                    let inner_phy = self.compile(inner, input_schema)?;
340                    self.compile_unary_op(op, inner_phy, input_schema)
341                } else {
342                    self.compile_standard(expr, input_schema)
343                }
344            }
345            Expr::IsNull(inner) => {
346                if Self::contains_custom_expr(inner) {
347                    let inner_phy = self.compile(inner, input_schema)?;
348                    Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
349                        .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
350                } else {
351                    self.compile_standard(expr, input_schema)
352                }
353            }
354            Expr::IsNotNull(inner) => {
355                if Self::contains_custom_expr(inner) {
356                    let inner_phy = self.compile(inner, input_schema)?;
357                    Ok(
358                        datafusion::physical_expr::expressions::is_not_null(inner_phy)
359                            .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
360                    )
361                } else {
362                    self.compile_standard(expr, input_schema)
363                }
364            }
365            // In operator is Expr::In { expr, list }
366            Expr::In {
367                expr: left,
368                list: right,
369            } => {
370                if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
371                    let left_phy = self.compile(left, input_schema)?;
372                    let right_phy = self.compile(right, input_schema)?;
373
374                    let left_type = left_phy
375                        .data_type(input_schema)
376                        .unwrap_or(DataType::LargeBinary);
377                    let right_type = right_phy
378                        .data_type(input_schema)
379                        .unwrap_or(DataType::LargeBinary);
380
381                    self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
382                        .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
383                } else {
384                    self.compile_standard(expr, input_schema)
385                }
386            }
387
388            // Recursively check other composite types if necessary.
389            Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
390                "List literals containing comprehensions not yet supported in compiler"
391            )),
392            Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
393                Err(anyhow!(
394                    "Map literals containing comprehensions not yet supported in compiler"
395                ))
396            }
397
398            // Property access on a struct column — e.g. `x.a` where `x` is Struct
399            Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
400
401            // Bracket access on a struct column — e.g. `x['a']` where `x` is Struct
402            Expr::ArrayIndex { array, index } => {
403                self.compile_array_index(array, index, input_schema)
404            }
405
406            // Pattern comprehension: [(a)-[:REL]->(b) WHERE pred | expr]
407            Expr::PatternComprehension {
408                path_variable,
409                pattern,
410                where_clause,
411                map_expr,
412            } => self.compile_pattern_comprehension(
413                path_variable,
414                pattern,
415                where_clause.as_deref(),
416                map_expr,
417                input_schema,
418            ),
419
420            // EXISTS subquery: plan + execute per row, return boolean
421            Expr::Exists { query, .. } => self.compile_exists(query),
422
423            // FunctionCall wrapping a custom expression (e.g. size(comprehension))
424            Expr::FunctionCall {
425                name,
426                args,
427                distinct,
428                ..
429            } => {
430                if args.iter().any(Self::contains_custom_expr) {
431                    self.compile_function_with_custom_args(name, args, *distinct, input_schema)
432                } else {
433                    self.compile_standard(expr, input_schema)
434                }
435            }
436
437            // CASE expression - dispatch based on whether it contains custom expressions
438            Expr::Case {
439                expr: case_operand,
440                when_then,
441                else_expr,
442            } => {
443                // Check if operand or any branch contains custom expressions
444                let has_custom = case_operand
445                    .as_deref()
446                    .is_some_and(Self::contains_custom_expr)
447                    || when_then.iter().any(|(w, t)| {
448                        Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
449                    })
450                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
451
452                if has_custom {
453                    // Use compile_case() for CypherValue boolean conversion and
454                    // LargeList/LargeBinary type unification
455                    self.compile_case(case_operand, when_then, else_expr, input_schema)
456                } else {
457                    // Standard compilation path - goes through apply_type_coercion which handles:
458                    // 1. Simple CASE → Generic CASE rewriting with cross-type equality
459                    // 2. Type coercion for CASE result branches
460                    // 3. Numeric widening for comparisons
461                    self.compile_standard(expr, input_schema)
462                }
463            }
464
465            // LabelCheck: delegate to standard compilation (uses cypher_expr_to_df)
466            Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
467
468            // Default to standard compilation for leaf nodes or non-custom trees
469            _ => self.compile_standard(expr, input_schema),
470        }
471    }
472
473    /// Dispatch binary op compilation, checking for custom expressions and CypherValue types.
474    fn compile_binary_op_dispatch(
475        &self,
476        left: &Expr,
477        op: &BinaryOp,
478        right: &Expr,
479        input_schema: &Schema,
480    ) -> Result<Arc<dyn PhysicalExpr>> {
481        if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
482            && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
483            && let Some(ctx) = self.translation_ctx
484            && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
485        {
486            let identity_prop = match (lk, rk) {
487                (VariableKind::Node, VariableKind::Node) => Some("_vid"),
488                (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
489                _ => None,
490            };
491
492            if let Some(id_prop) = identity_prop {
493                return self.compile_standard(
494                    &Expr::BinaryOp {
495                        left: Box::new(Expr::Property(
496                            Box::new(Expr::Variable(lv.clone())),
497                            id_prop.to_string(),
498                        )),
499                        op: *op,
500                        right: Box::new(Expr::Property(
501                            Box::new(Expr::Variable(rv.clone())),
502                            id_prop.to_string(),
503                        )),
504                    },
505                    input_schema,
506                );
507            }
508        }
509
510        // XOR and Pow: always route through compile_standard.
511        // compile_binary_op does not support these operators. The standard path
512        // correctly maps XOR → _cypher_xor UDF and Pow → power() function.
513        if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
514            return self.compile_standard(
515                &Expr::BinaryOp {
516                    left: Box::new(left.clone()),
517                    op: *op,
518                    right: Box::new(right.clone()),
519                },
520                input_schema,
521            );
522        }
523
524        if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
525            let left_phy = self.compile(left, input_schema)?;
526            let right_phy = self.compile(right, input_schema)?;
527            return self.compile_binary_op(op, left_phy, right_phy, input_schema);
528        }
529
530        // For Add with a list-producing operand (AST-level detection),
531        // compile through the standard path which uses cypher_expr_to_df
532        // to correctly route list + scalar to _cypher_list_concat.
533        if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
534        {
535            return self.compile_standard(
536                &Expr::BinaryOp {
537                    left: Box::new(left.clone()),
538                    op: *op,
539                    right: Box::new(right.clone()),
540                },
541                input_schema,
542            );
543        }
544
545        // Compile sub-expressions to check their types. If either operand
546        // produces LargeBinary (CypherValue), standard Arrow kernels will fail at
547        // runtime for comparisons. Route through compile_binary_op which
548        // dispatches to Cypher comparison UDFs.
549        let left_phy = self.compile(left, input_schema)?;
550        let right_phy = self.compile(right, input_schema)?;
551        let left_dt = left_phy.data_type(input_schema).ok();
552        let right_dt = right_phy.data_type(input_schema).ok();
553        let has_cv =
554            is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
555
556        if has_cv {
557            // CypherValue types need special handling via compile_binary_op
558            self.compile_binary_op(op, left_phy, right_phy, input_schema)
559        } else {
560            // Standard types: use compile_standard to get proper type coercion
561            // (e.g., Int64 == Float64 requires coercion to work)
562            self.compile_standard(
563                &Expr::BinaryOp {
564                    left: Box::new(left.clone()),
565                    op: *op,
566                    right: Box::new(right.clone()),
567                },
568                input_schema,
569            )
570        }
571    }
572
573    /// Try to compile struct field access for a variable.
574    ///
575    /// Returns `Some(expr)` if the variable is a Struct column and can be accessed,
576    /// `None` if fallback to standard compilation is needed.
577    fn try_compile_struct_field(
578        &self,
579        var_name: &str,
580        field_name: &str,
581        input_schema: &Schema,
582    ) -> Option<Arc<dyn PhysicalExpr>> {
583        let col_idx = input_schema.index_of(var_name).ok()?;
584        let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
585            return None;
586        };
587
588        // Cypher semantics: accessing a missing key returns null
589        if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
590            let output_type = struct_fields[field_idx].data_type().clone();
591            let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
592                datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
593            );
594            Some(Arc::new(StructFieldAccessExpr::new(
595                col_expr,
596                field_idx,
597                output_type,
598            )))
599        } else {
600            Some(Arc::new(
601                datafusion::physical_expr::expressions::Literal::new(
602                    datafusion::common::ScalarValue::Null,
603                ),
604            ))
605        }
606    }
607
608    /// Compile property access on a struct column (e.g. `x.a` where `x` is Struct).
609    fn compile_property_access(
610        &self,
611        base: &Expr,
612        prop: &str,
613        input_schema: &Schema,
614    ) -> Result<Arc<dyn PhysicalExpr>> {
615        if let Expr::Variable(var_name) = base {
616            // 1. Try struct field access (e.g. `x.a` where `x` is a Struct column)
617            if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
618                return Ok(expr);
619            }
620            // 2. Try flat column "{var}.{prop}" (for pattern comprehension inner schemas)
621            let flat_col = format!("{}.{}", var_name, prop);
622            if let Ok(col_idx) = input_schema.index_of(&flat_col) {
623                return Ok(Arc::new(
624                    datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
625                ));
626            }
627        }
628        self.compile_standard(
629            &Expr::Property(Box::new(base.clone()), prop.to_string()),
630            input_schema,
631        )
632    }
633
634    /// Compile bracket access on a struct column (e.g. `x['a']` where `x` is Struct).
635    fn compile_array_index(
636        &self,
637        array: &Expr,
638        index: &Expr,
639        input_schema: &Schema,
640    ) -> Result<Arc<dyn PhysicalExpr>> {
641        if let Expr::Variable(var_name) = array
642            && let Expr::Literal(CypherLiteral::String(prop)) = index
643            && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
644        {
645            return Ok(expr);
646        }
647        self.compile_standard(
648            &Expr::ArrayIndex {
649                array: Box::new(array.clone()),
650                index: Box::new(index.clone()),
651            },
652            input_schema,
653        )
654    }
655
656    /// Compile EXISTS subquery expression.
657    fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
658        // 7.1: Validate no mutation clauses in EXISTS body
659        if has_mutation_clause(query) {
660            return Err(anyhow!(
661                "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
662            ));
663        }
664
665        let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
666
667        let graph_ctx = self
668            .graph_ctx
669            .clone()
670            .ok_or_else(|| err("GraphExecutionContext"))?;
671        let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
672        let session_ctx = self
673            .session_ctx
674            .clone()
675            .ok_or_else(|| err("SessionContext"))?;
676        let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
677
678        Ok(Arc::new(ExistsExecExpr::new(
679            query.clone(),
680            graph_ctx,
681            session_ctx,
682            storage,
683            uni_schema,
684            self.params.clone(),
685        )))
686    }
687
688    /// Check if map_expr or where_clause contains a pattern comprehension that references the variable.
689    fn needs_vid_extraction_for_variable(
690        variable: &str,
691        map_expr: &Expr,
692        where_clause: Option<&Expr>,
693    ) -> bool {
694        fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
695            match expr {
696                Expr::PatternComprehension { pattern, .. } => {
697                    // Check if pattern uses the variable
698                    pattern.paths.iter().any(|path| {
699                        path.elements.iter().any(|elem| match elem {
700                            uni_cypher::ast::PatternElement::Node(n) => {
701                                n.variable.as_deref() == Some(var)
702                            }
703                            uni_cypher::ast::PatternElement::Relationship(r) => {
704                                r.variable.as_deref() == Some(var)
705                            }
706                            _ => false,
707                        })
708                    })
709                }
710                Expr::FunctionCall { args, .. } => args
711                    .iter()
712                    .any(|a| expr_has_pattern_comp_referencing(a, var)),
713                Expr::BinaryOp { left, right, .. } => {
714                    expr_has_pattern_comp_referencing(left, var)
715                        || expr_has_pattern_comp_referencing(right, var)
716                }
717                Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
718                    expr_has_pattern_comp_referencing(e, var)
719                }
720                Expr::List(items) => items
721                    .iter()
722                    .any(|i| expr_has_pattern_comp_referencing(i, var)),
723                Expr::ListComprehension {
724                    list,
725                    map_expr,
726                    where_clause,
727                    ..
728                } => {
729                    expr_has_pattern_comp_referencing(list, var)
730                        || expr_has_pattern_comp_referencing(map_expr, var)
731                        || where_clause
732                            .as_ref()
733                            .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
734                }
735                _ => false,
736            }
737        }
738
739        expr_has_pattern_comp_referencing(map_expr, variable)
740            || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
741    }
742
743    /// Check if an expression tree contains nodes that require custom compilation.
744    pub fn contains_custom_expr(expr: &Expr) -> bool {
745        match expr {
746            Expr::ListComprehension { .. } => true,
747            Expr::Quantifier { .. } => true,
748            Expr::Reduce { .. } => true,
749            Expr::PatternComprehension { .. } => true,
750            Expr::BinaryOp { left, right, .. } => {
751                Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
752            }
753            Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
754            Expr::FunctionCall { args, .. } => args.iter().any(Self::contains_custom_expr),
755            Expr::Case {
756                when_then,
757                else_expr,
758                ..
759            } => {
760                when_then
761                    .iter()
762                    .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
763                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
764            }
765            Expr::List(items) => items.iter().any(Self::contains_custom_expr),
766            Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
767            Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
768            Expr::In { expr: l, list: r } => {
769                Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
770            }
771            Expr::Exists { .. } => true,
772            Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
773            _ => false,
774        }
775    }
776
777    /// Check if an expression statically produces a list value.
778    /// Used to route `Add` operations to `_cypher_list_concat` instead of arithmetic.
779    fn is_list_producing(expr: &Expr) -> bool {
780        match expr {
781            Expr::List(_) => true,
782            Expr::ListComprehension { .. } => true,
783            Expr::ArraySlice { .. } => true,
784            // Add with a list-producing child produces a list
785            Expr::BinaryOp {
786                left,
787                op: BinaryOp::Add,
788                right,
789            } => Self::is_list_producing(left) || Self::is_list_producing(right),
790            Expr::FunctionCall { name, .. } => {
791                // Functions known to return lists
792                matches!(
793                    name.as_str(),
794                    "range"
795                        | "tail"
796                        | "reverse"
797                        | "collect"
798                        | "keys"
799                        | "labels"
800                        | "nodes"
801                        | "relationships"
802                )
803            }
804            _ => false,
805        }
806    }
807
808    fn compile_standard(
809        &self,
810        expr: &Expr,
811        input_schema: &Schema,
812    ) -> Result<Arc<dyn PhysicalExpr>> {
813        let df_expr = cypher_expr_to_df(expr, self.translation_ctx)?;
814        let resolved_expr = self.resolve_udfs(df_expr)?;
815
816        let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
817
818        // Apply type coercion to resolve type mismatches
819        let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
820
821        // Re-resolve UDFs after coercion (coercion may introduce new dummy UDF calls)
822        let coerced_expr = self.resolve_udfs(coerced_expr)?;
823
824        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
825        planner
826            .create_physical_expr(&coerced_expr, &df_schema, self.state)
827            .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
828    }
829
830    /// Resolve UDFs in DataFusion expression using the session state registry.
831    ///
832    /// Uses `TreeNode::transform_up` to traverse the entire expression tree,
833    /// ensuring UDFs inside Cast, Case, InList, Between, etc. are all resolved.
834    fn resolve_udfs(
835        &self,
836        expr: datafusion::logical_expr::Expr,
837    ) -> Result<datafusion::logical_expr::Expr> {
838        use datafusion::common::tree_node::{Transformed, TreeNode};
839        use datafusion::logical_expr::Expr as DfExpr;
840
841        let result = expr
842            .transform_up(|node| {
843                if let DfExpr::ScalarFunction(ref func) = node {
844                    let udf_name = func.func.name();
845                    if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
846                        return Ok(Transformed::yes(DfExpr::ScalarFunction(
847                            datafusion::logical_expr::expr::ScalarFunction {
848                                func: registered_udf.clone(),
849                                args: func.args.clone(),
850                            },
851                        )));
852                    }
853                }
854                Ok(Transformed::no(node))
855            })
856            .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
857        Ok(result.data)
858    }
859
860    fn compile_list_comprehension(
861        &self,
862        variable: &str,
863        list: &Expr,
864        where_clause: Option<&Expr>,
865        map_expr: &Expr,
866        input_schema: &Schema,
867    ) -> Result<Arc<dyn PhysicalExpr>> {
868        let input_list_phy = self.compile(list, input_schema)?;
869
870        // Resolve input list type
871        let list_data_type = input_list_phy.data_type(input_schema)?;
872        let inner_data_type = resolve_list_element_type(
873            &list_data_type,
874            DataType::LargeBinary,
875            "List comprehension",
876        )?;
877
878        // Create inner schema with loop variable (shadow outer variable if same name)
879        let mut fields = input_schema.fields().to_vec();
880        let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
881
882        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
883            fields[pos] = loop_var_field;
884        } else {
885            fields.push(loop_var_field);
886        }
887
888        // Check if we need VID extraction for nested pattern comprehensions
889        let needs_vid_extraction =
890            Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
891        if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
892            // Add a {variable}._vid field for VID extraction
893            let vid_field = Arc::new(Field::new(
894                format!("{}._vid", variable),
895                DataType::UInt64,
896                true,
897            ));
898            fields.push(vid_field);
899        }
900
901        let inner_schema = Arc::new(Schema::new(fields));
902
903        // Compile inner expressions with scoped translation context
904        let mut scoped_ctx = None;
905        let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
906
907        let predicate_phy = if let Some(pred) = where_clause {
908            Some(inner_compiler.compile(pred, &inner_schema)?)
909        } else {
910            None
911        };
912
913        let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
914        let output_item_type = map_phy.data_type(&inner_schema)?;
915
916        Ok(Arc::new(ListComprehensionExecExpr::new(
917            input_list_phy,
918            map_phy,
919            predicate_phy,
920            variable.to_string(),
921            Arc::new(input_schema.clone()),
922            output_item_type,
923            needs_vid_extraction,
924        )))
925    }
926
927    fn compile_reduce(
928        &self,
929        accumulator: &str,
930        initial: &Expr,
931        variable: &str,
932        list: &Expr,
933        reduce_expr: &Expr,
934        input_schema: &Schema,
935    ) -> Result<Arc<dyn PhysicalExpr>> {
936        let list_phy = self.compile(list, input_schema)?;
937
938        let initial_phy = self.compile(initial, input_schema)?;
939        let acc_type = initial_phy.data_type(input_schema)?;
940
941        let list_data_type = list_phy.data_type(input_schema)?;
942        // For LargeBinary (CypherValue arrays), use the accumulator type as element type so the
943        // reduce body expression compiles correctly (e.g. acc + x where both are Int64).
944        let inner_data_type =
945            resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
946
947        // Create inner schema with accumulator and loop variable (shadow outer variables if same names)
948        let mut fields = input_schema.fields().to_vec();
949
950        let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
951        if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
952            fields[pos] = acc_field;
953        } else {
954            fields.push(acc_field);
955        }
956
957        let var_field = Arc::new(Field::new(variable, inner_data_type, true));
958        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
959            fields[pos] = var_field;
960        } else {
961            fields.push(var_field);
962        }
963
964        let inner_schema = Arc::new(Schema::new(fields));
965
966        // Compile reduce expression with scoped translation context
967        let mut scoped_ctx = None;
968        let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
969
970        let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
971        let output_type = reduce_phy.data_type(&inner_schema)?;
972
973        Ok(Arc::new(ReduceExecExpr::new(
974            accumulator.to_string(),
975            initial_phy,
976            variable.to_string(),
977            list_phy,
978            reduce_phy,
979            Arc::new(input_schema.clone()),
980            output_type,
981        )))
982    }
983
984    fn compile_quantifier(
985        &self,
986        quantifier: &uni_cypher::ast::Quantifier,
987        variable: &str,
988        list: &Expr,
989        predicate: &Expr,
990        input_schema: &Schema,
991    ) -> Result<Arc<dyn PhysicalExpr>> {
992        let input_list_phy = self.compile(list, input_schema)?;
993
994        // Resolve element type from list type
995        let list_data_type = input_list_phy.data_type(input_schema)?;
996        let inner_data_type =
997            resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
998
999        // Create inner schema with loop variable
1000        // If a field with the same name exists in the outer schema, replace it (shadow it)
1001        // to ensure the loop variable takes precedence.
1002        let mut fields = input_schema.fields().to_vec();
1003        let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1004
1005        // Find and replace existing field with same name, or append if not found
1006        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1007            fields[pos] = loop_var_field;
1008        } else {
1009            fields.push(loop_var_field);
1010        }
1011
1012        let inner_schema = Arc::new(Schema::new(fields));
1013
1014        // Compile predicate with a scoped translation context that removes the loop variable
1015        // from variable_kinds, so property access on the loop variable doesn't incorrectly
1016        // generate flat columns (like "x.name") that don't exist in the inner schema.
1017        let mut scoped_ctx = None;
1018        let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1019
1020        let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1021
1022        // Wrap CypherValue predicates with _cv_to_bool for proper boolean evaluation
1023        if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1024            predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1025        }
1026
1027        let qt = match quantifier {
1028            uni_cypher::ast::Quantifier::All => QuantifierType::All,
1029            uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1030            uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1031            uni_cypher::ast::Quantifier::None => QuantifierType::None,
1032        };
1033
1034        Ok(Arc::new(QuantifierExecExpr::new(
1035            input_list_phy,
1036            predicate_phy,
1037            variable.to_string(),
1038            Arc::new(input_schema.clone()),
1039            qt,
1040        )))
1041    }
1042
1043    fn compile_pattern_comprehension(
1044        &self,
1045        path_variable: &Option<String>,
1046        pattern: &uni_cypher::ast::Pattern,
1047        where_clause: Option<&Expr>,
1048        map_expr: &Expr,
1049        input_schema: &Schema,
1050    ) -> Result<Arc<dyn PhysicalExpr>> {
1051        let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1052
1053        let graph_ctx = self
1054            .graph_ctx
1055            .as_ref()
1056            .ok_or_else(|| err("GraphExecutionContext"))?;
1057        let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1058
1059        // 1. Analyze pattern to get anchor column and traversal steps
1060        let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1061
1062        // 2. Collect needed properties from where_clause and map_expr
1063        let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1064
1065        // 3. Build inner schema
1066        let inner_schema = build_inner_schema(
1067            input_schema,
1068            &steps,
1069            &vertex_props,
1070            &edge_props,
1071            path_variable.as_deref(),
1072        );
1073
1074        // 4. Compile predicate and map_expr against inner schema
1075        let pred_phy = where_clause
1076            .map(|p| self.compile(p, &inner_schema))
1077            .transpose()?;
1078        let map_phy = self.compile(map_expr, &inner_schema)?;
1079        let output_type = map_phy.data_type(&inner_schema)?;
1080
1081        // 5. Return expression
1082        Ok(Arc::new(PatternComprehensionExecExpr::new(
1083            graph_ctx.clone(),
1084            anchor_col,
1085            steps,
1086            path_variable.clone(),
1087            pred_phy,
1088            map_phy,
1089            Arc::new(input_schema.clone()),
1090            Arc::new(inner_schema),
1091            output_type,
1092            vertex_props,
1093            edge_props,
1094        )))
1095    }
1096
1097    /// Compile a function call whose arguments contain custom expressions.
1098    ///
1099    /// Recursively compiles each argument via `self.compile()`, then looks up
1100    /// the corresponding UDF in the session registry and builds the physical
1101    /// expression.
1102    fn compile_function_with_custom_args(
1103        &self,
1104        name: &str,
1105        args: &[Expr],
1106        _distinct: bool,
1107        input_schema: &Schema,
1108    ) -> Result<Arc<dyn PhysicalExpr>> {
1109        // 1. Recursively compile each argument
1110        let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1111            .iter()
1112            .map(|arg| self.compile(arg, input_schema))
1113            .collect::<Result<Vec<_>>>()?;
1114
1115        // 2. Resolve UDF name and look it up in the registry
1116        let udf_name = Self::cypher_fn_to_udf(name);
1117        let udf = self
1118            .state
1119            .scalar_functions()
1120            .get(udf_name.as_str())
1121            .ok_or_else(|| {
1122                anyhow!(
1123                    "UDF '{}' not found in registry for function '{}'",
1124                    udf_name,
1125                    name
1126                )
1127            })?;
1128
1129        // 3. Build operand type list from compiled args
1130        let operand_types: Vec<(&str, DataType)> = compiled_args
1131            .iter()
1132            .enumerate()
1133            .map(|(i, arg)| {
1134                let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1135                // Use a unique placeholder name per argument
1136                let placeholder: &str = match i {
1137                    0 => "__arg0__",
1138                    1 => "__arg1__",
1139                    2 => "__arg2__",
1140                    _ => "__argN__",
1141                };
1142                (placeholder, dt)
1143            })
1144            .collect();
1145
1146        // 4. Build dummy column references for the UDF logical expression
1147        let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1148            .iter()
1149            .map(|(name, _)| {
1150                datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1151                    None::<String>,
1152                    *name,
1153                ))
1154            })
1155            .collect();
1156
1157        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1158            datafusion::logical_expr::expr::ScalarFunction {
1159                func: udf.clone(),
1160                args: dummy_cols,
1161            },
1162        );
1163
1164        // 5. Plan and rebind
1165        self.plan_udf_physical_expr(
1166            &udf_expr,
1167            &operand_types
1168                .iter()
1169                .map(|(n, dt)| (*n, dt.clone()))
1170                .collect::<Vec<_>>(),
1171            compiled_args,
1172            &format!("function {}", name),
1173        )
1174    }
1175
1176    /// Map a Cypher function name to the registered UDF name.
1177    ///
1178    /// Mirrors the mapping in `translate_function_call` from `df_expr.rs`.
1179    /// The registered UDF names are always lowercase.
1180    fn cypher_fn_to_udf(name: &str) -> String {
1181        match name.to_uppercase().as_str() {
1182            "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1183            "REVERSE" => "_cypher_reverse".to_string(),
1184            "TOSTRING" => "tostring".to_string(),
1185            "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1186            "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1187            "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1188            "HEAD" => "head".to_string(),
1189            "LAST" => "last".to_string(),
1190            "TAIL" => "tail".to_string(),
1191            "KEYS" => "keys".to_string(),
1192            "TYPE" => "type".to_string(),
1193            "PROPERTIES" => "properties".to_string(),
1194            "LABELS" => "labels".to_string(),
1195            "COALESCE" => "coalesce".to_string(),
1196            "ID" => "id".to_string(),
1197            // Fallback: lowercase the name (matches dummy_udf_expr behavior)
1198            _ => name.to_lowercase(),
1199        }
1200    }
1201
1202    /// Compile a CASE expression with custom sub-expressions in branches.
1203    ///
1204    /// Recursively compiles the operand, each when/then pair, and the else
1205    /// branch, then builds a `CaseExpr` physical expression.
1206    ///
1207    /// Applies two fixes for TCK compliance:
1208    /// 1. Wraps CypherValue WHEN conditions with `_cv_to_bool` for proper boolean evaluation
1209    /// 2. Unifies branch types when mixing LargeList and LargeBinary to avoid cast errors
1210    fn compile_case(
1211        &self,
1212        operand: &Option<Box<Expr>>,
1213        when_then: &[(Expr, Expr)],
1214        else_expr: &Option<Box<Expr>>,
1215        input_schema: &Schema,
1216    ) -> Result<Arc<dyn PhysicalExpr>> {
1217        let operand_phy = operand
1218            .as_deref()
1219            .map(|e| self.compile(e, input_schema))
1220            .transpose()?;
1221
1222        let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1223            .iter()
1224            .map(|(w, t)| {
1225                let w_phy = self.compile(w, input_schema)?;
1226                let t_phy = self.compile(t, input_schema)?;
1227                Ok((w_phy, t_phy))
1228            })
1229            .collect::<Result<Vec<_>>>()?;
1230
1231        let mut else_phy = else_expr
1232            .as_deref()
1233            .map(|e| self.compile(e, input_schema))
1234            .transpose()?;
1235
1236        // Fix B: Wrap CypherValue WHEN conditions with _cv_to_bool
1237        // Apply wrapping defensively - if we can't determine type or if it's LargeBinary
1238        for (w_phy, _) in &mut when_then_phy {
1239            let should_wrap = match w_phy.data_type(input_schema) {
1240                Ok(dt) => dt == DataType::LargeBinary,
1241                Err(_) => {
1242                    // If we can't determine the type, check if it's likely CypherValue by looking
1243                    // for common patterns (column references, function calls that might return CypherValue)
1244                    // For now, be conservative and don't wrap if we can't determine type
1245                    false
1246                }
1247            };
1248            if should_wrap {
1249                *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1250            }
1251        }
1252
1253        // Fix A: Unify branch types when mixing LargeList and LargeBinary
1254        // Collect all branch data types
1255        let mut branch_types: Vec<DataType> = Vec::new();
1256        for (_, t_phy) in &when_then_phy {
1257            if let Ok(dt) = t_phy.data_type(input_schema) {
1258                branch_types.push(dt);
1259            }
1260        }
1261        if let Some(ref e_phy) = else_phy
1262            && let Ok(dt) = e_phy.data_type(input_schema)
1263        {
1264            branch_types.push(dt);
1265        }
1266
1267        // Check if we have a mix of LargeBinary and LargeList/List types
1268        let has_large_binary = branch_types
1269            .iter()
1270            .any(|dt| matches!(dt, DataType::LargeBinary));
1271        let has_list = branch_types
1272            .iter()
1273            .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1274
1275        // If we have both, wrap List/LargeList branches with LargeListToCypherValueExpr
1276        if has_large_binary && has_list {
1277            for (_, t_phy) in &mut when_then_phy {
1278                if let Ok(dt) = t_phy.data_type(input_schema)
1279                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1280                {
1281                    *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1282                }
1283            }
1284            if let Some(e_phy) = else_phy.take() {
1285                if let Ok(dt) = e_phy.data_type(input_schema)
1286                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1287                {
1288                    else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1289                } else {
1290                    else_phy = Some(e_phy);
1291                }
1292            }
1293        }
1294
1295        let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1296            operand_phy,
1297            when_then_phy,
1298            else_phy,
1299        )
1300        .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1301
1302        Ok(Arc::new(case_expr))
1303    }
1304
1305    fn compile_binary_op(
1306        &self,
1307        op: &BinaryOp,
1308        left: Arc<dyn PhysicalExpr>,
1309        right: Arc<dyn PhysicalExpr>,
1310        input_schema: &Schema,
1311    ) -> Result<Arc<dyn PhysicalExpr>> {
1312        use datafusion::logical_expr::Operator;
1313
1314        // String operators use custom physical expr for safe type handling
1315        let string_op = match op {
1316            BinaryOp::StartsWith => Some(StringOp::StartsWith),
1317            BinaryOp::EndsWith => Some(StringOp::EndsWith),
1318            BinaryOp::Contains => Some(StringOp::Contains),
1319            _ => None,
1320        };
1321        if let Some(sop) = string_op {
1322            return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1323        }
1324
1325        let df_op = match op {
1326            BinaryOp::Add => Operator::Plus,
1327            BinaryOp::Sub => Operator::Minus,
1328            BinaryOp::Mul => Operator::Multiply,
1329            BinaryOp::Div => Operator::Divide,
1330            BinaryOp::Mod => Operator::Modulo,
1331            BinaryOp::Eq => Operator::Eq,
1332            BinaryOp::NotEq => Operator::NotEq,
1333            BinaryOp::Gt => Operator::Gt,
1334            BinaryOp::GtEq => Operator::GtEq,
1335            BinaryOp::Lt => Operator::Lt,
1336            BinaryOp::LtEq => Operator::LtEq,
1337            BinaryOp::And => Operator::And,
1338            BinaryOp::Or => Operator::Or,
1339            BinaryOp::Xor => {
1340                return Err(anyhow!(
1341                    "XOR not supported via binary helper, use bitwise_xor"
1342                ));
1343            }
1344            BinaryOp::Regex => Operator::RegexMatch,
1345            BinaryOp::ApproxEq => {
1346                return Err(anyhow!(
1347                    "ApproxEq (~=) not yet supported in physical compiler"
1348                ));
1349            }
1350            BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1351            _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1352        };
1353
1354        // When either operand is LargeBinary (CypherValue), standard Arrow comparison
1355        // kernels can't handle the type mismatch. Route through Cypher comparison
1356        // UDFs which decode CypherValue to Value for comparison.
1357        let mut left = left;
1358        let mut right = right;
1359        let left_type = left.data_type(input_schema).ok();
1360        let right_type = right.data_type(input_schema).ok();
1361
1362        // Type unification: if one side is LargeList and the other is LargeBinary,
1363        // convert LargeList to LargeBinary for consistent handling
1364        let left_is_list = matches!(
1365            left_type.as_ref(),
1366            Some(DataType::List(_) | DataType::LargeList(_))
1367        );
1368        let right_is_list = matches!(
1369            right_type.as_ref(),
1370            Some(DataType::List(_) | DataType::LargeList(_))
1371        );
1372        let left_is_binary = matches!(left_type.as_ref(), Some(DataType::LargeBinary));
1373        let right_is_binary = matches!(right_type.as_ref(), Some(DataType::LargeBinary));
1374
1375        if left_is_list && right_is_binary {
1376            left = Arc::new(LargeListToCypherValueExpr::new(left));
1377        } else if right_is_list && left_is_binary {
1378            right = Arc::new(LargeListToCypherValueExpr::new(right));
1379        }
1380
1381        // Recalculate types after unification
1382        let left_type = left.data_type(input_schema).ok();
1383        let right_type = right.data_type(input_schema).ok();
1384        let has_cv =
1385            is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1386
1387        if has_cv {
1388            if let Some(result) = self.compile_cv_comparison(
1389                df_op,
1390                left.clone(),
1391                right.clone(),
1392                &left_type,
1393                &right_type,
1394            )? {
1395                return Ok(result);
1396            }
1397            if let Some(result) = self.compile_cv_list_concat(
1398                left.clone(),
1399                right.clone(),
1400                &left_type,
1401                &right_type,
1402                df_op,
1403            )? {
1404                return Ok(result);
1405            }
1406            if let Some(result) = self.compile_cv_arithmetic(
1407                df_op,
1408                left.clone(),
1409                right.clone(),
1410                &left_type,
1411                &right_type,
1412                input_schema,
1413            )? {
1414                return Ok(result);
1415            }
1416        }
1417
1418        // Use DataFusion's binary physical expression creator which handles coercion
1419        binary(left, df_op, right, input_schema)
1420            .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1421    }
1422
1423    /// Compile CypherValue comparison using Cypher UDFs.
1424    fn compile_cv_comparison(
1425        &self,
1426        df_op: datafusion::logical_expr::Operator,
1427        left: Arc<dyn PhysicalExpr>,
1428        right: Arc<dyn PhysicalExpr>,
1429        left_type: &Option<DataType>,
1430        right_type: &Option<DataType>,
1431    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1432        use datafusion::logical_expr::Operator;
1433
1434        let udf_name = match df_op {
1435            Operator::Eq => "_cypher_equal",
1436            Operator::NotEq => "_cypher_not_equal",
1437            Operator::Gt => "_cypher_gt",
1438            Operator::GtEq => "_cypher_gt_eq",
1439            Operator::Lt => "_cypher_lt",
1440            Operator::LtEq => "_cypher_lt_eq",
1441            _ => return Ok(None),
1442        };
1443
1444        self.plan_binary_udf(
1445            udf_name,
1446            left,
1447            right,
1448            left_type.clone().unwrap_or(DataType::LargeBinary),
1449            right_type.clone().unwrap_or(DataType::LargeBinary),
1450        )
1451    }
1452
1453    /// Compile CypherValue list concatenation.
1454    fn compile_cv_list_concat(
1455        &self,
1456        left: Arc<dyn PhysicalExpr>,
1457        right: Arc<dyn PhysicalExpr>,
1458        left_type: &Option<DataType>,
1459        right_type: &Option<DataType>,
1460        df_op: datafusion::logical_expr::Operator,
1461    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1462        use datafusion::logical_expr::Operator;
1463
1464        if df_op != Operator::Plus {
1465            return Ok(None);
1466        }
1467
1468        // List concat when at least one side is a list (CypherValue or Arrow List)
1469        let is_list = |t: &Option<DataType>| {
1470            t.as_ref()
1471                .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1472        };
1473
1474        if !is_list(left_type) && !is_list(right_type) {
1475            return Ok(None);
1476        }
1477
1478        self.plan_binary_udf(
1479            "_cypher_list_concat",
1480            left,
1481            right,
1482            left_type.clone().unwrap_or(DataType::LargeBinary),
1483            right_type.clone().unwrap_or(DataType::LargeBinary),
1484        )
1485    }
1486
1487    /// Compile CypherValue arithmetic.
1488    ///
1489    /// Routes arithmetic operations through CypherValue-aware UDFs when at least
1490    /// one operand is LargeBinary (CypherValue-encoded).
1491    fn compile_cv_arithmetic(
1492        &self,
1493        df_op: datafusion::logical_expr::Operator,
1494        left: Arc<dyn PhysicalExpr>,
1495        right: Arc<dyn PhysicalExpr>,
1496        left_type: &Option<DataType>,
1497        right_type: &Option<DataType>,
1498        _input_schema: &Schema,
1499    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1500        use datafusion::logical_expr::Operator;
1501
1502        let udf_name = match df_op {
1503            Operator::Plus => "_cypher_add",
1504            Operator::Minus => "_cypher_sub",
1505            Operator::Multiply => "_cypher_mul",
1506            Operator::Divide => "_cypher_div",
1507            Operator::Modulo => "_cypher_mod",
1508            _ => return Ok(None),
1509        };
1510
1511        self.plan_binary_udf(
1512            udf_name,
1513            left,
1514            right,
1515            left_type.clone().unwrap_or(DataType::LargeBinary),
1516            right_type.clone().unwrap_or(DataType::LargeBinary),
1517        )
1518    }
1519
1520    /// Plan a UDF expression with dummy schema columns, then rebind to actual physical expressions.
1521    fn plan_udf_physical_expr(
1522        &self,
1523        udf_expr: &datafusion::logical_expr::Expr,
1524        operand_types: &[(&str, DataType)],
1525        children: Vec<Arc<dyn PhysicalExpr>>,
1526        error_context: &str,
1527    ) -> Result<Arc<dyn PhysicalExpr>> {
1528        let tmp_schema = Schema::new(
1529            operand_types
1530                .iter()
1531                .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1532                .collect::<Vec<_>>(),
1533        );
1534        let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1535        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1536        let udf_phy = planner
1537            .create_physical_expr(udf_expr, &df_schema, self.state)
1538            .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1539        udf_phy
1540            .with_new_children(children)
1541            .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1542    }
1543
1544    /// Wrap a LargeBinary (CypherValue) expression with `_cv_to_bool` conversion.
1545    ///
1546    /// Used when a CypherValue expression needs to be used as a boolean (e.g., in WHEN clauses).
1547    fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1548        let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1549            return Err(anyhow!("_cv_to_bool UDF not found"));
1550        };
1551
1552        let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1553            None::<String>,
1554            "__cv__",
1555        ));
1556        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1557            datafusion::logical_expr::expr::ScalarFunction {
1558                func: udf.clone(),
1559                args: vec![dummy_col],
1560            },
1561        );
1562
1563        self.plan_udf_physical_expr(
1564            &udf_expr,
1565            &[("__cv__", DataType::LargeBinary)],
1566            vec![expr],
1567            "CypherValue to bool",
1568        )
1569    }
1570
1571    /// Plan a binary UDF with the given name and operand types.
1572    fn plan_binary_udf(
1573        &self,
1574        udf_name: &str,
1575        left: Arc<dyn PhysicalExpr>,
1576        right: Arc<dyn PhysicalExpr>,
1577        left_type: DataType,
1578        right_type: DataType,
1579    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1580        let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1581            return Ok(None);
1582        };
1583        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1584            datafusion::logical_expr::expr::ScalarFunction {
1585                func: udf.clone(),
1586                args: vec![
1587                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1588                        None::<String>,
1589                        "__left__",
1590                    )),
1591                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1592                        None::<String>,
1593                        "__right__",
1594                    )),
1595                ],
1596            },
1597        );
1598        let result = self.plan_udf_physical_expr(
1599            &udf_expr,
1600            &[("__left__", left_type), ("__right__", right_type)],
1601            vec![left, right],
1602            udf_name,
1603        )?;
1604        Ok(Some(result))
1605    }
1606
1607    fn compile_unary_op(
1608        &self,
1609        op: &UnaryOp,
1610        expr: Arc<dyn PhysicalExpr>,
1611        input_schema: &Schema,
1612    ) -> Result<Arc<dyn PhysicalExpr>> {
1613        match op {
1614            UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1615            UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1616        }
1617        .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1618    }
1619}
1620
1621use datafusion::physical_plan::DisplayAs;
1622use datafusion::physical_plan::DisplayFormatType;
1623
1624#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1625enum StringOp {
1626    StartsWith,
1627    EndsWith,
1628    Contains,
1629}
1630
1631impl std::fmt::Display for StringOp {
1632    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1633        match self {
1634            StringOp::StartsWith => write!(f, "STARTS WITH"),
1635            StringOp::EndsWith => write!(f, "ENDS WITH"),
1636            StringOp::Contains => write!(f, "CONTAINS"),
1637        }
1638    }
1639}
1640
1641#[derive(Debug, Eq)]
1642struct CypherStringMatchExpr {
1643    left: Arc<dyn PhysicalExpr>,
1644    right: Arc<dyn PhysicalExpr>,
1645    op: StringOp,
1646}
1647
1648impl PartialEq for CypherStringMatchExpr {
1649    fn eq(&self, other: &Self) -> bool {
1650        self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1651    }
1652}
1653
1654impl std::hash::Hash for CypherStringMatchExpr {
1655    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1656        self.op.hash(state);
1657        self.left.hash(state);
1658        self.right.hash(state);
1659    }
1660}
1661
1662impl CypherStringMatchExpr {
1663    fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1664        Self { left, right, op }
1665    }
1666}
1667
1668impl std::fmt::Display for CypherStringMatchExpr {
1669    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1670        write!(f, "{} {} {}", self.left, self.op, self.right)
1671    }
1672}
1673
1674impl DisplayAs for CypherStringMatchExpr {
1675    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1676        write!(f, "{}", self)
1677    }
1678}
1679
1680impl PhysicalExpr for CypherStringMatchExpr {
1681    fn as_any(&self) -> &dyn std::any::Any {
1682        self
1683    }
1684
1685    fn data_type(
1686        &self,
1687        _input_schema: &Schema,
1688    ) -> datafusion::error::Result<arrow_schema::DataType> {
1689        Ok(arrow_schema::DataType::Boolean)
1690    }
1691
1692    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1693        Ok(true)
1694    }
1695
1696    fn evaluate(
1697        &self,
1698        batch: &arrow_array::RecordBatch,
1699    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1700        use crate::query::df_udfs::invoke_cypher_string_op;
1701        use arrow_schema::Field;
1702        use datafusion::config::ConfigOptions;
1703        use datafusion::logical_expr::ScalarFunctionArgs;
1704
1705        let left_val = self.left.evaluate(batch)?;
1706        let right_val = self.right.evaluate(batch)?;
1707
1708        let args = ScalarFunctionArgs {
1709            args: vec![left_val, right_val],
1710            number_rows: batch.num_rows(),
1711            return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
1712            config_options: Arc::new(ConfigOptions::default()),
1713            arg_fields: vec![], // Not used by invoke_cypher_string_op
1714        };
1715
1716        match self.op {
1717            StringOp::StartsWith => {
1718                invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
1719            }
1720            StringOp::EndsWith => {
1721                invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
1722            }
1723            StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
1724        }
1725    }
1726
1727    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1728        vec![&self.left, &self.right]
1729    }
1730
1731    fn with_new_children(
1732        self: Arc<Self>,
1733        children: Vec<Arc<dyn PhysicalExpr>>,
1734    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1735        Ok(Arc::new(CypherStringMatchExpr::new(
1736            children[0].clone(),
1737            children[1].clone(),
1738            self.op,
1739        )))
1740    }
1741
1742    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1743        write!(f, "{}", self)
1744    }
1745}
1746
1747impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
1748    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1749        if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
1750            self == other
1751        } else {
1752            false
1753        }
1754    }
1755}
1756
1757/// Physical expression for extracting a field from a struct column.
1758///
1759/// Used when list comprehension iterates over a list of structs (maps)
1760/// and accesses a field, e.g., `[x IN [{a: 1}] | x.a]`.
1761#[derive(Debug, Eq)]
1762struct StructFieldAccessExpr {
1763    /// Expression producing the struct column.
1764    input: Arc<dyn PhysicalExpr>,
1765    /// Index of the field within the struct.
1766    field_idx: usize,
1767    /// Output data type of the extracted field.
1768    output_type: arrow_schema::DataType,
1769}
1770
1771impl PartialEq for StructFieldAccessExpr {
1772    fn eq(&self, other: &Self) -> bool {
1773        self.field_idx == other.field_idx
1774            && self.input.eq(&other.input)
1775            && self.output_type == other.output_type
1776    }
1777}
1778
1779impl std::hash::Hash for StructFieldAccessExpr {
1780    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1781        self.input.hash(state);
1782        self.field_idx.hash(state);
1783    }
1784}
1785
1786impl StructFieldAccessExpr {
1787    fn new(
1788        input: Arc<dyn PhysicalExpr>,
1789        field_idx: usize,
1790        output_type: arrow_schema::DataType,
1791    ) -> Self {
1792        Self {
1793            input,
1794            field_idx,
1795            output_type,
1796        }
1797    }
1798}
1799
1800impl std::fmt::Display for StructFieldAccessExpr {
1801    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1802        write!(f, "{}[{}]", self.input, self.field_idx)
1803    }
1804}
1805
1806impl DisplayAs for StructFieldAccessExpr {
1807    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1808        write!(f, "{}", self)
1809    }
1810}
1811
1812impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
1813    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1814        if let Some(other) = other.as_any().downcast_ref::<Self>() {
1815            self.field_idx == other.field_idx && self.input.eq(&other.input)
1816        } else {
1817            false
1818        }
1819    }
1820}
1821
1822impl PhysicalExpr for StructFieldAccessExpr {
1823    fn as_any(&self) -> &dyn std::any::Any {
1824        self
1825    }
1826
1827    fn data_type(
1828        &self,
1829        _input_schema: &Schema,
1830    ) -> datafusion::error::Result<arrow_schema::DataType> {
1831        Ok(self.output_type.clone())
1832    }
1833
1834    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1835        Ok(true)
1836    }
1837
1838    fn evaluate(
1839        &self,
1840        batch: &arrow_array::RecordBatch,
1841    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1842        use arrow_array::StructArray;
1843
1844        let input_val = self.input.evaluate(batch)?;
1845        let array = input_val.into_array(batch.num_rows())?;
1846
1847        let struct_array = array
1848            .as_any()
1849            .downcast_ref::<StructArray>()
1850            .ok_or_else(|| {
1851                datafusion::error::DataFusionError::Execution(
1852                    "StructFieldAccessExpr: input is not a StructArray".to_string(),
1853                )
1854            })?;
1855
1856        let field_col = struct_array.column(self.field_idx).clone();
1857        Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
1858    }
1859
1860    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1861        vec![&self.input]
1862    }
1863
1864    fn with_new_children(
1865        self: Arc<Self>,
1866        children: Vec<Arc<dyn PhysicalExpr>>,
1867    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1868        Ok(Arc::new(StructFieldAccessExpr::new(
1869            children[0].clone(),
1870            self.field_idx,
1871            self.output_type.clone(),
1872        )))
1873    }
1874
1875    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1876        write!(f, "{}", self)
1877    }
1878}
1879
1880// ---------------------------------------------------------------------------
1881// EXISTS subquery physical expression
1882// ---------------------------------------------------------------------------
1883
1884/// Physical expression that evaluates an EXISTS subquery per row.
1885///
1886/// For each input row, plans and executes the subquery with the row's columns
1887/// injected as parameters. Returns `true` if the subquery produces any rows.
1888///
1889/// NOT EXISTS is handled by the caller wrapping this in a NOT expression.
1890/// Nested EXISTS works because `execute_subplan` creates a full planner that
1891/// handles nested EXISTS recursively.
1892struct ExistsExecExpr {
1893    query: Query,
1894    graph_ctx: Arc<GraphExecutionContext>,
1895    session_ctx: Arc<RwLock<SessionContext>>,
1896    storage: Arc<StorageManager>,
1897    uni_schema: Arc<UniSchema>,
1898    params: HashMap<String, Value>,
1899}
1900
1901impl std::fmt::Debug for ExistsExecExpr {
1902    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1903        f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
1904    }
1905}
1906
1907impl ExistsExecExpr {
1908    fn new(
1909        query: Query,
1910        graph_ctx: Arc<GraphExecutionContext>,
1911        session_ctx: Arc<RwLock<SessionContext>>,
1912        storage: Arc<StorageManager>,
1913        uni_schema: Arc<UniSchema>,
1914        params: HashMap<String, Value>,
1915    ) -> Self {
1916        Self {
1917            query,
1918            graph_ctx,
1919            session_ctx,
1920            storage,
1921            uni_schema,
1922            params,
1923        }
1924    }
1925}
1926
1927impl std::fmt::Display for ExistsExecExpr {
1928    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1929        write!(f, "EXISTS(<subquery>)")
1930    }
1931}
1932
1933impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
1934    fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
1935        false
1936    }
1937}
1938
1939impl PartialEq for ExistsExecExpr {
1940    fn eq(&self, _other: &Self) -> bool {
1941        false
1942    }
1943}
1944
1945impl Eq for ExistsExecExpr {}
1946
1947impl std::hash::Hash for ExistsExecExpr {
1948    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1949        "ExistsExecExpr".hash(state);
1950    }
1951}
1952
1953impl DisplayAs for ExistsExecExpr {
1954    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1955        write!(f, "{}", self)
1956    }
1957}
1958
1959impl PhysicalExpr for ExistsExecExpr {
1960    fn as_any(&self) -> &dyn std::any::Any {
1961        self
1962    }
1963
1964    fn data_type(
1965        &self,
1966        _input_schema: &Schema,
1967    ) -> datafusion::error::Result<arrow_schema::DataType> {
1968        Ok(DataType::Boolean)
1969    }
1970
1971    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1972        Ok(true)
1973    }
1974
1975    #[allow(clippy::manual_try_fold)]
1976    fn evaluate(
1977        &self,
1978        batch: &arrow_array::RecordBatch,
1979    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1980        let num_rows = batch.num_rows();
1981        let mut builder = BooleanBuilder::with_capacity(num_rows);
1982
1983        // 7.2: Extract entity variable names from batch schema.
1984        // Entity columns follow the pattern "varname._vid" (flattened) or are struct columns.
1985        // We pass ONLY entity base names (e.g., "p", "n", "m") as vars_in_scope so the
1986        // subquery planner treats them as bound (Imported) variables. The initial Project
1987        // then creates Parameter("n") AS "n" etc. — the traverse reads the VID from this
1988        // column via resolve_source_vid_col's bare-name fallback.
1989        //
1990        // We intentionally do NOT include raw column names (n._vid, n._labels, etc.) in
1991        // vars_in_scope to avoid duplicate column conflicts with traverse output columns.
1992        // Parameter expressions read directly from sub_params, not from plan columns.
1993        let schema = batch.schema();
1994        let mut entity_vars: HashSet<String> = HashSet::new();
1995        for field in schema.fields() {
1996            let name = field.name();
1997            if let Some(base) = name.strip_suffix("._vid") {
1998                entity_vars.insert(base.to_string());
1999            }
2000            if matches!(field.data_type(), DataType::Struct(_)) {
2001                entity_vars.insert(name.to_string());
2002            }
2003            // Also detect bare VID columns from parent EXISTS parameter projections.
2004            // In nested EXISTS, a parent level projects "n" as a bare Int64/UInt64 VID.
2005            // Simple identifier (no dots, no leading underscore) + integer type = VID.
2006            if !name.contains('.')
2007                && !name.starts_with('_')
2008                && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2009            {
2010                entity_vars.insert(name.to_string());
2011            }
2012        }
2013        let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2014
2015        // 7.3: Rewrite correlated property accesses to parameter references.
2016        // e.g., `n.prop` where `n` is an outer entity → `$param("n.prop")`
2017        let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2018
2019        // 7.4: Plan ONCE — the rewritten query is parameterized, same for all rows.
2020        let planner = QueryPlanner::new(self.uni_schema.clone());
2021        let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2022            Ok(plan) => plan,
2023            Err(e) => {
2024                return Err(datafusion::error::DataFusionError::Execution(format!(
2025                    "EXISTS subquery planning failed: {}",
2026                    e
2027                )));
2028            }
2029        };
2030
2031        // Execute all rows on a dedicated thread with a single tokio runtime.
2032        // The runtime must be created and dropped on this thread (not in an async context).
2033        let graph_ctx = self.graph_ctx.clone();
2034        let session_ctx = self.session_ctx.clone();
2035        let storage = self.storage.clone();
2036        let uni_schema = self.uni_schema.clone();
2037        let base_params = self.params.clone();
2038
2039        let result = std::thread::scope(|s| {
2040            s.spawn(|| {
2041                let rt = tokio::runtime::Builder::new_current_thread()
2042                    .enable_all()
2043                    .build()
2044                    .map_err(|e| {
2045                        datafusion::error::DataFusionError::Execution(format!(
2046                            "Failed to create runtime for EXISTS: {}",
2047                            e
2048                        ))
2049                    })?;
2050
2051                for row_idx in 0..num_rows {
2052                    let row_params = extract_row_params(batch, row_idx);
2053                    let mut sub_params = base_params.clone();
2054                    sub_params.extend(row_params);
2055
2056                    // Add entity variable → VID value mappings so that
2057                    // Parameter("n") resolves to the VID for traversal sources.
2058                    for var in &entity_vars {
2059                        let vid_key = format!("{}._vid", var);
2060                        if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2061                            sub_params.insert(var.clone(), vid_val);
2062                        }
2063                    }
2064
2065                    let batches = rt.block_on(execute_subplan(
2066                        &logical_plan,
2067                        &sub_params,
2068                        &HashMap::new(), // No outer values for EXISTS subquery
2069                        &graph_ctx,
2070                        &session_ctx,
2071                        &storage,
2072                        &uni_schema,
2073                    ))?;
2074
2075                    let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2076                    builder.append_value(has_rows);
2077                }
2078
2079                Ok::<_, datafusion::error::DataFusionError>(())
2080            })
2081            .join()
2082            .unwrap_or_else(|_| {
2083                Err(datafusion::error::DataFusionError::Execution(
2084                    "EXISTS subquery thread panicked".to_string(),
2085                ))
2086            })
2087        });
2088
2089        if let Err(e) = result {
2090            return Err(datafusion::error::DataFusionError::Execution(format!(
2091                "EXISTS subquery execution failed: {}",
2092                e
2093            )));
2094        }
2095
2096        Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2097            builder.finish(),
2098        )))
2099    }
2100
2101    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2102        vec![]
2103    }
2104
2105    fn with_new_children(
2106        self: Arc<Self>,
2107        children: Vec<Arc<dyn PhysicalExpr>>,
2108    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2109        if !children.is_empty() {
2110            return Err(datafusion::error::DataFusionError::Plan(
2111                "ExistsExecExpr has no children".to_string(),
2112            ));
2113        }
2114        Ok(self)
2115    }
2116
2117    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2118        write!(f, "{}", self)
2119    }
2120}
2121
2122// ---------------------------------------------------------------------------
2123// EXISTS subquery helpers
2124// ---------------------------------------------------------------------------
2125
2126/// Check if a Query contains any mutation clauses (CREATE, SET, DELETE, REMOVE, MERGE).
2127/// EXISTS subqueries must be read-only per OpenCypher spec.
2128fn has_mutation_clause(query: &Query) -> bool {
2129    match query {
2130        Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2131            matches!(
2132                c,
2133                Clause::Create(_)
2134                    | Clause::Delete(_)
2135                    | Clause::Set(_)
2136                    | Clause::Remove(_)
2137                    | Clause::Merge(_)
2138            ) || has_mutation_in_clause_exprs(c)
2139        }),
2140        Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2141        _ => false,
2142    }
2143}
2144
2145/// Check if a clause contains nested EXISTS with mutations (recursive).
2146fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2147    let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2148
2149    match clause {
2150        Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2151        Clause::With(w) => {
2152            w.where_clause.as_ref().is_some_and(check_expr)
2153                || w.items.iter().any(|item| match item {
2154                    ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2155                    ReturnItem::All => false,
2156                })
2157        }
2158        Clause::Return(r) => r.items.iter().any(|item| match item {
2159            ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2160            ReturnItem::All => false,
2161        }),
2162        _ => false,
2163    }
2164}
2165
2166/// Check if an expression tree contains an EXISTS with mutation clauses.
2167fn has_mutation_in_expr(expr: &Expr) -> bool {
2168    match expr {
2169        Expr::Exists { query, .. } => has_mutation_clause(query),
2170        _ => {
2171            let mut found = false;
2172            expr.for_each_child(&mut |child| {
2173                if has_mutation_in_expr(child) {
2174                    found = true;
2175                }
2176            });
2177            found
2178        }
2179    }
2180}
2181
2182/// Rewrite a Query AST to replace correlated property accesses with parameter references.
2183///
2184/// For each `Property(Variable(v), key)` where `v` is an outer-scope entity variable,
2185/// replaces it with `Parameter("{v}.{key}")`. This enables plan-once optimization since
2186/// the rewritten query is parameterized (same structure for every row).
2187fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2188    match query {
2189        Query::Single(stmt) => Query::Single(Statement {
2190            clauses: stmt
2191                .clauses
2192                .iter()
2193                .map(|c| rewrite_clause_correlated(c, outer_vars))
2194                .collect(),
2195        }),
2196        Query::Union { left, right, all } => Query::Union {
2197            left: Box::new(rewrite_query_correlated(left, outer_vars)),
2198            right: Box::new(rewrite_query_correlated(right, outer_vars)),
2199            all: *all,
2200        },
2201        other => other.clone(),
2202    }
2203}
2204
2205/// Rewrite expressions within a clause for correlated property access.
2206fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2207    match clause {
2208        Clause::Match(m) => Clause::Match(MatchClause {
2209            optional: m.optional,
2210            pattern: m.pattern.clone(),
2211            where_clause: m
2212                .where_clause
2213                .as_ref()
2214                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2215        }),
2216        Clause::With(w) => Clause::With(WithClause {
2217            distinct: w.distinct,
2218            items: w
2219                .items
2220                .iter()
2221                .map(|item| rewrite_return_item(item, outer_vars))
2222                .collect(),
2223            order_by: w.order_by.as_ref().map(|items| {
2224                items
2225                    .iter()
2226                    .map(|si| SortItem {
2227                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2228                        ascending: si.ascending,
2229                    })
2230                    .collect()
2231            }),
2232            skip: w
2233                .skip
2234                .as_ref()
2235                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2236            limit: w
2237                .limit
2238                .as_ref()
2239                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2240            where_clause: w
2241                .where_clause
2242                .as_ref()
2243                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2244        }),
2245        Clause::Return(r) => Clause::Return(ReturnClause {
2246            distinct: r.distinct,
2247            items: r
2248                .items
2249                .iter()
2250                .map(|item| rewrite_return_item(item, outer_vars))
2251                .collect(),
2252            order_by: r.order_by.as_ref().map(|items| {
2253                items
2254                    .iter()
2255                    .map(|si| SortItem {
2256                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2257                        ascending: si.ascending,
2258                    })
2259                    .collect()
2260            }),
2261            skip: r
2262                .skip
2263                .as_ref()
2264                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2265            limit: r
2266                .limit
2267                .as_ref()
2268                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2269        }),
2270        Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2271            expr: rewrite_expr_correlated(&u.expr, outer_vars),
2272            variable: u.variable.clone(),
2273        }),
2274        other => other.clone(),
2275    }
2276}
2277
2278fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2279    match item {
2280        ReturnItem::All => ReturnItem::All,
2281        ReturnItem::Expr {
2282            expr,
2283            alias,
2284            source_text,
2285        } => ReturnItem::Expr {
2286            expr: rewrite_expr_correlated(expr, outer_vars),
2287            alias: alias.clone(),
2288            source_text: source_text.clone(),
2289        },
2290    }
2291}
2292
2293/// Rewrite a single expression: Property(Variable(v), key) → Parameter("{v}.{key}")
2294/// when v is an outer-scope entity variable. Handles nested EXISTS recursively.
2295fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2296    match expr {
2297        // Core rewrite: n.prop → $param("n.prop") when n is an outer entity
2298        Expr::Property(base, key) => {
2299            if let Expr::Variable(v) = base.as_ref()
2300                && outer_vars.contains(v)
2301            {
2302                return Expr::Parameter(format!("{}.{}", v, key));
2303            }
2304            Expr::Property(
2305                Box::new(rewrite_expr_correlated(base, outer_vars)),
2306                key.clone(),
2307            )
2308        }
2309        // Nested EXISTS — recurse into the subquery body
2310        Expr::Exists {
2311            query,
2312            from_pattern_predicate,
2313        } => Expr::Exists {
2314            query: Box::new(rewrite_query_correlated(query, outer_vars)),
2315            from_pattern_predicate: *from_pattern_predicate,
2316        },
2317        // CountSubquery and CollectSubquery — recurse into subquery body
2318        Expr::CountSubquery(query) => {
2319            Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2320        }
2321        Expr::CollectSubquery(query) => {
2322            Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2323        }
2324        // All other expressions: recursively transform children
2325        other => other
2326            .clone()
2327            .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2328    }
2329}