Skip to main content

uni_query/query/df_graph/
expr_compiler.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9    PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
12use crate::query::df_graph::reduce::ReduceExecExpr;
13use crate::query::df_graph::similar_to_expr::SimilarToExecExpr;
14use crate::query::planner::QueryPlanner;
15use crate::query::similar_to::SimilarToError;
16use anyhow::{Result, anyhow};
17use arrow_array::builder::BooleanBuilder;
18use arrow_schema::{DataType, Field, Schema};
19use datafusion::execution::context::SessionState;
20use datafusion::physical_expr::expressions::binary;
21use datafusion::physical_plan::PhysicalExpr;
22use datafusion::physical_planner::PhysicalPlanner;
23use datafusion::prelude::SessionContext;
24use parking_lot::RwLock;
25use std::collections::{HashMap, HashSet};
26use std::sync::Arc;
27use uni_common::Value;
28use uni_common::core::schema::{DistanceMetric, IndexDefinition, Schema as UniSchema};
29use uni_cypher::ast::{
30    BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
31    Statement, UnaryOp, UnwindClause, WithClause,
32};
33use uni_store::storage::manager::StorageManager;
34
35/// Check if a data type represents CypherValue (LargeBinary).
36fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
37    dt.is_some_and(|t| *t == DataType::LargeBinary)
38}
39
40/// Resolve the element type for a list expression.
41///
42/// Extracts the element type from List/LargeList/Null/LargeBinary data types.
43/// Falls back to the provided fallback type for LargeBinary, and returns an
44/// error with the provided context for unsupported types.
45///
46/// # Errors
47///
48/// Returns an error if the data type is not a recognized list type.
49fn resolve_list_element_type(
50    list_data_type: &DataType,
51    large_binary_fallback: DataType,
52    context: &str,
53) -> Result<DataType> {
54    match list_data_type {
55        DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
56        DataType::Null => Ok(DataType::Null),
57        DataType::LargeBinary => Ok(large_binary_fallback),
58        _ => Err(anyhow!(
59            "{} input must be a list, got {:?}",
60            context,
61            list_data_type
62        )),
63    }
64}
65
66/// Physical expression wrapper that converts LargeList<T> to LargeBinary (CypherValue).
67///
68/// Used in CASE expressions to unify branch types when mixing typed lists
69/// (e.g., from list comprehensions) with CypherValue-encoded lists.
70#[derive(Debug)]
71struct LargeListToCypherValueExpr {
72    child: Arc<dyn PhysicalExpr>,
73}
74
75impl LargeListToCypherValueExpr {
76    fn new(child: Arc<dyn PhysicalExpr>) -> Self {
77        Self { child }
78    }
79}
80
81impl std::fmt::Display for LargeListToCypherValueExpr {
82    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83        write!(f, "LargeListToCypherValue({})", self.child)
84    }
85}
86
87impl PartialEq for LargeListToCypherValueExpr {
88    fn eq(&self, other: &Self) -> bool {
89        Arc::ptr_eq(&self.child, &other.child)
90    }
91}
92
93impl Eq for LargeListToCypherValueExpr {}
94
95impl std::hash::Hash for LargeListToCypherValueExpr {
96    fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
97        // Hash based on type since we can't hash PhysicalExpr
98        std::any::type_name::<Self>().hash(_state);
99    }
100}
101
102impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
103    fn eq(&self, other: &dyn std::any::Any) -> bool {
104        other
105            .downcast_ref::<Self>()
106            .map(|x| self == x)
107            .unwrap_or(false)
108    }
109}
110
111impl PhysicalExpr for LargeListToCypherValueExpr {
112    fn as_any(&self) -> &dyn std::any::Any {
113        self
114    }
115
116    fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
117        Ok(DataType::LargeBinary)
118    }
119
120    fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
121        self.child.nullable(input_schema)
122    }
123
124    fn evaluate(
125        &self,
126        batch: &arrow_array::RecordBatch,
127    ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
128        use datafusion::arrow::compute::cast;
129        use datafusion::logical_expr::ColumnarValue;
130
131        let child_result = self.child.evaluate(batch)?;
132        let child_array = child_result.into_array(batch.num_rows())?;
133
134        // Normalize List → LargeList (pattern from quantifier.rs:182-189)
135        let list_array = if let DataType::List(field) = child_array.data_type() {
136            let target_type = DataType::LargeList(field.clone());
137            cast(&child_array, &target_type).map_err(|e| {
138                datafusion::error::DataFusionError::Execution(format!(
139                    "List to LargeList cast failed: {e}"
140                ))
141            })?
142        } else {
143            child_array.clone()
144        };
145
146        // If already LargeBinary, pass through
147        if list_array.data_type() == &DataType::LargeBinary {
148            return Ok(ColumnarValue::Array(list_array));
149        }
150
151        // Convert LargeList to CypherValue
152        if let Some(large_list) = list_array
153            .as_any()
154            .downcast_ref::<datafusion::arrow::array::LargeListArray>()
155        {
156            let cv_array =
157                crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
158            Ok(ColumnarValue::Array(cv_array))
159        } else {
160            Err(datafusion::error::DataFusionError::Execution(format!(
161                "Expected List or LargeList, got {:?}",
162                list_array.data_type()
163            )))
164        }
165    }
166
167    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
168        vec![&self.child]
169    }
170
171    fn with_new_children(
172        self: Arc<Self>,
173        children: Vec<Arc<dyn PhysicalExpr>>,
174    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
175        if children.len() != 1 {
176            return Err(datafusion::error::DataFusionError::Execution(
177                "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
178            ));
179        }
180        Ok(Arc::new(LargeListToCypherValueExpr::new(
181            children[0].clone(),
182        )))
183    }
184
185    fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
186        write!(f, "LargeListToCypherValue({})", self.child)
187    }
188}
189
190/// Compiler for converting Cypher expressions directly to DataFusion Physical Expressions.
191pub struct CypherPhysicalExprCompiler<'a> {
192    state: &'a SessionState,
193    translation_ctx: Option<&'a TranslationContext>,
194    graph_ctx: Option<Arc<GraphExecutionContext>>,
195    uni_schema: Option<Arc<UniSchema>>,
196    /// Session context for EXISTS subquery execution.
197    session_ctx: Option<Arc<RwLock<SessionContext>>>,
198    /// Storage manager for EXISTS subquery execution.
199    storage: Option<Arc<StorageManager>>,
200    /// Query parameters for EXISTS subquery execution.
201    params: HashMap<String, Value>,
202}
203
204impl<'a> CypherPhysicalExprCompiler<'a> {
205    pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
206        Self {
207            state,
208            translation_ctx,
209            graph_ctx: None,
210            uni_schema: None,
211            session_ctx: None,
212            storage: None,
213            params: HashMap::new(),
214        }
215    }
216
217    /// Build a scoped compiler that excludes the given variables from the translation context.
218    ///
219    /// When compiling inner expressions for list comprehensions, reduce, or quantifiers,
220    /// loop variables must be removed from `variable_kinds` so that property access on
221    /// them does not incorrectly generate flat columns that don't exist in the inner schema.
222    ///
223    /// If none of the `exclude_vars` are present in the current context, the returned
224    /// compiler simply reuses `self`'s translation context unchanged.
225    ///
226    /// The caller must own `scoped_ctx_slot` and keep it alive for the returned compiler's
227    /// lifetime.
228    fn scoped_compiler<'b>(
229        &'b self,
230        exclude_vars: &[&str],
231        scoped_ctx_slot: &'b mut Option<TranslationContext>,
232    ) -> CypherPhysicalExprCompiler<'b>
233    where
234        'a: 'b,
235    {
236        let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
237            exclude_vars
238                .iter()
239                .any(|v| ctx.variable_kinds.contains_key(*v))
240        });
241
242        let ctx_ref = if needs_scoping {
243            let ctx = self.translation_ctx.unwrap();
244            let mut new_kinds = ctx.variable_kinds.clone();
245            for v in exclude_vars {
246                new_kinds.remove(*v);
247            }
248            *scoped_ctx_slot = Some(TranslationContext {
249                parameters: ctx.parameters.clone(),
250                outer_values: ctx.outer_values.clone(),
251                variable_labels: ctx.variable_labels.clone(),
252                variable_kinds: new_kinds,
253                node_variable_hints: ctx.node_variable_hints.clone(),
254                mutation_edge_hints: ctx.mutation_edge_hints.clone(),
255                statement_time: ctx.statement_time,
256            });
257            scoped_ctx_slot.as_ref()
258        } else {
259            self.translation_ctx
260        };
261
262        CypherPhysicalExprCompiler {
263            state: self.state,
264            translation_ctx: ctx_ref,
265            graph_ctx: self.graph_ctx.clone(),
266            uni_schema: self.uni_schema.clone(),
267            session_ctx: self.session_ctx.clone(),
268            storage: self.storage.clone(),
269            params: self.params.clone(),
270        }
271    }
272
273    /// Attach graph context and schema for pattern comprehension support.
274    pub fn with_graph_ctx(
275        mut self,
276        graph_ctx: Arc<GraphExecutionContext>,
277        uni_schema: Arc<UniSchema>,
278    ) -> Self {
279        self.graph_ctx = Some(graph_ctx);
280        self.uni_schema = Some(uni_schema);
281        self
282    }
283
284    /// Attach full subquery context for EXISTS support.
285    pub fn with_subquery_ctx(
286        mut self,
287        graph_ctx: Arc<GraphExecutionContext>,
288        uni_schema: Arc<UniSchema>,
289        session_ctx: Arc<RwLock<SessionContext>>,
290        storage: Arc<StorageManager>,
291        params: HashMap<String, Value>,
292    ) -> Self {
293        self.graph_ctx = Some(graph_ctx);
294        self.uni_schema = Some(uni_schema);
295        self.session_ctx = Some(session_ctx);
296        self.storage = Some(storage);
297        self.params = params;
298        self
299    }
300
301    /// Compile a Cypher expression into a DataFusion PhysicalExpr.
302    pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
303        match expr {
304            Expr::ListComprehension {
305                variable,
306                list,
307                where_clause,
308                map_expr,
309            } => self.compile_list_comprehension(
310                variable,
311                list,
312                where_clause.as_deref(),
313                map_expr,
314                input_schema,
315            ),
316            Expr::Quantifier {
317                quantifier,
318                variable,
319                list,
320                predicate,
321            } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
322            Expr::Reduce {
323                accumulator,
324                init,
325                variable,
326                list,
327                expr: expression,
328            } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
329            // For BinaryOp, check if children contain custom expressions or CypherValue types
330            Expr::BinaryOp { left, op, right } => {
331                self.compile_binary_op_dispatch(left, op, right, input_schema)
332            }
333            Expr::UnaryOp { op, expr: inner } => {
334                if matches!(op, UnaryOp::Not) {
335                    let mut inner_phy = self.compile(inner, input_schema)?;
336                    if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
337                        inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
338                    }
339                    self.compile_unary_op(op, inner_phy, input_schema)
340                } else if Self::contains_custom_expr(inner) {
341                    let inner_phy = self.compile(inner, input_schema)?;
342                    self.compile_unary_op(op, inner_phy, input_schema)
343                } else {
344                    self.compile_standard(expr, input_schema)
345                }
346            }
347            Expr::IsNull(inner) => {
348                if Self::contains_custom_expr(inner) {
349                    let inner_phy = self.compile(inner, input_schema)?;
350                    Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
351                        .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
352                } else {
353                    self.compile_standard(expr, input_schema)
354                }
355            }
356            Expr::IsNotNull(inner) => {
357                if Self::contains_custom_expr(inner) {
358                    let inner_phy = self.compile(inner, input_schema)?;
359                    Ok(
360                        datafusion::physical_expr::expressions::is_not_null(inner_phy)
361                            .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
362                    )
363                } else {
364                    self.compile_standard(expr, input_schema)
365                }
366            }
367            // In operator is Expr::In { expr, list }
368            Expr::In {
369                expr: left,
370                list: right,
371            } => {
372                if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
373                    let left_phy = self.compile(left, input_schema)?;
374                    let right_phy = self.compile(right, input_schema)?;
375
376                    let left_type = left_phy
377                        .data_type(input_schema)
378                        .unwrap_or(DataType::LargeBinary);
379                    let right_type = right_phy
380                        .data_type(input_schema)
381                        .unwrap_or(DataType::LargeBinary);
382
383                    self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
384                        .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
385                } else {
386                    self.compile_standard(expr, input_schema)
387                }
388            }
389
390            // Recursively check other composite types if necessary.
391            Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
392                "List literals containing comprehensions not yet supported in compiler"
393            )),
394            Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
395                Err(anyhow!(
396                    "Map literals containing comprehensions not yet supported in compiler"
397                ))
398            }
399
400            // Property access on a struct column — e.g. `x.a` where `x` is Struct
401            Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
402
403            // Bracket access on a struct column — e.g. `x['a']` where `x` is Struct
404            Expr::ArrayIndex { array, index } => {
405                self.compile_array_index(array, index, input_schema)
406            }
407
408            // Pattern comprehension: [(a)-[:REL]->(b) WHERE pred | expr]
409            Expr::PatternComprehension {
410                path_variable,
411                pattern,
412                where_clause,
413                map_expr,
414            } => self.compile_pattern_comprehension(
415                path_variable,
416                pattern,
417                where_clause.as_deref(),
418                map_expr,
419                input_schema,
420            ),
421
422            // EXISTS subquery: plan + execute per row, return boolean
423            Expr::Exists { query, .. } => self.compile_exists(query),
424
425            // FunctionCall wrapping a custom expression (e.g. size(comprehension))
426            Expr::FunctionCall {
427                name,
428                args,
429                distinct,
430                ..
431            } => {
432                if name.eq_ignore_ascii_case("similar_to") {
433                    return self.compile_similar_to(args, input_schema);
434                }
435                if args.iter().any(Self::contains_custom_expr) {
436                    self.compile_function_with_custom_args(name, args, *distinct, input_schema)
437                } else {
438                    self.compile_standard(expr, input_schema)
439                }
440            }
441
442            // CASE expression - dispatch based on whether it contains custom expressions
443            Expr::Case {
444                expr: case_operand,
445                when_then,
446                else_expr,
447            } => {
448                // Check if operand or any branch contains custom expressions
449                let has_custom = case_operand
450                    .as_deref()
451                    .is_some_and(Self::contains_custom_expr)
452                    || when_then.iter().any(|(w, t)| {
453                        Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
454                    })
455                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
456
457                if has_custom {
458                    // Use compile_case() for CypherValue boolean conversion and
459                    // LargeList/LargeBinary type unification
460                    self.compile_case(case_operand, when_then, else_expr, input_schema)
461                } else {
462                    // Standard compilation path - goes through apply_type_coercion which handles:
463                    // 1. Simple CASE → Generic CASE rewriting with cross-type equality
464                    // 2. Type coercion for CASE result branches
465                    // 3. Numeric widening for comparisons
466                    self.compile_standard(expr, input_schema)
467                }
468            }
469
470            // LabelCheck: delegate to standard compilation (uses cypher_expr_to_df)
471            Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
472
473            // Default to standard compilation for leaf nodes or non-custom trees
474            _ => self.compile_standard(expr, input_schema),
475        }
476    }
477
478    /// Dispatch binary op compilation, checking for custom expressions and CypherValue types.
479    fn compile_binary_op_dispatch(
480        &self,
481        left: &Expr,
482        op: &BinaryOp,
483        right: &Expr,
484        input_schema: &Schema,
485    ) -> Result<Arc<dyn PhysicalExpr>> {
486        if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
487            && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
488            && let Some(ctx) = self.translation_ctx
489            && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
490        {
491            let identity_prop = match (lk, rk) {
492                (VariableKind::Node, VariableKind::Node) => Some("_vid"),
493                (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
494                _ => None,
495            };
496
497            if let Some(id_prop) = identity_prop {
498                return self.compile_standard(
499                    &Expr::BinaryOp {
500                        left: Box::new(Expr::Property(
501                            Box::new(Expr::Variable(lv.clone())),
502                            id_prop.to_string(),
503                        )),
504                        op: *op,
505                        right: Box::new(Expr::Property(
506                            Box::new(Expr::Variable(rv.clone())),
507                            id_prop.to_string(),
508                        )),
509                    },
510                    input_schema,
511                );
512            }
513        }
514
515        // XOR and Pow: always route through compile_standard.
516        // compile_binary_op does not support these operators. The standard path
517        // correctly maps XOR → _cypher_xor UDF and Pow → power() function.
518        if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
519            return self.compile_standard(
520                &Expr::BinaryOp {
521                    left: Box::new(left.clone()),
522                    op: *op,
523                    right: Box::new(right.clone()),
524                },
525                input_schema,
526            );
527        }
528
529        if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
530            let left_phy = self.compile(left, input_schema)?;
531            let right_phy = self.compile(right, input_schema)?;
532            return self.compile_binary_op(op, left_phy, right_phy, input_schema);
533        }
534
535        // For Add with a list-producing operand (AST-level detection),
536        // compile through the standard path which uses cypher_expr_to_df
537        // to correctly route list + scalar to _cypher_list_concat.
538        if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
539        {
540            return self.compile_standard(
541                &Expr::BinaryOp {
542                    left: Box::new(left.clone()),
543                    op: *op,
544                    right: Box::new(right.clone()),
545                },
546                input_schema,
547            );
548        }
549
550        // Compile sub-expressions to check their types. If either operand
551        // produces LargeBinary (CypherValue), standard Arrow kernels will fail at
552        // runtime for comparisons. Route through compile_binary_op which
553        // dispatches to Cypher comparison UDFs.
554        let left_phy = self.compile(left, input_schema)?;
555        let right_phy = self.compile(right, input_schema)?;
556        let left_dt = left_phy.data_type(input_schema).ok();
557        let right_dt = right_phy.data_type(input_schema).ok();
558        let has_cv =
559            is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
560
561        if has_cv {
562            // CypherValue types need special handling via compile_binary_op
563            self.compile_binary_op(op, left_phy, right_phy, input_schema)
564        } else {
565            // Standard types: use compile_standard to get proper type coercion
566            // (e.g., Int64 == Float64 requires coercion to work)
567            self.compile_standard(
568                &Expr::BinaryOp {
569                    left: Box::new(left.clone()),
570                    op: *op,
571                    right: Box::new(right.clone()),
572                },
573                input_schema,
574            )
575        }
576    }
577
578    /// Try to compile struct field access for a variable.
579    ///
580    /// Returns `Some(expr)` if the variable is a Struct column and can be accessed,
581    /// `None` if fallback to standard compilation is needed.
582    fn try_compile_struct_field(
583        &self,
584        var_name: &str,
585        field_name: &str,
586        input_schema: &Schema,
587    ) -> Option<Arc<dyn PhysicalExpr>> {
588        let col_idx = input_schema.index_of(var_name).ok()?;
589        let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
590            return None;
591        };
592
593        // Cypher semantics: accessing a missing key returns null
594        if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
595            let output_type = struct_fields[field_idx].data_type().clone();
596            let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
597                datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
598            );
599            Some(Arc::new(StructFieldAccessExpr::new(
600                col_expr,
601                field_idx,
602                output_type,
603            )))
604        } else {
605            Some(Arc::new(
606                datafusion::physical_expr::expressions::Literal::new(
607                    datafusion::common::ScalarValue::Null,
608                ),
609            ))
610        }
611    }
612
613    /// Compile property access on a struct column (e.g. `x.a` where `x` is Struct).
614    fn compile_property_access(
615        &self,
616        base: &Expr,
617        prop: &str,
618        input_schema: &Schema,
619    ) -> Result<Arc<dyn PhysicalExpr>> {
620        if let Expr::Variable(var_name) = base {
621            // 1. Try struct field access (e.g. `x.a` where `x` is a Struct column)
622            if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
623                return Ok(expr);
624            }
625            // 2. Try flat column "{var}.{prop}" (for pattern comprehension inner schemas)
626            let flat_col = format!("{}.{}", var_name, prop);
627            if let Ok(col_idx) = input_schema.index_of(&flat_col) {
628                return Ok(Arc::new(
629                    datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
630                ));
631            }
632        }
633        self.compile_standard(
634            &Expr::Property(Box::new(base.clone()), prop.to_string()),
635            input_schema,
636        )
637    }
638
639    /// Compile bracket access on a struct column (e.g. `x['a']` where `x` is Struct).
640    fn compile_array_index(
641        &self,
642        array: &Expr,
643        index: &Expr,
644        input_schema: &Schema,
645    ) -> Result<Arc<dyn PhysicalExpr>> {
646        if let Expr::Variable(var_name) = array
647            && let Expr::Literal(CypherLiteral::String(prop)) = index
648            && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
649        {
650            return Ok(expr);
651        }
652        self.compile_standard(
653            &Expr::ArrayIndex {
654                array: Box::new(array.clone()),
655                index: Box::new(index.clone()),
656            },
657            input_schema,
658        )
659    }
660
661    /// Compile EXISTS subquery expression.
662    fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
663        // 7.1: Validate no mutation clauses in EXISTS body
664        if has_mutation_clause(query) {
665            return Err(anyhow!(
666                "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
667            ));
668        }
669
670        let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
671
672        let graph_ctx = self
673            .graph_ctx
674            .clone()
675            .ok_or_else(|| err("GraphExecutionContext"))?;
676        let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
677        let session_ctx = self
678            .session_ctx
679            .clone()
680            .ok_or_else(|| err("SessionContext"))?;
681        let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
682
683        Ok(Arc::new(ExistsExecExpr::new(
684            query.clone(),
685            graph_ctx,
686            session_ctx,
687            storage,
688            uni_schema,
689            self.params.clone(),
690        )))
691    }
692
693    /// Compile a `similar_to(sources, queries [, options])` call into a `SimilarToExecExpr`.
694    fn compile_similar_to(
695        &self,
696        args: &[Expr],
697        input_schema: &Schema,
698    ) -> Result<Arc<dyn PhysicalExpr>> {
699        if args.len() < 2 || args.len() > 3 {
700            return Err(SimilarToError::InvalidArity { count: args.len() }.into());
701        }
702
703        let graph_ctx = self
704            .graph_ctx
705            .clone()
706            .ok_or(SimilarToError::NoGraphContext)?;
707
708        // Extract variable name and property names from the source expression.
709        let source_variable = extract_source_variable(&args[0]);
710        let source_property_names = extract_property_names(&args[0]);
711
712        // Normalize sources and queries: if args[0] is a List, split into
713        // individual source/query pairs for multi-source scoring.
714        let (source_exprs, query_exprs) = normalize_similar_to_args(&args[0], &args[1]);
715
716        // Compile each source and query expression
717        let source_children: Vec<Arc<dyn PhysicalExpr>> = source_exprs
718            .iter()
719            .map(|e| self.compile(e, input_schema))
720            .collect::<Result<Vec<_>>>()?;
721        let query_children: Vec<Arc<dyn PhysicalExpr>> = query_exprs
722            .iter()
723            .map(|e| self.compile(e, input_schema))
724            .collect::<Result<Vec<_>>>()?;
725
726        // Compile options if present
727        let options_child = if args.len() == 3 {
728            Some(self.compile(&args[2], input_schema)?)
729        } else {
730            None
731        };
732
733        // Resolve per-source distance metrics from schema at compile time.
734        let source_metrics: Vec<Option<DistanceMetric>> = source_property_names
735            .iter()
736            .map(|prop_name| {
737                prop_name.as_ref().and_then(|prop| {
738                    self.uni_schema
739                        .as_ref()
740                        .and_then(|schema| resolve_metric_for_property(schema, prop))
741                })
742            })
743            .collect();
744
745        Ok(Arc::new(SimilarToExecExpr::new(
746            source_children,
747            query_children,
748            options_child,
749            graph_ctx,
750            source_variable,
751            source_property_names,
752            source_metrics,
753        )))
754    }
755
756    /// Check if map_expr or where_clause contains a pattern comprehension that references the variable.
757    fn needs_vid_extraction_for_variable(
758        variable: &str,
759        map_expr: &Expr,
760        where_clause: Option<&Expr>,
761    ) -> bool {
762        fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
763            match expr {
764                Expr::PatternComprehension { pattern, .. } => {
765                    // Check if pattern uses the variable
766                    pattern.paths.iter().any(|path| {
767                        path.elements.iter().any(|elem| match elem {
768                            uni_cypher::ast::PatternElement::Node(n) => {
769                                n.variable.as_deref() == Some(var)
770                            }
771                            uni_cypher::ast::PatternElement::Relationship(r) => {
772                                r.variable.as_deref() == Some(var)
773                            }
774                            _ => false,
775                        })
776                    })
777                }
778                Expr::FunctionCall { args, .. } => args
779                    .iter()
780                    .any(|a| expr_has_pattern_comp_referencing(a, var)),
781                Expr::BinaryOp { left, right, .. } => {
782                    expr_has_pattern_comp_referencing(left, var)
783                        || expr_has_pattern_comp_referencing(right, var)
784                }
785                Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
786                    expr_has_pattern_comp_referencing(e, var)
787                }
788                Expr::List(items) => items
789                    .iter()
790                    .any(|i| expr_has_pattern_comp_referencing(i, var)),
791                Expr::ListComprehension {
792                    list,
793                    map_expr,
794                    where_clause,
795                    ..
796                } => {
797                    expr_has_pattern_comp_referencing(list, var)
798                        || expr_has_pattern_comp_referencing(map_expr, var)
799                        || where_clause
800                            .as_ref()
801                            .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
802                }
803                _ => false,
804            }
805        }
806
807        expr_has_pattern_comp_referencing(map_expr, variable)
808            || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
809    }
810
811    /// Check if an expression tree contains nodes that require custom compilation.
812    pub fn contains_custom_expr(expr: &Expr) -> bool {
813        match expr {
814            Expr::ListComprehension { .. } => true,
815            Expr::Quantifier { .. } => true,
816            Expr::Reduce { .. } => true,
817            Expr::PatternComprehension { .. } => true,
818            Expr::BinaryOp { left, right, .. } => {
819                Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
820            }
821            Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
822            Expr::FunctionCall { name, args, .. } => {
823                name.eq_ignore_ascii_case("similar_to")
824                    || args.iter().any(Self::contains_custom_expr)
825            }
826            Expr::Case {
827                when_then,
828                else_expr,
829                ..
830            } => {
831                when_then
832                    .iter()
833                    .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
834                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
835            }
836            Expr::List(items) => items.iter().any(Self::contains_custom_expr),
837            Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
838            Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
839            Expr::In { expr: l, list: r } => {
840                Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
841            }
842            Expr::Exists { .. } => true,
843            Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
844            _ => false,
845        }
846    }
847
848    /// Check if an expression statically produces a list value.
849    /// Used to route `Add` operations to `_cypher_list_concat` instead of arithmetic.
850    fn is_list_producing(expr: &Expr) -> bool {
851        match expr {
852            Expr::List(_) => true,
853            Expr::ListComprehension { .. } => true,
854            Expr::ArraySlice { .. } => true,
855            // Add with a list-producing child produces a list
856            Expr::BinaryOp {
857                left,
858                op: BinaryOp::Add,
859                right,
860            } => Self::is_list_producing(left) || Self::is_list_producing(right),
861            Expr::FunctionCall { name, .. } => {
862                // Functions known to return lists
863                matches!(
864                    name.as_str(),
865                    "range"
866                        | "tail"
867                        | "reverse"
868                        | "collect"
869                        | "keys"
870                        | "labels"
871                        | "nodes"
872                        | "relationships"
873                )
874            }
875            _ => false,
876        }
877    }
878
879    fn compile_standard(
880        &self,
881        expr: &Expr,
882        input_schema: &Schema,
883    ) -> Result<Arc<dyn PhysicalExpr>> {
884        // Pre-resolve Property(Variable(v), p) → Variable("v.p") when the flat
885        // column "v.p" exists in the input schema.  This prevents DataFusion's
886        // standard path from compiling it as ArrayIndex(column(v), "p"), which
887        // fails when the column stores a VID integer rather than a Map/Struct.
888        let resolved = Self::resolve_flat_column_properties(expr, input_schema);
889        let df_expr = cypher_expr_to_df(&resolved, self.translation_ctx)?;
890        let resolved_expr = self.resolve_udfs(df_expr)?;
891
892        let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
893
894        // Apply type coercion to resolve type mismatches
895        let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
896
897        // Re-resolve UDFs after coercion (coercion may introduce new dummy UDF calls)
898        let coerced_expr = self.resolve_udfs(coerced_expr)?;
899
900        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
901        planner
902            .create_physical_expr(&coerced_expr, &df_schema, self.state)
903            .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
904    }
905
906    /// Recursively resolve `Property(Variable(v), p)` → `Variable("v.p")` when
907    /// the flat column `"v.p"` exists in `schema`.  This ensures that property
908    /// access on graph-scan variables is compiled as a direct column reference
909    /// rather than an ArrayIndex UDF call, which would fail when the variable
910    /// column holds a VID integer instead of a Map/Node struct.
911    fn resolve_flat_column_properties(expr: &Expr, schema: &Schema) -> Expr {
912        match expr {
913            Expr::Property(base, prop) => {
914                if let Expr::Variable(var) = base.as_ref() {
915                    let flat_col = format!("{}.{}", var, prop);
916                    if schema.index_of(&flat_col).is_ok() {
917                        return Expr::Variable(flat_col);
918                    }
919                }
920                // Recurse into the base expression
921                Expr::Property(
922                    Box::new(Self::resolve_flat_column_properties(base, schema)),
923                    prop.clone(),
924                )
925            }
926            Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
927                left: Box::new(Self::resolve_flat_column_properties(left, schema)),
928                op: *op,
929                right: Box::new(Self::resolve_flat_column_properties(right, schema)),
930            },
931            Expr::FunctionCall {
932                name,
933                args,
934                distinct,
935                window_spec,
936            } => Expr::FunctionCall {
937                name: name.clone(),
938                args: args
939                    .iter()
940                    .map(|a| Self::resolve_flat_column_properties(a, schema))
941                    .collect(),
942                distinct: *distinct,
943                window_spec: window_spec.clone(),
944            },
945            Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
946                op: *op,
947                expr: Box::new(Self::resolve_flat_column_properties(inner, schema)),
948            },
949            Expr::List(items) => Expr::List(
950                items
951                    .iter()
952                    .map(|i| Self::resolve_flat_column_properties(i, schema))
953                    .collect(),
954            ),
955            // For all other expression types, return as-is (literals, variables, etc.)
956            other => other.clone(),
957        }
958    }
959
960    /// Resolve UDFs in DataFusion expression using the session state registry.
961    ///
962    /// Uses `TreeNode::transform_up` to traverse the entire expression tree,
963    /// ensuring UDFs inside Cast, Case, InList, Between, etc. are all resolved.
964    fn resolve_udfs(
965        &self,
966        expr: datafusion::logical_expr::Expr,
967    ) -> Result<datafusion::logical_expr::Expr> {
968        use datafusion::common::tree_node::{Transformed, TreeNode};
969        use datafusion::logical_expr::Expr as DfExpr;
970
971        let result = expr
972            .transform_up(|node| {
973                if let DfExpr::ScalarFunction(ref func) = node {
974                    let udf_name = func.func.name();
975                    if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
976                        return Ok(Transformed::yes(DfExpr::ScalarFunction(
977                            datafusion::logical_expr::expr::ScalarFunction {
978                                func: registered_udf.clone(),
979                                args: func.args.clone(),
980                            },
981                        )));
982                    }
983                }
984                Ok(Transformed::no(node))
985            })
986            .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
987        Ok(result.data)
988    }
989
990    fn compile_list_comprehension(
991        &self,
992        variable: &str,
993        list: &Expr,
994        where_clause: Option<&Expr>,
995        map_expr: &Expr,
996        input_schema: &Schema,
997    ) -> Result<Arc<dyn PhysicalExpr>> {
998        let input_list_phy = self.compile(list, input_schema)?;
999
1000        // Resolve input list type
1001        let list_data_type = input_list_phy.data_type(input_schema)?;
1002        let inner_data_type = resolve_list_element_type(
1003            &list_data_type,
1004            DataType::LargeBinary,
1005            "List comprehension",
1006        )?;
1007
1008        // Create inner schema with loop variable (shadow outer variable if same name)
1009        let mut fields = input_schema.fields().to_vec();
1010        let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
1011
1012        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1013            fields[pos] = loop_var_field;
1014        } else {
1015            fields.push(loop_var_field);
1016        }
1017
1018        // Check if we need VID extraction for nested pattern comprehensions
1019        let needs_vid_extraction =
1020            Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
1021        if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
1022            // Add a {variable}._vid field for VID extraction
1023            let vid_field = Arc::new(Field::new(
1024                format!("{}._vid", variable),
1025                DataType::UInt64,
1026                true,
1027            ));
1028            fields.push(vid_field);
1029        }
1030
1031        let inner_schema = Arc::new(Schema::new(fields));
1032
1033        // Compile inner expressions with scoped translation context
1034        let mut scoped_ctx = None;
1035        let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1036
1037        let predicate_phy = if let Some(pred) = where_clause {
1038            Some(inner_compiler.compile(pred, &inner_schema)?)
1039        } else {
1040            None
1041        };
1042
1043        let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
1044        let output_item_type = map_phy.data_type(&inner_schema)?;
1045
1046        Ok(Arc::new(ListComprehensionExecExpr::new(
1047            input_list_phy,
1048            map_phy,
1049            predicate_phy,
1050            variable.to_string(),
1051            Arc::new(input_schema.clone()),
1052            output_item_type,
1053            needs_vid_extraction,
1054        )))
1055    }
1056
1057    fn compile_reduce(
1058        &self,
1059        accumulator: &str,
1060        initial: &Expr,
1061        variable: &str,
1062        list: &Expr,
1063        reduce_expr: &Expr,
1064        input_schema: &Schema,
1065    ) -> Result<Arc<dyn PhysicalExpr>> {
1066        let list_phy = self.compile(list, input_schema)?;
1067
1068        let initial_phy = self.compile(initial, input_schema)?;
1069        let acc_type = initial_phy.data_type(input_schema)?;
1070
1071        let list_data_type = list_phy.data_type(input_schema)?;
1072        // For LargeBinary (CypherValue arrays), use the accumulator type as element type so the
1073        // reduce body expression compiles correctly (e.g. acc + x where both are Int64).
1074        let inner_data_type =
1075            resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
1076
1077        // Create inner schema with accumulator and loop variable (shadow outer variables if same names)
1078        let mut fields = input_schema.fields().to_vec();
1079
1080        let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
1081        if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
1082            fields[pos] = acc_field;
1083        } else {
1084            fields.push(acc_field);
1085        }
1086
1087        let var_field = Arc::new(Field::new(variable, inner_data_type, true));
1088        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1089            fields[pos] = var_field;
1090        } else {
1091            fields.push(var_field);
1092        }
1093
1094        let inner_schema = Arc::new(Schema::new(fields));
1095
1096        // Compile reduce expression with scoped translation context
1097        let mut scoped_ctx = None;
1098        let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
1099
1100        let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
1101        let output_type = reduce_phy.data_type(&inner_schema)?;
1102
1103        Ok(Arc::new(ReduceExecExpr::new(
1104            accumulator.to_string(),
1105            initial_phy,
1106            variable.to_string(),
1107            list_phy,
1108            reduce_phy,
1109            Arc::new(input_schema.clone()),
1110            output_type,
1111        )))
1112    }
1113
1114    fn compile_quantifier(
1115        &self,
1116        quantifier: &uni_cypher::ast::Quantifier,
1117        variable: &str,
1118        list: &Expr,
1119        predicate: &Expr,
1120        input_schema: &Schema,
1121    ) -> Result<Arc<dyn PhysicalExpr>> {
1122        let input_list_phy = self.compile(list, input_schema)?;
1123
1124        // Resolve element type from list type
1125        let list_data_type = input_list_phy.data_type(input_schema)?;
1126        let inner_data_type =
1127            resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
1128
1129        // Create inner schema with loop variable
1130        // If a field with the same name exists in the outer schema, replace it (shadow it)
1131        // to ensure the loop variable takes precedence.
1132        let mut fields = input_schema.fields().to_vec();
1133        let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1134
1135        // Find and replace existing field with same name, or append if not found
1136        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1137            fields[pos] = loop_var_field;
1138        } else {
1139            fields.push(loop_var_field);
1140        }
1141
1142        let inner_schema = Arc::new(Schema::new(fields));
1143
1144        // Compile predicate with a scoped translation context that removes the loop variable
1145        // from variable_kinds, so property access on the loop variable doesn't incorrectly
1146        // generate flat columns (like "x.name") that don't exist in the inner schema.
1147        let mut scoped_ctx = None;
1148        let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1149
1150        let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1151
1152        // Wrap CypherValue predicates with _cv_to_bool for proper boolean evaluation
1153        if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1154            predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1155        }
1156
1157        let qt = match quantifier {
1158            uni_cypher::ast::Quantifier::All => QuantifierType::All,
1159            uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1160            uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1161            uni_cypher::ast::Quantifier::None => QuantifierType::None,
1162        };
1163
1164        Ok(Arc::new(QuantifierExecExpr::new(
1165            input_list_phy,
1166            predicate_phy,
1167            variable.to_string(),
1168            Arc::new(input_schema.clone()),
1169            qt,
1170        )))
1171    }
1172
1173    fn compile_pattern_comprehension(
1174        &self,
1175        path_variable: &Option<String>,
1176        pattern: &uni_cypher::ast::Pattern,
1177        where_clause: Option<&Expr>,
1178        map_expr: &Expr,
1179        input_schema: &Schema,
1180    ) -> Result<Arc<dyn PhysicalExpr>> {
1181        let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1182
1183        let graph_ctx = self
1184            .graph_ctx
1185            .as_ref()
1186            .ok_or_else(|| err("GraphExecutionContext"))?;
1187        let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1188
1189        // 1. Analyze pattern to get anchor column and traversal steps
1190        let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1191
1192        // 2. Collect needed properties from where_clause and map_expr
1193        let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1194
1195        // 3. Build inner schema
1196        let inner_schema = build_inner_schema(
1197            input_schema,
1198            &steps,
1199            &vertex_props,
1200            &edge_props,
1201            path_variable.as_deref(),
1202        );
1203
1204        // 4. Compile predicate and map_expr against inner schema
1205        let pred_phy = where_clause
1206            .map(|p| self.compile(p, &inner_schema))
1207            .transpose()?;
1208        let map_phy = self.compile(map_expr, &inner_schema)?;
1209        let output_type = map_phy.data_type(&inner_schema)?;
1210
1211        // 5. Return expression
1212        Ok(Arc::new(PatternComprehensionExecExpr::new(
1213            graph_ctx.clone(),
1214            anchor_col,
1215            steps,
1216            path_variable.clone(),
1217            pred_phy,
1218            map_phy,
1219            Arc::new(input_schema.clone()),
1220            Arc::new(inner_schema),
1221            output_type,
1222            vertex_props,
1223            edge_props,
1224        )))
1225    }
1226
1227    /// Compile a function call whose arguments contain custom expressions.
1228    ///
1229    /// Recursively compiles each argument via `self.compile()`, then looks up
1230    /// the corresponding UDF in the session registry and builds the physical
1231    /// expression.
1232    fn compile_function_with_custom_args(
1233        &self,
1234        name: &str,
1235        args: &[Expr],
1236        _distinct: bool,
1237        input_schema: &Schema,
1238    ) -> Result<Arc<dyn PhysicalExpr>> {
1239        // 1. Recursively compile each argument
1240        let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1241            .iter()
1242            .map(|arg| self.compile(arg, input_schema))
1243            .collect::<Result<Vec<_>>>()?;
1244
1245        // 2. Resolve UDF name and look it up in the registry
1246        let udf_name = Self::cypher_fn_to_udf(name);
1247        let udf = self
1248            .state
1249            .scalar_functions()
1250            .get(udf_name.as_str())
1251            .ok_or_else(|| {
1252                anyhow!(
1253                    "UDF '{}' not found in registry for function '{}'",
1254                    udf_name,
1255                    name
1256                )
1257            })?;
1258
1259        // 3. Build operand type list and dummy columns from compiled args
1260        let placeholders: &[&str] = &["__arg0__", "__arg1__", "__arg2__", "__argN__"];
1261        let operand_types: Vec<(&str, DataType)> = compiled_args
1262            .iter()
1263            .enumerate()
1264            .map(|(i, arg)| {
1265                let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1266                let placeholder = placeholders[i.min(3)];
1267                (placeholder, dt)
1268            })
1269            .collect();
1270
1271        let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1272            .iter()
1273            .map(|(name, _)| {
1274                datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1275                    None::<String>,
1276                    *name,
1277                ))
1278            })
1279            .collect();
1280
1281        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1282            datafusion::logical_expr::expr::ScalarFunction {
1283                func: udf.clone(),
1284                args: dummy_cols,
1285            },
1286        );
1287
1288        // 4. Plan and rebind
1289        self.plan_udf_physical_expr(
1290            &udf_expr,
1291            &operand_types,
1292            compiled_args,
1293            &format!("function {}", name),
1294        )
1295    }
1296
1297    /// Map a Cypher function name to the registered UDF name.
1298    ///
1299    /// Mirrors the mapping in `translate_function_call` from `df_expr.rs`.
1300    /// The registered UDF names are always lowercase.
1301    fn cypher_fn_to_udf(name: &str) -> String {
1302        match name.to_uppercase().as_str() {
1303            "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1304            "REVERSE" => "_cypher_reverse".to_string(),
1305            "TOSTRING" => "tostring".to_string(),
1306            "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1307            "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1308            "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1309            "HEAD" => "head".to_string(),
1310            "LAST" => "last".to_string(),
1311            "TAIL" => "tail".to_string(),
1312            "KEYS" => "keys".to_string(),
1313            "TYPE" => "type".to_string(),
1314            "PROPERTIES" => "properties".to_string(),
1315            "LABELS" => "labels".to_string(),
1316            "COALESCE" => "coalesce".to_string(),
1317            "ID" => "id".to_string(),
1318            // Fallback: lowercase the name (matches dummy_udf_expr behavior)
1319            _ => name.to_lowercase(),
1320        }
1321    }
1322
1323    /// Compile a CASE expression with custom sub-expressions in branches.
1324    ///
1325    /// Recursively compiles the operand, each when/then pair, and the else
1326    /// branch, then builds a `CaseExpr` physical expression.
1327    ///
1328    /// Applies two fixes for TCK compliance:
1329    /// 1. Wraps CypherValue WHEN conditions with `_cv_to_bool` for proper boolean evaluation
1330    /// 2. Unifies branch types when mixing LargeList and LargeBinary to avoid cast errors
1331    fn compile_case(
1332        &self,
1333        operand: &Option<Box<Expr>>,
1334        when_then: &[(Expr, Expr)],
1335        else_expr: &Option<Box<Expr>>,
1336        input_schema: &Schema,
1337    ) -> Result<Arc<dyn PhysicalExpr>> {
1338        let operand_phy = operand
1339            .as_deref()
1340            .map(|e| self.compile(e, input_schema))
1341            .transpose()?;
1342
1343        let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1344            .iter()
1345            .map(|(w, t)| {
1346                let w_phy = self.compile(w, input_schema)?;
1347                let t_phy = self.compile(t, input_schema)?;
1348                Ok((w_phy, t_phy))
1349            })
1350            .collect::<Result<Vec<_>>>()?;
1351
1352        let mut else_phy = else_expr
1353            .as_deref()
1354            .map(|e| self.compile(e, input_schema))
1355            .transpose()?;
1356
1357        // Wrap CypherValue WHEN conditions with _cv_to_bool for proper boolean evaluation
1358        for (w_phy, _) in &mut when_then_phy {
1359            if matches!(w_phy.data_type(input_schema), Ok(DataType::LargeBinary)) {
1360                *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1361            }
1362        }
1363
1364        // Unify branch types when mixing LargeList and LargeBinary
1365        let branch_types: Vec<DataType> = when_then_phy
1366            .iter()
1367            .map(|(_, t)| t.data_type(input_schema))
1368            .chain(else_phy.iter().map(|e| e.data_type(input_schema)))
1369            .filter_map(Result::ok)
1370            .collect();
1371
1372        let has_large_binary = branch_types.contains(&DataType::LargeBinary);
1373        let has_list = branch_types
1374            .iter()
1375            .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1376
1377        // If we have both, wrap List/LargeList branches with LargeListToCypherValueExpr
1378        if has_large_binary && has_list {
1379            for (_, t_phy) in &mut when_then_phy {
1380                if let Ok(dt) = t_phy.data_type(input_schema)
1381                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1382                {
1383                    *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1384                }
1385            }
1386            if let Some(e_phy) = else_phy.take() {
1387                if let Ok(dt) = e_phy.data_type(input_schema)
1388                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1389                {
1390                    else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1391                } else {
1392                    else_phy = Some(e_phy);
1393                }
1394            }
1395        }
1396
1397        let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1398            operand_phy,
1399            when_then_phy,
1400            else_phy,
1401        )
1402        .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1403
1404        Ok(Arc::new(case_expr))
1405    }
1406
1407    fn compile_binary_op(
1408        &self,
1409        op: &BinaryOp,
1410        left: Arc<dyn PhysicalExpr>,
1411        right: Arc<dyn PhysicalExpr>,
1412        input_schema: &Schema,
1413    ) -> Result<Arc<dyn PhysicalExpr>> {
1414        use datafusion::logical_expr::Operator;
1415
1416        // String operators use custom physical expr for safe type handling
1417        let string_op = match op {
1418            BinaryOp::StartsWith => Some(StringOp::StartsWith),
1419            BinaryOp::EndsWith => Some(StringOp::EndsWith),
1420            BinaryOp::Contains => Some(StringOp::Contains),
1421            _ => None,
1422        };
1423        if let Some(sop) = string_op {
1424            return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1425        }
1426
1427        let df_op = match op {
1428            BinaryOp::Add => Operator::Plus,
1429            BinaryOp::Sub => Operator::Minus,
1430            BinaryOp::Mul => Operator::Multiply,
1431            BinaryOp::Div => Operator::Divide,
1432            BinaryOp::Mod => Operator::Modulo,
1433            BinaryOp::Eq => Operator::Eq,
1434            BinaryOp::NotEq => Operator::NotEq,
1435            BinaryOp::Gt => Operator::Gt,
1436            BinaryOp::GtEq => Operator::GtEq,
1437            BinaryOp::Lt => Operator::Lt,
1438            BinaryOp::LtEq => Operator::LtEq,
1439            BinaryOp::And => Operator::And,
1440            BinaryOp::Or => Operator::Or,
1441            BinaryOp::Xor => {
1442                return Err(anyhow!(
1443                    "XOR not supported via binary helper, use bitwise_xor"
1444                ));
1445            }
1446            BinaryOp::Regex => Operator::RegexMatch,
1447            BinaryOp::ApproxEq => {
1448                return Err(anyhow!(
1449                    "ApproxEq (~=) not yet supported in physical compiler"
1450                ));
1451            }
1452            BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1453            _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1454        };
1455
1456        // When either operand is LargeBinary (CypherValue), standard Arrow comparison
1457        // kernels can't handle the type mismatch. Route through Cypher comparison
1458        // UDFs which decode CypherValue to Value for comparison.
1459        let mut left = left;
1460        let mut right = right;
1461        let mut left_type = left.data_type(input_schema).ok();
1462        let mut right_type = right.data_type(input_schema).ok();
1463
1464        // Type unification: if one side is LargeList and the other is LargeBinary,
1465        // convert LargeList to LargeBinary for consistent handling
1466        let left_is_list = matches!(
1467            left_type.as_ref(),
1468            Some(DataType::List(_) | DataType::LargeList(_))
1469        );
1470        let right_is_list = matches!(
1471            right_type.as_ref(),
1472            Some(DataType::List(_) | DataType::LargeList(_))
1473        );
1474
1475        if left_is_list && is_cypher_value_type(right_type.as_ref()) {
1476            left = Arc::new(LargeListToCypherValueExpr::new(left));
1477            left_type = Some(DataType::LargeBinary);
1478        } else if right_is_list && is_cypher_value_type(left_type.as_ref()) {
1479            right = Arc::new(LargeListToCypherValueExpr::new(right));
1480            right_type = Some(DataType::LargeBinary);
1481        }
1482
1483        let has_cv =
1484            is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1485
1486        if has_cv {
1487            if let Some(result) = self.compile_cv_comparison(
1488                df_op,
1489                left.clone(),
1490                right.clone(),
1491                &left_type,
1492                &right_type,
1493            )? {
1494                return Ok(result);
1495            }
1496            if let Some(result) = self.compile_cv_list_concat(
1497                left.clone(),
1498                right.clone(),
1499                &left_type,
1500                &right_type,
1501                df_op,
1502            )? {
1503                return Ok(result);
1504            }
1505            if let Some(result) = self.compile_cv_arithmetic(
1506                df_op,
1507                left.clone(),
1508                right.clone(),
1509                &left_type,
1510                &right_type,
1511                input_schema,
1512            )? {
1513                return Ok(result);
1514            }
1515        }
1516
1517        // Use DataFusion's binary physical expression creator which handles coercion
1518        binary(left, df_op, right, input_schema)
1519            .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1520    }
1521
1522    /// Compile CypherValue comparison using Cypher UDFs.
1523    fn compile_cv_comparison(
1524        &self,
1525        df_op: datafusion::logical_expr::Operator,
1526        left: Arc<dyn PhysicalExpr>,
1527        right: Arc<dyn PhysicalExpr>,
1528        left_type: &Option<DataType>,
1529        right_type: &Option<DataType>,
1530    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1531        use datafusion::logical_expr::Operator;
1532
1533        let udf_name = match df_op {
1534            Operator::Eq => "_cypher_equal",
1535            Operator::NotEq => "_cypher_not_equal",
1536            Operator::Gt => "_cypher_gt",
1537            Operator::GtEq => "_cypher_gt_eq",
1538            Operator::Lt => "_cypher_lt",
1539            Operator::LtEq => "_cypher_lt_eq",
1540            _ => return Ok(None),
1541        };
1542
1543        self.plan_binary_udf(
1544            udf_name,
1545            left,
1546            right,
1547            left_type.clone().unwrap_or(DataType::LargeBinary),
1548            right_type.clone().unwrap_or(DataType::LargeBinary),
1549        )
1550    }
1551
1552    /// Compile CypherValue list concatenation.
1553    fn compile_cv_list_concat(
1554        &self,
1555        left: Arc<dyn PhysicalExpr>,
1556        right: Arc<dyn PhysicalExpr>,
1557        left_type: &Option<DataType>,
1558        right_type: &Option<DataType>,
1559        df_op: datafusion::logical_expr::Operator,
1560    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1561        use datafusion::logical_expr::Operator;
1562
1563        if df_op != Operator::Plus {
1564            return Ok(None);
1565        }
1566
1567        // List concat when at least one side is a list (CypherValue or Arrow List)
1568        let is_list = |t: &Option<DataType>| {
1569            t.as_ref()
1570                .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1571        };
1572
1573        if !is_list(left_type) && !is_list(right_type) {
1574            return Ok(None);
1575        }
1576
1577        self.plan_binary_udf(
1578            "_cypher_list_concat",
1579            left,
1580            right,
1581            left_type.clone().unwrap_or(DataType::LargeBinary),
1582            right_type.clone().unwrap_or(DataType::LargeBinary),
1583        )
1584    }
1585
1586    /// Compile CypherValue arithmetic.
1587    ///
1588    /// Routes arithmetic operations through CypherValue-aware UDFs when at least
1589    /// one operand is LargeBinary (CypherValue-encoded).
1590    fn compile_cv_arithmetic(
1591        &self,
1592        df_op: datafusion::logical_expr::Operator,
1593        left: Arc<dyn PhysicalExpr>,
1594        right: Arc<dyn PhysicalExpr>,
1595        left_type: &Option<DataType>,
1596        right_type: &Option<DataType>,
1597        _input_schema: &Schema,
1598    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1599        use datafusion::logical_expr::Operator;
1600
1601        let udf_name = match df_op {
1602            Operator::Plus => "_cypher_add",
1603            Operator::Minus => "_cypher_sub",
1604            Operator::Multiply => "_cypher_mul",
1605            Operator::Divide => "_cypher_div",
1606            Operator::Modulo => "_cypher_mod",
1607            _ => return Ok(None),
1608        };
1609
1610        self.plan_binary_udf(
1611            udf_name,
1612            left,
1613            right,
1614            left_type.clone().unwrap_or(DataType::LargeBinary),
1615            right_type.clone().unwrap_or(DataType::LargeBinary),
1616        )
1617    }
1618
1619    /// Plan a UDF expression with dummy schema columns, then rebind to actual physical expressions.
1620    fn plan_udf_physical_expr(
1621        &self,
1622        udf_expr: &datafusion::logical_expr::Expr,
1623        operand_types: &[(&str, DataType)],
1624        children: Vec<Arc<dyn PhysicalExpr>>,
1625        error_context: &str,
1626    ) -> Result<Arc<dyn PhysicalExpr>> {
1627        let tmp_schema = Schema::new(
1628            operand_types
1629                .iter()
1630                .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1631                .collect::<Vec<_>>(),
1632        );
1633        let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1634        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1635        let udf_phy = planner
1636            .create_physical_expr(udf_expr, &df_schema, self.state)
1637            .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1638        udf_phy
1639            .with_new_children(children)
1640            .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1641    }
1642
1643    /// Wrap a LargeBinary (CypherValue) expression with `_cv_to_bool` conversion.
1644    ///
1645    /// Used when a CypherValue expression needs to be used as a boolean (e.g., in WHEN clauses).
1646    fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1647        let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1648            return Err(anyhow!("_cv_to_bool UDF not found"));
1649        };
1650
1651        let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1652            None::<String>,
1653            "__cv__",
1654        ));
1655        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1656            datafusion::logical_expr::expr::ScalarFunction {
1657                func: udf.clone(),
1658                args: vec![dummy_col],
1659            },
1660        );
1661
1662        self.plan_udf_physical_expr(
1663            &udf_expr,
1664            &[("__cv__", DataType::LargeBinary)],
1665            vec![expr],
1666            "CypherValue to bool",
1667        )
1668    }
1669
1670    /// Plan a binary UDF with the given name and operand types.
1671    fn plan_binary_udf(
1672        &self,
1673        udf_name: &str,
1674        left: Arc<dyn PhysicalExpr>,
1675        right: Arc<dyn PhysicalExpr>,
1676        left_type: DataType,
1677        right_type: DataType,
1678    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1679        let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1680            return Ok(None);
1681        };
1682        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1683            datafusion::logical_expr::expr::ScalarFunction {
1684                func: udf.clone(),
1685                args: vec![
1686                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1687                        None::<String>,
1688                        "__left__",
1689                    )),
1690                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1691                        None::<String>,
1692                        "__right__",
1693                    )),
1694                ],
1695            },
1696        );
1697        let result = self.plan_udf_physical_expr(
1698            &udf_expr,
1699            &[("__left__", left_type), ("__right__", right_type)],
1700            vec![left, right],
1701            udf_name,
1702        )?;
1703        Ok(Some(result))
1704    }
1705
1706    fn compile_unary_op(
1707        &self,
1708        op: &UnaryOp,
1709        expr: Arc<dyn PhysicalExpr>,
1710        input_schema: &Schema,
1711    ) -> Result<Arc<dyn PhysicalExpr>> {
1712        match op {
1713            UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1714            UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1715        }
1716        .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1717    }
1718}
1719
1720// ---------------------------------------------------------------------------
1721// similar_to() AST helpers
1722// ---------------------------------------------------------------------------
1723
1724/// Extract the base variable name from a source expression.
1725///
1726/// For `d.embedding` → `"d"`, for `[d.embedding, d.content]` → `"d"`.
1727fn extract_source_variable(expr: &Expr) -> Option<String> {
1728    match expr {
1729        Expr::Property(inner, _) => extract_source_variable(inner),
1730        Expr::Variable(name) => Some(name.clone()),
1731        Expr::List(items) => items.first().and_then(extract_source_variable),
1732        _ => None,
1733    }
1734}
1735
1736/// Extract property names from source expressions.
1737///
1738/// For `d.embedding` → `["embedding"]`, for `[d.embedding, d.content]` → `["embedding", "content"]`.
1739fn extract_property_names(expr: &Expr) -> Vec<Option<String>> {
1740    match expr {
1741        Expr::Property(_, prop) => vec![Some(prop.clone())],
1742        Expr::List(items) => items
1743            .iter()
1744            .map(|item| {
1745                if let Expr::Property(_, prop) = item {
1746                    Some(prop.clone())
1747                } else {
1748                    None
1749                }
1750            })
1751            .collect(),
1752        _ => vec![None],
1753    }
1754}
1755
1756/// Normalize similar_to arguments for multi-source scoring.
1757///
1758/// If source arg is a `List`, returns the individual items as separate source/query pairs.
1759/// Otherwise returns the args as-is (single pair).
1760fn normalize_similar_to_args<'e>(
1761    sources: &'e Expr,
1762    queries: &'e Expr,
1763) -> (Vec<&'e Expr>, Vec<&'e Expr>) {
1764    match (sources, queries) {
1765        // Multi-source: [d.embedding, d.content] paired with [query_vec, query_text]
1766        (Expr::List(src_items), Expr::List(qry_items)) if src_items.len() == qry_items.len() => {
1767            (src_items.iter().collect(), qry_items.iter().collect())
1768        }
1769        // Multi-source with broadcast query: [d.embedding, d.content] paired with single query
1770        (Expr::List(src_items), _) if src_items.len() > 1 => {
1771            let queries_broadcast: Vec<&Expr> = vec![queries; src_items.len()];
1772            (src_items.iter().collect(), queries_broadcast)
1773        }
1774        // Single source
1775        _ => (vec![sources], vec![queries]),
1776    }
1777}
1778
1779use datafusion::physical_plan::DisplayAs;
1780use datafusion::physical_plan::DisplayFormatType;
1781
1782#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1783enum StringOp {
1784    StartsWith,
1785    EndsWith,
1786    Contains,
1787}
1788
1789impl std::fmt::Display for StringOp {
1790    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1791        match self {
1792            StringOp::StartsWith => write!(f, "STARTS WITH"),
1793            StringOp::EndsWith => write!(f, "ENDS WITH"),
1794            StringOp::Contains => write!(f, "CONTAINS"),
1795        }
1796    }
1797}
1798
1799#[derive(Debug, Eq)]
1800struct CypherStringMatchExpr {
1801    left: Arc<dyn PhysicalExpr>,
1802    right: Arc<dyn PhysicalExpr>,
1803    op: StringOp,
1804}
1805
1806impl PartialEq for CypherStringMatchExpr {
1807    fn eq(&self, other: &Self) -> bool {
1808        self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1809    }
1810}
1811
1812impl std::hash::Hash for CypherStringMatchExpr {
1813    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1814        self.op.hash(state);
1815        self.left.hash(state);
1816        self.right.hash(state);
1817    }
1818}
1819
1820impl CypherStringMatchExpr {
1821    fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1822        Self { left, right, op }
1823    }
1824}
1825
1826impl std::fmt::Display for CypherStringMatchExpr {
1827    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1828        write!(f, "{} {} {}", self.left, self.op, self.right)
1829    }
1830}
1831
1832impl DisplayAs for CypherStringMatchExpr {
1833    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1834        write!(f, "{}", self)
1835    }
1836}
1837
1838impl PhysicalExpr for CypherStringMatchExpr {
1839    fn as_any(&self) -> &dyn std::any::Any {
1840        self
1841    }
1842
1843    fn data_type(
1844        &self,
1845        _input_schema: &Schema,
1846    ) -> datafusion::error::Result<arrow_schema::DataType> {
1847        Ok(arrow_schema::DataType::Boolean)
1848    }
1849
1850    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1851        Ok(true)
1852    }
1853
1854    fn evaluate(
1855        &self,
1856        batch: &arrow_array::RecordBatch,
1857    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1858        use crate::query::df_udfs::invoke_cypher_string_op;
1859        use arrow_schema::Field;
1860        use datafusion::config::ConfigOptions;
1861        use datafusion::logical_expr::ScalarFunctionArgs;
1862
1863        let left_val = self.left.evaluate(batch)?;
1864        let right_val = self.right.evaluate(batch)?;
1865
1866        let args = ScalarFunctionArgs {
1867            args: vec![left_val, right_val],
1868            number_rows: batch.num_rows(),
1869            return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
1870            config_options: Arc::new(ConfigOptions::default()),
1871            arg_fields: vec![], // Not used by invoke_cypher_string_op
1872        };
1873
1874        match self.op {
1875            StringOp::StartsWith => {
1876                invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
1877            }
1878            StringOp::EndsWith => {
1879                invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
1880            }
1881            StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
1882        }
1883    }
1884
1885    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1886        vec![&self.left, &self.right]
1887    }
1888
1889    fn with_new_children(
1890        self: Arc<Self>,
1891        children: Vec<Arc<dyn PhysicalExpr>>,
1892    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1893        Ok(Arc::new(CypherStringMatchExpr::new(
1894            children[0].clone(),
1895            children[1].clone(),
1896            self.op,
1897        )))
1898    }
1899
1900    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1901        write!(f, "{}", self)
1902    }
1903}
1904
1905impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
1906    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1907        if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
1908            self == other
1909        } else {
1910            false
1911        }
1912    }
1913}
1914
1915/// Physical expression for extracting a field from a struct column.
1916///
1917/// Used when list comprehension iterates over a list of structs (maps)
1918/// and accesses a field, e.g., `[x IN [{a: 1}] | x.a]`.
1919#[derive(Debug, Eq)]
1920struct StructFieldAccessExpr {
1921    /// Expression producing the struct column.
1922    input: Arc<dyn PhysicalExpr>,
1923    /// Index of the field within the struct.
1924    field_idx: usize,
1925    /// Output data type of the extracted field.
1926    output_type: arrow_schema::DataType,
1927}
1928
1929impl PartialEq for StructFieldAccessExpr {
1930    fn eq(&self, other: &Self) -> bool {
1931        self.field_idx == other.field_idx
1932            && self.input.eq(&other.input)
1933            && self.output_type == other.output_type
1934    }
1935}
1936
1937impl std::hash::Hash for StructFieldAccessExpr {
1938    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1939        self.input.hash(state);
1940        self.field_idx.hash(state);
1941    }
1942}
1943
1944impl StructFieldAccessExpr {
1945    fn new(
1946        input: Arc<dyn PhysicalExpr>,
1947        field_idx: usize,
1948        output_type: arrow_schema::DataType,
1949    ) -> Self {
1950        Self {
1951            input,
1952            field_idx,
1953            output_type,
1954        }
1955    }
1956}
1957
1958impl std::fmt::Display for StructFieldAccessExpr {
1959    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1960        write!(f, "{}[{}]", self.input, self.field_idx)
1961    }
1962}
1963
1964impl DisplayAs for StructFieldAccessExpr {
1965    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1966        write!(f, "{}", self)
1967    }
1968}
1969
1970impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
1971    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1972        if let Some(other) = other.as_any().downcast_ref::<Self>() {
1973            self.field_idx == other.field_idx && self.input.eq(&other.input)
1974        } else {
1975            false
1976        }
1977    }
1978}
1979
1980impl PhysicalExpr for StructFieldAccessExpr {
1981    fn as_any(&self) -> &dyn std::any::Any {
1982        self
1983    }
1984
1985    fn data_type(
1986        &self,
1987        _input_schema: &Schema,
1988    ) -> datafusion::error::Result<arrow_schema::DataType> {
1989        Ok(self.output_type.clone())
1990    }
1991
1992    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1993        Ok(true)
1994    }
1995
1996    fn evaluate(
1997        &self,
1998        batch: &arrow_array::RecordBatch,
1999    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2000        use arrow_array::StructArray;
2001
2002        let input_val = self.input.evaluate(batch)?;
2003        let array = input_val.into_array(batch.num_rows())?;
2004
2005        let struct_array = array
2006            .as_any()
2007            .downcast_ref::<StructArray>()
2008            .ok_or_else(|| {
2009                datafusion::error::DataFusionError::Execution(
2010                    "StructFieldAccessExpr: input is not a StructArray".to_string(),
2011                )
2012            })?;
2013
2014        let field_col = struct_array.column(self.field_idx).clone();
2015        Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
2016    }
2017
2018    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2019        vec![&self.input]
2020    }
2021
2022    fn with_new_children(
2023        self: Arc<Self>,
2024        children: Vec<Arc<dyn PhysicalExpr>>,
2025    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2026        Ok(Arc::new(StructFieldAccessExpr::new(
2027            children[0].clone(),
2028            self.field_idx,
2029            self.output_type.clone(),
2030        )))
2031    }
2032
2033    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2034        write!(f, "{}", self)
2035    }
2036}
2037
2038// ---------------------------------------------------------------------------
2039// EXISTS subquery physical expression
2040// ---------------------------------------------------------------------------
2041
2042/// Physical expression that evaluates an EXISTS subquery per row.
2043///
2044/// For each input row, plans and executes the subquery with the row's columns
2045/// injected as parameters. Returns `true` if the subquery produces any rows.
2046///
2047/// NOT EXISTS is handled by the caller wrapping this in a NOT expression.
2048/// Nested EXISTS works because `execute_subplan` creates a full planner that
2049/// handles nested EXISTS recursively.
2050struct ExistsExecExpr {
2051    query: Query,
2052    graph_ctx: Arc<GraphExecutionContext>,
2053    session_ctx: Arc<RwLock<SessionContext>>,
2054    storage: Arc<StorageManager>,
2055    uni_schema: Arc<UniSchema>,
2056    params: HashMap<String, Value>,
2057}
2058
2059impl std::fmt::Debug for ExistsExecExpr {
2060    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2061        f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
2062    }
2063}
2064
2065impl ExistsExecExpr {
2066    fn new(
2067        query: Query,
2068        graph_ctx: Arc<GraphExecutionContext>,
2069        session_ctx: Arc<RwLock<SessionContext>>,
2070        storage: Arc<StorageManager>,
2071        uni_schema: Arc<UniSchema>,
2072        params: HashMap<String, Value>,
2073    ) -> Self {
2074        Self {
2075            query,
2076            graph_ctx,
2077            session_ctx,
2078            storage,
2079            uni_schema,
2080            params,
2081        }
2082    }
2083}
2084
2085impl std::fmt::Display for ExistsExecExpr {
2086    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2087        write!(f, "EXISTS(<subquery>)")
2088    }
2089}
2090
2091impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
2092    fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
2093        false
2094    }
2095}
2096
2097impl PartialEq for ExistsExecExpr {
2098    fn eq(&self, _other: &Self) -> bool {
2099        false
2100    }
2101}
2102
2103impl Eq for ExistsExecExpr {}
2104
2105impl std::hash::Hash for ExistsExecExpr {
2106    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2107        "ExistsExecExpr".hash(state);
2108    }
2109}
2110
2111impl DisplayAs for ExistsExecExpr {
2112    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2113        write!(f, "{}", self)
2114    }
2115}
2116
2117impl PhysicalExpr for ExistsExecExpr {
2118    fn as_any(&self) -> &dyn std::any::Any {
2119        self
2120    }
2121
2122    fn data_type(
2123        &self,
2124        _input_schema: &Schema,
2125    ) -> datafusion::error::Result<arrow_schema::DataType> {
2126        Ok(DataType::Boolean)
2127    }
2128
2129    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2130        Ok(true)
2131    }
2132
2133    fn evaluate(
2134        &self,
2135        batch: &arrow_array::RecordBatch,
2136    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2137        let num_rows = batch.num_rows();
2138        let mut builder = BooleanBuilder::with_capacity(num_rows);
2139
2140        // 7.2: Extract entity variable names from batch schema.
2141        // Entity columns follow the pattern "varname._vid" (flattened) or are struct columns.
2142        // We pass ONLY entity base names (e.g., "p", "n", "m") as vars_in_scope so the
2143        // subquery planner treats them as bound (Imported) variables. The initial Project
2144        // then creates Parameter("n") AS "n" etc. — the traverse reads the VID from this
2145        // column via resolve_source_vid_col's bare-name fallback.
2146        //
2147        // We intentionally do NOT include raw column names (n._vid, n._labels, etc.) in
2148        // vars_in_scope to avoid duplicate column conflicts with traverse output columns.
2149        // Parameter expressions read directly from sub_params, not from plan columns.
2150        let schema = batch.schema();
2151        let mut entity_vars: HashSet<String> = HashSet::new();
2152        for field in schema.fields() {
2153            let name = field.name();
2154            if let Some(base) = name.strip_suffix("._vid") {
2155                entity_vars.insert(base.to_string());
2156            }
2157            if matches!(field.data_type(), DataType::Struct(_)) {
2158                entity_vars.insert(name.to_string());
2159            }
2160            // Also detect bare VID columns from parent EXISTS parameter projections.
2161            // In nested EXISTS, a parent level projects "n" as a bare Int64/UInt64 VID.
2162            // Simple identifier (no dots, no leading underscore) + integer type = VID.
2163            if !name.contains('.')
2164                && !name.starts_with('_')
2165                && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2166            {
2167                entity_vars.insert(name.to_string());
2168            }
2169        }
2170        let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2171
2172        // 7.3: Rewrite correlated property accesses to parameter references.
2173        // e.g., `n.prop` where `n` is an outer entity → `$param("n.prop")`
2174        let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2175
2176        // 7.4: Plan ONCE — the rewritten query is parameterized, same for all rows.
2177        let planner = QueryPlanner::new(self.uni_schema.clone());
2178        let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2179            Ok(plan) => plan,
2180            Err(e) => {
2181                return Err(datafusion::error::DataFusionError::Execution(format!(
2182                    "EXISTS subquery planning failed: {}",
2183                    e
2184                )));
2185            }
2186        };
2187
2188        // Execute all rows on a dedicated thread with a single tokio runtime.
2189        // The runtime must be created and dropped on this thread (not in an async context).
2190        let graph_ctx = self.graph_ctx.clone();
2191        let session_ctx = self.session_ctx.clone();
2192        let storage = self.storage.clone();
2193        let uni_schema = self.uni_schema.clone();
2194        let base_params = self.params.clone();
2195
2196        let result = std::thread::scope(|s| {
2197            s.spawn(|| {
2198                let rt = tokio::runtime::Builder::new_current_thread()
2199                    .enable_all()
2200                    .build()
2201                    .map_err(|e| {
2202                        datafusion::error::DataFusionError::Execution(format!(
2203                            "Failed to create runtime for EXISTS: {}",
2204                            e
2205                        ))
2206                    })?;
2207
2208                for row_idx in 0..num_rows {
2209                    let row_params = extract_row_params(batch, row_idx);
2210                    let mut sub_params = base_params.clone();
2211                    sub_params.extend(row_params);
2212
2213                    // Add entity variable → VID value mappings so that
2214                    // Parameter("n") resolves to the VID for traversal sources.
2215                    for var in &entity_vars {
2216                        let vid_key = format!("{}._vid", var);
2217                        if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2218                            sub_params.insert(var.clone(), vid_val);
2219                        }
2220                    }
2221
2222                    let batches = rt.block_on(execute_subplan(
2223                        &logical_plan,
2224                        &sub_params,
2225                        &HashMap::new(), // No outer values for EXISTS subquery
2226                        &graph_ctx,
2227                        &session_ctx,
2228                        &storage,
2229                        &uni_schema,
2230                    ))?;
2231
2232                    let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2233                    builder.append_value(has_rows);
2234                }
2235
2236                Ok::<_, datafusion::error::DataFusionError>(())
2237            })
2238            .join()
2239            .unwrap_or_else(|_| {
2240                Err(datafusion::error::DataFusionError::Execution(
2241                    "EXISTS subquery thread panicked".to_string(),
2242                ))
2243            })
2244        });
2245
2246        if let Err(e) = result {
2247            return Err(datafusion::error::DataFusionError::Execution(format!(
2248                "EXISTS subquery execution failed: {}",
2249                e
2250            )));
2251        }
2252
2253        Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2254            builder.finish(),
2255        )))
2256    }
2257
2258    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2259        vec![]
2260    }
2261
2262    fn with_new_children(
2263        self: Arc<Self>,
2264        children: Vec<Arc<dyn PhysicalExpr>>,
2265    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2266        if !children.is_empty() {
2267            return Err(datafusion::error::DataFusionError::Plan(
2268                "ExistsExecExpr has no children".to_string(),
2269            ));
2270        }
2271        Ok(self)
2272    }
2273
2274    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2275        write!(f, "{}", self)
2276    }
2277}
2278
2279// ---------------------------------------------------------------------------
2280// EXISTS subquery helpers
2281// ---------------------------------------------------------------------------
2282
2283/// Check if a Query contains any mutation clauses (CREATE, SET, DELETE, REMOVE, MERGE).
2284/// EXISTS subqueries must be read-only per OpenCypher spec.
2285fn has_mutation_clause(query: &Query) -> bool {
2286    match query {
2287        Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2288            matches!(
2289                c,
2290                Clause::Create(_)
2291                    | Clause::Delete(_)
2292                    | Clause::Set(_)
2293                    | Clause::Remove(_)
2294                    | Clause::Merge(_)
2295            ) || has_mutation_in_clause_exprs(c)
2296        }),
2297        Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2298        _ => false,
2299    }
2300}
2301
2302/// Check if a clause contains nested EXISTS with mutations (recursive).
2303fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2304    let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2305
2306    match clause {
2307        Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2308        Clause::With(w) => {
2309            w.where_clause.as_ref().is_some_and(check_expr)
2310                || w.items.iter().any(|item| match item {
2311                    ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2312                    ReturnItem::All => false,
2313                })
2314        }
2315        Clause::Return(r) => r.items.iter().any(|item| match item {
2316            ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2317            ReturnItem::All => false,
2318        }),
2319        _ => false,
2320    }
2321}
2322
2323/// Check if an expression tree contains an EXISTS with mutation clauses.
2324fn has_mutation_in_expr(expr: &Expr) -> bool {
2325    match expr {
2326        Expr::Exists { query, .. } => has_mutation_clause(query),
2327        _ => {
2328            let mut found = false;
2329            expr.for_each_child(&mut |child| {
2330                if has_mutation_in_expr(child) {
2331                    found = true;
2332                }
2333            });
2334            found
2335        }
2336    }
2337}
2338
2339/// Rewrite a Query AST to replace correlated property accesses with parameter references.
2340///
2341/// For each `Property(Variable(v), key)` where `v` is an outer-scope entity variable,
2342/// replaces it with `Parameter("{v}.{key}")`. This enables plan-once optimization since
2343/// the rewritten query is parameterized (same structure for every row).
2344fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2345    match query {
2346        Query::Single(stmt) => Query::Single(Statement {
2347            clauses: stmt
2348                .clauses
2349                .iter()
2350                .map(|c| rewrite_clause_correlated(c, outer_vars))
2351                .collect(),
2352        }),
2353        Query::Union { left, right, all } => Query::Union {
2354            left: Box::new(rewrite_query_correlated(left, outer_vars)),
2355            right: Box::new(rewrite_query_correlated(right, outer_vars)),
2356            all: *all,
2357        },
2358        other => other.clone(),
2359    }
2360}
2361
2362/// Rewrite expressions within a clause for correlated property access.
2363fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2364    match clause {
2365        Clause::Match(m) => Clause::Match(MatchClause {
2366            optional: m.optional,
2367            pattern: m.pattern.clone(),
2368            where_clause: m
2369                .where_clause
2370                .as_ref()
2371                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2372        }),
2373        Clause::With(w) => Clause::With(WithClause {
2374            distinct: w.distinct,
2375            items: w
2376                .items
2377                .iter()
2378                .map(|item| rewrite_return_item(item, outer_vars))
2379                .collect(),
2380            order_by: w.order_by.as_ref().map(|items| {
2381                items
2382                    .iter()
2383                    .map(|si| SortItem {
2384                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2385                        ascending: si.ascending,
2386                    })
2387                    .collect()
2388            }),
2389            skip: w
2390                .skip
2391                .as_ref()
2392                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2393            limit: w
2394                .limit
2395                .as_ref()
2396                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2397            where_clause: w
2398                .where_clause
2399                .as_ref()
2400                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2401        }),
2402        Clause::Return(r) => Clause::Return(ReturnClause {
2403            distinct: r.distinct,
2404            items: r
2405                .items
2406                .iter()
2407                .map(|item| rewrite_return_item(item, outer_vars))
2408                .collect(),
2409            order_by: r.order_by.as_ref().map(|items| {
2410                items
2411                    .iter()
2412                    .map(|si| SortItem {
2413                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2414                        ascending: si.ascending,
2415                    })
2416                    .collect()
2417            }),
2418            skip: r
2419                .skip
2420                .as_ref()
2421                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2422            limit: r
2423                .limit
2424                .as_ref()
2425                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2426        }),
2427        Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2428            expr: rewrite_expr_correlated(&u.expr, outer_vars),
2429            variable: u.variable.clone(),
2430        }),
2431        other => other.clone(),
2432    }
2433}
2434
2435fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2436    match item {
2437        ReturnItem::All => ReturnItem::All,
2438        ReturnItem::Expr {
2439            expr,
2440            alias,
2441            source_text,
2442        } => ReturnItem::Expr {
2443            expr: rewrite_expr_correlated(expr, outer_vars),
2444            alias: alias.clone(),
2445            source_text: source_text.clone(),
2446        },
2447    }
2448}
2449
2450/// Rewrite a single expression: Property(Variable(v), key) → Parameter("{v}.{key}")
2451/// when v is an outer-scope entity variable. Handles nested EXISTS recursively.
2452fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2453    match expr {
2454        // Core rewrite: n.prop → $param("n.prop") when n is an outer entity
2455        Expr::Property(base, key) => {
2456            if let Expr::Variable(v) = base.as_ref()
2457                && outer_vars.contains(v)
2458            {
2459                return Expr::Parameter(format!("{}.{}", v, key));
2460            }
2461            Expr::Property(
2462                Box::new(rewrite_expr_correlated(base, outer_vars)),
2463                key.clone(),
2464            )
2465        }
2466        // Nested EXISTS — recurse into the subquery body
2467        Expr::Exists {
2468            query,
2469            from_pattern_predicate,
2470        } => Expr::Exists {
2471            query: Box::new(rewrite_query_correlated(query, outer_vars)),
2472            from_pattern_predicate: *from_pattern_predicate,
2473        },
2474        // CountSubquery and CollectSubquery — recurse into subquery body
2475        Expr::CountSubquery(query) => {
2476            Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2477        }
2478        Expr::CollectSubquery(query) => {
2479            Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2480        }
2481        // All other expressions: recursively transform children
2482        other => other
2483            .clone()
2484            .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2485    }
2486}
2487
2488/// Look up the `DistanceMetric` for a vector-indexed property across all labels.
2489///
2490/// Returns `None` if no vector index exists for the property.
2491fn resolve_metric_for_property(schema: &UniSchema, property: &str) -> Option<DistanceMetric> {
2492    for idx in &schema.indexes {
2493        if let IndexDefinition::Vector(config) = idx
2494            && config.property == property
2495        {
2496            return Some(config.metric.clone());
2497        }
2498    }
2499    None
2500}