Skip to main content

uni_query/query/df_graph/
expr_compiler.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan_with_outer_vars, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9    PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::pattern_exists::{
12    PatternExistsExecExpr, extract_pattern_from_exists_query, extract_target_property_predicates,
13};
14use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
15use crate::query::df_graph::reduce::ReduceExecExpr;
16use crate::query::df_graph::similar_to_expr::SimilarToExecExpr;
17use crate::query::planner::QueryPlanner;
18use crate::query::similar_to::SimilarToError;
19use anyhow::{Result, anyhow};
20use arrow_array::builder::BooleanBuilder;
21use arrow_schema::{DataType, Field, Schema};
22use datafusion::execution::context::SessionState;
23use datafusion::physical_expr::expressions::binary;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_planner::PhysicalPlanner;
26use datafusion::prelude::SessionContext;
27use parking_lot::RwLock;
28use std::collections::{HashMap, HashSet};
29use std::sync::Arc;
30use uni_common::Value;
31use uni_common::core::schema::{DistanceMetric, IndexDefinition, Schema as UniSchema};
32use uni_cypher::ast::{
33    BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
34    Statement, UnaryOp, UnwindClause, WithClause,
35};
36use uni_store::storage::manager::StorageManager;
37
38/// Check if a data type represents CypherValue (LargeBinary) or BTIC (FixedSizeBinary(24)).
39/// Both need to route through Cypher UDFs for comparison operators.
40fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
41    dt.is_some_and(|t| matches!(t, DataType::LargeBinary | DataType::FixedSizeBinary(24)))
42}
43
44/// Resolve the element type for a list expression.
45///
46/// Extracts the element type from List/LargeList/Null/LargeBinary data types.
47/// Falls back to the provided fallback type for LargeBinary, and returns an
48/// error with the provided context for unsupported types.
49///
50/// # Errors
51///
52/// Returns an error if the data type is not a recognized list type.
53fn resolve_list_element_type(
54    list_data_type: &DataType,
55    large_binary_fallback: DataType,
56    context: &str,
57) -> Result<DataType> {
58    match list_data_type {
59        DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
60        DataType::Null => Ok(DataType::Null),
61        DataType::LargeBinary => Ok(large_binary_fallback),
62        _ => Err(anyhow!(
63            "{} input must be a list, got {:?}",
64            context,
65            list_data_type
66        )),
67    }
68}
69
70/// Physical expression wrapper that converts LargeList<T> to LargeBinary (CypherValue).
71///
72/// Used in CASE expressions to unify branch types when mixing typed lists
73/// (e.g., from list comprehensions) with CypherValue-encoded lists.
74#[derive(Debug)]
75struct LargeListToCypherValueExpr {
76    child: Arc<dyn PhysicalExpr>,
77}
78
79impl LargeListToCypherValueExpr {
80    fn new(child: Arc<dyn PhysicalExpr>) -> Self {
81        Self { child }
82    }
83}
84
85impl std::fmt::Display for LargeListToCypherValueExpr {
86    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
87        write!(f, "LargeListToCypherValue({})", self.child)
88    }
89}
90
91impl PartialEq for LargeListToCypherValueExpr {
92    fn eq(&self, other: &Self) -> bool {
93        Arc::ptr_eq(&self.child, &other.child)
94    }
95}
96
97impl Eq for LargeListToCypherValueExpr {}
98
99impl std::hash::Hash for LargeListToCypherValueExpr {
100    fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
101        // Hash based on type since we can't hash PhysicalExpr
102        std::any::type_name::<Self>().hash(_state);
103    }
104}
105
106impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
107    fn eq(&self, other: &dyn std::any::Any) -> bool {
108        other
109            .downcast_ref::<Self>()
110            .map(|x| self == x)
111            .unwrap_or(false)
112    }
113}
114
115impl PhysicalExpr for LargeListToCypherValueExpr {
116    fn as_any(&self) -> &dyn std::any::Any {
117        self
118    }
119
120    fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
121        Ok(DataType::LargeBinary)
122    }
123
124    fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
125        self.child.nullable(input_schema)
126    }
127
128    fn evaluate(
129        &self,
130        batch: &arrow_array::RecordBatch,
131    ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
132        use datafusion::arrow::compute::cast;
133        use datafusion::logical_expr::ColumnarValue;
134
135        let child_result = self.child.evaluate(batch)?;
136        let child_array = child_result.into_array(batch.num_rows())?;
137
138        // Normalize List → LargeList (pattern from quantifier.rs:182-189)
139        let list_array = if let DataType::List(field) = child_array.data_type() {
140            let target_type = DataType::LargeList(field.clone());
141            cast(&child_array, &target_type).map_err(|e| {
142                datafusion::error::DataFusionError::Execution(format!(
143                    "List to LargeList cast failed: {e}"
144                ))
145            })?
146        } else {
147            child_array.clone()
148        };
149
150        // If already LargeBinary, pass through
151        if list_array.data_type() == &DataType::LargeBinary {
152            return Ok(ColumnarValue::Array(list_array));
153        }
154
155        // Convert LargeList to CypherValue
156        if let Some(large_list) = list_array
157            .as_any()
158            .downcast_ref::<datafusion::arrow::array::LargeListArray>()
159        {
160            let cv_array =
161                crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
162            Ok(ColumnarValue::Array(cv_array))
163        } else {
164            Err(datafusion::error::DataFusionError::Execution(format!(
165                "Expected List or LargeList, got {:?}",
166                list_array.data_type()
167            )))
168        }
169    }
170
171    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
172        vec![&self.child]
173    }
174
175    fn with_new_children(
176        self: Arc<Self>,
177        children: Vec<Arc<dyn PhysicalExpr>>,
178    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
179        if children.len() != 1 {
180            return Err(datafusion::error::DataFusionError::Execution(
181                "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
182            ));
183        }
184        Ok(Arc::new(LargeListToCypherValueExpr::new(
185            children[0].clone(),
186        )))
187    }
188
189    fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
190        write!(f, "LargeListToCypherValue({})", self.child)
191    }
192}
193
194/// Compiler for converting Cypher expressions directly to DataFusion Physical Expressions.
195pub struct CypherPhysicalExprCompiler<'a> {
196    state: &'a SessionState,
197    translation_ctx: Option<&'a TranslationContext>,
198    graph_ctx: Option<Arc<GraphExecutionContext>>,
199    uni_schema: Option<Arc<UniSchema>>,
200    /// Session context for EXISTS subquery execution.
201    session_ctx: Option<Arc<RwLock<SessionContext>>>,
202    /// Storage manager for EXISTS subquery execution.
203    storage: Option<Arc<StorageManager>>,
204    /// Query parameters for EXISTS subquery execution.
205    params: HashMap<String, Value>,
206    /// Entity variable names from outer scopes (for nested EXISTS).
207    /// A target variable in a pattern predicate that appears here is a
208    /// correlated reference requiring per-row evaluation, not a fresh binding.
209    outer_entity_vars: HashSet<String>,
210}
211
212impl<'a> CypherPhysicalExprCompiler<'a> {
213    pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
214        Self {
215            state,
216            translation_ctx,
217            graph_ctx: None,
218            uni_schema: None,
219            session_ctx: None,
220            storage: None,
221            params: HashMap::new(),
222            outer_entity_vars: HashSet::new(),
223        }
224    }
225
226    /// Build a scoped compiler that excludes the given variables from the translation context.
227    ///
228    /// When compiling inner expressions for list comprehensions, reduce, or quantifiers,
229    /// loop variables must be removed from `variable_kinds` so that property access on
230    /// them does not incorrectly generate flat columns that don't exist in the inner schema.
231    ///
232    /// If none of the `exclude_vars` are present in the current context, the returned
233    /// compiler simply reuses `self`'s translation context unchanged.
234    ///
235    /// The caller must own `scoped_ctx_slot` and keep it alive for the returned compiler's
236    /// lifetime.
237    fn scoped_compiler<'b>(
238        &'b self,
239        exclude_vars: &[&str],
240        scoped_ctx_slot: &'b mut Option<TranslationContext>,
241    ) -> CypherPhysicalExprCompiler<'b>
242    where
243        'a: 'b,
244    {
245        let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
246            exclude_vars
247                .iter()
248                .any(|v| ctx.variable_kinds.contains_key(*v))
249        });
250
251        let ctx_ref = if needs_scoping {
252            let ctx = self.translation_ctx.unwrap();
253            let mut new_kinds = ctx.variable_kinds.clone();
254            for v in exclude_vars {
255                new_kinds.remove(*v);
256            }
257            *scoped_ctx_slot = Some(TranslationContext {
258                parameters: ctx.parameters.clone(),
259                outer_values: ctx.outer_values.clone(),
260                variable_labels: ctx.variable_labels.clone(),
261                variable_kinds: new_kinds,
262                node_variable_hints: ctx.node_variable_hints.clone(),
263                mutation_edge_hints: ctx.mutation_edge_hints.clone(),
264                statement_time: ctx.statement_time,
265            });
266            scoped_ctx_slot.as_ref()
267        } else {
268            self.translation_ctx
269        };
270
271        CypherPhysicalExprCompiler {
272            state: self.state,
273            translation_ctx: ctx_ref,
274            graph_ctx: self.graph_ctx.clone(),
275            uni_schema: self.uni_schema.clone(),
276            session_ctx: self.session_ctx.clone(),
277            storage: self.storage.clone(),
278            params: self.params.clone(),
279            outer_entity_vars: self.outer_entity_vars.clone(),
280        }
281    }
282
283    /// Attach graph context and schema for pattern comprehension support.
284    pub fn with_graph_ctx(
285        mut self,
286        graph_ctx: Arc<GraphExecutionContext>,
287        uni_schema: Arc<UniSchema>,
288    ) -> Self {
289        self.graph_ctx = Some(graph_ctx);
290        self.uni_schema = Some(uni_schema);
291        self
292    }
293
294    /// Attach full subquery context for EXISTS support.
295    pub fn with_subquery_ctx(
296        mut self,
297        graph_ctx: Arc<GraphExecutionContext>,
298        uni_schema: Arc<UniSchema>,
299        session_ctx: Arc<RwLock<SessionContext>>,
300        storage: Arc<StorageManager>,
301        params: HashMap<String, Value>,
302        outer_entity_vars: HashSet<String>,
303    ) -> Self {
304        self.graph_ctx = Some(graph_ctx);
305        self.uni_schema = Some(uni_schema);
306        self.session_ctx = Some(session_ctx);
307        self.storage = Some(storage);
308        self.params = params;
309        self.outer_entity_vars = outer_entity_vars;
310        self
311    }
312
313    /// Compile a Cypher expression into a DataFusion PhysicalExpr.
314    pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
315        match expr {
316            Expr::ListComprehension {
317                variable,
318                list,
319                where_clause,
320                map_expr,
321            } => self.compile_list_comprehension(
322                variable,
323                list,
324                where_clause.as_deref(),
325                map_expr,
326                input_schema,
327            ),
328            Expr::Quantifier {
329                quantifier,
330                variable,
331                list,
332                predicate,
333            } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
334            Expr::Reduce {
335                accumulator,
336                init,
337                variable,
338                list,
339                expr: expression,
340            } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
341            // For BinaryOp, check if children contain custom expressions or CypherValue types
342            Expr::BinaryOp { left, op, right } => {
343                self.compile_binary_op_dispatch(left, op, right, input_schema)
344            }
345            Expr::UnaryOp { op, expr: inner } => {
346                if matches!(op, UnaryOp::Not) {
347                    let mut inner_phy = self.compile(inner, input_schema)?;
348                    if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
349                        inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
350                    }
351                    self.compile_unary_op(op, inner_phy, input_schema)
352                } else if Self::contains_custom_expr(inner) {
353                    let inner_phy = self.compile(inner, input_schema)?;
354                    self.compile_unary_op(op, inner_phy, input_schema)
355                } else {
356                    self.compile_standard(expr, input_schema)
357                }
358            }
359            Expr::IsNull(inner) => {
360                if Self::contains_custom_expr(inner) {
361                    let inner_phy = self.compile(inner, input_schema)?;
362                    Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
363                        .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
364                } else {
365                    self.compile_standard(expr, input_schema)
366                }
367            }
368            Expr::IsNotNull(inner) => {
369                if Self::contains_custom_expr(inner) {
370                    let inner_phy = self.compile(inner, input_schema)?;
371                    Ok(
372                        datafusion::physical_expr::expressions::is_not_null(inner_phy)
373                            .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
374                    )
375                } else {
376                    self.compile_standard(expr, input_schema)
377                }
378            }
379            // In operator is Expr::In { expr, list }
380            Expr::In {
381                expr: left,
382                list: right,
383            } => {
384                if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
385                    let left_phy = self.compile(left, input_schema)?;
386                    let right_phy = self.compile(right, input_schema)?;
387
388                    let left_type = left_phy
389                        .data_type(input_schema)
390                        .unwrap_or(DataType::LargeBinary);
391                    let right_type = right_phy
392                        .data_type(input_schema)
393                        .unwrap_or(DataType::LargeBinary);
394
395                    self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
396                        .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
397                } else {
398                    self.compile_standard(expr, input_schema)
399                }
400            }
401
402            // Recursively check other composite types if necessary.
403            Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
404                "List literals containing comprehensions not yet supported in compiler"
405            )),
406            Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
407                Err(anyhow!(
408                    "Map literals containing comprehensions not yet supported in compiler"
409                ))
410            }
411
412            // Property access on a struct column — e.g. `x.a` where `x` is Struct
413            Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
414
415            // Bracket access on a struct column — e.g. `x['a']` where `x` is Struct
416            Expr::ArrayIndex { array, index } => {
417                self.compile_array_index(array, index, input_schema)
418            }
419
420            // Pattern comprehension: [(a)-[:REL]->(b) WHERE pred | expr]
421            Expr::PatternComprehension {
422                path_variable,
423                pattern,
424                where_clause,
425                map_expr,
426            } => self.compile_pattern_comprehension(
427                path_variable,
428                pattern,
429                where_clause.as_deref(),
430                map_expr,
431                input_schema,
432            ),
433
434            // EXISTS subquery: vectorized for pattern predicates, per-row for general EXISTS.
435            Expr::Exists {
436                query,
437                from_pattern_predicate,
438            } => {
439                if *from_pattern_predicate {
440                    match self.compile_pattern_exists(query, input_schema) {
441                        Ok(expr) => Ok(expr),
442                        Err(e) => {
443                            log::debug!(
444                                "Pattern exists vectorization failed, falling back to subquery: {e}"
445                            );
446                            self.compile_exists(query)
447                        }
448                    }
449                } else {
450                    self.compile_exists(query)
451                }
452            }
453
454            // FunctionCall wrapping a custom expression (e.g. size(comprehension))
455            Expr::FunctionCall {
456                name,
457                args,
458                distinct,
459                ..
460            } => {
461                if name.eq_ignore_ascii_case("similar_to") {
462                    return self.compile_similar_to(args, input_schema);
463                }
464                if args.iter().any(Self::contains_custom_expr) {
465                    self.compile_function_with_custom_args(name, args, *distinct, input_schema)
466                } else {
467                    self.compile_standard(expr, input_schema)
468                }
469            }
470
471            // CASE expression - dispatch based on whether it contains custom expressions
472            Expr::Case {
473                expr: case_operand,
474                when_then,
475                else_expr,
476            } => {
477                // Check if operand or any branch contains custom expressions
478                let has_custom = case_operand
479                    .as_deref()
480                    .is_some_and(Self::contains_custom_expr)
481                    || when_then.iter().any(|(w, t)| {
482                        Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
483                    })
484                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
485
486                if has_custom {
487                    // Use compile_case() for CypherValue boolean conversion and
488                    // LargeList/LargeBinary type unification
489                    self.compile_case(case_operand, when_then, else_expr, input_schema)
490                } else {
491                    // Standard compilation path - goes through apply_type_coercion which handles:
492                    // 1. Simple CASE → Generic CASE rewriting with cross-type equality
493                    // 2. Type coercion for CASE result branches
494                    // 3. Numeric widening for comparisons
495                    self.compile_standard(expr, input_schema)
496                }
497            }
498
499            // LabelCheck: delegate to standard compilation (uses cypher_expr_to_df)
500            Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
501
502            // Default to standard compilation for leaf nodes or non-custom trees
503            _ => self.compile_standard(expr, input_schema),
504        }
505    }
506
507    /// Dispatch binary op compilation, checking for custom expressions and CypherValue types.
508    fn compile_binary_op_dispatch(
509        &self,
510        left: &Expr,
511        op: &BinaryOp,
512        right: &Expr,
513        input_schema: &Schema,
514    ) -> Result<Arc<dyn PhysicalExpr>> {
515        if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
516            && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
517            && let Some(ctx) = self.translation_ctx
518            && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
519        {
520            let identity_prop = match (lk, rk) {
521                (VariableKind::Node, VariableKind::Node) => Some("_vid"),
522                (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
523                _ => None,
524            };
525
526            if let Some(id_prop) = identity_prop {
527                return self.compile_standard(
528                    &Expr::BinaryOp {
529                        left: Box::new(Expr::Property(
530                            Box::new(Expr::Variable(lv.clone())),
531                            id_prop.to_string(),
532                        )),
533                        op: *op,
534                        right: Box::new(Expr::Property(
535                            Box::new(Expr::Variable(rv.clone())),
536                            id_prop.to_string(),
537                        )),
538                    },
539                    input_schema,
540                );
541            }
542        }
543
544        // XOR and Pow: always route through compile_standard.
545        // compile_binary_op does not support these operators. The standard path
546        // correctly maps XOR → _cypher_xor UDF and Pow → power() function.
547        if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
548            return self.compile_standard(
549                &Expr::BinaryOp {
550                    left: Box::new(left.clone()),
551                    op: *op,
552                    right: Box::new(right.clone()),
553                },
554                input_schema,
555            );
556        }
557
558        if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
559            let left_phy = self.compile(left, input_schema)?;
560            let right_phy = self.compile(right, input_schema)?;
561            return self.compile_binary_op(op, left_phy, right_phy, input_schema);
562        }
563
564        // For Add with a list-producing operand (AST-level detection),
565        // compile through the standard path which uses cypher_expr_to_df
566        // to correctly route list + scalar to _cypher_list_concat.
567        if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
568        {
569            return self.compile_standard(
570                &Expr::BinaryOp {
571                    left: Box::new(left.clone()),
572                    op: *op,
573                    right: Box::new(right.clone()),
574                },
575                input_schema,
576            );
577        }
578
579        // Compile sub-expressions to check their types. If either operand
580        // produces LargeBinary (CypherValue), standard Arrow kernels will fail at
581        // runtime for comparisons. Route through compile_binary_op which
582        // dispatches to Cypher comparison UDFs.
583        let left_phy = self.compile(left, input_schema)?;
584        let right_phy = self.compile(right, input_schema)?;
585        let left_dt = left_phy.data_type(input_schema).ok();
586        let right_dt = right_phy.data_type(input_schema).ok();
587        let has_cv =
588            is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
589
590        if has_cv {
591            // CypherValue types need special handling via compile_binary_op
592            self.compile_binary_op(op, left_phy, right_phy, input_schema)
593        } else {
594            // Standard types: use compile_standard to get proper type coercion
595            // (e.g., Int64 == Float64 requires coercion to work)
596            self.compile_standard(
597                &Expr::BinaryOp {
598                    left: Box::new(left.clone()),
599                    op: *op,
600                    right: Box::new(right.clone()),
601                },
602                input_schema,
603            )
604        }
605    }
606
607    /// Try to compile struct field access for a variable.
608    ///
609    /// Returns `Some(expr)` if the variable is a Struct column and can be accessed,
610    /// `None` if fallback to standard compilation is needed.
611    fn try_compile_struct_field(
612        &self,
613        var_name: &str,
614        field_name: &str,
615        input_schema: &Schema,
616    ) -> Option<Arc<dyn PhysicalExpr>> {
617        let col_idx = input_schema.index_of(var_name).ok()?;
618        let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
619            return None;
620        };
621
622        // Cypher semantics: accessing a missing key returns null
623        if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
624            let output_type = struct_fields[field_idx].data_type().clone();
625            let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
626                datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
627            );
628            Some(Arc::new(StructFieldAccessExpr::new(
629                col_expr,
630                field_idx,
631                output_type,
632            )))
633        } else {
634            Some(Arc::new(
635                datafusion::physical_expr::expressions::Literal::new(
636                    datafusion::common::ScalarValue::Null,
637                ),
638            ))
639        }
640    }
641
642    /// Compile property access on a struct column (e.g. `x.a` where `x` is Struct).
643    fn compile_property_access(
644        &self,
645        base: &Expr,
646        prop: &str,
647        input_schema: &Schema,
648    ) -> Result<Arc<dyn PhysicalExpr>> {
649        if let Expr::Variable(var_name) = base {
650            // 1. Try struct field access (e.g. `x.a` where `x` is a Struct column)
651            if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
652                return Ok(expr);
653            }
654            // 2. Try flat column "{var}.{prop}" (for pattern comprehension inner schemas)
655            let flat_col = format!("{}.{}", var_name, prop);
656            if let Ok(col_idx) = input_schema.index_of(&flat_col) {
657                return Ok(Arc::new(
658                    datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
659                ));
660            }
661        }
662        self.compile_standard(
663            &Expr::Property(Box::new(base.clone()), prop.to_string()),
664            input_schema,
665        )
666    }
667
668    /// Compile bracket access on a struct column (e.g. `x['a']` where `x` is Struct).
669    fn compile_array_index(
670        &self,
671        array: &Expr,
672        index: &Expr,
673        input_schema: &Schema,
674    ) -> Result<Arc<dyn PhysicalExpr>> {
675        if let Expr::Variable(var_name) = array
676            && let Expr::Literal(CypherLiteral::String(prop)) = index
677            && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
678        {
679            return Ok(expr);
680        }
681        self.compile_standard(
682            &Expr::ArrayIndex {
683                array: Box::new(array.clone()),
684                index: Box::new(index.clone()),
685            },
686            input_schema,
687        )
688    }
689
690    /// Compile EXISTS subquery expression.
691    fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
692        // 7.1: Validate no mutation clauses in EXISTS body
693        if has_mutation_clause(query) {
694            return Err(anyhow!(
695                "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
696            ));
697        }
698
699        let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
700
701        let graph_ctx = self
702            .graph_ctx
703            .clone()
704            .ok_or_else(|| err("GraphExecutionContext"))?;
705        let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
706        let session_ctx = self
707            .session_ctx
708            .clone()
709            .ok_or_else(|| err("SessionContext"))?;
710        let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
711
712        Ok(Arc::new(ExistsExecExpr::new(
713            query.clone(),
714            graph_ctx,
715            session_ctx,
716            storage,
717            uni_schema,
718            self.params.clone(),
719            self.outer_entity_vars.clone(),
720        )))
721    }
722
723    /// Compile a pattern-predicate EXISTS into a vectorized `PatternExistsExecExpr`.
724    ///
725    /// Returns `Err` for unsupported patterns (triggers fallback to `ExistsExecExpr`).
726    fn compile_pattern_exists(
727        &self,
728        query: &Query,
729        input_schema: &Schema,
730    ) -> Result<Arc<dyn PhysicalExpr>> {
731        let pattern = extract_pattern_from_exists_query(query)?;
732
733        let graph_ctx = self
734            .graph_ctx
735            .as_ref()
736            .ok_or_else(|| anyhow!("Pattern exists requires GraphExecutionContext"))?;
737        let uni_schema = self
738            .uni_schema
739            .as_ref()
740            .ok_or_else(|| anyhow!("Pattern exists requires UniSchema"))?;
741
742        let (anchor_col, steps) = analyze_pattern(&pattern, input_schema, uni_schema)?;
743
744        // Detect bound and correlated targets.
745        // - Bound: target variable has `._vid` column in input schema → check specific VID.
746        // - Correlated: target has a named variable NOT in the schema (from an outer EXISTS
747        //   scope, passed as a parameter). We can't resolve it → bail out.
748        let mut bound_target_columns: Vec<Option<String>> = Vec::with_capacity(steps.len());
749        for step in &steps {
750            if let Some(var) = &step.target_variable {
751                let vid_col = format!("{}._vid", var);
752                if input_schema.column_with_name(&vid_col).is_some() {
753                    bound_target_columns.push(Some(vid_col));
754                } else if self.outer_entity_vars.contains(var) {
755                    // Named variable from outer scope — correlated, can't vectorize.
756                    anyhow::bail!(
757                        "Pattern target '{}' is a correlated reference from an outer scope",
758                        var
759                    );
760                } else {
761                    // Fresh binding introduced by the pattern — not correlated.
762                    bound_target_columns.push(None);
763                }
764            } else {
765                bound_target_columns.push(None);
766            }
767        }
768
769        let property_preds = extract_target_property_predicates(&pattern, &steps, uni_schema)?;
770
771        Ok(Arc::new(PatternExistsExecExpr::new(
772            graph_ctx.clone(),
773            anchor_col,
774            steps,
775            Arc::new(input_schema.clone()),
776            property_preds,
777            bound_target_columns,
778            self.params.clone(),
779        )))
780    }
781
782    /// Compile a `similar_to(sources, queries [, options])` call into a `SimilarToExecExpr`.
783    fn compile_similar_to(
784        &self,
785        args: &[Expr],
786        input_schema: &Schema,
787    ) -> Result<Arc<dyn PhysicalExpr>> {
788        if args.len() < 2 || args.len() > 3 {
789            return Err(SimilarToError::InvalidArity { count: args.len() }.into());
790        }
791
792        let graph_ctx = self
793            .graph_ctx
794            .clone()
795            .ok_or(SimilarToError::NoGraphContext)?;
796
797        // Extract variable name and property names from the source expression.
798        let source_variable = extract_source_variable(&args[0]);
799        let source_property_names = extract_property_names(&args[0]);
800
801        // Normalize sources and queries: if args[0] is a List, split into
802        // individual source/query pairs for multi-source scoring.
803        let (source_exprs, query_exprs) = normalize_similar_to_args(&args[0], &args[1]);
804
805        // Compile each source and query expression
806        let source_children: Vec<Arc<dyn PhysicalExpr>> = source_exprs
807            .iter()
808            .map(|e| self.compile(e, input_schema))
809            .collect::<Result<Vec<_>>>()?;
810        let query_children: Vec<Arc<dyn PhysicalExpr>> = query_exprs
811            .iter()
812            .map(|e| self.compile(e, input_schema))
813            .collect::<Result<Vec<_>>>()?;
814
815        // Compile options if present
816        let options_child = if args.len() == 3 {
817            Some(self.compile(&args[2], input_schema)?)
818        } else {
819            None
820        };
821
822        // Resolve per-source distance metrics from schema at compile time.
823        let source_metrics: Vec<Option<DistanceMetric>> = source_property_names
824            .iter()
825            .map(|prop_name| {
826                prop_name.as_ref().and_then(|prop| {
827                    self.uni_schema
828                        .as_ref()
829                        .and_then(|schema| resolve_metric_for_property(schema, prop))
830                })
831            })
832            .collect();
833
834        Ok(Arc::new(SimilarToExecExpr::new(
835            source_children,
836            query_children,
837            options_child,
838            graph_ctx,
839            source_variable,
840            source_property_names,
841            source_metrics,
842        )))
843    }
844
845    /// Check if map_expr or where_clause contains a pattern comprehension that references the variable.
846    fn needs_vid_extraction_for_variable(
847        variable: &str,
848        map_expr: &Expr,
849        where_clause: Option<&Expr>,
850    ) -> bool {
851        fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
852            match expr {
853                Expr::PatternComprehension { pattern, .. } => {
854                    // Check if pattern uses the variable
855                    pattern.paths.iter().any(|path| {
856                        path.elements.iter().any(|elem| match elem {
857                            uni_cypher::ast::PatternElement::Node(n) => {
858                                n.variable.as_deref() == Some(var)
859                            }
860                            uni_cypher::ast::PatternElement::Relationship(r) => {
861                                r.variable.as_deref() == Some(var)
862                            }
863                            _ => false,
864                        })
865                    })
866                }
867                Expr::FunctionCall { args, .. } => args
868                    .iter()
869                    .any(|a| expr_has_pattern_comp_referencing(a, var)),
870                Expr::BinaryOp { left, right, .. } => {
871                    expr_has_pattern_comp_referencing(left, var)
872                        || expr_has_pattern_comp_referencing(right, var)
873                }
874                Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
875                    expr_has_pattern_comp_referencing(e, var)
876                }
877                Expr::List(items) => items
878                    .iter()
879                    .any(|i| expr_has_pattern_comp_referencing(i, var)),
880                Expr::ListComprehension {
881                    list,
882                    map_expr,
883                    where_clause,
884                    ..
885                } => {
886                    expr_has_pattern_comp_referencing(list, var)
887                        || expr_has_pattern_comp_referencing(map_expr, var)
888                        || where_clause
889                            .as_ref()
890                            .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
891                }
892                _ => false,
893            }
894        }
895
896        expr_has_pattern_comp_referencing(map_expr, variable)
897            || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
898    }
899
900    /// Check if an expression tree contains nodes that require custom compilation.
901    pub fn contains_custom_expr(expr: &Expr) -> bool {
902        match expr {
903            Expr::ListComprehension { .. } => true,
904            Expr::Quantifier { .. } => true,
905            Expr::Reduce { .. } => true,
906            Expr::PatternComprehension { .. } => true,
907            Expr::BinaryOp { left, right, .. } => {
908                Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
909            }
910            Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
911            Expr::FunctionCall { name, args, .. } => {
912                name.eq_ignore_ascii_case("similar_to")
913                    || args.iter().any(Self::contains_custom_expr)
914            }
915            Expr::Case {
916                when_then,
917                else_expr,
918                ..
919            } => {
920                when_then
921                    .iter()
922                    .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
923                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
924            }
925            Expr::List(items) => items.iter().any(Self::contains_custom_expr),
926            Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
927            Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
928            Expr::In { expr: l, list: r } => {
929                Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
930            }
931            Expr::Exists { .. } => true,
932            Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
933            _ => false,
934        }
935    }
936
937    /// Check if an expression statically produces a list value.
938    /// Used to route `Add` operations to `_cypher_list_concat` instead of arithmetic.
939    fn is_list_producing(expr: &Expr) -> bool {
940        match expr {
941            Expr::List(_) => true,
942            Expr::ListComprehension { .. } => true,
943            Expr::ArraySlice { .. } => true,
944            // Add with a list-producing child produces a list
945            Expr::BinaryOp {
946                left,
947                op: BinaryOp::Add,
948                right,
949            } => Self::is_list_producing(left) || Self::is_list_producing(right),
950            Expr::FunctionCall { name, .. } => {
951                // Functions known to return lists
952                matches!(
953                    name.as_str(),
954                    "range"
955                        | "tail"
956                        | "reverse"
957                        | "collect"
958                        | "keys"
959                        | "labels"
960                        | "nodes"
961                        | "relationships"
962                )
963            }
964            _ => false,
965        }
966    }
967
968    fn compile_standard(
969        &self,
970        expr: &Expr,
971        input_schema: &Schema,
972    ) -> Result<Arc<dyn PhysicalExpr>> {
973        // Pre-resolve Property(Variable(v), p) → Variable("v.p") when the flat
974        // column "v.p" exists in the input schema.  This prevents DataFusion's
975        // standard path from compiling it as ArrayIndex(column(v), "p"), which
976        // fails when the column stores a VID integer rather than a Map/Struct.
977        let resolved = Self::resolve_flat_column_properties(expr, input_schema);
978        let df_expr = cypher_expr_to_df(&resolved, self.translation_ctx)?;
979        let resolved_expr = self.resolve_udfs(df_expr)?;
980
981        let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
982
983        // Apply type coercion to resolve type mismatches
984        let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
985
986        // Re-resolve UDFs after coercion (coercion may introduce new dummy UDF calls)
987        let coerced_expr = self.resolve_udfs(coerced_expr)?;
988
989        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
990        planner
991            .create_physical_expr(&coerced_expr, &df_schema, self.state)
992            .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
993    }
994
995    /// Recursively resolve `Property(Variable(v), p)` → `Variable("v.p")` when
996    /// the flat column `"v.p"` exists in `schema`.  This ensures that property
997    /// access on graph-scan variables is compiled as a direct column reference
998    /// rather than an ArrayIndex UDF call, which would fail when the variable
999    /// column holds a VID integer instead of a Map/Node struct.
1000    fn resolve_flat_column_properties(expr: &Expr, schema: &Schema) -> Expr {
1001        match expr {
1002            Expr::Property(base, prop) => {
1003                if let Expr::Variable(var) = base.as_ref() {
1004                    let flat_col = format!("{}.{}", var, prop);
1005                    if schema.index_of(&flat_col).is_ok() {
1006                        return Expr::Variable(flat_col);
1007                    }
1008                }
1009                // Recurse into the base expression
1010                Expr::Property(
1011                    Box::new(Self::resolve_flat_column_properties(base, schema)),
1012                    prop.clone(),
1013                )
1014            }
1015            Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
1016                left: Box::new(Self::resolve_flat_column_properties(left, schema)),
1017                op: *op,
1018                right: Box::new(Self::resolve_flat_column_properties(right, schema)),
1019            },
1020            Expr::FunctionCall {
1021                name,
1022                args,
1023                distinct,
1024                window_spec,
1025            } => Expr::FunctionCall {
1026                name: name.clone(),
1027                args: args
1028                    .iter()
1029                    .map(|a| Self::resolve_flat_column_properties(a, schema))
1030                    .collect(),
1031                distinct: *distinct,
1032                window_spec: window_spec.clone(),
1033            },
1034            Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
1035                op: *op,
1036                expr: Box::new(Self::resolve_flat_column_properties(inner, schema)),
1037            },
1038            Expr::List(items) => Expr::List(
1039                items
1040                    .iter()
1041                    .map(|i| Self::resolve_flat_column_properties(i, schema))
1042                    .collect(),
1043            ),
1044            // For all other expression types, return as-is (literals, variables, etc.)
1045            other => other.clone(),
1046        }
1047    }
1048
1049    /// Resolve UDFs in DataFusion expression using the session state registry.
1050    ///
1051    /// Uses `TreeNode::transform_up` to traverse the entire expression tree,
1052    /// ensuring UDFs inside Cast, Case, InList, Between, etc. are all resolved.
1053    fn resolve_udfs(
1054        &self,
1055        expr: datafusion::logical_expr::Expr,
1056    ) -> Result<datafusion::logical_expr::Expr> {
1057        use datafusion::common::tree_node::{Transformed, TreeNode};
1058        use datafusion::logical_expr::Expr as DfExpr;
1059
1060        let result = expr
1061            .transform_up(|node| {
1062                if let DfExpr::ScalarFunction(ref func) = node {
1063                    let udf_name = func.func.name();
1064                    if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
1065                        return Ok(Transformed::yes(DfExpr::ScalarFunction(
1066                            datafusion::logical_expr::expr::ScalarFunction {
1067                                func: registered_udf.clone(),
1068                                args: func.args.clone(),
1069                            },
1070                        )));
1071                    }
1072                }
1073                Ok(Transformed::no(node))
1074            })
1075            .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
1076        Ok(result.data)
1077    }
1078
1079    fn compile_list_comprehension(
1080        &self,
1081        variable: &str,
1082        list: &Expr,
1083        where_clause: Option<&Expr>,
1084        map_expr: &Expr,
1085        input_schema: &Schema,
1086    ) -> Result<Arc<dyn PhysicalExpr>> {
1087        let input_list_phy = self.compile(list, input_schema)?;
1088
1089        // Resolve input list type
1090        let list_data_type = input_list_phy.data_type(input_schema)?;
1091        let inner_data_type = resolve_list_element_type(
1092            &list_data_type,
1093            DataType::LargeBinary,
1094            "List comprehension",
1095        )?;
1096
1097        // Create inner schema with loop variable (shadow outer variable if same name)
1098        let mut fields = input_schema.fields().to_vec();
1099        let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
1100
1101        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1102            fields[pos] = loop_var_field;
1103        } else {
1104            fields.push(loop_var_field);
1105        }
1106
1107        // Check if we need VID extraction for nested pattern comprehensions
1108        let needs_vid_extraction =
1109            Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
1110        if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
1111            // Add a {variable}._vid field for VID extraction
1112            let vid_field = Arc::new(Field::new(
1113                format!("{}._vid", variable),
1114                DataType::UInt64,
1115                true,
1116            ));
1117            fields.push(vid_field);
1118        }
1119
1120        let inner_schema = Arc::new(Schema::new(fields));
1121
1122        // Compile inner expressions with scoped translation context
1123        let mut scoped_ctx = None;
1124        let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1125
1126        let predicate_phy = if let Some(pred) = where_clause {
1127            Some(inner_compiler.compile(pred, &inner_schema)?)
1128        } else {
1129            None
1130        };
1131
1132        let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
1133        let output_item_type = map_phy.data_type(&inner_schema)?;
1134
1135        Ok(Arc::new(ListComprehensionExecExpr::new(
1136            input_list_phy,
1137            map_phy,
1138            predicate_phy,
1139            variable.to_string(),
1140            Arc::new(input_schema.clone()),
1141            output_item_type,
1142            needs_vid_extraction,
1143        )))
1144    }
1145
1146    fn compile_reduce(
1147        &self,
1148        accumulator: &str,
1149        initial: &Expr,
1150        variable: &str,
1151        list: &Expr,
1152        reduce_expr: &Expr,
1153        input_schema: &Schema,
1154    ) -> Result<Arc<dyn PhysicalExpr>> {
1155        let list_phy = self.compile(list, input_schema)?;
1156
1157        let initial_phy = self.compile(initial, input_schema)?;
1158        let acc_type = initial_phy.data_type(input_schema)?;
1159
1160        let list_data_type = list_phy.data_type(input_schema)?;
1161        // For LargeBinary (CypherValue arrays), use the accumulator type as element type so the
1162        // reduce body expression compiles correctly (e.g. acc + x where both are Int64).
1163        let inner_data_type =
1164            resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
1165
1166        // Create inner schema with accumulator and loop variable (shadow outer variables if same names)
1167        let mut fields = input_schema.fields().to_vec();
1168
1169        let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
1170        if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
1171            fields[pos] = acc_field;
1172        } else {
1173            fields.push(acc_field);
1174        }
1175
1176        let var_field = Arc::new(Field::new(variable, inner_data_type, true));
1177        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1178            fields[pos] = var_field;
1179        } else {
1180            fields.push(var_field);
1181        }
1182
1183        let inner_schema = Arc::new(Schema::new(fields));
1184
1185        // Compile reduce expression with scoped translation context
1186        let mut scoped_ctx = None;
1187        let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
1188
1189        let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
1190        let output_type = reduce_phy.data_type(&inner_schema)?;
1191
1192        Ok(Arc::new(ReduceExecExpr::new(
1193            accumulator.to_string(),
1194            initial_phy,
1195            variable.to_string(),
1196            list_phy,
1197            reduce_phy,
1198            Arc::new(input_schema.clone()),
1199            output_type,
1200        )))
1201    }
1202
1203    fn compile_quantifier(
1204        &self,
1205        quantifier: &uni_cypher::ast::Quantifier,
1206        variable: &str,
1207        list: &Expr,
1208        predicate: &Expr,
1209        input_schema: &Schema,
1210    ) -> Result<Arc<dyn PhysicalExpr>> {
1211        let input_list_phy = self.compile(list, input_schema)?;
1212
1213        // Resolve element type from list type
1214        let list_data_type = input_list_phy.data_type(input_schema)?;
1215        let inner_data_type =
1216            resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
1217
1218        // Create inner schema with loop variable
1219        // If a field with the same name exists in the outer schema, replace it (shadow it)
1220        // to ensure the loop variable takes precedence.
1221        let mut fields = input_schema.fields().to_vec();
1222        let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1223
1224        // Find and replace existing field with same name, or append if not found
1225        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1226            fields[pos] = loop_var_field;
1227        } else {
1228            fields.push(loop_var_field);
1229        }
1230
1231        let inner_schema = Arc::new(Schema::new(fields));
1232
1233        // Compile predicate with a scoped translation context that removes the loop variable
1234        // from variable_kinds, so property access on the loop variable doesn't incorrectly
1235        // generate flat columns (like "x.name") that don't exist in the inner schema.
1236        let mut scoped_ctx = None;
1237        let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1238
1239        let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1240
1241        // Wrap CypherValue predicates with _cv_to_bool for proper boolean evaluation
1242        if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1243            predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1244        }
1245
1246        let qt = match quantifier {
1247            uni_cypher::ast::Quantifier::All => QuantifierType::All,
1248            uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1249            uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1250            uni_cypher::ast::Quantifier::None => QuantifierType::None,
1251        };
1252
1253        Ok(Arc::new(QuantifierExecExpr::new(
1254            input_list_phy,
1255            predicate_phy,
1256            variable.to_string(),
1257            Arc::new(input_schema.clone()),
1258            qt,
1259        )))
1260    }
1261
1262    fn compile_pattern_comprehension(
1263        &self,
1264        path_variable: &Option<String>,
1265        pattern: &uni_cypher::ast::Pattern,
1266        where_clause: Option<&Expr>,
1267        map_expr: &Expr,
1268        input_schema: &Schema,
1269    ) -> Result<Arc<dyn PhysicalExpr>> {
1270        let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1271
1272        let graph_ctx = self
1273            .graph_ctx
1274            .as_ref()
1275            .ok_or_else(|| err("GraphExecutionContext"))?;
1276        let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1277
1278        // 1. Analyze pattern to get anchor column and traversal steps
1279        let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1280
1281        // 2. Collect needed properties from where_clause and map_expr
1282        let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1283
1284        // 3. Build inner schema
1285        let inner_schema = build_inner_schema(
1286            input_schema,
1287            &steps,
1288            &vertex_props,
1289            &edge_props,
1290            path_variable.as_deref(),
1291        );
1292
1293        // 4. Compile predicate and map_expr against inner schema
1294        let pred_phy = where_clause
1295            .map(|p| self.compile(p, &inner_schema))
1296            .transpose()?;
1297        let map_phy = self.compile(map_expr, &inner_schema)?;
1298        let output_type = map_phy.data_type(&inner_schema)?;
1299
1300        // 5. Return expression
1301        Ok(Arc::new(PatternComprehensionExecExpr::new(
1302            graph_ctx.clone(),
1303            anchor_col,
1304            steps,
1305            path_variable.clone(),
1306            pred_phy,
1307            map_phy,
1308            Arc::new(input_schema.clone()),
1309            Arc::new(inner_schema),
1310            output_type,
1311            vertex_props,
1312            edge_props,
1313        )))
1314    }
1315
1316    /// Compile a function call whose arguments contain custom expressions.
1317    ///
1318    /// Recursively compiles each argument via `self.compile()`, then looks up
1319    /// the corresponding UDF in the session registry and builds the physical
1320    /// expression.
1321    fn compile_function_with_custom_args(
1322        &self,
1323        name: &str,
1324        args: &[Expr],
1325        _distinct: bool,
1326        input_schema: &Schema,
1327    ) -> Result<Arc<dyn PhysicalExpr>> {
1328        // 1. Recursively compile each argument
1329        let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1330            .iter()
1331            .map(|arg| self.compile(arg, input_schema))
1332            .collect::<Result<Vec<_>>>()?;
1333
1334        // 2. Resolve UDF name and look it up in the registry
1335        let udf_name = Self::cypher_fn_to_udf(name);
1336        let udf = self
1337            .state
1338            .scalar_functions()
1339            .get(udf_name.as_str())
1340            .ok_or_else(|| {
1341                anyhow!(
1342                    "UDF '{}' not found in registry for function '{}'",
1343                    udf_name,
1344                    name
1345                )
1346            })?;
1347
1348        // 3. Build operand type list and dummy columns from compiled args
1349        let placeholders: &[&str] = &["__arg0__", "__arg1__", "__arg2__", "__argN__"];
1350        let operand_types: Vec<(&str, DataType)> = compiled_args
1351            .iter()
1352            .enumerate()
1353            .map(|(i, arg)| {
1354                let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1355                let placeholder = placeholders[i.min(3)];
1356                (placeholder, dt)
1357            })
1358            .collect();
1359
1360        let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1361            .iter()
1362            .map(|(name, _)| {
1363                datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1364                    None::<String>,
1365                    *name,
1366                ))
1367            })
1368            .collect();
1369
1370        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1371            datafusion::logical_expr::expr::ScalarFunction {
1372                func: udf.clone(),
1373                args: dummy_cols,
1374            },
1375        );
1376
1377        // 4. Plan and rebind
1378        self.plan_udf_physical_expr(
1379            &udf_expr,
1380            &operand_types,
1381            compiled_args,
1382            &format!("function {}", name),
1383        )
1384    }
1385
1386    /// Map a Cypher function name to the registered UDF name.
1387    ///
1388    /// Mirrors the mapping in `translate_function_call` from `df_expr.rs`.
1389    /// The registered UDF names are always lowercase.
1390    fn cypher_fn_to_udf(name: &str) -> String {
1391        match name.to_uppercase().as_str() {
1392            "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1393            "REVERSE" => "_cypher_reverse".to_string(),
1394            "TOSTRING" => "tostring".to_string(),
1395            "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1396            "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1397            "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1398            "HEAD" => "head".to_string(),
1399            "LAST" => "last".to_string(),
1400            "TAIL" => "tail".to_string(),
1401            "KEYS" => "keys".to_string(),
1402            "TYPE" => "type".to_string(),
1403            "PROPERTIES" => "properties".to_string(),
1404            "LABELS" => "labels".to_string(),
1405            "COALESCE" => "coalesce".to_string(),
1406            "ID" => "id".to_string(),
1407            // Fallback: lowercase the name (matches dummy_udf_expr behavior)
1408            _ => name.to_lowercase(),
1409        }
1410    }
1411
1412    /// Compile a CASE expression with custom sub-expressions in branches.
1413    ///
1414    /// Recursively compiles the operand, each when/then pair, and the else
1415    /// branch, then builds a `CaseExpr` physical expression.
1416    ///
1417    /// Applies two fixes for TCK compliance:
1418    /// 1. Wraps CypherValue WHEN conditions with `_cv_to_bool` for proper boolean evaluation
1419    /// 2. Unifies branch types when mixing LargeList and LargeBinary to avoid cast errors
1420    fn compile_case(
1421        &self,
1422        operand: &Option<Box<Expr>>,
1423        when_then: &[(Expr, Expr)],
1424        else_expr: &Option<Box<Expr>>,
1425        input_schema: &Schema,
1426    ) -> Result<Arc<dyn PhysicalExpr>> {
1427        let operand_phy = operand
1428            .as_deref()
1429            .map(|e| self.compile(e, input_schema))
1430            .transpose()?;
1431
1432        let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1433            .iter()
1434            .map(|(w, t)| {
1435                let w_phy = self.compile(w, input_schema)?;
1436                let t_phy = self.compile(t, input_schema)?;
1437                Ok((w_phy, t_phy))
1438            })
1439            .collect::<Result<Vec<_>>>()?;
1440
1441        let mut else_phy = else_expr
1442            .as_deref()
1443            .map(|e| self.compile(e, input_schema))
1444            .transpose()?;
1445
1446        // Wrap CypherValue WHEN conditions with _cv_to_bool for proper boolean evaluation
1447        for (w_phy, _) in &mut when_then_phy {
1448            if matches!(w_phy.data_type(input_schema), Ok(DataType::LargeBinary)) {
1449                *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1450            }
1451        }
1452
1453        // Unify branch types when mixing LargeList and LargeBinary
1454        let branch_types: Vec<DataType> = when_then_phy
1455            .iter()
1456            .map(|(_, t)| t.data_type(input_schema))
1457            .chain(else_phy.iter().map(|e| e.data_type(input_schema)))
1458            .filter_map(Result::ok)
1459            .collect();
1460
1461        let has_large_binary = branch_types.contains(&DataType::LargeBinary);
1462        let has_list = branch_types
1463            .iter()
1464            .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1465
1466        // If we have both, wrap List/LargeList branches with LargeListToCypherValueExpr
1467        if has_large_binary && has_list {
1468            for (_, t_phy) in &mut when_then_phy {
1469                if let Ok(dt) = t_phy.data_type(input_schema)
1470                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1471                {
1472                    *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1473                }
1474            }
1475            if let Some(e_phy) = else_phy.take() {
1476                if let Ok(dt) = e_phy.data_type(input_schema)
1477                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1478                {
1479                    else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1480                } else {
1481                    else_phy = Some(e_phy);
1482                }
1483            }
1484        }
1485
1486        let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1487            operand_phy,
1488            when_then_phy,
1489            else_phy,
1490        )
1491        .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1492
1493        Ok(Arc::new(case_expr))
1494    }
1495
1496    fn compile_binary_op(
1497        &self,
1498        op: &BinaryOp,
1499        left: Arc<dyn PhysicalExpr>,
1500        right: Arc<dyn PhysicalExpr>,
1501        input_schema: &Schema,
1502    ) -> Result<Arc<dyn PhysicalExpr>> {
1503        use datafusion::logical_expr::Operator;
1504
1505        // String operators use custom physical expr for safe type handling
1506        let string_op = match op {
1507            BinaryOp::StartsWith => Some(StringOp::StartsWith),
1508            BinaryOp::EndsWith => Some(StringOp::EndsWith),
1509            BinaryOp::Contains => Some(StringOp::Contains),
1510            _ => None,
1511        };
1512        if let Some(sop) = string_op {
1513            return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1514        }
1515
1516        let df_op = match op {
1517            BinaryOp::Add => Operator::Plus,
1518            BinaryOp::Sub => Operator::Minus,
1519            BinaryOp::Mul => Operator::Multiply,
1520            BinaryOp::Div => Operator::Divide,
1521            BinaryOp::Mod => Operator::Modulo,
1522            BinaryOp::Eq => Operator::Eq,
1523            BinaryOp::NotEq => Operator::NotEq,
1524            BinaryOp::Gt => Operator::Gt,
1525            BinaryOp::GtEq => Operator::GtEq,
1526            BinaryOp::Lt => Operator::Lt,
1527            BinaryOp::LtEq => Operator::LtEq,
1528            BinaryOp::And => Operator::And,
1529            BinaryOp::Or => Operator::Or,
1530            BinaryOp::Xor => {
1531                return Err(anyhow!(
1532                    "XOR not supported via binary helper, use bitwise_xor"
1533                ));
1534            }
1535            BinaryOp::Regex => Operator::RegexMatch,
1536            BinaryOp::ApproxEq => {
1537                return Err(anyhow!(
1538                    "ApproxEq (~=) not yet supported in physical compiler"
1539                ));
1540            }
1541            BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1542            _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1543        };
1544
1545        // When either operand is LargeBinary (CypherValue), standard Arrow comparison
1546        // kernels can't handle the type mismatch. Route through Cypher comparison
1547        // UDFs which decode CypherValue to Value for comparison.
1548        let mut left = left;
1549        let mut right = right;
1550        let mut left_type = left.data_type(input_schema).ok();
1551        let mut right_type = right.data_type(input_schema).ok();
1552
1553        // Type unification: if one side is LargeList and the other is LargeBinary,
1554        // convert LargeList to LargeBinary for consistent handling
1555        let left_is_list = matches!(
1556            left_type.as_ref(),
1557            Some(DataType::List(_) | DataType::LargeList(_))
1558        );
1559        let right_is_list = matches!(
1560            right_type.as_ref(),
1561            Some(DataType::List(_) | DataType::LargeList(_))
1562        );
1563
1564        if left_is_list && is_cypher_value_type(right_type.as_ref()) {
1565            left = Arc::new(LargeListToCypherValueExpr::new(left));
1566            left_type = Some(DataType::LargeBinary);
1567        } else if right_is_list && is_cypher_value_type(left_type.as_ref()) {
1568            right = Arc::new(LargeListToCypherValueExpr::new(right));
1569            right_type = Some(DataType::LargeBinary);
1570        }
1571
1572        let has_cv =
1573            is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1574
1575        if has_cv {
1576            if let Some(result) = self.compile_cv_comparison(
1577                df_op,
1578                left.clone(),
1579                right.clone(),
1580                &left_type,
1581                &right_type,
1582            )? {
1583                return Ok(result);
1584            }
1585            if let Some(result) = self.compile_cv_list_concat(
1586                left.clone(),
1587                right.clone(),
1588                &left_type,
1589                &right_type,
1590                df_op,
1591            )? {
1592                return Ok(result);
1593            }
1594            if let Some(result) = self.compile_cv_arithmetic(
1595                df_op,
1596                left.clone(),
1597                right.clone(),
1598                &left_type,
1599                &right_type,
1600                input_schema,
1601            )? {
1602                return Ok(result);
1603            }
1604        }
1605
1606        // Use DataFusion's binary physical expression creator which handles coercion
1607        binary(left, df_op, right, input_schema)
1608            .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1609    }
1610
1611    /// Compile CypherValue comparison using Cypher UDFs.
1612    fn compile_cv_comparison(
1613        &self,
1614        df_op: datafusion::logical_expr::Operator,
1615        left: Arc<dyn PhysicalExpr>,
1616        right: Arc<dyn PhysicalExpr>,
1617        left_type: &Option<DataType>,
1618        right_type: &Option<DataType>,
1619    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1620        use datafusion::logical_expr::Operator;
1621
1622        let udf_name = match df_op {
1623            Operator::Eq => "_cypher_equal",
1624            Operator::NotEq => "_cypher_not_equal",
1625            Operator::Gt => "_cypher_gt",
1626            Operator::GtEq => "_cypher_gt_eq",
1627            Operator::Lt => "_cypher_lt",
1628            Operator::LtEq => "_cypher_lt_eq",
1629            _ => return Ok(None),
1630        };
1631
1632        self.plan_binary_udf(
1633            udf_name,
1634            left,
1635            right,
1636            left_type.clone().unwrap_or(DataType::LargeBinary),
1637            right_type.clone().unwrap_or(DataType::LargeBinary),
1638        )
1639    }
1640
1641    /// Compile CypherValue list concatenation.
1642    fn compile_cv_list_concat(
1643        &self,
1644        left: Arc<dyn PhysicalExpr>,
1645        right: Arc<dyn PhysicalExpr>,
1646        left_type: &Option<DataType>,
1647        right_type: &Option<DataType>,
1648        df_op: datafusion::logical_expr::Operator,
1649    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1650        use datafusion::logical_expr::Operator;
1651
1652        if df_op != Operator::Plus {
1653            return Ok(None);
1654        }
1655
1656        // List concat when at least one side is a list (CypherValue or Arrow List)
1657        let is_list = |t: &Option<DataType>| {
1658            t.as_ref()
1659                .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1660        };
1661
1662        if !is_list(left_type) && !is_list(right_type) {
1663            return Ok(None);
1664        }
1665
1666        self.plan_binary_udf(
1667            "_cypher_list_concat",
1668            left,
1669            right,
1670            left_type.clone().unwrap_or(DataType::LargeBinary),
1671            right_type.clone().unwrap_or(DataType::LargeBinary),
1672        )
1673    }
1674
1675    /// Compile CypherValue arithmetic.
1676    ///
1677    /// Routes arithmetic operations through CypherValue-aware UDFs when at least
1678    /// one operand is LargeBinary (CypherValue-encoded).
1679    fn compile_cv_arithmetic(
1680        &self,
1681        df_op: datafusion::logical_expr::Operator,
1682        left: Arc<dyn PhysicalExpr>,
1683        right: Arc<dyn PhysicalExpr>,
1684        left_type: &Option<DataType>,
1685        right_type: &Option<DataType>,
1686        _input_schema: &Schema,
1687    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1688        use datafusion::logical_expr::Operator;
1689
1690        let udf_name = match df_op {
1691            Operator::Plus => "_cypher_add",
1692            Operator::Minus => "_cypher_sub",
1693            Operator::Multiply => "_cypher_mul",
1694            Operator::Divide => "_cypher_div",
1695            Operator::Modulo => "_cypher_mod",
1696            _ => return Ok(None),
1697        };
1698
1699        self.plan_binary_udf(
1700            udf_name,
1701            left,
1702            right,
1703            left_type.clone().unwrap_or(DataType::LargeBinary),
1704            right_type.clone().unwrap_or(DataType::LargeBinary),
1705        )
1706    }
1707
1708    /// Plan a UDF expression with dummy schema columns, then rebind to actual physical expressions.
1709    fn plan_udf_physical_expr(
1710        &self,
1711        udf_expr: &datafusion::logical_expr::Expr,
1712        operand_types: &[(&str, DataType)],
1713        children: Vec<Arc<dyn PhysicalExpr>>,
1714        error_context: &str,
1715    ) -> Result<Arc<dyn PhysicalExpr>> {
1716        let tmp_schema = Schema::new(
1717            operand_types
1718                .iter()
1719                .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1720                .collect::<Vec<_>>(),
1721        );
1722        let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1723        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1724        let udf_phy = planner
1725            .create_physical_expr(udf_expr, &df_schema, self.state)
1726            .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1727        udf_phy
1728            .with_new_children(children)
1729            .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1730    }
1731
1732    /// Wrap a LargeBinary (CypherValue) expression with `_cv_to_bool` conversion.
1733    ///
1734    /// Used when a CypherValue expression needs to be used as a boolean (e.g., in WHEN clauses).
1735    fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1736        let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1737            return Err(anyhow!("_cv_to_bool UDF not found"));
1738        };
1739
1740        let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1741            None::<String>,
1742            "__cv__",
1743        ));
1744        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1745            datafusion::logical_expr::expr::ScalarFunction {
1746                func: udf.clone(),
1747                args: vec![dummy_col],
1748            },
1749        );
1750
1751        self.plan_udf_physical_expr(
1752            &udf_expr,
1753            &[("__cv__", DataType::LargeBinary)],
1754            vec![expr],
1755            "CypherValue to bool",
1756        )
1757    }
1758
1759    /// Plan a binary UDF with the given name and operand types.
1760    fn plan_binary_udf(
1761        &self,
1762        udf_name: &str,
1763        left: Arc<dyn PhysicalExpr>,
1764        right: Arc<dyn PhysicalExpr>,
1765        left_type: DataType,
1766        right_type: DataType,
1767    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1768        let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1769            return Ok(None);
1770        };
1771        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1772            datafusion::logical_expr::expr::ScalarFunction {
1773                func: udf.clone(),
1774                args: vec![
1775                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1776                        None::<String>,
1777                        "__left__",
1778                    )),
1779                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1780                        None::<String>,
1781                        "__right__",
1782                    )),
1783                ],
1784            },
1785        );
1786        let result = self.plan_udf_physical_expr(
1787            &udf_expr,
1788            &[("__left__", left_type), ("__right__", right_type)],
1789            vec![left, right],
1790            udf_name,
1791        )?;
1792        Ok(Some(result))
1793    }
1794
1795    fn compile_unary_op(
1796        &self,
1797        op: &UnaryOp,
1798        expr: Arc<dyn PhysicalExpr>,
1799        input_schema: &Schema,
1800    ) -> Result<Arc<dyn PhysicalExpr>> {
1801        match op {
1802            UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1803            UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1804        }
1805        .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1806    }
1807}
1808
1809// ---------------------------------------------------------------------------
1810// similar_to() AST helpers
1811// ---------------------------------------------------------------------------
1812
1813/// Extract the base variable name from a source expression.
1814///
1815/// For `d.embedding` → `"d"`, for `[d.embedding, d.content]` → `"d"`.
1816fn extract_source_variable(expr: &Expr) -> Option<String> {
1817    match expr {
1818        Expr::Property(inner, _) => extract_source_variable(inner),
1819        Expr::Variable(name) => Some(name.clone()),
1820        Expr::List(items) => items.first().and_then(extract_source_variable),
1821        _ => None,
1822    }
1823}
1824
1825/// Extract property names from source expressions.
1826///
1827/// For `d.embedding` → `["embedding"]`, for `[d.embedding, d.content]` → `["embedding", "content"]`.
1828fn extract_property_names(expr: &Expr) -> Vec<Option<String>> {
1829    match expr {
1830        Expr::Property(_, prop) => vec![Some(prop.clone())],
1831        Expr::List(items) => items
1832            .iter()
1833            .map(|item| {
1834                if let Expr::Property(_, prop) = item {
1835                    Some(prop.clone())
1836                } else {
1837                    None
1838                }
1839            })
1840            .collect(),
1841        _ => vec![None],
1842    }
1843}
1844
1845/// Normalize similar_to arguments for multi-source scoring.
1846///
1847/// If source arg is a `List`, returns the individual items as separate source/query pairs.
1848/// Otherwise returns the args as-is (single pair).
1849fn normalize_similar_to_args<'e>(
1850    sources: &'e Expr,
1851    queries: &'e Expr,
1852) -> (Vec<&'e Expr>, Vec<&'e Expr>) {
1853    match (sources, queries) {
1854        // Multi-source: [d.embedding, d.content] paired with [query_vec, query_text]
1855        (Expr::List(src_items), Expr::List(qry_items)) if src_items.len() == qry_items.len() => {
1856            (src_items.iter().collect(), qry_items.iter().collect())
1857        }
1858        // Multi-source with broadcast query: [d.embedding, d.content] paired with single query
1859        (Expr::List(src_items), _) if src_items.len() > 1 => {
1860            let queries_broadcast: Vec<&Expr> = vec![queries; src_items.len()];
1861            (src_items.iter().collect(), queries_broadcast)
1862        }
1863        // Single source
1864        _ => (vec![sources], vec![queries]),
1865    }
1866}
1867
1868use datafusion::physical_plan::DisplayAs;
1869use datafusion::physical_plan::DisplayFormatType;
1870
1871#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1872enum StringOp {
1873    StartsWith,
1874    EndsWith,
1875    Contains,
1876}
1877
1878impl std::fmt::Display for StringOp {
1879    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1880        match self {
1881            StringOp::StartsWith => write!(f, "STARTS WITH"),
1882            StringOp::EndsWith => write!(f, "ENDS WITH"),
1883            StringOp::Contains => write!(f, "CONTAINS"),
1884        }
1885    }
1886}
1887
1888#[derive(Debug, Eq)]
1889struct CypherStringMatchExpr {
1890    left: Arc<dyn PhysicalExpr>,
1891    right: Arc<dyn PhysicalExpr>,
1892    op: StringOp,
1893}
1894
1895impl PartialEq for CypherStringMatchExpr {
1896    fn eq(&self, other: &Self) -> bool {
1897        self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1898    }
1899}
1900
1901impl std::hash::Hash for CypherStringMatchExpr {
1902    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1903        self.op.hash(state);
1904        self.left.hash(state);
1905        self.right.hash(state);
1906    }
1907}
1908
1909impl CypherStringMatchExpr {
1910    fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1911        Self { left, right, op }
1912    }
1913}
1914
1915impl std::fmt::Display for CypherStringMatchExpr {
1916    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1917        write!(f, "{} {} {}", self.left, self.op, self.right)
1918    }
1919}
1920
1921impl DisplayAs for CypherStringMatchExpr {
1922    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1923        write!(f, "{}", self)
1924    }
1925}
1926
1927impl PhysicalExpr for CypherStringMatchExpr {
1928    fn as_any(&self) -> &dyn std::any::Any {
1929        self
1930    }
1931
1932    fn data_type(
1933        &self,
1934        _input_schema: &Schema,
1935    ) -> datafusion::error::Result<arrow_schema::DataType> {
1936        Ok(arrow_schema::DataType::Boolean)
1937    }
1938
1939    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
1940        Ok(true)
1941    }
1942
1943    fn evaluate(
1944        &self,
1945        batch: &arrow_array::RecordBatch,
1946    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
1947        use crate::query::df_udfs::invoke_cypher_string_op;
1948        use arrow_schema::Field;
1949        use datafusion::config::ConfigOptions;
1950        use datafusion::logical_expr::ScalarFunctionArgs;
1951
1952        let left_val = self.left.evaluate(batch)?;
1953        let right_val = self.right.evaluate(batch)?;
1954
1955        let args = ScalarFunctionArgs {
1956            args: vec![left_val, right_val],
1957            number_rows: batch.num_rows(),
1958            return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
1959            config_options: Arc::new(ConfigOptions::default()),
1960            arg_fields: vec![], // Not used by invoke_cypher_string_op
1961        };
1962
1963        match self.op {
1964            StringOp::StartsWith => {
1965                invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
1966            }
1967            StringOp::EndsWith => {
1968                invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
1969            }
1970            StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
1971        }
1972    }
1973
1974    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1975        vec![&self.left, &self.right]
1976    }
1977
1978    fn with_new_children(
1979        self: Arc<Self>,
1980        children: Vec<Arc<dyn PhysicalExpr>>,
1981    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
1982        Ok(Arc::new(CypherStringMatchExpr::new(
1983            children[0].clone(),
1984            children[1].clone(),
1985            self.op,
1986        )))
1987    }
1988
1989    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1990        write!(f, "{}", self)
1991    }
1992}
1993
1994impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
1995    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
1996        if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
1997            self == other
1998        } else {
1999            false
2000        }
2001    }
2002}
2003
2004/// Physical expression for extracting a field from a struct column.
2005///
2006/// Used when list comprehension iterates over a list of structs (maps)
2007/// and accesses a field, e.g., `[x IN [{a: 1}] | x.a]`.
2008#[derive(Debug, Eq)]
2009struct StructFieldAccessExpr {
2010    /// Expression producing the struct column.
2011    input: Arc<dyn PhysicalExpr>,
2012    /// Index of the field within the struct.
2013    field_idx: usize,
2014    /// Output data type of the extracted field.
2015    output_type: arrow_schema::DataType,
2016}
2017
2018impl PartialEq for StructFieldAccessExpr {
2019    fn eq(&self, other: &Self) -> bool {
2020        self.field_idx == other.field_idx
2021            && self.input.eq(&other.input)
2022            && self.output_type == other.output_type
2023    }
2024}
2025
2026impl std::hash::Hash for StructFieldAccessExpr {
2027    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2028        self.input.hash(state);
2029        self.field_idx.hash(state);
2030    }
2031}
2032
2033impl StructFieldAccessExpr {
2034    fn new(
2035        input: Arc<dyn PhysicalExpr>,
2036        field_idx: usize,
2037        output_type: arrow_schema::DataType,
2038    ) -> Self {
2039        Self {
2040            input,
2041            field_idx,
2042            output_type,
2043        }
2044    }
2045}
2046
2047impl std::fmt::Display for StructFieldAccessExpr {
2048    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2049        write!(f, "{}[{}]", self.input, self.field_idx)
2050    }
2051}
2052
2053impl DisplayAs for StructFieldAccessExpr {
2054    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2055        write!(f, "{}", self)
2056    }
2057}
2058
2059impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
2060    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
2061        if let Some(other) = other.as_any().downcast_ref::<Self>() {
2062            self.field_idx == other.field_idx && self.input.eq(&other.input)
2063        } else {
2064            false
2065        }
2066    }
2067}
2068
2069impl PhysicalExpr for StructFieldAccessExpr {
2070    fn as_any(&self) -> &dyn std::any::Any {
2071        self
2072    }
2073
2074    fn data_type(
2075        &self,
2076        _input_schema: &Schema,
2077    ) -> datafusion::error::Result<arrow_schema::DataType> {
2078        Ok(self.output_type.clone())
2079    }
2080
2081    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2082        Ok(true)
2083    }
2084
2085    fn evaluate(
2086        &self,
2087        batch: &arrow_array::RecordBatch,
2088    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2089        use arrow_array::StructArray;
2090
2091        let input_val = self.input.evaluate(batch)?;
2092        let array = input_val.into_array(batch.num_rows())?;
2093
2094        let struct_array = array
2095            .as_any()
2096            .downcast_ref::<StructArray>()
2097            .ok_or_else(|| {
2098                datafusion::error::DataFusionError::Execution(
2099                    "StructFieldAccessExpr: input is not a StructArray".to_string(),
2100                )
2101            })?;
2102
2103        let field_col = struct_array.column(self.field_idx).clone();
2104        Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
2105    }
2106
2107    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2108        vec![&self.input]
2109    }
2110
2111    fn with_new_children(
2112        self: Arc<Self>,
2113        children: Vec<Arc<dyn PhysicalExpr>>,
2114    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2115        Ok(Arc::new(StructFieldAccessExpr::new(
2116            children[0].clone(),
2117            self.field_idx,
2118            self.output_type.clone(),
2119        )))
2120    }
2121
2122    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2123        write!(f, "{}", self)
2124    }
2125}
2126
2127// ---------------------------------------------------------------------------
2128// EXISTS subquery physical expression
2129// ---------------------------------------------------------------------------
2130
2131/// Physical expression that evaluates an EXISTS subquery per row.
2132///
2133/// For each input row, plans and executes the subquery with the row's columns
2134/// injected as parameters. Returns `true` if the subquery produces any rows.
2135///
2136/// NOT EXISTS is handled by the caller wrapping this in a NOT expression.
2137/// Nested EXISTS works because `execute_subplan` creates a full planner that
2138/// handles nested EXISTS recursively.
2139struct ExistsExecExpr {
2140    query: Query,
2141    graph_ctx: Arc<GraphExecutionContext>,
2142    session_ctx: Arc<RwLock<SessionContext>>,
2143    storage: Arc<StorageManager>,
2144    uni_schema: Arc<UniSchema>,
2145    params: HashMap<String, Value>,
2146    /// Entity variable names from outer scopes, threaded into nested subplans
2147    /// so the inner expression compiler can detect correlated references.
2148    outer_entity_vars: HashSet<String>,
2149}
2150
2151impl std::fmt::Debug for ExistsExecExpr {
2152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2153        f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
2154    }
2155}
2156
2157impl ExistsExecExpr {
2158    fn new(
2159        query: Query,
2160        graph_ctx: Arc<GraphExecutionContext>,
2161        session_ctx: Arc<RwLock<SessionContext>>,
2162        storage: Arc<StorageManager>,
2163        uni_schema: Arc<UniSchema>,
2164        params: HashMap<String, Value>,
2165        outer_entity_vars: HashSet<String>,
2166    ) -> Self {
2167        Self {
2168            query,
2169            graph_ctx,
2170            session_ctx,
2171            storage,
2172            uni_schema,
2173            params,
2174            outer_entity_vars,
2175        }
2176    }
2177}
2178
2179impl std::fmt::Display for ExistsExecExpr {
2180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2181        write!(f, "EXISTS(<subquery>)")
2182    }
2183}
2184
2185impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
2186    fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
2187        false
2188    }
2189}
2190
2191impl PartialEq for ExistsExecExpr {
2192    fn eq(&self, _other: &Self) -> bool {
2193        false
2194    }
2195}
2196
2197impl Eq for ExistsExecExpr {}
2198
2199impl std::hash::Hash for ExistsExecExpr {
2200    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2201        "ExistsExecExpr".hash(state);
2202    }
2203}
2204
2205impl DisplayAs for ExistsExecExpr {
2206    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2207        write!(f, "{}", self)
2208    }
2209}
2210
2211impl PhysicalExpr for ExistsExecExpr {
2212    fn as_any(&self) -> &dyn std::any::Any {
2213        self
2214    }
2215
2216    fn data_type(
2217        &self,
2218        _input_schema: &Schema,
2219    ) -> datafusion::error::Result<arrow_schema::DataType> {
2220        Ok(DataType::Boolean)
2221    }
2222
2223    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2224        Ok(true)
2225    }
2226
2227    fn evaluate(
2228        &self,
2229        batch: &arrow_array::RecordBatch,
2230    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2231        let num_rows = batch.num_rows();
2232        let mut builder = BooleanBuilder::with_capacity(num_rows);
2233
2234        // 7.2: Extract entity variable names from batch schema.
2235        // Entity columns follow the pattern "varname._vid" (flattened) or are struct columns.
2236        // We pass ONLY entity base names (e.g., "p", "n", "m") as vars_in_scope so the
2237        // subquery planner treats them as bound (Imported) variables. The initial Project
2238        // then creates Parameter("n") AS "n" etc. — the traverse reads the VID from this
2239        // column via resolve_source_vid_col's bare-name fallback.
2240        //
2241        // We intentionally do NOT include raw column names (n._vid, n._labels, etc.) in
2242        // vars_in_scope to avoid duplicate column conflicts with traverse output columns.
2243        // Parameter expressions read directly from sub_params, not from plan columns.
2244        let schema = batch.schema();
2245        let mut entity_vars: HashSet<String> = HashSet::new();
2246        for field in schema.fields() {
2247            let name = field.name();
2248            if let Some(base) = name.strip_suffix("._vid") {
2249                entity_vars.insert(base.to_string());
2250            }
2251            if matches!(field.data_type(), DataType::Struct(_)) {
2252                entity_vars.insert(name.to_string());
2253            }
2254            // Also detect bare VID columns from parent EXISTS parameter projections.
2255            // In nested EXISTS, a parent level projects "n" as a bare Int64/UInt64 VID.
2256            // Simple identifier (no dots, no leading underscore) + integer type = VID.
2257            if !name.contains('.')
2258                && !name.starts_with('_')
2259                && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2260            {
2261                entity_vars.insert(name.to_string());
2262            }
2263        }
2264        let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2265
2266        // 7.3: Rewrite correlated property accesses to parameter references.
2267        // e.g., `n.prop` where `n` is an outer entity → `$param("n.prop")`
2268        let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2269
2270        // 7.4: Plan ONCE — the rewritten query is parameterized, same for all rows.
2271        let planner = QueryPlanner::new(self.uni_schema.clone());
2272        let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2273            Ok(plan) => plan,
2274            Err(e) => {
2275                return Err(datafusion::error::DataFusionError::Execution(format!(
2276                    "EXISTS subquery planning failed: {}",
2277                    e
2278                )));
2279            }
2280        };
2281
2282        // Execute all rows on a dedicated thread with a single tokio runtime.
2283        // The runtime must be created and dropped on this thread (not in an async context).
2284        let graph_ctx = self.graph_ctx.clone();
2285        let session_ctx = self.session_ctx.clone();
2286        let storage = self.storage.clone();
2287        let uni_schema = self.uni_schema.clone();
2288        let base_params = self.params.clone();
2289
2290        let result = std::thread::scope(|s| {
2291            s.spawn(|| {
2292                let rt = tokio::runtime::Builder::new_current_thread()
2293                    .enable_all()
2294                    .build()
2295                    .map_err(|e| {
2296                        datafusion::error::DataFusionError::Execution(format!(
2297                            "Failed to create runtime for EXISTS: {}",
2298                            e
2299                        ))
2300                    })?;
2301
2302                // Merge stored outer entity vars with batch-extracted vars
2303                // so the inner planner's compiler can detect correlated refs.
2304                let mut combined_entity_vars = self.outer_entity_vars.clone();
2305                combined_entity_vars.extend(entity_vars.iter().cloned());
2306
2307                for row_idx in 0..num_rows {
2308                    let row_params = extract_row_params(batch, row_idx);
2309                    let mut sub_params = base_params.clone();
2310                    sub_params.extend(row_params);
2311
2312                    // Add entity variable → VID value mappings so that
2313                    // Parameter("n") resolves to the VID for traversal sources.
2314                    for var in &entity_vars {
2315                        let vid_key = format!("{}._vid", var);
2316                        if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2317                            sub_params.insert(var.clone(), vid_val);
2318                        }
2319                    }
2320
2321                    let batches = rt.block_on(execute_subplan_with_outer_vars(
2322                        &logical_plan,
2323                        &sub_params,
2324                        &HashMap::new(), // No outer values for EXISTS subquery
2325                        &graph_ctx,
2326                        &session_ctx,
2327                        &storage,
2328                        &uni_schema,
2329                        &combined_entity_vars,
2330                        None, // EXISTS compile-time rejects mutations (expr_compiler:693)
2331                    ))?;
2332
2333                    let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2334                    builder.append_value(has_rows);
2335                }
2336
2337                Ok::<_, datafusion::error::DataFusionError>(())
2338            })
2339            .join()
2340            .unwrap_or_else(|_| {
2341                Err(datafusion::error::DataFusionError::Execution(
2342                    "EXISTS subquery thread panicked".to_string(),
2343                ))
2344            })
2345        });
2346
2347        if let Err(e) = result {
2348            return Err(datafusion::error::DataFusionError::Execution(format!(
2349                "EXISTS subquery execution failed: {}",
2350                e
2351            )));
2352        }
2353
2354        Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2355            builder.finish(),
2356        )))
2357    }
2358
2359    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2360        vec![]
2361    }
2362
2363    fn with_new_children(
2364        self: Arc<Self>,
2365        children: Vec<Arc<dyn PhysicalExpr>>,
2366    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2367        if !children.is_empty() {
2368            return Err(datafusion::error::DataFusionError::Plan(
2369                "ExistsExecExpr has no children".to_string(),
2370            ));
2371        }
2372        Ok(self)
2373    }
2374
2375    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2376        write!(f, "{}", self)
2377    }
2378}
2379
2380// ---------------------------------------------------------------------------
2381// EXISTS subquery helpers
2382// ---------------------------------------------------------------------------
2383
2384/// Check if a Query contains any mutation clauses (CREATE, SET, DELETE, REMOVE, MERGE).
2385/// EXISTS subqueries must be read-only per OpenCypher spec.
2386fn has_mutation_clause(query: &Query) -> bool {
2387    match query {
2388        Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2389            matches!(
2390                c,
2391                Clause::Create(_)
2392                    | Clause::Delete(_)
2393                    | Clause::Set(_)
2394                    | Clause::Remove(_)
2395                    | Clause::Merge(_)
2396            ) || has_mutation_in_clause_exprs(c)
2397        }),
2398        Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2399        _ => false,
2400    }
2401}
2402
2403/// Check if a clause contains nested EXISTS with mutations (recursive).
2404fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2405    let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2406
2407    match clause {
2408        Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2409        Clause::With(w) => {
2410            w.where_clause.as_ref().is_some_and(check_expr)
2411                || w.items.iter().any(|item| match item {
2412                    ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2413                    ReturnItem::All => false,
2414                })
2415        }
2416        Clause::Return(r) => r.items.iter().any(|item| match item {
2417            ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2418            ReturnItem::All => false,
2419        }),
2420        _ => false,
2421    }
2422}
2423
2424/// Check if an expression tree contains an EXISTS with mutation clauses.
2425fn has_mutation_in_expr(expr: &Expr) -> bool {
2426    match expr {
2427        Expr::Exists { query, .. } => has_mutation_clause(query),
2428        _ => {
2429            let mut found = false;
2430            expr.for_each_child(&mut |child| {
2431                if has_mutation_in_expr(child) {
2432                    found = true;
2433                }
2434            });
2435            found
2436        }
2437    }
2438}
2439
2440/// Rewrite a Query AST to replace correlated property accesses with parameter references.
2441///
2442/// For each `Property(Variable(v), key)` where `v` is an outer-scope entity variable,
2443/// replaces it with `Parameter("{v}.{key}")`. This enables plan-once optimization since
2444/// the rewritten query is parameterized (same structure for every row).
2445fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2446    match query {
2447        Query::Single(stmt) => Query::Single(Statement {
2448            clauses: stmt
2449                .clauses
2450                .iter()
2451                .map(|c| rewrite_clause_correlated(c, outer_vars))
2452                .collect(),
2453        }),
2454        Query::Union { left, right, all } => Query::Union {
2455            left: Box::new(rewrite_query_correlated(left, outer_vars)),
2456            right: Box::new(rewrite_query_correlated(right, outer_vars)),
2457            all: *all,
2458        },
2459        other => other.clone(),
2460    }
2461}
2462
2463/// Rewrite expressions within a clause for correlated property access.
2464fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2465    match clause {
2466        Clause::Match(m) => Clause::Match(MatchClause {
2467            optional: m.optional,
2468            pattern: m.pattern.clone(),
2469            where_clause: m
2470                .where_clause
2471                .as_ref()
2472                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2473            for_update: m.for_update,
2474        }),
2475        Clause::With(w) => Clause::With(WithClause {
2476            distinct: w.distinct,
2477            items: w
2478                .items
2479                .iter()
2480                .map(|item| rewrite_return_item(item, outer_vars))
2481                .collect(),
2482            order_by: w.order_by.as_ref().map(|items| {
2483                items
2484                    .iter()
2485                    .map(|si| SortItem {
2486                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2487                        ascending: si.ascending,
2488                    })
2489                    .collect()
2490            }),
2491            skip: w
2492                .skip
2493                .as_ref()
2494                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2495            limit: w
2496                .limit
2497                .as_ref()
2498                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2499            where_clause: w
2500                .where_clause
2501                .as_ref()
2502                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2503        }),
2504        Clause::Return(r) => Clause::Return(ReturnClause {
2505            distinct: r.distinct,
2506            items: r
2507                .items
2508                .iter()
2509                .map(|item| rewrite_return_item(item, outer_vars))
2510                .collect(),
2511            order_by: r.order_by.as_ref().map(|items| {
2512                items
2513                    .iter()
2514                    .map(|si| SortItem {
2515                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2516                        ascending: si.ascending,
2517                    })
2518                    .collect()
2519            }),
2520            skip: r
2521                .skip
2522                .as_ref()
2523                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2524            limit: r
2525                .limit
2526                .as_ref()
2527                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2528        }),
2529        Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2530            expr: rewrite_expr_correlated(&u.expr, outer_vars),
2531            variable: u.variable.clone(),
2532        }),
2533        other => other.clone(),
2534    }
2535}
2536
2537fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2538    match item {
2539        ReturnItem::All => ReturnItem::All,
2540        ReturnItem::Expr {
2541            expr,
2542            alias,
2543            source_text,
2544        } => ReturnItem::Expr {
2545            expr: rewrite_expr_correlated(expr, outer_vars),
2546            alias: alias.clone(),
2547            source_text: source_text.clone(),
2548        },
2549    }
2550}
2551
2552/// Rewrite a single expression: Property(Variable(v), key) → Parameter("{v}.{key}")
2553/// when v is an outer-scope entity variable. Handles nested EXISTS recursively.
2554fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2555    match expr {
2556        // Core rewrite: n.prop → $param("n.prop") when n is an outer entity
2557        Expr::Property(base, key) => {
2558            if let Expr::Variable(v) = base.as_ref()
2559                && outer_vars.contains(v)
2560            {
2561                return Expr::Parameter(format!("{}.{}", v, key));
2562            }
2563            Expr::Property(
2564                Box::new(rewrite_expr_correlated(base, outer_vars)),
2565                key.clone(),
2566            )
2567        }
2568        // Nested EXISTS — recurse into the subquery body
2569        Expr::Exists {
2570            query,
2571            from_pattern_predicate,
2572        } => Expr::Exists {
2573            query: Box::new(rewrite_query_correlated(query, outer_vars)),
2574            from_pattern_predicate: *from_pattern_predicate,
2575        },
2576        // CountSubquery and CollectSubquery — recurse into subquery body
2577        Expr::CountSubquery(query) => {
2578            Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2579        }
2580        Expr::CollectSubquery(query) => {
2581            Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2582        }
2583        // All other expressions: recursively transform children
2584        other => other
2585            .clone()
2586            .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2587    }
2588}
2589
2590/// Look up the `DistanceMetric` for a vector-indexed property across all labels.
2591///
2592/// Returns `None` if no vector index exists for the property.
2593fn resolve_metric_for_property(schema: &UniSchema, property: &str) -> Option<DistanceMetric> {
2594    for idx in &schema.indexes {
2595        if let IndexDefinition::Vector(config) = idx
2596            && config.property == property
2597        {
2598            return Some(config.metric.clone());
2599        }
2600    }
2601    None
2602}