Skip to main content

uni_query/query/df_graph/
expr_compiler.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9    PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
12use crate::query::df_graph::reduce::ReduceExecExpr;
13use crate::query::df_graph::similar_to_expr::SimilarToExecExpr;
14use crate::query::planner::QueryPlanner;
15use crate::query::similar_to::SimilarToError;
16use anyhow::{Result, anyhow};
17use arrow_array::builder::BooleanBuilder;
18use arrow_schema::{DataType, Field, Schema};
19use datafusion::execution::context::SessionState;
20use datafusion::physical_expr::expressions::binary;
21use datafusion::physical_plan::PhysicalExpr;
22use datafusion::physical_planner::PhysicalPlanner;
23use datafusion::prelude::SessionContext;
24use parking_lot::RwLock;
25use std::collections::{HashMap, HashSet};
26use std::sync::Arc;
27use uni_common::Value;
28use uni_common::core::schema::{DistanceMetric, IndexDefinition, Schema as UniSchema};
29use uni_cypher::ast::{
30    BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
31    Statement, UnaryOp, UnwindClause, WithClause,
32};
33use uni_store::storage::manager::StorageManager;
34
35/// Check if a data type represents CypherValue (LargeBinary) or BTIC (FixedSizeBinary(24)).
36/// Both need to route through Cypher UDFs for comparison operators.
37fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
38    dt.is_some_and(|t| matches!(t, DataType::LargeBinary | DataType::FixedSizeBinary(24)))
39}
40
41/// Resolve the element type for a list expression.
42///
43/// Extracts the element type from List/LargeList/Null/LargeBinary data types.
44/// Falls back to the provided fallback type for LargeBinary, and returns an
45/// error with the provided context for unsupported types.
46///
47/// # Errors
48///
49/// Returns an error if the data type is not a recognized list type.
50fn resolve_list_element_type(
51    list_data_type: &DataType,
52    large_binary_fallback: DataType,
53    context: &str,
54) -> Result<DataType> {
55    match list_data_type {
56        DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
57        DataType::Null => Ok(DataType::Null),
58        DataType::LargeBinary => Ok(large_binary_fallback),
59        _ => Err(anyhow!(
60            "{} input must be a list, got {:?}",
61            context,
62            list_data_type
63        )),
64    }
65}
66
67/// Physical expression wrapper that converts LargeList<T> to LargeBinary (CypherValue).
68///
69/// Used in CASE expressions to unify branch types when mixing typed lists
70/// (e.g., from list comprehensions) with CypherValue-encoded lists.
71#[derive(Debug)]
72struct LargeListToCypherValueExpr {
73    child: Arc<dyn PhysicalExpr>,
74}
75
76impl LargeListToCypherValueExpr {
77    fn new(child: Arc<dyn PhysicalExpr>) -> Self {
78        Self { child }
79    }
80}
81
82impl std::fmt::Display for LargeListToCypherValueExpr {
83    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
84        write!(f, "LargeListToCypherValue({})", self.child)
85    }
86}
87
88impl PartialEq for LargeListToCypherValueExpr {
89    fn eq(&self, other: &Self) -> bool {
90        Arc::ptr_eq(&self.child, &other.child)
91    }
92}
93
94impl Eq for LargeListToCypherValueExpr {}
95
96impl std::hash::Hash for LargeListToCypherValueExpr {
97    fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
98        // Hash based on type since we can't hash PhysicalExpr
99        std::any::type_name::<Self>().hash(_state);
100    }
101}
102
103impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
104    fn eq(&self, other: &dyn std::any::Any) -> bool {
105        other
106            .downcast_ref::<Self>()
107            .map(|x| self == x)
108            .unwrap_or(false)
109    }
110}
111
112impl PhysicalExpr for LargeListToCypherValueExpr {
113    fn as_any(&self) -> &dyn std::any::Any {
114        self
115    }
116
117    fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
118        Ok(DataType::LargeBinary)
119    }
120
121    fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
122        self.child.nullable(input_schema)
123    }
124
125    fn evaluate(
126        &self,
127        batch: &arrow_array::RecordBatch,
128    ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
129        use datafusion::arrow::compute::cast;
130        use datafusion::logical_expr::ColumnarValue;
131
132        let child_result = self.child.evaluate(batch)?;
133        let child_array = child_result.into_array(batch.num_rows())?;
134
135        // Normalize List → LargeList (pattern from quantifier.rs:182-189)
136        let list_array = if let DataType::List(field) = child_array.data_type() {
137            let target_type = DataType::LargeList(field.clone());
138            cast(&child_array, &target_type).map_err(|e| {
139                datafusion::error::DataFusionError::Execution(format!(
140                    "List to LargeList cast failed: {e}"
141                ))
142            })?
143        } else {
144            child_array.clone()
145        };
146
147        // If already LargeBinary, pass through
148        if list_array.data_type() == &DataType::LargeBinary {
149            return Ok(ColumnarValue::Array(list_array));
150        }
151
152        // Convert LargeList to CypherValue
153        if let Some(large_list) = list_array
154            .as_any()
155            .downcast_ref::<datafusion::arrow::array::LargeListArray>()
156        {
157            let cv_array =
158                crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
159            Ok(ColumnarValue::Array(cv_array))
160        } else {
161            Err(datafusion::error::DataFusionError::Execution(format!(
162                "Expected List or LargeList, got {:?}",
163                list_array.data_type()
164            )))
165        }
166    }
167
168    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
169        vec![&self.child]
170    }
171
172    fn with_new_children(
173        self: Arc<Self>,
174        children: Vec<Arc<dyn PhysicalExpr>>,
175    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
176        if children.len() != 1 {
177            return Err(datafusion::error::DataFusionError::Execution(
178                "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
179            ));
180        }
181        Ok(Arc::new(LargeListToCypherValueExpr::new(
182            children[0].clone(),
183        )))
184    }
185
186    fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
187        write!(f, "LargeListToCypherValue({})", self.child)
188    }
189}
190
191/// Compiler for converting Cypher expressions directly to DataFusion Physical Expressions.
192pub struct CypherPhysicalExprCompiler<'a> {
193    state: &'a SessionState,
194    translation_ctx: Option<&'a TranslationContext>,
195    graph_ctx: Option<Arc<GraphExecutionContext>>,
196    uni_schema: Option<Arc<UniSchema>>,
197    /// Session context for EXISTS subquery execution.
198    session_ctx: Option<Arc<RwLock<SessionContext>>>,
199    /// Storage manager for EXISTS subquery execution.
200    storage: Option<Arc<StorageManager>>,
201    /// Query parameters for EXISTS subquery execution.
202    params: HashMap<String, Value>,
203}
204
205impl<'a> CypherPhysicalExprCompiler<'a> {
206    pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
207        Self {
208            state,
209            translation_ctx,
210            graph_ctx: None,
211            uni_schema: None,
212            session_ctx: None,
213            storage: None,
214            params: HashMap::new(),
215        }
216    }
217
218    /// Build a scoped compiler that excludes the given variables from the translation context.
219    ///
220    /// When compiling inner expressions for list comprehensions, reduce, or quantifiers,
221    /// loop variables must be removed from `variable_kinds` so that property access on
222    /// them does not incorrectly generate flat columns that don't exist in the inner schema.
223    ///
224    /// If none of the `exclude_vars` are present in the current context, the returned
225    /// compiler simply reuses `self`'s translation context unchanged.
226    ///
227    /// The caller must own `scoped_ctx_slot` and keep it alive for the returned compiler's
228    /// lifetime.
229    fn scoped_compiler<'b>(
230        &'b self,
231        exclude_vars: &[&str],
232        scoped_ctx_slot: &'b mut Option<TranslationContext>,
233    ) -> CypherPhysicalExprCompiler<'b>
234    where
235        'a: 'b,
236    {
237        let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
238            exclude_vars
239                .iter()
240                .any(|v| ctx.variable_kinds.contains_key(*v))
241        });
242
243        let ctx_ref = if needs_scoping {
244            let ctx = self.translation_ctx.unwrap();
245            let mut new_kinds = ctx.variable_kinds.clone();
246            for v in exclude_vars {
247                new_kinds.remove(*v);
248            }
249            *scoped_ctx_slot = Some(TranslationContext {
250                parameters: ctx.parameters.clone(),
251                outer_values: ctx.outer_values.clone(),
252                variable_labels: ctx.variable_labels.clone(),
253                variable_kinds: new_kinds,
254                node_variable_hints: ctx.node_variable_hints.clone(),
255                mutation_edge_hints: ctx.mutation_edge_hints.clone(),
256                statement_time: ctx.statement_time,
257            });
258            scoped_ctx_slot.as_ref()
259        } else {
260            self.translation_ctx
261        };
262
263        CypherPhysicalExprCompiler {
264            state: self.state,
265            translation_ctx: ctx_ref,
266            graph_ctx: self.graph_ctx.clone(),
267            uni_schema: self.uni_schema.clone(),
268            session_ctx: self.session_ctx.clone(),
269            storage: self.storage.clone(),
270            params: self.params.clone(),
271        }
272    }
273
274    /// Attach graph context and schema for pattern comprehension support.
275    pub fn with_graph_ctx(
276        mut self,
277        graph_ctx: Arc<GraphExecutionContext>,
278        uni_schema: Arc<UniSchema>,
279    ) -> Self {
280        self.graph_ctx = Some(graph_ctx);
281        self.uni_schema = Some(uni_schema);
282        self
283    }
284
285    /// Attach full subquery context for EXISTS support.
286    pub fn with_subquery_ctx(
287        mut self,
288        graph_ctx: Arc<GraphExecutionContext>,
289        uni_schema: Arc<UniSchema>,
290        session_ctx: Arc<RwLock<SessionContext>>,
291        storage: Arc<StorageManager>,
292        params: HashMap<String, Value>,
293    ) -> Self {
294        self.graph_ctx = Some(graph_ctx);
295        self.uni_schema = Some(uni_schema);
296        self.session_ctx = Some(session_ctx);
297        self.storage = Some(storage);
298        self.params = params;
299        self
300    }
301
302    /// Compile a Cypher expression into a DataFusion PhysicalExpr.
303    pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
304        match expr {
305            Expr::ListComprehension {
306                variable,
307                list,
308                where_clause,
309                map_expr,
310            } => self.compile_list_comprehension(
311                variable,
312                list,
313                where_clause.as_deref(),
314                map_expr,
315                input_schema,
316            ),
317            Expr::Quantifier {
318                quantifier,
319                variable,
320                list,
321                predicate,
322            } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
323            Expr::Reduce {
324                accumulator,
325                init,
326                variable,
327                list,
328                expr: expression,
329            } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
330            // For BinaryOp, check if children contain custom expressions or CypherValue types
331            Expr::BinaryOp { left, op, right } => {
332                self.compile_binary_op_dispatch(left, op, right, input_schema)
333            }
334            Expr::UnaryOp { op, expr: inner } => {
335                if matches!(op, UnaryOp::Not) {
336                    let mut inner_phy = self.compile(inner, input_schema)?;
337                    if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
338                        inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
339                    }
340                    self.compile_unary_op(op, inner_phy, input_schema)
341                } else if Self::contains_custom_expr(inner) {
342                    let inner_phy = self.compile(inner, input_schema)?;
343                    self.compile_unary_op(op, inner_phy, input_schema)
344                } else {
345                    self.compile_standard(expr, input_schema)
346                }
347            }
348            Expr::IsNull(inner) => {
349                if Self::contains_custom_expr(inner) {
350                    let inner_phy = self.compile(inner, input_schema)?;
351                    Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
352                        .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
353                } else {
354                    self.compile_standard(expr, input_schema)
355                }
356            }
357            Expr::IsNotNull(inner) => {
358                if Self::contains_custom_expr(inner) {
359                    let inner_phy = self.compile(inner, input_schema)?;
360                    Ok(
361                        datafusion::physical_expr::expressions::is_not_null(inner_phy)
362                            .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
363                    )
364                } else {
365                    self.compile_standard(expr, input_schema)
366                }
367            }
368            // In operator is Expr::In { expr, list }
369            Expr::In {
370                expr: left,
371                list: right,
372            } => {
373                if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
374                    let left_phy = self.compile(left, input_schema)?;
375                    let right_phy = self.compile(right, input_schema)?;
376
377                    let left_type = left_phy
378                        .data_type(input_schema)
379                        .unwrap_or(DataType::LargeBinary);
380                    let right_type = right_phy
381                        .data_type(input_schema)
382                        .unwrap_or(DataType::LargeBinary);
383
384                    self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
385                        .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
386                } else {
387                    self.compile_standard(expr, input_schema)
388                }
389            }
390
391            // Recursively check other composite types if necessary.
392            Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
393                "List literals containing comprehensions not yet supported in compiler"
394            )),
395            Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
396                Err(anyhow!(
397                    "Map literals containing comprehensions not yet supported in compiler"
398                ))
399            }
400
401            // Property access on a struct column — e.g. `x.a` where `x` is Struct
402            Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
403
404            // Bracket access on a struct column — e.g. `x['a']` where `x` is Struct
405            Expr::ArrayIndex { array, index } => {
406                self.compile_array_index(array, index, input_schema)
407            }
408
409            // Pattern comprehension: [(a)-[:REL]->(b) WHERE pred | expr]
410            Expr::PatternComprehension {
411                path_variable,
412                pattern,
413                where_clause,
414                map_expr,
415            } => self.compile_pattern_comprehension(
416                path_variable,
417                pattern,
418                where_clause.as_deref(),
419                map_expr,
420                input_schema,
421            ),
422
423            // EXISTS subquery: plan + execute per row, return boolean
424            Expr::Exists { query, .. } => self.compile_exists(query),
425
426            // FunctionCall wrapping a custom expression (e.g. size(comprehension))
427            Expr::FunctionCall {
428                name,
429                args,
430                distinct,
431                ..
432            } => {
433                if name.eq_ignore_ascii_case("similar_to") {
434                    return self.compile_similar_to(args, input_schema);
435                }
436                if args.iter().any(Self::contains_custom_expr) {
437                    self.compile_function_with_custom_args(name, args, *distinct, input_schema)
438                } else {
439                    self.compile_standard(expr, input_schema)
440                }
441            }
442
443            // CASE expression - dispatch based on whether it contains custom expressions
444            Expr::Case {
445                expr: case_operand,
446                when_then,
447                else_expr,
448            } => {
449                // Check if operand or any branch contains custom expressions
450                let has_custom = case_operand
451                    .as_deref()
452                    .is_some_and(Self::contains_custom_expr)
453                    || when_then.iter().any(|(w, t)| {
454                        Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
455                    })
456                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
457
458                if has_custom {
459                    // Use compile_case() for CypherValue boolean conversion and
460                    // LargeList/LargeBinary type unification
461                    self.compile_case(case_operand, when_then, else_expr, input_schema)
462                } else {
463                    // Standard compilation path - goes through apply_type_coercion which handles:
464                    // 1. Simple CASE → Generic CASE rewriting with cross-type equality
465                    // 2. Type coercion for CASE result branches
466                    // 3. Numeric widening for comparisons
467                    self.compile_standard(expr, input_schema)
468                }
469            }
470
471            // LabelCheck: delegate to standard compilation (uses cypher_expr_to_df)
472            Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
473
474            // Default to standard compilation for leaf nodes or non-custom trees
475            _ => self.compile_standard(expr, input_schema),
476        }
477    }
478
479    /// Dispatch binary op compilation, checking for custom expressions and CypherValue types.
480    fn compile_binary_op_dispatch(
481        &self,
482        left: &Expr,
483        op: &BinaryOp,
484        right: &Expr,
485        input_schema: &Schema,
486    ) -> Result<Arc<dyn PhysicalExpr>> {
487        if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
488            && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
489            && let Some(ctx) = self.translation_ctx
490            && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
491        {
492            let identity_prop = match (lk, rk) {
493                (VariableKind::Node, VariableKind::Node) => Some("_vid"),
494                (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
495                _ => None,
496            };
497
498            if let Some(id_prop) = identity_prop {
499                return self.compile_standard(
500                    &Expr::BinaryOp {
501                        left: Box::new(Expr::Property(
502                            Box::new(Expr::Variable(lv.clone())),
503                            id_prop.to_string(),
504                        )),
505                        op: *op,
506                        right: Box::new(Expr::Property(
507                            Box::new(Expr::Variable(rv.clone())),
508                            id_prop.to_string(),
509                        )),
510                    },
511                    input_schema,
512                );
513            }
514        }
515
516        // XOR and Pow: always route through compile_standard.
517        // compile_binary_op does not support these operators. The standard path
518        // correctly maps XOR → _cypher_xor UDF and Pow → power() function.
519        if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
520            return self.compile_standard(
521                &Expr::BinaryOp {
522                    left: Box::new(left.clone()),
523                    op: *op,
524                    right: Box::new(right.clone()),
525                },
526                input_schema,
527            );
528        }
529
530        if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
531            let left_phy = self.compile(left, input_schema)?;
532            let right_phy = self.compile(right, input_schema)?;
533            return self.compile_binary_op(op, left_phy, right_phy, input_schema);
534        }
535
536        // For Add with a list-producing operand (AST-level detection),
537        // compile through the standard path which uses cypher_expr_to_df
538        // to correctly route list + scalar to _cypher_list_concat.
539        if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
540        {
541            return self.compile_standard(
542                &Expr::BinaryOp {
543                    left: Box::new(left.clone()),
544                    op: *op,
545                    right: Box::new(right.clone()),
546                },
547                input_schema,
548            );
549        }
550
551        // Compile sub-expressions to check their types. If either operand
552        // produces LargeBinary (CypherValue), standard Arrow kernels will fail at
553        // runtime for comparisons. Route through compile_binary_op which
554        // dispatches to Cypher comparison UDFs.
555        let left_phy = self.compile(left, input_schema)?;
556        let right_phy = self.compile(right, input_schema)?;
557        let left_dt = left_phy.data_type(input_schema).ok();
558        let right_dt = right_phy.data_type(input_schema).ok();
559        let has_cv =
560            is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
561
562        if has_cv {
563            // CypherValue types need special handling via compile_binary_op
564            self.compile_binary_op(op, left_phy, right_phy, input_schema)
565        } else {
566            // Standard types: use compile_standard to get proper type coercion
567            // (e.g., Int64 == Float64 requires coercion to work)
568            self.compile_standard(
569                &Expr::BinaryOp {
570                    left: Box::new(left.clone()),
571                    op: *op,
572                    right: Box::new(right.clone()),
573                },
574                input_schema,
575            )
576        }
577    }
578
579    /// Try to compile struct field access for a variable.
580    ///
581    /// Returns `Some(expr)` if the variable is a Struct column and can be accessed,
582    /// `None` if fallback to standard compilation is needed.
583    fn try_compile_struct_field(
584        &self,
585        var_name: &str,
586        field_name: &str,
587        input_schema: &Schema,
588    ) -> Option<Arc<dyn PhysicalExpr>> {
589        let col_idx = input_schema.index_of(var_name).ok()?;
590        let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
591            return None;
592        };
593
594        // Cypher semantics: accessing a missing key returns null
595        if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
596            let output_type = struct_fields[field_idx].data_type().clone();
597            let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
598                datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
599            );
600            Some(Arc::new(StructFieldAccessExpr::new(
601                col_expr,
602                field_idx,
603                output_type,
604            )))
605        } else {
606            Some(Arc::new(
607                datafusion::physical_expr::expressions::Literal::new(
608                    datafusion::common::ScalarValue::Null,
609                ),
610            ))
611        }
612    }
613
614    /// Compile property access on a struct column (e.g. `x.a` where `x` is Struct).
615    fn compile_property_access(
616        &self,
617        base: &Expr,
618        prop: &str,
619        input_schema: &Schema,
620    ) -> Result<Arc<dyn PhysicalExpr>> {
621        if let Expr::Variable(var_name) = base {
622            // 1. Try struct field access (e.g. `x.a` where `x` is a Struct column)
623            if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
624                return Ok(expr);
625            }
626            // 2. Try flat column "{var}.{prop}" (for pattern comprehension inner schemas)
627            let flat_col = format!("{}.{}", var_name, prop);
628            if let Ok(col_idx) = input_schema.index_of(&flat_col) {
629                return Ok(Arc::new(
630                    datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
631                ));
632            }
633        }
634        self.compile_standard(
635            &Expr::Property(Box::new(base.clone()), prop.to_string()),
636            input_schema,
637        )
638    }
639
640    /// Compile bracket access on a struct column (e.g. `x['a']` where `x` is Struct).
641    fn compile_array_index(
642        &self,
643        array: &Expr,
644        index: &Expr,
645        input_schema: &Schema,
646    ) -> Result<Arc<dyn PhysicalExpr>> {
647        if let Expr::Variable(var_name) = array
648            && let Expr::Literal(CypherLiteral::String(prop)) = index
649            && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
650        {
651            return Ok(expr);
652        }
653        self.compile_standard(
654            &Expr::ArrayIndex {
655                array: Box::new(array.clone()),
656                index: Box::new(index.clone()),
657            },
658            input_schema,
659        )
660    }
661
662    /// Compile EXISTS subquery expression.
663    fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
664        // 7.1: Validate no mutation clauses in EXISTS body
665        if has_mutation_clause(query) {
666            return Err(anyhow!(
667                "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
668            ));
669        }
670
671        let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
672
673        let graph_ctx = self
674            .graph_ctx
675            .clone()
676            .ok_or_else(|| err("GraphExecutionContext"))?;
677        let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
678        let session_ctx = self
679            .session_ctx
680            .clone()
681            .ok_or_else(|| err("SessionContext"))?;
682        let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
683
684        Ok(Arc::new(ExistsExecExpr::new(
685            query.clone(),
686            graph_ctx,
687            session_ctx,
688            storage,
689            uni_schema,
690            self.params.clone(),
691        )))
692    }
693
694    /// Compile a `similar_to(sources, queries [, options])` call into a `SimilarToExecExpr`.
695    fn compile_similar_to(
696        &self,
697        args: &[Expr],
698        input_schema: &Schema,
699    ) -> Result<Arc<dyn PhysicalExpr>> {
700        if args.len() < 2 || args.len() > 3 {
701            return Err(SimilarToError::InvalidArity { count: args.len() }.into());
702        }
703
704        let graph_ctx = self
705            .graph_ctx
706            .clone()
707            .ok_or(SimilarToError::NoGraphContext)?;
708
709        // Extract variable name and property names from the source expression.
710        let source_variable = extract_source_variable(&args[0]);
711        let source_property_names = extract_property_names(&args[0]);
712
713        // Normalize sources and queries: if args[0] is a List, split into
714        // individual source/query pairs for multi-source scoring.
715        let (source_exprs, query_exprs) = normalize_similar_to_args(&args[0], &args[1]);
716
717        // Compile each source and query expression
718        let source_children: Vec<Arc<dyn PhysicalExpr>> = source_exprs
719            .iter()
720            .map(|e| self.compile(e, input_schema))
721            .collect::<Result<Vec<_>>>()?;
722        let query_children: Vec<Arc<dyn PhysicalExpr>> = query_exprs
723            .iter()
724            .map(|e| self.compile(e, input_schema))
725            .collect::<Result<Vec<_>>>()?;
726
727        // Compile options if present
728        let options_child = if args.len() == 3 {
729            Some(self.compile(&args[2], input_schema)?)
730        } else {
731            None
732        };
733
734        // Resolve per-source distance metrics from schema at compile time.
735        let source_metrics: Vec<Option<DistanceMetric>> = source_property_names
736            .iter()
737            .map(|prop_name| {
738                prop_name.as_ref().and_then(|prop| {
739                    self.uni_schema
740                        .as_ref()
741                        .and_then(|schema| resolve_metric_for_property(schema, prop))
742                })
743            })
744            .collect();
745
746        Ok(Arc::new(SimilarToExecExpr::new(
747            source_children,
748            query_children,
749            options_child,
750            graph_ctx,
751            source_variable,
752            source_property_names,
753            source_metrics,
754        )))
755    }
756
757    /// Check if map_expr or where_clause contains a pattern comprehension that references the variable.
758    fn needs_vid_extraction_for_variable(
759        variable: &str,
760        map_expr: &Expr,
761        where_clause: Option<&Expr>,
762    ) -> bool {
763        fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
764            match expr {
765                Expr::PatternComprehension { pattern, .. } => {
766                    // Check if pattern uses the variable
767                    pattern.paths.iter().any(|path| {
768                        path.elements.iter().any(|elem| match elem {
769                            uni_cypher::ast::PatternElement::Node(n) => {
770                                n.variable.as_deref() == Some(var)
771                            }
772                            uni_cypher::ast::PatternElement::Relationship(r) => {
773                                r.variable.as_deref() == Some(var)
774                            }
775                            _ => false,
776                        })
777                    })
778                }
779                Expr::FunctionCall { args, .. } => args
780                    .iter()
781                    .any(|a| expr_has_pattern_comp_referencing(a, var)),
782                Expr::BinaryOp { left, right, .. } => {
783                    expr_has_pattern_comp_referencing(left, var)
784                        || expr_has_pattern_comp_referencing(right, var)
785                }
786                Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
787                    expr_has_pattern_comp_referencing(e, var)
788                }
789                Expr::List(items) => items
790                    .iter()
791                    .any(|i| expr_has_pattern_comp_referencing(i, var)),
792                Expr::ListComprehension {
793                    list,
794                    map_expr,
795                    where_clause,
796                    ..
797                } => {
798                    expr_has_pattern_comp_referencing(list, var)
799                        || expr_has_pattern_comp_referencing(map_expr, var)
800                        || where_clause
801                            .as_ref()
802                            .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
803                }
804                _ => false,
805            }
806        }
807
808        expr_has_pattern_comp_referencing(map_expr, variable)
809            || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
810    }
811
812    /// Check if an expression tree contains nodes that require custom compilation.
813    pub fn contains_custom_expr(expr: &Expr) -> bool {
814        match expr {
815            Expr::ListComprehension { .. } => true,
816            Expr::Quantifier { .. } => true,
817            Expr::Reduce { .. } => true,
818            Expr::PatternComprehension { .. } => true,
819            Expr::BinaryOp { left, right, .. } => {
820                Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
821            }
822            Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
823            Expr::FunctionCall { name, args, .. } => {
824                name.eq_ignore_ascii_case("similar_to")
825                    || args.iter().any(Self::contains_custom_expr)
826            }
827            Expr::Case {
828                when_then,
829                else_expr,
830                ..
831            } => {
832                when_then
833                    .iter()
834                    .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
835                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
836            }
837            Expr::List(items) => items.iter().any(Self::contains_custom_expr),
838            Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
839            Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
840            Expr::In { expr: l, list: r } => {
841                Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
842            }
843            Expr::Exists { .. } => true,
844            Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
845            _ => false,
846        }
847    }
848
849    /// Check if an expression statically produces a list value.
850    /// Used to route `Add` operations to `_cypher_list_concat` instead of arithmetic.
851    fn is_list_producing(expr: &Expr) -> bool {
852        match expr {
853            Expr::List(_) => true,
854            Expr::ListComprehension { .. } => true,
855            Expr::ArraySlice { .. } => true,
856            // Add with a list-producing child produces a list
857            Expr::BinaryOp {
858                left,
859                op: BinaryOp::Add,
860                right,
861            } => Self::is_list_producing(left) || Self::is_list_producing(right),
862            Expr::FunctionCall { name, .. } => {
863                // Functions known to return lists
864                matches!(
865                    name.as_str(),
866                    "range"
867                        | "tail"
868                        | "reverse"
869                        | "collect"
870                        | "keys"
871                        | "labels"
872                        | "nodes"
873                        | "relationships"
874                )
875            }
876            _ => false,
877        }
878    }
879
880    fn compile_standard(
881        &self,
882        expr: &Expr,
883        input_schema: &Schema,
884    ) -> Result<Arc<dyn PhysicalExpr>> {
885        // Pre-resolve Property(Variable(v), p) → Variable("v.p") when the flat
886        // column "v.p" exists in the input schema.  This prevents DataFusion's
887        // standard path from compiling it as ArrayIndex(column(v), "p"), which
888        // fails when the column stores a VID integer rather than a Map/Struct.
889        let resolved = Self::resolve_flat_column_properties(expr, input_schema);
890        let df_expr = cypher_expr_to_df(&resolved, self.translation_ctx)?;
891        let resolved_expr = self.resolve_udfs(df_expr)?;
892
893        let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
894
895        // Apply type coercion to resolve type mismatches
896        let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
897
898        // Re-resolve UDFs after coercion (coercion may introduce new dummy UDF calls)
899        let coerced_expr = self.resolve_udfs(coerced_expr)?;
900
901        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
902        planner
903            .create_physical_expr(&coerced_expr, &df_schema, self.state)
904            .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
905    }
906
907    /// Recursively resolve `Property(Variable(v), p)` → `Variable("v.p")` when
908    /// the flat column `"v.p"` exists in `schema`.  This ensures that property
909    /// access on graph-scan variables is compiled as a direct column reference
910    /// rather than an ArrayIndex UDF call, which would fail when the variable
911    /// column holds a VID integer instead of a Map/Node struct.
912    fn resolve_flat_column_properties(expr: &Expr, schema: &Schema) -> Expr {
913        match expr {
914            Expr::Property(base, prop) => {
915                if let Expr::Variable(var) = base.as_ref() {
916                    let flat_col = format!("{}.{}", var, prop);
917                    if schema.index_of(&flat_col).is_ok() {
918                        return Expr::Variable(flat_col);
919                    }
920                }
921                // Recurse into the base expression
922                Expr::Property(
923                    Box::new(Self::resolve_flat_column_properties(base, schema)),
924                    prop.clone(),
925                )
926            }
927            Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
928                left: Box::new(Self::resolve_flat_column_properties(left, schema)),
929                op: *op,
930                right: Box::new(Self::resolve_flat_column_properties(right, schema)),
931            },
932            Expr::FunctionCall {
933                name,
934                args,
935                distinct,
936                window_spec,
937            } => Expr::FunctionCall {
938                name: name.clone(),
939                args: args
940                    .iter()
941                    .map(|a| Self::resolve_flat_column_properties(a, schema))
942                    .collect(),
943                distinct: *distinct,
944                window_spec: window_spec.clone(),
945            },
946            Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
947                op: *op,
948                expr: Box::new(Self::resolve_flat_column_properties(inner, schema)),
949            },
950            Expr::List(items) => Expr::List(
951                items
952                    .iter()
953                    .map(|i| Self::resolve_flat_column_properties(i, schema))
954                    .collect(),
955            ),
956            // For all other expression types, return as-is (literals, variables, etc.)
957            other => other.clone(),
958        }
959    }
960
961    /// Resolve UDFs in DataFusion expression using the session state registry.
962    ///
963    /// Uses `TreeNode::transform_up` to traverse the entire expression tree,
964    /// ensuring UDFs inside Cast, Case, InList, Between, etc. are all resolved.
965    fn resolve_udfs(
966        &self,
967        expr: datafusion::logical_expr::Expr,
968    ) -> Result<datafusion::logical_expr::Expr> {
969        use datafusion::common::tree_node::{Transformed, TreeNode};
970        use datafusion::logical_expr::Expr as DfExpr;
971
972        let result = expr
973            .transform_up(|node| {
974                if let DfExpr::ScalarFunction(ref func) = node {
975                    let udf_name = func.func.name();
976                    if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
977                        return Ok(Transformed::yes(DfExpr::ScalarFunction(
978                            datafusion::logical_expr::expr::ScalarFunction {
979                                func: registered_udf.clone(),
980                                args: func.args.clone(),
981                            },
982                        )));
983                    }
984                }
985                Ok(Transformed::no(node))
986            })
987            .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
988        Ok(result.data)
989    }
990
991    fn compile_list_comprehension(
992        &self,
993        variable: &str,
994        list: &Expr,
995        where_clause: Option<&Expr>,
996        map_expr: &Expr,
997        input_schema: &Schema,
998    ) -> Result<Arc<dyn PhysicalExpr>> {
999        let input_list_phy = self.compile(list, input_schema)?;
1000
1001        // Resolve input list type
1002        let list_data_type = input_list_phy.data_type(input_schema)?;
1003        let inner_data_type = resolve_list_element_type(
1004            &list_data_type,
1005            DataType::LargeBinary,
1006            "List comprehension",
1007        )?;
1008
1009        // Create inner schema with loop variable (shadow outer variable if same name)
1010        let mut fields = input_schema.fields().to_vec();
1011        let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
1012
1013        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1014            fields[pos] = loop_var_field;
1015        } else {
1016            fields.push(loop_var_field);
1017        }
1018
1019        // Check if we need VID extraction for nested pattern comprehensions
1020        let needs_vid_extraction =
1021            Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
1022        if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
1023            // Add a {variable}._vid field for VID extraction
1024            let vid_field = Arc::new(Field::new(
1025                format!("{}._vid", variable),
1026                DataType::UInt64,
1027                true,
1028            ));
1029            fields.push(vid_field);
1030        }
1031
1032        let inner_schema = Arc::new(Schema::new(fields));
1033
1034        // Compile inner expressions with scoped translation context
1035        let mut scoped_ctx = None;
1036        let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1037
1038        let predicate_phy = if let Some(pred) = where_clause {
1039            Some(inner_compiler.compile(pred, &inner_schema)?)
1040        } else {
1041            None
1042        };
1043
1044        let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
1045        let output_item_type = map_phy.data_type(&inner_schema)?;
1046
1047        Ok(Arc::new(ListComprehensionExecExpr::new(
1048            input_list_phy,
1049            map_phy,
1050            predicate_phy,
1051            variable.to_string(),
1052            Arc::new(input_schema.clone()),
1053            output_item_type,
1054            needs_vid_extraction,
1055        )))
1056    }
1057
1058    fn compile_reduce(
1059        &self,
1060        accumulator: &str,
1061        initial: &Expr,
1062        variable: &str,
1063        list: &Expr,
1064        reduce_expr: &Expr,
1065        input_schema: &Schema,
1066    ) -> Result<Arc<dyn PhysicalExpr>> {
1067        let list_phy = self.compile(list, input_schema)?;
1068
1069        let initial_phy = self.compile(initial, input_schema)?;
1070        let acc_type = initial_phy.data_type(input_schema)?;
1071
1072        let list_data_type = list_phy.data_type(input_schema)?;
1073        // For LargeBinary (CypherValue arrays), use the accumulator type as element type so the
1074        // reduce body expression compiles correctly (e.g. acc + x where both are Int64).
1075        let inner_data_type =
1076            resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
1077
1078        // Create inner schema with accumulator and loop variable (shadow outer variables if same names)
1079        let mut fields = input_schema.fields().to_vec();
1080
1081        let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
1082        if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
1083            fields[pos] = acc_field;
1084        } else {
1085            fields.push(acc_field);
1086        }
1087
1088        let var_field = Arc::new(Field::new(variable, inner_data_type, true));
1089        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1090            fields[pos] = var_field;
1091        } else {
1092            fields.push(var_field);
1093        }
1094
1095        let inner_schema = Arc::new(Schema::new(fields));
1096
1097        // Compile reduce expression with scoped translation context
1098        let mut scoped_ctx = None;
1099        let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
1100
1101        let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
1102        let output_type = reduce_phy.data_type(&inner_schema)?;
1103
1104        Ok(Arc::new(ReduceExecExpr::new(
1105            accumulator.to_string(),
1106            initial_phy,
1107            variable.to_string(),
1108            list_phy,
1109            reduce_phy,
1110            Arc::new(input_schema.clone()),
1111            output_type,
1112        )))
1113    }
1114
1115    fn compile_quantifier(
1116        &self,
1117        quantifier: &uni_cypher::ast::Quantifier,
1118        variable: &str,
1119        list: &Expr,
1120        predicate: &Expr,
1121        input_schema: &Schema,
1122    ) -> Result<Arc<dyn PhysicalExpr>> {
1123        let input_list_phy = self.compile(list, input_schema)?;
1124
1125        // Resolve element type from list type
1126        let list_data_type = input_list_phy.data_type(input_schema)?;
1127        let inner_data_type =
1128            resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
1129
1130        // Create inner schema with loop variable
1131        // If a field with the same name exists in the outer schema, replace it (shadow it)
1132        // to ensure the loop variable takes precedence.
1133        let mut fields = input_schema.fields().to_vec();
1134        let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1135
1136        // Find and replace existing field with same name, or append if not found
1137        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1138            fields[pos] = loop_var_field;
1139        } else {
1140            fields.push(loop_var_field);
1141        }
1142
1143        let inner_schema = Arc::new(Schema::new(fields));
1144
1145        // Compile predicate with a scoped translation context that removes the loop variable
1146        // from variable_kinds, so property access on the loop variable doesn't incorrectly
1147        // generate flat columns (like "x.name") that don't exist in the inner schema.
1148        let mut scoped_ctx = None;
1149        let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1150
1151        let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1152
1153        // Wrap CypherValue predicates with _cv_to_bool for proper boolean evaluation
1154        if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1155            predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1156        }
1157
1158        let qt = match quantifier {
1159            uni_cypher::ast::Quantifier::All => QuantifierType::All,
1160            uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1161            uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1162            uni_cypher::ast::Quantifier::None => QuantifierType::None,
1163        };
1164
1165        Ok(Arc::new(QuantifierExecExpr::new(
1166            input_list_phy,
1167            predicate_phy,
1168            variable.to_string(),
1169            Arc::new(input_schema.clone()),
1170            qt,
1171        )))
1172    }
1173
1174    fn compile_pattern_comprehension(
1175        &self,
1176        path_variable: &Option<String>,
1177        pattern: &uni_cypher::ast::Pattern,
1178        where_clause: Option<&Expr>,
1179        map_expr: &Expr,
1180        input_schema: &Schema,
1181    ) -> Result<Arc<dyn PhysicalExpr>> {
1182        let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1183
1184        let graph_ctx = self
1185            .graph_ctx
1186            .as_ref()
1187            .ok_or_else(|| err("GraphExecutionContext"))?;
1188        let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1189
1190        // 1. Analyze pattern to get anchor column and traversal steps
1191        let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1192
1193        // 2. Collect needed properties from where_clause and map_expr
1194        let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1195
1196        // 3. Build inner schema
1197        let inner_schema = build_inner_schema(
1198            input_schema,
1199            &steps,
1200            &vertex_props,
1201            &edge_props,
1202            path_variable.as_deref(),
1203        );
1204
1205        // 4. Compile predicate and map_expr against inner schema
1206        let pred_phy = where_clause
1207            .map(|p| self.compile(p, &inner_schema))
1208            .transpose()?;
1209        let map_phy = self.compile(map_expr, &inner_schema)?;
1210        let output_type = map_phy.data_type(&inner_schema)?;
1211
1212        // 5. Return expression
1213        Ok(Arc::new(PatternComprehensionExecExpr::new(
1214            graph_ctx.clone(),
1215            anchor_col,
1216            steps,
1217            path_variable.clone(),
1218            pred_phy,
1219            map_phy,
1220            Arc::new(input_schema.clone()),
1221            Arc::new(inner_schema),
1222            output_type,
1223            vertex_props,
1224            edge_props,
1225        )))
1226    }
1227
1228    /// Compile a function call whose arguments contain custom expressions.
1229    ///
1230    /// Recursively compiles each argument via `self.compile()`, then looks up
1231    /// the corresponding UDF in the session registry and builds the physical
1232    /// expression.
1233    fn compile_function_with_custom_args(
1234        &self,
1235        name: &str,
1236        args: &[Expr],
1237        _distinct: bool,
1238        input_schema: &Schema,
1239    ) -> Result<Arc<dyn PhysicalExpr>> {
1240        // 1. Recursively compile each argument
1241        let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1242            .iter()
1243            .map(|arg| self.compile(arg, input_schema))
1244            .collect::<Result<Vec<_>>>()?;
1245
1246        // 2. Resolve UDF name and look it up in the registry
1247        let udf_name = Self::cypher_fn_to_udf(name);
1248        let udf = self
1249            .state
1250            .scalar_functions()
1251            .get(udf_name.as_str())
1252            .ok_or_else(|| {
1253                anyhow!(
1254                    "UDF '{}' not found in registry for function '{}'",
1255                    udf_name,
1256                    name
1257                )
1258            })?;
1259
1260        // 3. Build operand type list and dummy columns from compiled args
1261        let placeholders: &[&str] = &["__arg0__", "__arg1__", "__arg2__", "__argN__"];
1262        let operand_types: Vec<(&str, DataType)> = compiled_args
1263            .iter()
1264            .enumerate()
1265            .map(|(i, arg)| {
1266                let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1267                let placeholder = placeholders[i.min(3)];
1268                (placeholder, dt)
1269            })
1270            .collect();
1271
1272        let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1273            .iter()
1274            .map(|(name, _)| {
1275                datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1276                    None::<String>,
1277                    *name,
1278                ))
1279            })
1280            .collect();
1281
1282        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1283            datafusion::logical_expr::expr::ScalarFunction {
1284                func: udf.clone(),
1285                args: dummy_cols,
1286            },
1287        );
1288
1289        // 4. Plan and rebind
1290        self.plan_udf_physical_expr(
1291            &udf_expr,
1292            &operand_types,
1293            compiled_args,
1294            &format!("function {}", name),
1295        )
1296    }
1297
1298    /// Map a Cypher function name to the registered UDF name.
1299    ///
1300    /// Mirrors the mapping in `translate_function_call` from `df_expr.rs`.
1301    /// The registered UDF names are always lowercase.
1302    fn cypher_fn_to_udf(name: &str) -> String {
1303        match name.to_uppercase().as_str() {
1304            "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1305            "REVERSE" => "_cypher_reverse".to_string(),
1306            "TOSTRING" => "tostring".to_string(),
1307            "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1308            "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1309            "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1310            "HEAD" => "head".to_string(),
1311            "LAST" => "last".to_string(),
1312            "TAIL" => "tail".to_string(),
1313            "KEYS" => "keys".to_string(),
1314            "TYPE" => "type".to_string(),
1315            "PROPERTIES" => "properties".to_string(),
1316            "LABELS" => "labels".to_string(),
1317            "COALESCE" => "coalesce".to_string(),
1318            "ID" => "id".to_string(),
1319            // Fallback: lowercase the name (matches dummy_udf_expr behavior)
1320            _ => name.to_lowercase(),
1321        }
1322    }
1323
1324    /// Compile a CASE expression with custom sub-expressions in branches.
1325    ///
1326    /// Recursively compiles the operand, each when/then pair, and the else
1327    /// branch, then builds a `CaseExpr` physical expression.
1328    ///
1329    /// Applies two fixes for TCK compliance:
1330    /// 1. Wraps CypherValue WHEN conditions with `_cv_to_bool` for proper boolean evaluation
1331    /// 2. Unifies branch types when mixing LargeList and LargeBinary to avoid cast errors
1332    fn compile_case(
1333        &self,
1334        operand: &Option<Box<Expr>>,
1335        when_then: &[(Expr, Expr)],
1336        else_expr: &Option<Box<Expr>>,
1337        input_schema: &Schema,
1338    ) -> Result<Arc<dyn PhysicalExpr>> {
1339        let operand_phy = operand
1340            .as_deref()
1341            .map(|e| self.compile(e, input_schema))
1342            .transpose()?;
1343
1344        let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1345            .iter()
1346            .map(|(w, t)| {
1347                let w_phy = self.compile(w, input_schema)?;
1348                let t_phy = self.compile(t, input_schema)?;
1349                Ok((w_phy, t_phy))
1350            })
1351            .collect::<Result<Vec<_>>>()?;
1352
1353        let mut else_phy = else_expr
1354            .as_deref()
1355            .map(|e| self.compile(e, input_schema))
1356            .transpose()?;
1357
1358        // Wrap CypherValue WHEN conditions with _cv_to_bool for proper boolean evaluation
1359        for (w_phy, _) in &mut when_then_phy {
1360            if matches!(w_phy.data_type(input_schema), Ok(DataType::LargeBinary)) {
1361                *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1362            }
1363        }
1364
1365        // Unify branch types when mixing LargeList and LargeBinary
1366        let branch_types: Vec<DataType> = when_then_phy
1367            .iter()
1368            .map(|(_, t)| t.data_type(input_schema))
1369            .chain(else_phy.iter().map(|e| e.data_type(input_schema)))
1370            .filter_map(Result::ok)
1371            .collect();
1372
1373        let has_large_binary = branch_types.contains(&DataType::LargeBinary);
1374        let has_list = branch_types
1375            .iter()
1376            .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1377
1378        // If we have both, wrap List/LargeList branches with LargeListToCypherValueExpr
1379        if has_large_binary && has_list {
1380            for (_, t_phy) in &mut when_then_phy {
1381                if let Ok(dt) = t_phy.data_type(input_schema)
1382                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1383                {
1384                    *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1385                }
1386            }
1387            if let Some(e_phy) = else_phy.take() {
1388                if let Ok(dt) = e_phy.data_type(input_schema)
1389                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1390                {
1391                    else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1392                } else {
1393                    else_phy = Some(e_phy);
1394                }
1395            }
1396        }
1397
1398        let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1399            operand_phy,
1400            when_then_phy,
1401            else_phy,
1402        )
1403        .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1404
1405        Ok(Arc::new(case_expr))
1406    }
1407
1408    fn compile_binary_op(
1409        &self,
1410        op: &BinaryOp,
1411        left: Arc<dyn PhysicalExpr>,
1412        right: Arc<dyn PhysicalExpr>,
1413        input_schema: &Schema,
1414    ) -> Result<Arc<dyn PhysicalExpr>> {
1415        use datafusion::logical_expr::Operator;
1416
1417        // String operators use custom physical expr for safe type handling
1418        let string_op = match op {
1419            BinaryOp::StartsWith => Some(StringOp::StartsWith),
1420            BinaryOp::EndsWith => Some(StringOp::EndsWith),
1421            BinaryOp::Contains => Some(StringOp::Contains),
1422            _ => None,
1423        };
1424        if let Some(sop) = string_op {
1425            return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1426        }
1427
1428        let df_op = match op {
1429            BinaryOp::Add => Operator::Plus,
1430            BinaryOp::Sub => Operator::Minus,
1431            BinaryOp::Mul => Operator::Multiply,
1432            BinaryOp::Div => Operator::Divide,
1433            BinaryOp::Mod => Operator::Modulo,
1434            BinaryOp::Eq => Operator::Eq,
1435            BinaryOp::NotEq => Operator::NotEq,
1436            BinaryOp::Gt => Operator::Gt,
1437            BinaryOp::GtEq => Operator::GtEq,
1438            BinaryOp::Lt => Operator::Lt,
1439            BinaryOp::LtEq => Operator::LtEq,
1440            BinaryOp::And => Operator::And,
1441            BinaryOp::Or => Operator::Or,
1442            BinaryOp::Xor => {
1443                return Err(anyhow!(
1444                    "XOR not supported via binary helper, use bitwise_xor"
1445                ));
1446            }
1447            BinaryOp::Regex => Operator::RegexMatch,
1448            BinaryOp::ApproxEq => {
1449                return Err(anyhow!(
1450                    "ApproxEq (~=) not yet supported in physical compiler"
1451                ));
1452            }
1453            BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1454            _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1455        };
1456
1457        // When either operand is LargeBinary (CypherValue), standard Arrow comparison
1458        // kernels can't handle the type mismatch. Route through Cypher comparison
1459        // UDFs which decode CypherValue to Value for comparison.
1460        let mut left = left;
1461        let mut right = right;
1462        let mut left_type = left.data_type(input_schema).ok();
1463        let mut right_type = right.data_type(input_schema).ok();
1464
1465        // Type unification: if one side is LargeList and the other is LargeBinary,
1466        // convert LargeList to LargeBinary for consistent handling
1467        let left_is_list = matches!(
1468            left_type.as_ref(),
1469            Some(DataType::List(_) | DataType::LargeList(_))
1470        );
1471        let right_is_list = matches!(
1472            right_type.as_ref(),
1473            Some(DataType::List(_) | DataType::LargeList(_))
1474        );
1475
1476        if left_is_list && is_cypher_value_type(right_type.as_ref()) {
1477            left = Arc::new(LargeListToCypherValueExpr::new(left));
1478            left_type = Some(DataType::LargeBinary);
1479        } else if right_is_list && is_cypher_value_type(left_type.as_ref()) {
1480            right = Arc::new(LargeListToCypherValueExpr::new(right));
1481            right_type = Some(DataType::LargeBinary);
1482        }
1483
1484        let has_cv =
1485            is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1486
1487        if has_cv {
1488            if let Some(result) = self.compile_cv_comparison(
1489                df_op,
1490                left.clone(),
1491                right.clone(),
1492                &left_type,
1493                &right_type,
1494            )? {
1495                return Ok(result);
1496            }
1497            if let Some(result) = self.compile_cv_list_concat(
1498                left.clone(),
1499                right.clone(),
1500                &left_type,
1501                &right_type,
1502                df_op,
1503            )? {
1504                return Ok(result);
1505            }
1506            if let Some(result) = self.compile_cv_arithmetic(
1507                df_op,
1508                left.clone(),
1509                right.clone(),
1510                &left_type,
1511                &right_type,
1512                input_schema,
1513            )? {
1514                return Ok(result);
1515            }
1516        }
1517
1518        // Use DataFusion's binary physical expression creator which handles coercion
1519        binary(left, df_op, right, input_schema)
1520            .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1521    }
1522
1523    /// Compile CypherValue comparison using Cypher UDFs.
1524    fn compile_cv_comparison(
1525        &self,
1526        df_op: datafusion::logical_expr::Operator,
1527        left: Arc<dyn PhysicalExpr>,
1528        right: Arc<dyn PhysicalExpr>,
1529        left_type: &Option<DataType>,
1530        right_type: &Option<DataType>,
1531    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1532        use datafusion::logical_expr::Operator;
1533
1534        let udf_name = match df_op {
1535            Operator::Eq => "_cypher_equal",
1536            Operator::NotEq => "_cypher_not_equal",
1537            Operator::Gt => "_cypher_gt",
1538            Operator::GtEq => "_cypher_gt_eq",
1539            Operator::Lt => "_cypher_lt",
1540            Operator::LtEq => "_cypher_lt_eq",
1541            _ => return Ok(None),
1542        };
1543
1544        self.plan_binary_udf(
1545            udf_name,
1546            left,
1547            right,
1548            left_type.clone().unwrap_or(DataType::LargeBinary),
1549            right_type.clone().unwrap_or(DataType::LargeBinary),
1550        )
1551    }
1552
1553    /// Compile CypherValue list concatenation.
1554    fn compile_cv_list_concat(
1555        &self,
1556        left: Arc<dyn PhysicalExpr>,
1557        right: Arc<dyn PhysicalExpr>,
1558        left_type: &Option<DataType>,
1559        right_type: &Option<DataType>,
1560        df_op: datafusion::logical_expr::Operator,
1561    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1562        use datafusion::logical_expr::Operator;
1563
1564        if df_op != Operator::Plus {
1565            return Ok(None);
1566        }
1567
1568        // List concat when at least one side is a list (CypherValue or Arrow List)
1569        let is_list = |t: &Option<DataType>| {
1570            t.as_ref()
1571                .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1572        };
1573
1574        if !is_list(left_type) && !is_list(right_type) {
1575            return Ok(None);
1576        }
1577
1578        self.plan_binary_udf(
1579            "_cypher_list_concat",
1580            left,
1581            right,
1582            left_type.clone().unwrap_or(DataType::LargeBinary),
1583            right_type.clone().unwrap_or(DataType::LargeBinary),
1584        )
1585    }
1586
1587    /// Compile CypherValue arithmetic.
1588    ///
1589    /// Routes arithmetic operations through CypherValue-aware UDFs when at least
1590    /// one operand is LargeBinary (CypherValue-encoded).
1591    fn compile_cv_arithmetic(
1592        &self,
1593        df_op: datafusion::logical_expr::Operator,
1594        left: Arc<dyn PhysicalExpr>,
1595        right: Arc<dyn PhysicalExpr>,
1596        left_type: &Option<DataType>,
1597        right_type: &Option<DataType>,
1598        _input_schema: &Schema,
1599    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1600        use datafusion::logical_expr::Operator;
1601
1602        let udf_name = match df_op {
1603            Operator::Plus => "_cypher_add",
1604            Operator::Minus => "_cypher_sub",
1605            Operator::Multiply => "_cypher_mul",
1606            Operator::Divide => "_cypher_div",
1607            Operator::Modulo => "_cypher_mod",
1608            _ => return Ok(None),
1609        };
1610
1611        self.plan_binary_udf(
1612            udf_name,
1613            left,
1614            right,
1615            left_type.clone().unwrap_or(DataType::LargeBinary),
1616            right_type.clone().unwrap_or(DataType::LargeBinary),
1617        )
1618    }
1619
1620    /// Plan a UDF expression with dummy schema columns, then rebind to actual physical expressions.
1621    fn plan_udf_physical_expr(
1622        &self,
1623        udf_expr: &datafusion::logical_expr::Expr,
1624        operand_types: &[(&str, DataType)],
1625        children: Vec<Arc<dyn PhysicalExpr>>,
1626        error_context: &str,
1627    ) -> Result<Arc<dyn PhysicalExpr>> {
1628        let tmp_schema = Schema::new(
1629            operand_types
1630                .iter()
1631                .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1632                .collect::<Vec<_>>(),
1633        );
1634        let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1635        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1636        let udf_phy = planner
1637            .create_physical_expr(udf_expr, &df_schema, self.state)
1638            .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1639        udf_phy
1640            .with_new_children(children)
1641            .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1642    }
1643
1644    /// Wrap a LargeBinary (CypherValue) expression with `_cv_to_bool` conversion.
1645    ///
1646    /// Used when a CypherValue expression needs to be used as a boolean (e.g., in WHEN clauses).
1647    fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1648        let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1649            return Err(anyhow!("_cv_to_bool UDF not found"));
1650        };
1651
1652        let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1653            None::<String>,
1654            "__cv__",
1655        ));
1656        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1657            datafusion::logical_expr::expr::ScalarFunction {
1658                func: udf.clone(),
1659                args: vec![dummy_col],
1660            },
1661        );
1662
1663        self.plan_udf_physical_expr(
1664            &udf_expr,
1665            &[("__cv__", DataType::LargeBinary)],
1666            vec![expr],
1667            "CypherValue to bool",
1668        )
1669    }
1670
1671    /// Plan a binary UDF with the given name and operand types.
1672    fn plan_binary_udf(
1673        &self,
1674        udf_name: &str,
1675        left: Arc<dyn PhysicalExpr>,
1676        right: Arc<dyn PhysicalExpr>,
1677        left_type: DataType,
1678        right_type: DataType,
1679    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1680        let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1681            return Ok(None);
1682        };
1683        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1684            datafusion::logical_expr::expr::ScalarFunction {
1685                func: udf.clone(),
1686                args: vec![
1687                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1688                        None::<String>,
1689                        "__left__",
1690                    )),
1691                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1692                        None::<String>,
1693                        "__right__",
1694                    )),
1695                ],
1696            },
1697        );
1698        let result = self.plan_udf_physical_expr(
1699            &udf_expr,
1700            &[("__left__", left_type), ("__right__", right_type)],
1701            vec![left, right],
1702            udf_name,
1703        )?;
1704        Ok(Some(result))
1705    }
1706
1707    fn compile_unary_op(
1708        &self,
1709        op: &UnaryOp,
1710        expr: Arc<dyn PhysicalExpr>,
1711        input_schema: &Schema,
1712    ) -> Result<Arc<dyn PhysicalExpr>> {
1713        match op {
1714            UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1715            UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1716        }
1717        .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1718    }
1719}
1720
1721// ---------------------------------------------------------------------------
1722// similar_to() AST helpers
1723// ---------------------------------------------------------------------------
1724
1725/// Extract the base variable name from a source expression.
1726///
1727/// For `d.embedding` → `"d"`, for `[d.embedding, d.content]` → `"d"`.
1728fn extract_source_variable(expr: &Expr) -> Option<String> {
1729    match expr {
1730        Expr::Property(inner, _) => extract_source_variable(inner),
1731        Expr::Variable(name) => Some(name.clone()),
1732        Expr::List(items) => items.first().and_then(extract_source_variable),
1733        _ => None,
1734    }
1735}
1736
1737/// Extract property names from source expressions.
1738///
1739/// For `d.embedding` → `["embedding"]`, for `[d.embedding, d.content]` → `["embedding", "content"]`.
1740fn extract_property_names(expr: &Expr) -> Vec<Option<String>> {
1741    match expr {
1742        Expr::Property(_, prop) => vec![Some(prop.clone())],
1743        Expr::List(items) => items
1744            .iter()
1745            .map(|item| {
1746                if let Expr::Property(_, prop) = item {
1747                    Some(prop.clone())
1748                } else {
1749                    None
1750                }
1751            })
1752            .collect(),
1753        _ => vec![None],
1754    }
1755}
1756
1757/// Normalize similar_to arguments for multi-source scoring.
1758///
1759/// If source arg is a `List`, returns the individual items as separate source/query pairs.
1760/// Otherwise returns the args as-is (single pair).
1761fn normalize_similar_to_args<'e>(
1762    sources: &'e Expr,
1763    queries: &'e Expr,
1764) -> (Vec<&'e Expr>, Vec<&'e Expr>) {
1765    match (sources, queries) {
1766        // Multi-source: [d.embedding, d.content] paired with [query_vec, query_text]
1767        (Expr::List(src_items), Expr::List(qry_items)) if src_items.len() == qry_items.len() => {
1768            (src_items.iter().collect(), qry_items.iter().collect())
1769        }
1770        // Multi-source with broadcast query: [d.embedding, d.content] paired with single query
1771        (Expr::List(src_items), _) if src_items.len() > 1 => {
1772            let queries_broadcast: Vec<&Expr> = vec![queries; src_items.len()];
1773            (src_items.iter().collect(), queries_broadcast)
1774        }
1775        // Single source
1776        _ => (vec![sources], vec![queries]),
1777    }
1778}
1779
1780use datafusion::physical_plan::DisplayAs;
1781use datafusion::physical_plan::DisplayFormatType;
1782
1783#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1784enum StringOp {
1785    StartsWith,
1786    EndsWith,
1787    Contains,
1788}
1789
1790impl std::fmt::Display for StringOp {
1791    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1792        match self {
1793            StringOp::StartsWith => write!(f, "STARTS WITH"),
1794            StringOp::EndsWith => write!(f, "ENDS WITH"),
1795            StringOp::Contains => write!(f, "CONTAINS"),
1796        }
1797    }
1798}
1799
1800#[derive(Debug, Eq)]
1801struct CypherStringMatchExpr {
1802    left: Arc<dyn PhysicalExpr>,
1803    right: Arc<dyn PhysicalExpr>,
1804    op: StringOp,
1805}
1806
1807impl PartialEq for CypherStringMatchExpr {
1808    fn eq(&self, other: &Self) -> bool {
1809        self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1810    }
1811}
1812
1813impl std::hash::Hash for CypherStringMatchExpr {
1814    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1815        self.op.hash(state);
1816        self.left.hash(state);
1817        self.right.hash(state);
1818    }
1819}
1820
1821impl CypherStringMatchExpr {
1822    fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1823        Self { left, right, op }
1824    }
1825}
1826
1827impl std::fmt::Display for CypherStringMatchExpr {
1828    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1829        write!(f, "{} {} {}", self.left, self.op, self.right)
1830    }
1831}
1832
1833impl DisplayAs for CypherStringMatchExpr {
1834    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1835        write!(f, "{}", self)
1836    }
1837}
1838
1839impl PhysicalExpr for CypherStringMatchExpr {
1840    fn as_any(&self) -> &dyn std::any::Any {
1841        self
1842    }
1843
1844    fn data_type(
1845        &self,
1846        _input_schema: &Schema,
1847    ) -> datafusion::error::Result<arrow_schema::DataType> {
1848        Ok(arrow_schema::DataType::Boolean)
1849    }
1850
1851    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1852        Ok(true)
1853    }
1854
1855    fn evaluate(
1856        &self,
1857        batch: &arrow_array::RecordBatch,
1858    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1859        use crate::query::df_udfs::invoke_cypher_string_op;
1860        use arrow_schema::Field;
1861        use datafusion::config::ConfigOptions;
1862        use datafusion::logical_expr::ScalarFunctionArgs;
1863
1864        let left_val = self.left.evaluate(batch)?;
1865        let right_val = self.right.evaluate(batch)?;
1866
1867        let args = ScalarFunctionArgs {
1868            args: vec![left_val, right_val],
1869            number_rows: batch.num_rows(),
1870            return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
1871            config_options: Arc::new(ConfigOptions::default()),
1872            arg_fields: vec![], // Not used by invoke_cypher_string_op
1873        };
1874
1875        match self.op {
1876            StringOp::StartsWith => {
1877                invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
1878            }
1879            StringOp::EndsWith => {
1880                invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
1881            }
1882            StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
1883        }
1884    }
1885
1886    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1887        vec![&self.left, &self.right]
1888    }
1889
1890    fn with_new_children(
1891        self: Arc<Self>,
1892        children: Vec<Arc<dyn PhysicalExpr>>,
1893    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1894        Ok(Arc::new(CypherStringMatchExpr::new(
1895            children[0].clone(),
1896            children[1].clone(),
1897            self.op,
1898        )))
1899    }
1900
1901    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1902        write!(f, "{}", self)
1903    }
1904}
1905
1906impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
1907    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1908        if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
1909            self == other
1910        } else {
1911            false
1912        }
1913    }
1914}
1915
1916/// Physical expression for extracting a field from a struct column.
1917///
1918/// Used when list comprehension iterates over a list of structs (maps)
1919/// and accesses a field, e.g., `[x IN [{a: 1}] | x.a]`.
1920#[derive(Debug, Eq)]
1921struct StructFieldAccessExpr {
1922    /// Expression producing the struct column.
1923    input: Arc<dyn PhysicalExpr>,
1924    /// Index of the field within the struct.
1925    field_idx: usize,
1926    /// Output data type of the extracted field.
1927    output_type: arrow_schema::DataType,
1928}
1929
1930impl PartialEq for StructFieldAccessExpr {
1931    fn eq(&self, other: &Self) -> bool {
1932        self.field_idx == other.field_idx
1933            && self.input.eq(&other.input)
1934            && self.output_type == other.output_type
1935    }
1936}
1937
1938impl std::hash::Hash for StructFieldAccessExpr {
1939    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1940        self.input.hash(state);
1941        self.field_idx.hash(state);
1942    }
1943}
1944
1945impl StructFieldAccessExpr {
1946    fn new(
1947        input: Arc<dyn PhysicalExpr>,
1948        field_idx: usize,
1949        output_type: arrow_schema::DataType,
1950    ) -> Self {
1951        Self {
1952            input,
1953            field_idx,
1954            output_type,
1955        }
1956    }
1957}
1958
1959impl std::fmt::Display for StructFieldAccessExpr {
1960    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1961        write!(f, "{}[{}]", self.input, self.field_idx)
1962    }
1963}
1964
1965impl DisplayAs for StructFieldAccessExpr {
1966    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1967        write!(f, "{}", self)
1968    }
1969}
1970
1971impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
1972    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1973        if let Some(other) = other.as_any().downcast_ref::<Self>() {
1974            self.field_idx == other.field_idx && self.input.eq(&other.input)
1975        } else {
1976            false
1977        }
1978    }
1979}
1980
1981impl PhysicalExpr for StructFieldAccessExpr {
1982    fn as_any(&self) -> &dyn std::any::Any {
1983        self
1984    }
1985
1986    fn data_type(
1987        &self,
1988        _input_schema: &Schema,
1989    ) -> datafusion::error::Result<arrow_schema::DataType> {
1990        Ok(self.output_type.clone())
1991    }
1992
1993    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1994        Ok(true)
1995    }
1996
1997    fn evaluate(
1998        &self,
1999        batch: &arrow_array::RecordBatch,
2000    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2001        use arrow_array::StructArray;
2002
2003        let input_val = self.input.evaluate(batch)?;
2004        let array = input_val.into_array(batch.num_rows())?;
2005
2006        let struct_array = array
2007            .as_any()
2008            .downcast_ref::<StructArray>()
2009            .ok_or_else(|| {
2010                datafusion::error::DataFusionError::Execution(
2011                    "StructFieldAccessExpr: input is not a StructArray".to_string(),
2012                )
2013            })?;
2014
2015        let field_col = struct_array.column(self.field_idx).clone();
2016        Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
2017    }
2018
2019    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2020        vec![&self.input]
2021    }
2022
2023    fn with_new_children(
2024        self: Arc<Self>,
2025        children: Vec<Arc<dyn PhysicalExpr>>,
2026    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2027        Ok(Arc::new(StructFieldAccessExpr::new(
2028            children[0].clone(),
2029            self.field_idx,
2030            self.output_type.clone(),
2031        )))
2032    }
2033
2034    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2035        write!(f, "{}", self)
2036    }
2037}
2038
2039// ---------------------------------------------------------------------------
2040// EXISTS subquery physical expression
2041// ---------------------------------------------------------------------------
2042
2043/// Physical expression that evaluates an EXISTS subquery per row.
2044///
2045/// For each input row, plans and executes the subquery with the row's columns
2046/// injected as parameters. Returns `true` if the subquery produces any rows.
2047///
2048/// NOT EXISTS is handled by the caller wrapping this in a NOT expression.
2049/// Nested EXISTS works because `execute_subplan` creates a full planner that
2050/// handles nested EXISTS recursively.
2051struct ExistsExecExpr {
2052    query: Query,
2053    graph_ctx: Arc<GraphExecutionContext>,
2054    session_ctx: Arc<RwLock<SessionContext>>,
2055    storage: Arc<StorageManager>,
2056    uni_schema: Arc<UniSchema>,
2057    params: HashMap<String, Value>,
2058}
2059
2060impl std::fmt::Debug for ExistsExecExpr {
2061    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2062        f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
2063    }
2064}
2065
2066impl ExistsExecExpr {
2067    fn new(
2068        query: Query,
2069        graph_ctx: Arc<GraphExecutionContext>,
2070        session_ctx: Arc<RwLock<SessionContext>>,
2071        storage: Arc<StorageManager>,
2072        uni_schema: Arc<UniSchema>,
2073        params: HashMap<String, Value>,
2074    ) -> Self {
2075        Self {
2076            query,
2077            graph_ctx,
2078            session_ctx,
2079            storage,
2080            uni_schema,
2081            params,
2082        }
2083    }
2084}
2085
2086impl std::fmt::Display for ExistsExecExpr {
2087    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2088        write!(f, "EXISTS(<subquery>)")
2089    }
2090}
2091
2092impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
2093    fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
2094        false
2095    }
2096}
2097
2098impl PartialEq for ExistsExecExpr {
2099    fn eq(&self, _other: &Self) -> bool {
2100        false
2101    }
2102}
2103
2104impl Eq for ExistsExecExpr {}
2105
2106impl std::hash::Hash for ExistsExecExpr {
2107    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2108        "ExistsExecExpr".hash(state);
2109    }
2110}
2111
2112impl DisplayAs for ExistsExecExpr {
2113    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2114        write!(f, "{}", self)
2115    }
2116}
2117
2118impl PhysicalExpr for ExistsExecExpr {
2119    fn as_any(&self) -> &dyn std::any::Any {
2120        self
2121    }
2122
2123    fn data_type(
2124        &self,
2125        _input_schema: &Schema,
2126    ) -> datafusion::error::Result<arrow_schema::DataType> {
2127        Ok(DataType::Boolean)
2128    }
2129
2130    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2131        Ok(true)
2132    }
2133
2134    fn evaluate(
2135        &self,
2136        batch: &arrow_array::RecordBatch,
2137    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2138        let num_rows = batch.num_rows();
2139        let mut builder = BooleanBuilder::with_capacity(num_rows);
2140
2141        // 7.2: Extract entity variable names from batch schema.
2142        // Entity columns follow the pattern "varname._vid" (flattened) or are struct columns.
2143        // We pass ONLY entity base names (e.g., "p", "n", "m") as vars_in_scope so the
2144        // subquery planner treats them as bound (Imported) variables. The initial Project
2145        // then creates Parameter("n") AS "n" etc. — the traverse reads the VID from this
2146        // column via resolve_source_vid_col's bare-name fallback.
2147        //
2148        // We intentionally do NOT include raw column names (n._vid, n._labels, etc.) in
2149        // vars_in_scope to avoid duplicate column conflicts with traverse output columns.
2150        // Parameter expressions read directly from sub_params, not from plan columns.
2151        let schema = batch.schema();
2152        let mut entity_vars: HashSet<String> = HashSet::new();
2153        for field in schema.fields() {
2154            let name = field.name();
2155            if let Some(base) = name.strip_suffix("._vid") {
2156                entity_vars.insert(base.to_string());
2157            }
2158            if matches!(field.data_type(), DataType::Struct(_)) {
2159                entity_vars.insert(name.to_string());
2160            }
2161            // Also detect bare VID columns from parent EXISTS parameter projections.
2162            // In nested EXISTS, a parent level projects "n" as a bare Int64/UInt64 VID.
2163            // Simple identifier (no dots, no leading underscore) + integer type = VID.
2164            if !name.contains('.')
2165                && !name.starts_with('_')
2166                && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2167            {
2168                entity_vars.insert(name.to_string());
2169            }
2170        }
2171        let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2172
2173        // 7.3: Rewrite correlated property accesses to parameter references.
2174        // e.g., `n.prop` where `n` is an outer entity → `$param("n.prop")`
2175        let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2176
2177        // 7.4: Plan ONCE — the rewritten query is parameterized, same for all rows.
2178        let planner = QueryPlanner::new(self.uni_schema.clone());
2179        let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2180            Ok(plan) => plan,
2181            Err(e) => {
2182                return Err(datafusion::error::DataFusionError::Execution(format!(
2183                    "EXISTS subquery planning failed: {}",
2184                    e
2185                )));
2186            }
2187        };
2188
2189        // Execute all rows on a dedicated thread with a single tokio runtime.
2190        // The runtime must be created and dropped on this thread (not in an async context).
2191        let graph_ctx = self.graph_ctx.clone();
2192        let session_ctx = self.session_ctx.clone();
2193        let storage = self.storage.clone();
2194        let uni_schema = self.uni_schema.clone();
2195        let base_params = self.params.clone();
2196
2197        let result = std::thread::scope(|s| {
2198            s.spawn(|| {
2199                let rt = tokio::runtime::Builder::new_current_thread()
2200                    .enable_all()
2201                    .build()
2202                    .map_err(|e| {
2203                        datafusion::error::DataFusionError::Execution(format!(
2204                            "Failed to create runtime for EXISTS: {}",
2205                            e
2206                        ))
2207                    })?;
2208
2209                for row_idx in 0..num_rows {
2210                    let row_params = extract_row_params(batch, row_idx);
2211                    let mut sub_params = base_params.clone();
2212                    sub_params.extend(row_params);
2213
2214                    // Add entity variable → VID value mappings so that
2215                    // Parameter("n") resolves to the VID for traversal sources.
2216                    for var in &entity_vars {
2217                        let vid_key = format!("{}._vid", var);
2218                        if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2219                            sub_params.insert(var.clone(), vid_val);
2220                        }
2221                    }
2222
2223                    let batches = rt.block_on(execute_subplan(
2224                        &logical_plan,
2225                        &sub_params,
2226                        &HashMap::new(), // No outer values for EXISTS subquery
2227                        &graph_ctx,
2228                        &session_ctx,
2229                        &storage,
2230                        &uni_schema,
2231                    ))?;
2232
2233                    let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2234                    builder.append_value(has_rows);
2235                }
2236
2237                Ok::<_, datafusion::error::DataFusionError>(())
2238            })
2239            .join()
2240            .unwrap_or_else(|_| {
2241                Err(datafusion::error::DataFusionError::Execution(
2242                    "EXISTS subquery thread panicked".to_string(),
2243                ))
2244            })
2245        });
2246
2247        if let Err(e) = result {
2248            return Err(datafusion::error::DataFusionError::Execution(format!(
2249                "EXISTS subquery execution failed: {}",
2250                e
2251            )));
2252        }
2253
2254        Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2255            builder.finish(),
2256        )))
2257    }
2258
2259    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2260        vec![]
2261    }
2262
2263    fn with_new_children(
2264        self: Arc<Self>,
2265        children: Vec<Arc<dyn PhysicalExpr>>,
2266    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2267        if !children.is_empty() {
2268            return Err(datafusion::error::DataFusionError::Plan(
2269                "ExistsExecExpr has no children".to_string(),
2270            ));
2271        }
2272        Ok(self)
2273    }
2274
2275    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2276        write!(f, "{}", self)
2277    }
2278}
2279
2280// ---------------------------------------------------------------------------
2281// EXISTS subquery helpers
2282// ---------------------------------------------------------------------------
2283
2284/// Check if a Query contains any mutation clauses (CREATE, SET, DELETE, REMOVE, MERGE).
2285/// EXISTS subqueries must be read-only per OpenCypher spec.
2286fn has_mutation_clause(query: &Query) -> bool {
2287    match query {
2288        Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2289            matches!(
2290                c,
2291                Clause::Create(_)
2292                    | Clause::Delete(_)
2293                    | Clause::Set(_)
2294                    | Clause::Remove(_)
2295                    | Clause::Merge(_)
2296            ) || has_mutation_in_clause_exprs(c)
2297        }),
2298        Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2299        _ => false,
2300    }
2301}
2302
2303/// Check if a clause contains nested EXISTS with mutations (recursive).
2304fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2305    let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2306
2307    match clause {
2308        Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2309        Clause::With(w) => {
2310            w.where_clause.as_ref().is_some_and(check_expr)
2311                || w.items.iter().any(|item| match item {
2312                    ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2313                    ReturnItem::All => false,
2314                })
2315        }
2316        Clause::Return(r) => r.items.iter().any(|item| match item {
2317            ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2318            ReturnItem::All => false,
2319        }),
2320        _ => false,
2321    }
2322}
2323
2324/// Check if an expression tree contains an EXISTS with mutation clauses.
2325fn has_mutation_in_expr(expr: &Expr) -> bool {
2326    match expr {
2327        Expr::Exists { query, .. } => has_mutation_clause(query),
2328        _ => {
2329            let mut found = false;
2330            expr.for_each_child(&mut |child| {
2331                if has_mutation_in_expr(child) {
2332                    found = true;
2333                }
2334            });
2335            found
2336        }
2337    }
2338}
2339
2340/// Rewrite a Query AST to replace correlated property accesses with parameter references.
2341///
2342/// For each `Property(Variable(v), key)` where `v` is an outer-scope entity variable,
2343/// replaces it with `Parameter("{v}.{key}")`. This enables plan-once optimization since
2344/// the rewritten query is parameterized (same structure for every row).
2345fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2346    match query {
2347        Query::Single(stmt) => Query::Single(Statement {
2348            clauses: stmt
2349                .clauses
2350                .iter()
2351                .map(|c| rewrite_clause_correlated(c, outer_vars))
2352                .collect(),
2353        }),
2354        Query::Union { left, right, all } => Query::Union {
2355            left: Box::new(rewrite_query_correlated(left, outer_vars)),
2356            right: Box::new(rewrite_query_correlated(right, outer_vars)),
2357            all: *all,
2358        },
2359        other => other.clone(),
2360    }
2361}
2362
2363/// Rewrite expressions within a clause for correlated property access.
2364fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2365    match clause {
2366        Clause::Match(m) => Clause::Match(MatchClause {
2367            optional: m.optional,
2368            pattern: m.pattern.clone(),
2369            where_clause: m
2370                .where_clause
2371                .as_ref()
2372                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2373        }),
2374        Clause::With(w) => Clause::With(WithClause {
2375            distinct: w.distinct,
2376            items: w
2377                .items
2378                .iter()
2379                .map(|item| rewrite_return_item(item, outer_vars))
2380                .collect(),
2381            order_by: w.order_by.as_ref().map(|items| {
2382                items
2383                    .iter()
2384                    .map(|si| SortItem {
2385                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2386                        ascending: si.ascending,
2387                    })
2388                    .collect()
2389            }),
2390            skip: w
2391                .skip
2392                .as_ref()
2393                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2394            limit: w
2395                .limit
2396                .as_ref()
2397                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2398            where_clause: w
2399                .where_clause
2400                .as_ref()
2401                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2402        }),
2403        Clause::Return(r) => Clause::Return(ReturnClause {
2404            distinct: r.distinct,
2405            items: r
2406                .items
2407                .iter()
2408                .map(|item| rewrite_return_item(item, outer_vars))
2409                .collect(),
2410            order_by: r.order_by.as_ref().map(|items| {
2411                items
2412                    .iter()
2413                    .map(|si| SortItem {
2414                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2415                        ascending: si.ascending,
2416                    })
2417                    .collect()
2418            }),
2419            skip: r
2420                .skip
2421                .as_ref()
2422                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2423            limit: r
2424                .limit
2425                .as_ref()
2426                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2427        }),
2428        Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2429            expr: rewrite_expr_correlated(&u.expr, outer_vars),
2430            variable: u.variable.clone(),
2431        }),
2432        other => other.clone(),
2433    }
2434}
2435
2436fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2437    match item {
2438        ReturnItem::All => ReturnItem::All,
2439        ReturnItem::Expr {
2440            expr,
2441            alias,
2442            source_text,
2443        } => ReturnItem::Expr {
2444            expr: rewrite_expr_correlated(expr, outer_vars),
2445            alias: alias.clone(),
2446            source_text: source_text.clone(),
2447        },
2448    }
2449}
2450
2451/// Rewrite a single expression: Property(Variable(v), key) → Parameter("{v}.{key}")
2452/// when v is an outer-scope entity variable. Handles nested EXISTS recursively.
2453fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2454    match expr {
2455        // Core rewrite: n.prop → $param("n.prop") when n is an outer entity
2456        Expr::Property(base, key) => {
2457            if let Expr::Variable(v) = base.as_ref()
2458                && outer_vars.contains(v)
2459            {
2460                return Expr::Parameter(format!("{}.{}", v, key));
2461            }
2462            Expr::Property(
2463                Box::new(rewrite_expr_correlated(base, outer_vars)),
2464                key.clone(),
2465            )
2466        }
2467        // Nested EXISTS — recurse into the subquery body
2468        Expr::Exists {
2469            query,
2470            from_pattern_predicate,
2471        } => Expr::Exists {
2472            query: Box::new(rewrite_query_correlated(query, outer_vars)),
2473            from_pattern_predicate: *from_pattern_predicate,
2474        },
2475        // CountSubquery and CollectSubquery — recurse into subquery body
2476        Expr::CountSubquery(query) => {
2477            Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2478        }
2479        Expr::CollectSubquery(query) => {
2480            Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2481        }
2482        // All other expressions: recursively transform children
2483        other => other
2484            .clone()
2485            .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2486    }
2487}
2488
2489/// Look up the `DistanceMetric` for a vector-indexed property across all labels.
2490///
2491/// Returns `None` if no vector index exists for the property.
2492fn resolve_metric_for_property(schema: &UniSchema, property: &str) -> Option<DistanceMetric> {
2493    for idx in &schema.indexes {
2494        if let IndexDefinition::Vector(config) = idx
2495            && config.property == property
2496        {
2497            return Some(config.metric.clone());
2498        }
2499    }
2500    None
2501}