Skip to main content

uni_query/query/df_graph/
expr_compiler.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::query::df_expr::{TranslationContext, VariableKind, cypher_expr_to_df};
5use crate::query::df_graph::GraphExecutionContext;
6use crate::query::df_graph::common::{execute_subplan_with_outer_vars, extract_row_params};
7use crate::query::df_graph::comprehension::ListComprehensionExecExpr;
8use crate::query::df_graph::pattern_comprehension::{
9    PatternComprehensionExecExpr, analyze_pattern, build_inner_schema, collect_inner_properties,
10};
11use crate::query::df_graph::pattern_exists::{
12    PatternExistsExecExpr, extract_pattern_from_exists_query, extract_target_property_predicates,
13};
14use crate::query::df_graph::quantifier::{QuantifierExecExpr, QuantifierType};
15use crate::query::df_graph::raw_bytes_marker;
16use crate::query::df_graph::reduce::ReduceExecExpr;
17use crate::query::df_graph::similar_to_expr::SimilarToExecExpr;
18use crate::query::planner::QueryPlanner;
19use crate::query::similar_to::SimilarToError;
20use anyhow::{Result, anyhow};
21use arrow_array::builder::BooleanBuilder;
22use arrow_schema::{DataType, Field, Schema};
23use datafusion::execution::context::SessionState;
24use datafusion::physical_expr::expressions::binary;
25use datafusion::physical_plan::PhysicalExpr;
26use datafusion::physical_planner::PhysicalPlanner;
27use datafusion::prelude::SessionContext;
28use parking_lot::RwLock;
29use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use uni_common::Value;
32use uni_common::core::schema::{DistanceMetric, IndexDefinition, Schema as UniSchema};
33use uni_cypher::ast::{
34    BinaryOp, Clause, CypherLiteral, Expr, MatchClause, Query, ReturnClause, ReturnItem, SortItem,
35    Statement, UnaryOp, UnwindClause, WithClause,
36};
37use uni_store::storage::manager::StorageManager;
38
39/// Check if a data type represents CypherValue (LargeBinary) or BTIC (FixedSizeBinary(24)).
40/// Both need to route through Cypher UDFs for comparison operators.
41fn is_cypher_value_type(dt: Option<&DataType>) -> bool {
42    dt.is_some_and(|t| matches!(t, DataType::LargeBinary | DataType::FixedSizeBinary(24)))
43}
44
45/// Resolve the element type for a list expression.
46///
47/// Extracts the element type from List/LargeList/Null/LargeBinary data types.
48/// Falls back to the provided fallback type for LargeBinary, and returns an
49/// error with the provided context for unsupported types.
50///
51/// # Errors
52///
53/// Returns an error if the data type is not a recognized list type.
54fn resolve_list_element_type(
55    list_data_type: &DataType,
56    large_binary_fallback: DataType,
57    context: &str,
58) -> Result<DataType> {
59    match list_data_type {
60        DataType::List(field) | DataType::LargeList(field) => Ok(field.data_type().clone()),
61        DataType::Null => Ok(DataType::Null),
62        DataType::LargeBinary => Ok(large_binary_fallback),
63        _ => Err(anyhow!(
64            "{} input must be a list, got {:?}",
65            context,
66            list_data_type
67        )),
68    }
69}
70
71/// Physical expression wrapper that converts LargeList<T> to LargeBinary (CypherValue).
72///
73/// Used in CASE expressions to unify branch types when mixing typed lists
74/// (e.g., from list comprehensions) with CypherValue-encoded lists.
75#[derive(Debug)]
76struct LargeListToCypherValueExpr {
77    child: Arc<dyn PhysicalExpr>,
78}
79
80impl LargeListToCypherValueExpr {
81    fn new(child: Arc<dyn PhysicalExpr>) -> Self {
82        Self { child }
83    }
84}
85
86impl std::fmt::Display for LargeListToCypherValueExpr {
87    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
88        write!(f, "LargeListToCypherValue({})", self.child)
89    }
90}
91
92impl PartialEq for LargeListToCypherValueExpr {
93    fn eq(&self, other: &Self) -> bool {
94        Arc::ptr_eq(&self.child, &other.child)
95    }
96}
97
98impl Eq for LargeListToCypherValueExpr {}
99
100impl std::hash::Hash for LargeListToCypherValueExpr {
101    fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {
102        // Hash based on type since we can't hash PhysicalExpr
103        std::any::type_name::<Self>().hash(_state);
104    }
105}
106
107impl PartialEq<dyn std::any::Any> for LargeListToCypherValueExpr {
108    fn eq(&self, other: &dyn std::any::Any) -> bool {
109        other
110            .downcast_ref::<Self>()
111            .map(|x| self == x)
112            .unwrap_or(false)
113    }
114}
115
116impl PhysicalExpr for LargeListToCypherValueExpr {
117    fn as_any(&self) -> &dyn std::any::Any {
118        self
119    }
120
121    fn data_type(&self, _input_schema: &Schema) -> datafusion::error::Result<DataType> {
122        Ok(DataType::LargeBinary)
123    }
124
125    fn nullable(&self, input_schema: &Schema) -> datafusion::error::Result<bool> {
126        self.child.nullable(input_schema)
127    }
128
129    fn evaluate(
130        &self,
131        batch: &arrow_array::RecordBatch,
132    ) -> datafusion::error::Result<datafusion::logical_expr::ColumnarValue> {
133        use datafusion::arrow::compute::cast;
134        use datafusion::logical_expr::ColumnarValue;
135
136        let child_result = self.child.evaluate(batch)?;
137        let child_array = child_result.into_array(batch.num_rows())?;
138
139        // Normalize List → LargeList (pattern from quantifier.rs:182-189)
140        let list_array = if let DataType::List(field) = child_array.data_type() {
141            let target_type = DataType::LargeList(field.clone());
142            cast(&child_array, &target_type).map_err(|e| {
143                datafusion::error::DataFusionError::Execution(format!(
144                    "List to LargeList cast failed: {e}"
145                ))
146            })?
147        } else {
148            child_array.clone()
149        };
150
151        // If already LargeBinary, pass through
152        if list_array.data_type() == &DataType::LargeBinary {
153            return Ok(ColumnarValue::Array(list_array));
154        }
155
156        // Convert LargeList to CypherValue
157        if let Some(large_list) = list_array
158            .as_any()
159            .downcast_ref::<datafusion::arrow::array::LargeListArray>()
160        {
161            let cv_array =
162                crate::query::df_graph::common::typed_large_list_to_cv_array(large_list)?;
163            Ok(ColumnarValue::Array(cv_array))
164        } else {
165            Err(datafusion::error::DataFusionError::Execution(format!(
166                "Expected List or LargeList, got {:?}",
167                list_array.data_type()
168            )))
169        }
170    }
171
172    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
173        vec![&self.child]
174    }
175
176    fn with_new_children(
177        self: Arc<Self>,
178        children: Vec<Arc<dyn PhysicalExpr>>,
179    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
180        if children.len() != 1 {
181            return Err(datafusion::error::DataFusionError::Execution(
182                "LargeListToCypherValueExpr expects exactly 1 child".to_string(),
183            ));
184        }
185        Ok(Arc::new(LargeListToCypherValueExpr::new(
186            children[0].clone(),
187        )))
188    }
189
190    fn fmt_sql(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
191        write!(f, "LargeListToCypherValue({})", self.child)
192    }
193}
194
195/// Compiler for converting Cypher expressions directly to DataFusion Physical Expressions.
196pub struct CypherPhysicalExprCompiler<'a> {
197    state: &'a SessionState,
198    translation_ctx: Option<&'a TranslationContext>,
199    graph_ctx: Option<Arc<GraphExecutionContext>>,
200    uni_schema: Option<Arc<UniSchema>>,
201    /// Session context for EXISTS subquery execution.
202    session_ctx: Option<Arc<RwLock<SessionContext>>>,
203    /// Storage manager for EXISTS subquery execution.
204    storage: Option<Arc<StorageManager>>,
205    /// Query parameters for EXISTS subquery execution.
206    params: HashMap<String, Value>,
207    /// Entity variable names from outer scopes (for nested EXISTS).
208    /// A target variable in a pattern predicate that appears here is a
209    /// correlated reference requiring per-row evaluation, not a fresh binding.
210    outer_entity_vars: HashSet<String>,
211}
212
213impl<'a> CypherPhysicalExprCompiler<'a> {
214    pub fn new(state: &'a SessionState, translation_ctx: Option<&'a TranslationContext>) -> Self {
215        Self {
216            state,
217            translation_ctx,
218            graph_ctx: None,
219            uni_schema: None,
220            session_ctx: None,
221            storage: None,
222            params: HashMap::new(),
223            outer_entity_vars: HashSet::new(),
224        }
225    }
226
227    /// Build a scoped compiler that excludes the given variables from the translation context.
228    ///
229    /// When compiling inner expressions for list comprehensions, reduce, or quantifiers,
230    /// loop variables must be removed from `variable_kinds` so that property access on
231    /// them does not incorrectly generate flat columns that don't exist in the inner schema.
232    ///
233    /// If none of the `exclude_vars` are present in the current context, the returned
234    /// compiler simply reuses `self`'s translation context unchanged.
235    ///
236    /// The caller must own `scoped_ctx_slot` and keep it alive for the returned compiler's
237    /// lifetime.
238    fn scoped_compiler<'b>(
239        &'b self,
240        exclude_vars: &[&str],
241        scoped_ctx_slot: &'b mut Option<TranslationContext>,
242    ) -> CypherPhysicalExprCompiler<'b>
243    where
244        'a: 'b,
245    {
246        let needs_scoping = self.translation_ctx.is_some_and(|ctx| {
247            exclude_vars
248                .iter()
249                .any(|v| ctx.variable_kinds.contains_key(*v))
250        });
251
252        let ctx_ref = if needs_scoping {
253            let ctx = self.translation_ctx.unwrap();
254            let mut new_kinds = ctx.variable_kinds.clone();
255            for v in exclude_vars {
256                new_kinds.remove(*v);
257            }
258            *scoped_ctx_slot = Some(TranslationContext {
259                parameters: ctx.parameters.clone(),
260                outer_values: ctx.outer_values.clone(),
261                variable_labels: ctx.variable_labels.clone(),
262                variable_kinds: new_kinds,
263                node_variable_hints: ctx.node_variable_hints.clone(),
264                mutation_edge_hints: ctx.mutation_edge_hints.clone(),
265                statement_time: ctx.statement_time,
266            });
267            scoped_ctx_slot.as_ref()
268        } else {
269            self.translation_ctx
270        };
271
272        CypherPhysicalExprCompiler {
273            state: self.state,
274            translation_ctx: ctx_ref,
275            graph_ctx: self.graph_ctx.clone(),
276            uni_schema: self.uni_schema.clone(),
277            session_ctx: self.session_ctx.clone(),
278            storage: self.storage.clone(),
279            params: self.params.clone(),
280            outer_entity_vars: self.outer_entity_vars.clone(),
281        }
282    }
283
284    /// Attach graph context and schema for pattern comprehension support.
285    pub fn with_graph_ctx(
286        mut self,
287        graph_ctx: Arc<GraphExecutionContext>,
288        uni_schema: Arc<UniSchema>,
289    ) -> Self {
290        self.graph_ctx = Some(graph_ctx);
291        self.uni_schema = Some(uni_schema);
292        self
293    }
294
295    /// Attach full subquery context for EXISTS support.
296    pub fn with_subquery_ctx(
297        mut self,
298        graph_ctx: Arc<GraphExecutionContext>,
299        uni_schema: Arc<UniSchema>,
300        session_ctx: Arc<RwLock<SessionContext>>,
301        storage: Arc<StorageManager>,
302        params: HashMap<String, Value>,
303        outer_entity_vars: HashSet<String>,
304    ) -> Self {
305        self.graph_ctx = Some(graph_ctx);
306        self.uni_schema = Some(uni_schema);
307        self.session_ctx = Some(session_ctx);
308        self.storage = Some(storage);
309        self.params = params;
310        self.outer_entity_vars = outer_entity_vars;
311        self
312    }
313
314    /// Compile a Cypher expression into a DataFusion PhysicalExpr.
315    pub fn compile(&self, expr: &Expr, input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
316        match expr {
317            Expr::ListComprehension {
318                variable,
319                list,
320                where_clause,
321                map_expr,
322            } => self.compile_list_comprehension(
323                variable,
324                list,
325                where_clause.as_deref(),
326                map_expr,
327                input_schema,
328            ),
329            Expr::Quantifier {
330                quantifier,
331                variable,
332                list,
333                predicate,
334            } => self.compile_quantifier(quantifier, variable, list, predicate, input_schema),
335            Expr::Reduce {
336                accumulator,
337                init,
338                variable,
339                list,
340                expr: expression,
341            } => self.compile_reduce(accumulator, init, variable, list, expression, input_schema),
342            // For BinaryOp, check if children contain custom expressions or CypherValue types
343            Expr::BinaryOp { left, op, right } => {
344                self.compile_binary_op_dispatch(left, op, right, input_schema)
345            }
346            Expr::UnaryOp { op, expr: inner } => {
347                if matches!(op, UnaryOp::Not) {
348                    let mut inner_phy = self.compile(inner, input_schema)?;
349                    if let Ok(DataType::LargeBinary) = inner_phy.data_type(input_schema) {
350                        inner_phy = self.wrap_with_cv_to_bool(inner_phy)?;
351                    }
352                    self.compile_unary_op(op, inner_phy, input_schema)
353                } else if Self::contains_custom_expr(inner) {
354                    let inner_phy = self.compile(inner, input_schema)?;
355                    self.compile_unary_op(op, inner_phy, input_schema)
356                } else {
357                    self.compile_standard(expr, input_schema)
358                }
359            }
360            Expr::IsNull(inner) => {
361                if Self::contains_custom_expr(inner) {
362                    let inner_phy = self.compile(inner, input_schema)?;
363                    Ok(datafusion::physical_expr::expressions::is_null(inner_phy)
364                        .map_err(|e| anyhow!("Failed to create is_null: {}", e))?)
365                } else {
366                    self.compile_standard(expr, input_schema)
367                }
368            }
369            Expr::IsNotNull(inner) => {
370                if Self::contains_custom_expr(inner) {
371                    let inner_phy = self.compile(inner, input_schema)?;
372                    Ok(
373                        datafusion::physical_expr::expressions::is_not_null(inner_phy)
374                            .map_err(|e| anyhow!("Failed to create is_not_null: {}", e))?,
375                    )
376                } else {
377                    self.compile_standard(expr, input_schema)
378                }
379            }
380            // In operator is Expr::In { expr, list }
381            Expr::In {
382                expr: left,
383                list: right,
384            } => {
385                if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
386                    let left_phy = self.compile(left, input_schema)?;
387                    let right_phy = self.compile(right, input_schema)?;
388
389                    let left_type = left_phy
390                        .data_type(input_schema)
391                        .unwrap_or(DataType::LargeBinary);
392                    let right_type = right_phy
393                        .data_type(input_schema)
394                        .unwrap_or(DataType::LargeBinary);
395
396                    self.plan_binary_udf("_cypher_in", left_phy, right_phy, left_type, right_type)?
397                        .ok_or_else(|| anyhow!("_cypher_in UDF not found"))
398                } else {
399                    self.compile_standard(expr, input_schema)
400                }
401            }
402
403            // A raw-`Bytes` list literal (e.g. `[b.data]`): compile the `make_array`
404            // then mark its child field so element-extraction (`head`/`index`/…) and
405            // the final read decode each element as raw `Bytes` rather than via the
406            // tagged codec. CV-routed / mixed lists carry no marker (analyzer is
407            // conservative), so this never disturbs pattern-comprehension/VLP.
408            Expr::List(_) if raw_bytes_marker::is_markable_list(expr, input_schema) => {
409                let inner = self.compile_standard(expr, input_schema)?;
410                Ok(Arc::new(raw_bytes_marker::RawBytesMarkerExpr::list_child(
411                    inner,
412                )))
413            }
414
415            // Recursively check other composite types if necessary.
416            Expr::List(items) if items.iter().any(Self::contains_custom_expr) => Err(anyhow!(
417                "List literals containing comprehensions not yet supported in compiler"
418            )),
419            Expr::Map(entries) if entries.iter().any(|(_, v)| Self::contains_custom_expr(v)) => {
420                Err(anyhow!(
421                    "Map literals containing comprehensions not yet supported in compiler"
422                ))
423            }
424
425            // Property access on a struct column — e.g. `x.a` where `x` is Struct
426            Expr::Property(base, prop) => self.compile_property_access(base, prop, input_schema),
427
428            // Bracket access on a struct column — e.g. `x['a']` where `x` is Struct
429            Expr::ArrayIndex { array, index } => {
430                self.compile_array_index(array, index, input_schema)
431            }
432
433            // Pattern comprehension: [(a)-[:REL]->(b) WHERE pred | expr]
434            Expr::PatternComprehension {
435                path_variable,
436                pattern,
437                where_clause,
438                map_expr,
439            } => self.compile_pattern_comprehension(
440                path_variable,
441                pattern,
442                where_clause.as_deref(),
443                map_expr,
444                input_schema,
445            ),
446
447            // EXISTS subquery: vectorized for pattern predicates, per-row for general EXISTS.
448            Expr::Exists {
449                query,
450                from_pattern_predicate,
451            } => {
452                if *from_pattern_predicate {
453                    match self.compile_pattern_exists(query, input_schema) {
454                        Ok(expr) => Ok(expr),
455                        Err(e) => {
456                            log::debug!(
457                                "Pattern exists vectorization failed, falling back to subquery: {e}"
458                            );
459                            self.compile_exists(query)
460                        }
461                    }
462                } else {
463                    self.compile_exists(query)
464                }
465            }
466
467            // FunctionCall wrapping a custom expression (e.g. size(comprehension))
468            // A coalesce that mixes a raw-`Bytes` arg with a non-raw, non-null one:
469            // rewrite to an explicit CASE whose THEN/ELSE values are CypherValue-
470            // encoded (while WHEN tests the original args for null), so the output
471            // column is uniformly codec-decodable and raw bytes round-trip to
472            // `Value::Bytes`. A uniformly raw-or-null coalesce is left alone and marked
473            // at the projection (see `raw_bytes_marker::is_raw_scalar`).
474            Expr::FunctionCall { name, args, .. }
475                if name.eq_ignore_ascii_case("coalesce")
476                    && args.len() > 1
477                    && raw_bytes_marker::coalesce_needs_cv_unify(args, input_schema) =>
478            {
479                let cv_wrap = |a: &Expr| Expr::FunctionCall {
480                    name: "_cypher_scalar_to_cv".to_string(),
481                    args: vec![a.clone()],
482                    distinct: false,
483                    window_spec: None,
484                };
485                let (last, init) = args.split_last().unwrap();
486                let when_then = init
487                    .iter()
488                    .map(|a| (Expr::IsNotNull(Box::new(a.clone())), cv_wrap(a)))
489                    .collect();
490                let case = Expr::Case {
491                    expr: None,
492                    when_then,
493                    else_expr: Some(Box::new(cv_wrap(last))),
494                };
495                self.compile_standard(&case, input_schema)
496            }
497            Expr::FunctionCall {
498                name,
499                args,
500                distinct,
501                ..
502            } => {
503                if name.eq_ignore_ascii_case("similar_to") {
504                    return self.compile_similar_to(args, input_schema);
505                }
506                // Route through the recursive custom-args path when an argument is a
507                // raw-`Bytes` list literal, so its `make_array` child gets marked (the
508                // list-consumer UDFs `head`/`last`/`tail`/… then decode raw bytes).
509                if args.iter().any(Self::contains_custom_expr)
510                    || args
511                        .iter()
512                        .any(|a| raw_bytes_marker::is_markable_list(a, input_schema))
513                {
514                    self.compile_function_with_custom_args(name, args, *distinct, input_schema)
515                } else {
516                    self.compile_standard(expr, input_schema)
517                }
518            }
519
520            // CASE expression - dispatch based on whether it contains custom expressions
521            Expr::Case {
522                expr: case_operand,
523                when_then,
524                else_expr,
525            } => {
526                // Check if operand or any branch contains custom expressions
527                let has_custom = case_operand
528                    .as_deref()
529                    .is_some_and(Self::contains_custom_expr)
530                    || when_then.iter().any(|(w, t)| {
531                        Self::contains_custom_expr(w) || Self::contains_custom_expr(t)
532                    })
533                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr);
534
535                if has_custom {
536                    // Use compile_case() for CypherValue boolean conversion and
537                    // LargeList/LargeBinary type unification
538                    self.compile_case(case_operand, when_then, else_expr, input_schema)
539                } else {
540                    // Standard compilation path - goes through apply_type_coercion which handles:
541                    // 1. Simple CASE → Generic CASE rewriting with cross-type equality
542                    // 2. Type coercion for CASE result branches
543                    // 3. Numeric widening for comparisons
544                    self.compile_standard(expr, input_schema)
545                }
546            }
547
548            // LabelCheck: delegate to standard compilation (uses cypher_expr_to_df)
549            Expr::LabelCheck { .. } => self.compile_standard(expr, input_schema),
550
551            // Default to standard compilation for leaf nodes or non-custom trees
552            _ => self.compile_standard(expr, input_schema),
553        }
554    }
555
556    /// Dispatch binary op compilation, checking for custom expressions and CypherValue types.
557    fn compile_binary_op_dispatch(
558        &self,
559        left: &Expr,
560        op: &BinaryOp,
561        right: &Expr,
562        input_schema: &Schema,
563    ) -> Result<Arc<dyn PhysicalExpr>> {
564        if matches!(op, BinaryOp::Eq | BinaryOp::NotEq)
565            && let (Expr::Variable(lv), Expr::Variable(rv)) = (left, right)
566            && let Some(ctx) = self.translation_ctx
567            && let (Some(lk), Some(rk)) = (ctx.variable_kinds.get(lv), ctx.variable_kinds.get(rv))
568        {
569            let identity_prop = match (lk, rk) {
570                (VariableKind::Node, VariableKind::Node) => Some("_vid"),
571                (VariableKind::Edge, VariableKind::Edge) => Some("_eid"),
572                _ => None,
573            };
574
575            if let Some(id_prop) = identity_prop {
576                return self.compile_standard(
577                    &Expr::BinaryOp {
578                        left: Box::new(Expr::Property(
579                            Box::new(Expr::Variable(lv.clone())),
580                            id_prop.to_string(),
581                        )),
582                        op: *op,
583                        right: Box::new(Expr::Property(
584                            Box::new(Expr::Variable(rv.clone())),
585                            id_prop.to_string(),
586                        )),
587                    },
588                    input_schema,
589                );
590            }
591        }
592
593        // XOR and Pow: always route through compile_standard.
594        // compile_binary_op does not support these operators. The standard path
595        // correctly maps XOR → _cypher_xor UDF and Pow → power() function.
596        if matches!(op, BinaryOp::Xor | BinaryOp::Pow) {
597            return self.compile_standard(
598                &Expr::BinaryOp {
599                    left: Box::new(left.clone()),
600                    op: *op,
601                    right: Box::new(right.clone()),
602                },
603                input_schema,
604            );
605        }
606
607        if Self::contains_custom_expr(left) || Self::contains_custom_expr(right) {
608            let left_phy = self.compile(left, input_schema)?;
609            let right_phy = self.compile(right, input_schema)?;
610            return self.compile_binary_op(op, left_phy, right_phy, input_schema);
611        }
612
613        // For Add with a list-producing operand (AST-level detection),
614        // compile through the standard path which uses cypher_expr_to_df
615        // to correctly route list + scalar to _cypher_list_concat.
616        if *op == BinaryOp::Add && (Self::is_list_producing(left) || Self::is_list_producing(right))
617        {
618            return self.compile_standard(
619                &Expr::BinaryOp {
620                    left: Box::new(left.clone()),
621                    op: *op,
622                    right: Box::new(right.clone()),
623                },
624                input_schema,
625            );
626        }
627
628        // Compile sub-expressions to check their types. If either operand
629        // produces LargeBinary (CypherValue), standard Arrow kernels will fail at
630        // runtime for comparisons. Route through compile_binary_op which
631        // dispatches to Cypher comparison UDFs.
632        let left_phy = self.compile(left, input_schema)?;
633        let right_phy = self.compile(right, input_schema)?;
634        let left_dt = left_phy.data_type(input_schema).ok();
635        let right_dt = right_phy.data_type(input_schema).ok();
636        let has_cv =
637            is_cypher_value_type(left_dt.as_ref()) || is_cypher_value_type(right_dt.as_ref());
638
639        if has_cv {
640            // CypherValue types need special handling via compile_binary_op
641            self.compile_binary_op(op, left_phy, right_phy, input_schema)
642        } else {
643            // Standard types: use compile_standard to get proper type coercion
644            // (e.g., Int64 == Float64 requires coercion to work)
645            self.compile_standard(
646                &Expr::BinaryOp {
647                    left: Box::new(left.clone()),
648                    op: *op,
649                    right: Box::new(right.clone()),
650                },
651                input_schema,
652            )
653        }
654    }
655
656    /// Try to compile struct field access for a variable.
657    ///
658    /// Returns `Some(expr)` if the variable is a Struct column and can be accessed,
659    /// `None` if fallback to standard compilation is needed.
660    fn try_compile_struct_field(
661        &self,
662        var_name: &str,
663        field_name: &str,
664        input_schema: &Schema,
665    ) -> Option<Arc<dyn PhysicalExpr>> {
666        let col_idx = input_schema.index_of(var_name).ok()?;
667        let DataType::Struct(struct_fields) = input_schema.field(col_idx).data_type() else {
668            return None;
669        };
670
671        // Cypher semantics: accessing a missing key returns null
672        if let Some(field_idx) = struct_fields.iter().position(|f| f.name() == field_name) {
673            let output_type = struct_fields[field_idx].data_type().clone();
674            let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
675                datafusion::physical_expr::expressions::Column::new(var_name, col_idx),
676            );
677            Some(Arc::new(StructFieldAccessExpr::new(
678                col_expr,
679                field_idx,
680                output_type,
681            )))
682        } else {
683            Some(Arc::new(
684                datafusion::physical_expr::expressions::Literal::new(
685                    datafusion::common::ScalarValue::Null,
686                ),
687            ))
688        }
689    }
690
691    /// Compile property access on a struct column (e.g. `x.a` where `x` is Struct).
692    fn compile_property_access(
693        &self,
694        base: &Expr,
695        prop: &str,
696        input_schema: &Schema,
697    ) -> Result<Arc<dyn PhysicalExpr>> {
698        if let Expr::Variable(var_name) = base {
699            // 1. Try struct field access (e.g. `x.a` where `x` is a Struct column)
700            if let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema) {
701                return Ok(expr);
702            }
703            // 2. Try flat column "{var}.{prop}" (for pattern comprehension inner schemas)
704            let flat_col = format!("{}.{}", var_name, prop);
705            if let Ok(col_idx) = input_schema.index_of(&flat_col) {
706                return Ok(Arc::new(
707                    datafusion::physical_expr::expressions::Column::new(&flat_col, col_idx),
708                ));
709            }
710        }
711        self.compile_standard(
712            &Expr::Property(Box::new(base.clone()), prop.to_string()),
713            input_schema,
714        )
715    }
716
717    /// Compile bracket access on a struct column (e.g. `x['a']` where `x` is Struct).
718    fn compile_array_index(
719        &self,
720        array: &Expr,
721        index: &Expr,
722        input_schema: &Schema,
723    ) -> Result<Arc<dyn PhysicalExpr>> {
724        if let Expr::Variable(var_name) = array
725            && let Expr::Literal(CypherLiteral::String(prop)) = index
726            && let Some(expr) = self.try_compile_struct_field(var_name, prop, input_schema)
727        {
728            return Ok(expr);
729        }
730        // Indexing into a raw-`Bytes` list literal (e.g. `[b.data][0]`): route through
731        // the recursive custom-args path (UDF `index`) so the inner `make_array` child
732        // is marked and the extracted element decodes as raw `Bytes`.
733        if raw_bytes_marker::is_markable_list(array, input_schema) {
734            return self.compile_function_with_custom_args(
735                "index",
736                &[array.clone(), index.clone()],
737                false,
738                input_schema,
739            );
740        }
741        self.compile_standard(
742            &Expr::ArrayIndex {
743                array: Box::new(array.clone()),
744                index: Box::new(index.clone()),
745            },
746            input_schema,
747        )
748    }
749
750    /// Compile EXISTS subquery expression.
751    fn compile_exists(&self, query: &Query) -> Result<Arc<dyn PhysicalExpr>> {
752        // 7.1: Validate no mutation clauses in EXISTS body
753        if has_mutation_clause(query) {
754            return Err(anyhow!(
755                "SyntaxError: InvalidClauseComposition - EXISTS subquery cannot contain updating clauses"
756            ));
757        }
758
759        let err = |dep: &str| anyhow!("EXISTS requires {}", dep);
760
761        let graph_ctx = self
762            .graph_ctx
763            .clone()
764            .ok_or_else(|| err("GraphExecutionContext"))?;
765        let uni_schema = self.uni_schema.clone().ok_or_else(|| err("UniSchema"))?;
766        let session_ctx = self
767            .session_ctx
768            .clone()
769            .ok_or_else(|| err("SessionContext"))?;
770        let storage = self.storage.clone().ok_or_else(|| err("StorageManager"))?;
771
772        Ok(Arc::new(ExistsExecExpr::new(
773            query.clone(),
774            graph_ctx,
775            session_ctx,
776            storage,
777            uni_schema,
778            self.params.clone(),
779            self.outer_entity_vars.clone(),
780        )))
781    }
782
783    /// Compile a pattern-predicate EXISTS into a vectorized `PatternExistsExecExpr`.
784    ///
785    /// Returns `Err` for unsupported patterns (triggers fallback to `ExistsExecExpr`).
786    fn compile_pattern_exists(
787        &self,
788        query: &Query,
789        input_schema: &Schema,
790    ) -> Result<Arc<dyn PhysicalExpr>> {
791        let pattern = extract_pattern_from_exists_query(query)?;
792
793        let graph_ctx = self
794            .graph_ctx
795            .as_ref()
796            .ok_or_else(|| anyhow!("Pattern exists requires GraphExecutionContext"))?;
797        let uni_schema = self
798            .uni_schema
799            .as_ref()
800            .ok_or_else(|| anyhow!("Pattern exists requires UniSchema"))?;
801
802        let (anchor_col, steps) = analyze_pattern(&pattern, input_schema, uni_schema)?;
803
804        // Detect bound and correlated targets.
805        // - Bound: target variable has `._vid` column in input schema → check specific VID.
806        // - Correlated: target has a named variable NOT in the schema (from an outer EXISTS
807        //   scope, passed as a parameter). We can't resolve it → bail out.
808        let mut bound_target_columns: Vec<Option<String>> = Vec::with_capacity(steps.len());
809        for step in &steps {
810            if let Some(var) = &step.target_variable {
811                let vid_col = format!("{}._vid", var);
812                if input_schema.column_with_name(&vid_col).is_some() {
813                    bound_target_columns.push(Some(vid_col));
814                } else if self.outer_entity_vars.contains(var) {
815                    // Named variable from outer scope — correlated, can't vectorize.
816                    anyhow::bail!(
817                        "Pattern target '{}' is a correlated reference from an outer scope",
818                        var
819                    );
820                } else {
821                    // Fresh binding introduced by the pattern — not correlated.
822                    bound_target_columns.push(None);
823                }
824            } else {
825                bound_target_columns.push(None);
826            }
827        }
828
829        let property_preds = extract_target_property_predicates(&pattern, &steps, uni_schema)?;
830
831        Ok(Arc::new(PatternExistsExecExpr::new(
832            graph_ctx.clone(),
833            anchor_col,
834            steps,
835            Arc::new(input_schema.clone()),
836            property_preds,
837            bound_target_columns,
838            self.params.clone(),
839        )))
840    }
841
842    /// Compile a `similar_to(sources, queries [, options])` call into a `SimilarToExecExpr`.
843    fn compile_similar_to(
844        &self,
845        args: &[Expr],
846        input_schema: &Schema,
847    ) -> Result<Arc<dyn PhysicalExpr>> {
848        if args.len() < 2 || args.len() > 3 {
849            return Err(SimilarToError::InvalidArity { count: args.len() }.into());
850        }
851
852        let graph_ctx = self
853            .graph_ctx
854            .clone()
855            .ok_or(SimilarToError::NoGraphContext)?;
856
857        // Extract variable name and property names from the source expression.
858        let source_variable = extract_source_variable(&args[0]);
859        let source_property_names = extract_property_names(&args[0]);
860
861        // Normalize sources and queries: if args[0] is a List, split into
862        // individual source/query pairs for multi-source scoring.
863        let (source_exprs, query_exprs) = normalize_similar_to_args(&args[0], &args[1]);
864
865        // Compile each source and query expression
866        let source_children: Vec<Arc<dyn PhysicalExpr>> = source_exprs
867            .iter()
868            .map(|e| self.compile(e, input_schema))
869            .collect::<Result<Vec<_>>>()?;
870        let query_children: Vec<Arc<dyn PhysicalExpr>> = query_exprs
871            .iter()
872            .map(|e| self.compile(e, input_schema))
873            .collect::<Result<Vec<_>>>()?;
874
875        // Compile options if present
876        let options_child = if args.len() == 3 {
877            Some(self.compile(&args[2], input_schema)?)
878        } else {
879            None
880        };
881
882        // Resolve per-source distance metrics from schema at compile time.
883        let source_metrics: Vec<Option<DistanceMetric>> = source_property_names
884            .iter()
885            .map(|prop_name| {
886                prop_name.as_ref().and_then(|prop| {
887                    self.uni_schema
888                        .as_ref()
889                        .and_then(|schema| resolve_metric_for_property(schema, prop))
890                })
891            })
892            .collect();
893
894        Ok(Arc::new(SimilarToExecExpr::new(
895            source_children,
896            query_children,
897            options_child,
898            graph_ctx,
899            source_variable,
900            source_property_names,
901            source_metrics,
902        )))
903    }
904
905    /// Check if map_expr or where_clause contains a pattern comprehension that references the variable.
906    fn needs_vid_extraction_for_variable(
907        variable: &str,
908        map_expr: &Expr,
909        where_clause: Option<&Expr>,
910    ) -> bool {
911        fn expr_has_pattern_comp_referencing(expr: &Expr, var: &str) -> bool {
912            match expr {
913                Expr::PatternComprehension { pattern, .. } => {
914                    // Check if pattern uses the variable
915                    pattern.paths.iter().any(|path| {
916                        path.elements.iter().any(|elem| match elem {
917                            uni_cypher::ast::PatternElement::Node(n) => {
918                                n.variable.as_deref() == Some(var)
919                            }
920                            uni_cypher::ast::PatternElement::Relationship(r) => {
921                                r.variable.as_deref() == Some(var)
922                            }
923                            _ => false,
924                        })
925                    })
926                }
927                Expr::FunctionCall { args, .. } => args
928                    .iter()
929                    .any(|a| expr_has_pattern_comp_referencing(a, var)),
930                Expr::BinaryOp { left, right, .. } => {
931                    expr_has_pattern_comp_referencing(left, var)
932                        || expr_has_pattern_comp_referencing(right, var)
933                }
934                Expr::UnaryOp { expr: e, .. } | Expr::Property(e, _) => {
935                    expr_has_pattern_comp_referencing(e, var)
936                }
937                Expr::List(items) => items
938                    .iter()
939                    .any(|i| expr_has_pattern_comp_referencing(i, var)),
940                Expr::ListComprehension {
941                    list,
942                    map_expr,
943                    where_clause,
944                    ..
945                } => {
946                    expr_has_pattern_comp_referencing(list, var)
947                        || expr_has_pattern_comp_referencing(map_expr, var)
948                        || where_clause
949                            .as_ref()
950                            .is_some_and(|w| expr_has_pattern_comp_referencing(w, var))
951                }
952                _ => false,
953            }
954        }
955
956        expr_has_pattern_comp_referencing(map_expr, variable)
957            || where_clause.is_some_and(|w| expr_has_pattern_comp_referencing(w, variable))
958    }
959
960    /// Check if an expression tree contains nodes that require custom compilation.
961    pub fn contains_custom_expr(expr: &Expr) -> bool {
962        match expr {
963            Expr::ListComprehension { .. } => true,
964            Expr::Quantifier { .. } => true,
965            Expr::Reduce { .. } => true,
966            Expr::PatternComprehension { .. } => true,
967            Expr::BinaryOp { left, right, .. } => {
968                Self::contains_custom_expr(left) || Self::contains_custom_expr(right)
969            }
970            Expr::UnaryOp { expr, .. } => Self::contains_custom_expr(expr),
971            Expr::FunctionCall { name, args, .. } => {
972                name.eq_ignore_ascii_case("similar_to")
973                    || args.iter().any(Self::contains_custom_expr)
974            }
975            Expr::Case {
976                when_then,
977                else_expr,
978                ..
979            } => {
980                when_then
981                    .iter()
982                    .any(|(w, t)| Self::contains_custom_expr(w) || Self::contains_custom_expr(t))
983                    || else_expr.as_deref().is_some_and(Self::contains_custom_expr)
984            }
985            Expr::List(items) => items.iter().any(Self::contains_custom_expr),
986            Expr::Map(entries) => entries.iter().any(|(_, v)| Self::contains_custom_expr(v)),
987            Expr::IsNull(e) | Expr::IsNotNull(e) => Self::contains_custom_expr(e),
988            Expr::In { expr: l, list: r } => {
989                Self::contains_custom_expr(l) || Self::contains_custom_expr(r)
990            }
991            Expr::Exists { .. } => true,
992            Expr::LabelCheck { expr, .. } => Self::contains_custom_expr(expr),
993            _ => false,
994        }
995    }
996
997    /// Check if an expression statically produces a list value.
998    /// Used to route `Add` operations to `_cypher_list_concat` instead of arithmetic.
999    fn is_list_producing(expr: &Expr) -> bool {
1000        match expr {
1001            Expr::List(_) => true,
1002            Expr::ListComprehension { .. } => true,
1003            Expr::ArraySlice { .. } => true,
1004            // Add with a list-producing child produces a list
1005            Expr::BinaryOp {
1006                left,
1007                op: BinaryOp::Add,
1008                right,
1009            } => Self::is_list_producing(left) || Self::is_list_producing(right),
1010            Expr::FunctionCall { name, .. } => {
1011                // Functions known to return lists
1012                matches!(
1013                    name.as_str(),
1014                    "range"
1015                        | "tail"
1016                        | "reverse"
1017                        | "collect"
1018                        | "keys"
1019                        | "labels"
1020                        | "nodes"
1021                        | "relationships"
1022                )
1023            }
1024            _ => false,
1025        }
1026    }
1027
1028    fn compile_standard(
1029        &self,
1030        expr: &Expr,
1031        input_schema: &Schema,
1032    ) -> Result<Arc<dyn PhysicalExpr>> {
1033        // Pre-resolve Property(Variable(v), p) → Variable("v.p") when the flat
1034        // column "v.p" exists in the input schema.  This prevents DataFusion's
1035        // standard path from compiling it as ArrayIndex(column(v), "p"), which
1036        // fails when the column stores a VID integer rather than a Map/Struct.
1037        let resolved = Self::resolve_flat_column_properties(expr, input_schema);
1038        let df_expr = cypher_expr_to_df(&resolved, self.translation_ctx)?;
1039        let resolved_expr = self.resolve_udfs(df_expr)?;
1040
1041        let df_schema = datafusion::common::DFSchema::try_from(input_schema.clone())?;
1042
1043        // Apply type coercion to resolve type mismatches
1044        let coerced_expr = crate::query::df_expr::apply_type_coercion(&resolved_expr, &df_schema)?;
1045
1046        // Re-resolve UDFs after coercion (coercion may introduce new dummy UDF calls)
1047        let coerced_expr = self.resolve_udfs(coerced_expr)?;
1048
1049        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1050        planner
1051            .create_physical_expr(&coerced_expr, &df_schema, self.state)
1052            .map_err(|e| anyhow!("DataFusion planning failed: {}", e))
1053    }
1054
1055    /// Recursively resolve `Property(Variable(v), p)` → `Variable("v.p")` when
1056    /// the flat column `"v.p"` exists in `schema`.  This ensures that property
1057    /// access on graph-scan variables is compiled as a direct column reference
1058    /// rather than an ArrayIndex UDF call, which would fail when the variable
1059    /// column holds a VID integer instead of a Map/Node struct.
1060    fn resolve_flat_column_properties(expr: &Expr, schema: &Schema) -> Expr {
1061        match expr {
1062            Expr::Property(base, prop) => {
1063                if let Expr::Variable(var) = base.as_ref() {
1064                    let flat_col = format!("{}.{}", var, prop);
1065                    if schema.index_of(&flat_col).is_ok() {
1066                        return Expr::Variable(flat_col);
1067                    }
1068                }
1069                // Recurse into the base expression
1070                Expr::Property(
1071                    Box::new(Self::resolve_flat_column_properties(base, schema)),
1072                    prop.clone(),
1073                )
1074            }
1075            Expr::BinaryOp { left, op, right } => Expr::BinaryOp {
1076                left: Box::new(Self::resolve_flat_column_properties(left, schema)),
1077                op: *op,
1078                right: Box::new(Self::resolve_flat_column_properties(right, schema)),
1079            },
1080            Expr::FunctionCall {
1081                name,
1082                args,
1083                distinct,
1084                window_spec,
1085            } => Expr::FunctionCall {
1086                name: name.clone(),
1087                args: args
1088                    .iter()
1089                    .map(|a| Self::resolve_flat_column_properties(a, schema))
1090                    .collect(),
1091                distinct: *distinct,
1092                window_spec: window_spec.clone(),
1093            },
1094            Expr::UnaryOp { op, expr: inner } => Expr::UnaryOp {
1095                op: *op,
1096                expr: Box::new(Self::resolve_flat_column_properties(inner, schema)),
1097            },
1098            Expr::List(items) => Expr::List(
1099                items
1100                    .iter()
1101                    .map(|i| Self::resolve_flat_column_properties(i, schema))
1102                    .collect(),
1103            ),
1104            // For all other expression types, return as-is (literals, variables, etc.)
1105            other => other.clone(),
1106        }
1107    }
1108
1109    /// Resolve UDFs in DataFusion expression using the session state registry.
1110    ///
1111    /// Uses `TreeNode::transform_up` to traverse the entire expression tree,
1112    /// ensuring UDFs inside Cast, Case, InList, Between, etc. are all resolved.
1113    fn resolve_udfs(
1114        &self,
1115        expr: datafusion::logical_expr::Expr,
1116    ) -> Result<datafusion::logical_expr::Expr> {
1117        use datafusion::common::tree_node::{Transformed, TreeNode};
1118        use datafusion::logical_expr::Expr as DfExpr;
1119
1120        let result = expr
1121            .transform_up(|node| {
1122                if let DfExpr::ScalarFunction(ref func) = node {
1123                    let udf_name = func.func.name();
1124                    if let Some(registered_udf) = self.state.scalar_functions().get(udf_name) {
1125                        return Ok(Transformed::yes(DfExpr::ScalarFunction(
1126                            datafusion::logical_expr::expr::ScalarFunction {
1127                                func: registered_udf.clone(),
1128                                args: func.args.clone(),
1129                            },
1130                        )));
1131                    }
1132                }
1133                Ok(Transformed::no(node))
1134            })
1135            .map_err(|e| anyhow!("Failed to resolve UDFs: {}", e))?;
1136        Ok(result.data)
1137    }
1138
1139    fn compile_list_comprehension(
1140        &self,
1141        variable: &str,
1142        list: &Expr,
1143        where_clause: Option<&Expr>,
1144        map_expr: &Expr,
1145        input_schema: &Schema,
1146    ) -> Result<Arc<dyn PhysicalExpr>> {
1147        let input_list_phy = self.compile(list, input_schema)?;
1148
1149        // Resolve input list type
1150        let list_data_type = input_list_phy.data_type(input_schema)?;
1151        let inner_data_type = resolve_list_element_type(
1152            &list_data_type,
1153            DataType::LargeBinary,
1154            "List comprehension",
1155        )?;
1156
1157        // Create inner schema with loop variable (shadow outer variable if same name)
1158        let mut fields = input_schema.fields().to_vec();
1159        let loop_var_field = Arc::new(Field::new(variable, inner_data_type.clone(), true));
1160
1161        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1162            fields[pos] = loop_var_field;
1163        } else {
1164            fields.push(loop_var_field);
1165        }
1166
1167        // Check if we need VID extraction for nested pattern comprehensions
1168        let needs_vid_extraction =
1169            Self::needs_vid_extraction_for_variable(variable, map_expr, where_clause);
1170        if needs_vid_extraction && inner_data_type == DataType::LargeBinary {
1171            // Add a {variable}._vid field for VID extraction
1172            let vid_field = Arc::new(Field::new(
1173                format!("{}._vid", variable),
1174                DataType::UInt64,
1175                true,
1176            ));
1177            fields.push(vid_field);
1178        }
1179
1180        let inner_schema = Arc::new(Schema::new(fields));
1181
1182        // Compile inner expressions with scoped translation context
1183        let mut scoped_ctx = None;
1184        let inner_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1185
1186        let predicate_phy = if let Some(pred) = where_clause {
1187            Some(inner_compiler.compile(pred, &inner_schema)?)
1188        } else {
1189            None
1190        };
1191
1192        let map_phy = inner_compiler.compile(map_expr, &inner_schema)?;
1193        let output_item_type = map_phy.data_type(&inner_schema)?;
1194
1195        Ok(Arc::new(ListComprehensionExecExpr::new(
1196            input_list_phy,
1197            map_phy,
1198            predicate_phy,
1199            variable.to_string(),
1200            Arc::new(input_schema.clone()),
1201            output_item_type,
1202            needs_vid_extraction,
1203        )))
1204    }
1205
1206    fn compile_reduce(
1207        &self,
1208        accumulator: &str,
1209        initial: &Expr,
1210        variable: &str,
1211        list: &Expr,
1212        reduce_expr: &Expr,
1213        input_schema: &Schema,
1214    ) -> Result<Arc<dyn PhysicalExpr>> {
1215        let list_phy = self.compile(list, input_schema)?;
1216
1217        let initial_phy = self.compile(initial, input_schema)?;
1218        let acc_type = initial_phy.data_type(input_schema)?;
1219
1220        let list_data_type = list_phy.data_type(input_schema)?;
1221        // For LargeBinary (CypherValue arrays), use the accumulator type as element type so the
1222        // reduce body expression compiles correctly (e.g. acc + x where both are Int64).
1223        let inner_data_type =
1224            resolve_list_element_type(&list_data_type, acc_type.clone(), "Reduce")?;
1225
1226        // Create inner schema with accumulator and loop variable (shadow outer variables if same names)
1227        let mut fields = input_schema.fields().to_vec();
1228
1229        let acc_field = Arc::new(Field::new(accumulator, acc_type, true));
1230        if let Some(pos) = fields.iter().position(|f| f.name() == accumulator) {
1231            fields[pos] = acc_field;
1232        } else {
1233            fields.push(acc_field);
1234        }
1235
1236        let var_field = Arc::new(Field::new(variable, inner_data_type, true));
1237        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1238            fields[pos] = var_field;
1239        } else {
1240            fields.push(var_field);
1241        }
1242
1243        let inner_schema = Arc::new(Schema::new(fields));
1244
1245        // Compile reduce expression with scoped translation context
1246        let mut scoped_ctx = None;
1247        let reduce_compiler = self.scoped_compiler(&[accumulator, variable], &mut scoped_ctx);
1248
1249        let reduce_phy = reduce_compiler.compile(reduce_expr, &inner_schema)?;
1250        let output_type = reduce_phy.data_type(&inner_schema)?;
1251
1252        Ok(Arc::new(ReduceExecExpr::new(
1253            accumulator.to_string(),
1254            initial_phy,
1255            variable.to_string(),
1256            list_phy,
1257            reduce_phy,
1258            Arc::new(input_schema.clone()),
1259            output_type,
1260        )))
1261    }
1262
1263    fn compile_quantifier(
1264        &self,
1265        quantifier: &uni_cypher::ast::Quantifier,
1266        variable: &str,
1267        list: &Expr,
1268        predicate: &Expr,
1269        input_schema: &Schema,
1270    ) -> Result<Arc<dyn PhysicalExpr>> {
1271        let input_list_phy = self.compile(list, input_schema)?;
1272
1273        // Resolve element type from list type
1274        let list_data_type = input_list_phy.data_type(input_schema)?;
1275        let inner_data_type =
1276            resolve_list_element_type(&list_data_type, DataType::LargeBinary, "Quantifier")?;
1277
1278        // Create inner schema with loop variable
1279        // If a field with the same name exists in the outer schema, replace it (shadow it)
1280        // to ensure the loop variable takes precedence.
1281        let mut fields = input_schema.fields().to_vec();
1282        let loop_var_field = Arc::new(Field::new(variable, inner_data_type, true));
1283
1284        // Find and replace existing field with same name, or append if not found
1285        if let Some(pos) = fields.iter().position(|f| f.name() == variable) {
1286            fields[pos] = loop_var_field;
1287        } else {
1288            fields.push(loop_var_field);
1289        }
1290
1291        let inner_schema = Arc::new(Schema::new(fields));
1292
1293        // Compile predicate with a scoped translation context that removes the loop variable
1294        // from variable_kinds, so property access on the loop variable doesn't incorrectly
1295        // generate flat columns (like "x.name") that don't exist in the inner schema.
1296        let mut scoped_ctx = None;
1297        let pred_compiler = self.scoped_compiler(&[variable], &mut scoped_ctx);
1298
1299        let mut predicate_phy = pred_compiler.compile(predicate, &inner_schema)?;
1300
1301        // Wrap CypherValue predicates with _cv_to_bool for proper boolean evaluation
1302        if let Ok(DataType::LargeBinary) = predicate_phy.data_type(&inner_schema) {
1303            predicate_phy = self.wrap_with_cv_to_bool(predicate_phy)?;
1304        }
1305
1306        let qt = match quantifier {
1307            uni_cypher::ast::Quantifier::All => QuantifierType::All,
1308            uni_cypher::ast::Quantifier::Any => QuantifierType::Any,
1309            uni_cypher::ast::Quantifier::Single => QuantifierType::Single,
1310            uni_cypher::ast::Quantifier::None => QuantifierType::None,
1311        };
1312
1313        Ok(Arc::new(QuantifierExecExpr::new(
1314            input_list_phy,
1315            predicate_phy,
1316            variable.to_string(),
1317            Arc::new(input_schema.clone()),
1318            qt,
1319        )))
1320    }
1321
1322    fn compile_pattern_comprehension(
1323        &self,
1324        path_variable: &Option<String>,
1325        pattern: &uni_cypher::ast::Pattern,
1326        where_clause: Option<&Expr>,
1327        map_expr: &Expr,
1328        input_schema: &Schema,
1329    ) -> Result<Arc<dyn PhysicalExpr>> {
1330        let err = |dep: &str| anyhow!("Pattern comprehension requires {}", dep);
1331
1332        let graph_ctx = self
1333            .graph_ctx
1334            .as_ref()
1335            .ok_or_else(|| err("GraphExecutionContext"))?;
1336        let uni_schema = self.uni_schema.as_ref().ok_or_else(|| err("UniSchema"))?;
1337
1338        // 1. Analyze pattern to get anchor column and traversal steps
1339        let (anchor_col, steps) = analyze_pattern(pattern, input_schema, uni_schema)?;
1340
1341        // 2. Collect needed properties from where_clause and map_expr
1342        let (vertex_props, edge_props) = collect_inner_properties(where_clause, map_expr, &steps);
1343
1344        // 3. Build inner schema
1345        let inner_schema = build_inner_schema(
1346            input_schema,
1347            &steps,
1348            &vertex_props,
1349            &edge_props,
1350            path_variable.as_deref(),
1351        );
1352
1353        // 4. Compile predicate and map_expr against inner schema
1354        let pred_phy = where_clause
1355            .map(|p| self.compile(p, &inner_schema))
1356            .transpose()?;
1357        let map_phy = self.compile(map_expr, &inner_schema)?;
1358        let output_type = map_phy.data_type(&inner_schema)?;
1359
1360        // 5. Return expression
1361        Ok(Arc::new(PatternComprehensionExecExpr::new(
1362            graph_ctx.clone(),
1363            anchor_col,
1364            steps,
1365            path_variable.clone(),
1366            pred_phy,
1367            map_phy,
1368            Arc::new(input_schema.clone()),
1369            Arc::new(inner_schema),
1370            output_type,
1371            vertex_props,
1372            edge_props,
1373        )))
1374    }
1375
1376    /// Compile a function call whose arguments contain custom expressions.
1377    ///
1378    /// Recursively compiles each argument via `self.compile()`, then looks up
1379    /// the corresponding UDF in the session registry and builds the physical
1380    /// expression.
1381    fn compile_function_with_custom_args(
1382        &self,
1383        name: &str,
1384        args: &[Expr],
1385        _distinct: bool,
1386        input_schema: &Schema,
1387    ) -> Result<Arc<dyn PhysicalExpr>> {
1388        // 1. Recursively compile each argument
1389        let compiled_args: Vec<Arc<dyn PhysicalExpr>> = args
1390            .iter()
1391            .map(|arg| self.compile(arg, input_schema))
1392            .collect::<Result<Vec<_>>>()?;
1393
1394        // 2. Resolve UDF name and look it up in the registry
1395        let udf_name = Self::cypher_fn_to_udf(name);
1396        let udf = self
1397            .state
1398            .scalar_functions()
1399            .get(udf_name.as_str())
1400            .ok_or_else(|| {
1401                anyhow!(
1402                    "UDF '{}' not found in registry for function '{}'",
1403                    udf_name,
1404                    name
1405                )
1406            })?;
1407
1408        // 3. Build operand type list and dummy columns from compiled args
1409        let placeholders: &[&str] = &["__arg0__", "__arg1__", "__arg2__", "__argN__"];
1410        let operand_types: Vec<(&str, DataType)> = compiled_args
1411            .iter()
1412            .enumerate()
1413            .map(|(i, arg)| {
1414                let dt = arg.data_type(input_schema).unwrap_or(DataType::LargeBinary);
1415                let placeholder = placeholders[i.min(3)];
1416                (placeholder, dt)
1417            })
1418            .collect();
1419
1420        let dummy_cols: Vec<datafusion::logical_expr::Expr> = operand_types
1421            .iter()
1422            .map(|(name, _)| {
1423                datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1424                    None::<String>,
1425                    *name,
1426                ))
1427            })
1428            .collect();
1429
1430        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1431            datafusion::logical_expr::expr::ScalarFunction {
1432                func: udf.clone(),
1433                args: dummy_cols,
1434            },
1435        );
1436
1437        // 4. Plan and rebind
1438        self.plan_udf_physical_expr(
1439            &udf_expr,
1440            &operand_types,
1441            compiled_args,
1442            &format!("function {}", name),
1443        )
1444    }
1445
1446    /// Map a Cypher function name to the registered UDF name.
1447    ///
1448    /// Mirrors the mapping in `translate_function_call` from `df_expr.rs`.
1449    /// The registered UDF names are always lowercase.
1450    fn cypher_fn_to_udf(name: &str) -> String {
1451        match name.to_uppercase().as_str() {
1452            "SIZE" | "LENGTH" => "_cypher_size".to_string(),
1453            "REVERSE" => "_cypher_reverse".to_string(),
1454            "TOSTRING" => "tostring".to_string(),
1455            "TOBOOLEAN" | "TOBOOL" | "TOBOOLEANORNULL" => "toboolean".to_string(),
1456            "TOINTEGER" | "TOINT" | "TOINTEGERORNULL" => "tointeger".to_string(),
1457            "TOFLOAT" | "TOFLOATORNULL" => "tofloat".to_string(),
1458            "HEAD" => "head".to_string(),
1459            "LAST" => "last".to_string(),
1460            "TAIL" => "_cypher_tail".to_string(),
1461            "KEYS" => "keys".to_string(),
1462            "TYPE" => "type".to_string(),
1463            "PROPERTIES" => "properties".to_string(),
1464            "LABELS" => "labels".to_string(),
1465            "COALESCE" => "coalesce".to_string(),
1466            "ID" => "id".to_string(),
1467            // Fallback: lowercase the name (matches dummy_udf_expr behavior)
1468            _ => name.to_lowercase(),
1469        }
1470    }
1471
1472    /// Compile a CASE expression with custom sub-expressions in branches.
1473    ///
1474    /// Recursively compiles the operand, each when/then pair, and the else
1475    /// branch, then builds a `CaseExpr` physical expression.
1476    ///
1477    /// Applies two fixes for TCK compliance:
1478    /// 1. Wraps CypherValue WHEN conditions with `_cv_to_bool` for proper boolean evaluation
1479    /// 2. Unifies branch types when mixing LargeList and LargeBinary to avoid cast errors
1480    fn compile_case(
1481        &self,
1482        operand: &Option<Box<Expr>>,
1483        when_then: &[(Expr, Expr)],
1484        else_expr: &Option<Box<Expr>>,
1485        input_schema: &Schema,
1486    ) -> Result<Arc<dyn PhysicalExpr>> {
1487        let operand_phy = operand
1488            .as_deref()
1489            .map(|e| self.compile(e, input_schema))
1490            .transpose()?;
1491
1492        let mut when_then_phy: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> = when_then
1493            .iter()
1494            .map(|(w, t)| {
1495                let w_phy = self.compile(w, input_schema)?;
1496                let t_phy = self.compile(t, input_schema)?;
1497                Ok((w_phy, t_phy))
1498            })
1499            .collect::<Result<Vec<_>>>()?;
1500
1501        let mut else_phy = else_expr
1502            .as_deref()
1503            .map(|e| self.compile(e, input_schema))
1504            .transpose()?;
1505
1506        // Wrap CypherValue WHEN conditions with _cv_to_bool for proper boolean evaluation
1507        for (w_phy, _) in &mut when_then_phy {
1508            if matches!(w_phy.data_type(input_schema), Ok(DataType::LargeBinary)) {
1509                *w_phy = self.wrap_with_cv_to_bool(w_phy.clone())?;
1510            }
1511        }
1512
1513        // Unify branch types when mixing LargeList and LargeBinary
1514        let branch_types: Vec<DataType> = when_then_phy
1515            .iter()
1516            .map(|(_, t)| t.data_type(input_schema))
1517            .chain(else_phy.iter().map(|e| e.data_type(input_schema)))
1518            .filter_map(Result::ok)
1519            .collect();
1520
1521        let has_large_binary = branch_types.contains(&DataType::LargeBinary);
1522        let has_list = branch_types
1523            .iter()
1524            .any(|dt| matches!(dt, DataType::List(_) | DataType::LargeList(_)));
1525
1526        // If we have both, wrap List/LargeList branches with LargeListToCypherValueExpr
1527        if has_large_binary && has_list {
1528            for (_, t_phy) in &mut when_then_phy {
1529                if let Ok(dt) = t_phy.data_type(input_schema)
1530                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1531                {
1532                    *t_phy = Arc::new(LargeListToCypherValueExpr::new(t_phy.clone()));
1533                }
1534            }
1535            if let Some(e_phy) = else_phy.take() {
1536                if let Ok(dt) = e_phy.data_type(input_schema)
1537                    && matches!(dt, DataType::List(_) | DataType::LargeList(_))
1538                {
1539                    else_phy = Some(Arc::new(LargeListToCypherValueExpr::new(e_phy)));
1540                } else {
1541                    else_phy = Some(e_phy);
1542                }
1543            }
1544        }
1545
1546        let case_expr = datafusion::physical_expr::expressions::CaseExpr::try_new(
1547            operand_phy,
1548            when_then_phy,
1549            else_phy,
1550        )
1551        .map_err(|e| anyhow!("Failed to create CASE expression: {}", e))?;
1552
1553        Ok(Arc::new(case_expr))
1554    }
1555
1556    fn compile_binary_op(
1557        &self,
1558        op: &BinaryOp,
1559        left: Arc<dyn PhysicalExpr>,
1560        right: Arc<dyn PhysicalExpr>,
1561        input_schema: &Schema,
1562    ) -> Result<Arc<dyn PhysicalExpr>> {
1563        use datafusion::logical_expr::Operator;
1564
1565        // String operators use custom physical expr for safe type handling
1566        let string_op = match op {
1567            BinaryOp::StartsWith => Some(StringOp::StartsWith),
1568            BinaryOp::EndsWith => Some(StringOp::EndsWith),
1569            BinaryOp::Contains => Some(StringOp::Contains),
1570            _ => None,
1571        };
1572        if let Some(sop) = string_op {
1573            return Ok(Arc::new(CypherStringMatchExpr::new(left, right, sop)));
1574        }
1575
1576        let df_op = match op {
1577            BinaryOp::Add => Operator::Plus,
1578            BinaryOp::Sub => Operator::Minus,
1579            BinaryOp::Mul => Operator::Multiply,
1580            BinaryOp::Div => Operator::Divide,
1581            BinaryOp::Mod => Operator::Modulo,
1582            BinaryOp::Eq => Operator::Eq,
1583            BinaryOp::NotEq => Operator::NotEq,
1584            BinaryOp::Gt => Operator::Gt,
1585            BinaryOp::GtEq => Operator::GtEq,
1586            BinaryOp::Lt => Operator::Lt,
1587            BinaryOp::LtEq => Operator::LtEq,
1588            BinaryOp::And => Operator::And,
1589            BinaryOp::Or => Operator::Or,
1590            BinaryOp::Xor => {
1591                return Err(anyhow!(
1592                    "XOR not supported via binary helper, use bitwise_xor"
1593                ));
1594            }
1595            BinaryOp::Regex => Operator::RegexMatch,
1596            BinaryOp::ApproxEq => {
1597                return Err(anyhow!(
1598                    "ApproxEq (~=) not yet supported in physical compiler"
1599                ));
1600            }
1601            BinaryOp::Pow => return Err(anyhow!("POW not yet supported in physical compiler")),
1602            _ => return Err(anyhow!("Unsupported binary op in compiler: {:?}", op)),
1603        };
1604
1605        // When either operand is LargeBinary (CypherValue), standard Arrow comparison
1606        // kernels can't handle the type mismatch. Route through Cypher comparison
1607        // UDFs which decode CypherValue to Value for comparison.
1608        let mut left = left;
1609        let mut right = right;
1610        let mut left_type = left.data_type(input_schema).ok();
1611        let mut right_type = right.data_type(input_schema).ok();
1612
1613        // Type unification: if one side is LargeList and the other is LargeBinary,
1614        // convert LargeList to LargeBinary for consistent handling
1615        let left_is_list = matches!(
1616            left_type.as_ref(),
1617            Some(DataType::List(_) | DataType::LargeList(_))
1618        );
1619        let right_is_list = matches!(
1620            right_type.as_ref(),
1621            Some(DataType::List(_) | DataType::LargeList(_))
1622        );
1623
1624        if left_is_list && is_cypher_value_type(right_type.as_ref()) {
1625            left = Arc::new(LargeListToCypherValueExpr::new(left));
1626            left_type = Some(DataType::LargeBinary);
1627        } else if right_is_list && is_cypher_value_type(left_type.as_ref()) {
1628            right = Arc::new(LargeListToCypherValueExpr::new(right));
1629            right_type = Some(DataType::LargeBinary);
1630        }
1631
1632        let has_cv =
1633            is_cypher_value_type(left_type.as_ref()) || is_cypher_value_type(right_type.as_ref());
1634
1635        if has_cv {
1636            if let Some(result) = self.compile_cv_comparison(
1637                df_op,
1638                left.clone(),
1639                right.clone(),
1640                &left_type,
1641                &right_type,
1642            )? {
1643                return Ok(result);
1644            }
1645            if let Some(result) = self.compile_cv_list_concat(
1646                left.clone(),
1647                right.clone(),
1648                &left_type,
1649                &right_type,
1650                df_op,
1651            )? {
1652                return Ok(result);
1653            }
1654            if let Some(result) = self.compile_cv_arithmetic(
1655                df_op,
1656                left.clone(),
1657                right.clone(),
1658                &left_type,
1659                &right_type,
1660                input_schema,
1661            )? {
1662                return Ok(result);
1663            }
1664        }
1665
1666        // Use DataFusion's binary physical expression creator which handles coercion
1667        binary(left, df_op, right, input_schema)
1668            .map_err(|e| anyhow!("Failed to create binary expression: {}", e))
1669    }
1670
1671    /// Compile CypherValue comparison using Cypher UDFs.
1672    fn compile_cv_comparison(
1673        &self,
1674        df_op: datafusion::logical_expr::Operator,
1675        left: Arc<dyn PhysicalExpr>,
1676        right: Arc<dyn PhysicalExpr>,
1677        left_type: &Option<DataType>,
1678        right_type: &Option<DataType>,
1679    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1680        use datafusion::logical_expr::Operator;
1681
1682        let udf_name = match df_op {
1683            Operator::Eq => "_cypher_equal",
1684            Operator::NotEq => "_cypher_not_equal",
1685            Operator::Gt => "_cypher_gt",
1686            Operator::GtEq => "_cypher_gt_eq",
1687            Operator::Lt => "_cypher_lt",
1688            Operator::LtEq => "_cypher_lt_eq",
1689            _ => return Ok(None),
1690        };
1691
1692        self.plan_binary_udf(
1693            udf_name,
1694            left,
1695            right,
1696            left_type.clone().unwrap_or(DataType::LargeBinary),
1697            right_type.clone().unwrap_or(DataType::LargeBinary),
1698        )
1699    }
1700
1701    /// Compile CypherValue list concatenation.
1702    fn compile_cv_list_concat(
1703        &self,
1704        left: Arc<dyn PhysicalExpr>,
1705        right: Arc<dyn PhysicalExpr>,
1706        left_type: &Option<DataType>,
1707        right_type: &Option<DataType>,
1708        df_op: datafusion::logical_expr::Operator,
1709    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1710        use datafusion::logical_expr::Operator;
1711
1712        if df_op != Operator::Plus {
1713            return Ok(None);
1714        }
1715
1716        // List concat when at least one side is a list (CypherValue or Arrow List)
1717        let is_list = |t: &Option<DataType>| {
1718            t.as_ref()
1719                .is_some_and(|dt| matches!(dt, DataType::LargeBinary | DataType::List(_)))
1720        };
1721
1722        if !is_list(left_type) && !is_list(right_type) {
1723            return Ok(None);
1724        }
1725
1726        self.plan_binary_udf(
1727            "_cypher_list_concat",
1728            left,
1729            right,
1730            left_type.clone().unwrap_or(DataType::LargeBinary),
1731            right_type.clone().unwrap_or(DataType::LargeBinary),
1732        )
1733    }
1734
1735    /// Compile CypherValue arithmetic.
1736    ///
1737    /// Routes arithmetic operations through CypherValue-aware UDFs when at least
1738    /// one operand is LargeBinary (CypherValue-encoded).
1739    fn compile_cv_arithmetic(
1740        &self,
1741        df_op: datafusion::logical_expr::Operator,
1742        left: Arc<dyn PhysicalExpr>,
1743        right: Arc<dyn PhysicalExpr>,
1744        left_type: &Option<DataType>,
1745        right_type: &Option<DataType>,
1746        _input_schema: &Schema,
1747    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1748        use datafusion::logical_expr::Operator;
1749
1750        let udf_name = match df_op {
1751            Operator::Plus => "_cypher_add",
1752            Operator::Minus => "_cypher_sub",
1753            Operator::Multiply => "_cypher_mul",
1754            Operator::Divide => "_cypher_div",
1755            Operator::Modulo => "_cypher_mod",
1756            _ => return Ok(None),
1757        };
1758
1759        self.plan_binary_udf(
1760            udf_name,
1761            left,
1762            right,
1763            left_type.clone().unwrap_or(DataType::LargeBinary),
1764            right_type.clone().unwrap_or(DataType::LargeBinary),
1765        )
1766    }
1767
1768    /// Plan a UDF expression with dummy schema columns, then rebind to actual physical expressions.
1769    fn plan_udf_physical_expr(
1770        &self,
1771        udf_expr: &datafusion::logical_expr::Expr,
1772        operand_types: &[(&str, DataType)],
1773        children: Vec<Arc<dyn PhysicalExpr>>,
1774        error_context: &str,
1775    ) -> Result<Arc<dyn PhysicalExpr>> {
1776        let tmp_schema = Schema::new(
1777            operand_types
1778                .iter()
1779                .map(|(name, dt)| Arc::new(Field::new(*name, dt.clone(), true)))
1780                .collect::<Vec<_>>(),
1781        );
1782        let df_schema = datafusion::common::DFSchema::try_from(tmp_schema)?;
1783        let planner = datafusion::physical_planner::DefaultPhysicalPlanner::default();
1784        let udf_phy = planner
1785            .create_physical_expr(udf_expr, &df_schema, self.state)
1786            .map_err(|e| anyhow!("Failed to create {} expr: {}", error_context, e))?;
1787        udf_phy
1788            .with_new_children(children)
1789            .map_err(|e| anyhow!("Failed to rebind {} children: {}", error_context, e))
1790    }
1791
1792    /// Wrap a LargeBinary (CypherValue) expression with `_cv_to_bool` conversion.
1793    ///
1794    /// Used when a CypherValue expression needs to be used as a boolean (e.g., in WHEN clauses).
1795    fn wrap_with_cv_to_bool(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
1796        let Some(udf) = self.state.scalar_functions().get("_cv_to_bool") else {
1797            return Err(anyhow!("_cv_to_bool UDF not found"));
1798        };
1799
1800        let dummy_col = datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1801            None::<String>,
1802            "__cv__",
1803        ));
1804        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1805            datafusion::logical_expr::expr::ScalarFunction {
1806                func: udf.clone(),
1807                args: vec![dummy_col],
1808            },
1809        );
1810
1811        self.plan_udf_physical_expr(
1812            &udf_expr,
1813            &[("__cv__", DataType::LargeBinary)],
1814            vec![expr],
1815            "CypherValue to bool",
1816        )
1817    }
1818
1819    /// Plan a binary UDF with the given name and operand types.
1820    fn plan_binary_udf(
1821        &self,
1822        udf_name: &str,
1823        left: Arc<dyn PhysicalExpr>,
1824        right: Arc<dyn PhysicalExpr>,
1825        left_type: DataType,
1826        right_type: DataType,
1827    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
1828        let Some(udf) = self.state.scalar_functions().get(udf_name) else {
1829            return Ok(None);
1830        };
1831        let udf_expr = datafusion::logical_expr::Expr::ScalarFunction(
1832            datafusion::logical_expr::expr::ScalarFunction {
1833                func: udf.clone(),
1834                args: vec![
1835                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1836                        None::<String>,
1837                        "__left__",
1838                    )),
1839                    datafusion::logical_expr::Expr::Column(datafusion::common::Column::new(
1840                        None::<String>,
1841                        "__right__",
1842                    )),
1843                ],
1844            },
1845        );
1846        let result = self.plan_udf_physical_expr(
1847            &udf_expr,
1848            &[("__left__", left_type), ("__right__", right_type)],
1849            vec![left, right],
1850            udf_name,
1851        )?;
1852        Ok(Some(result))
1853    }
1854
1855    fn compile_unary_op(
1856        &self,
1857        op: &UnaryOp,
1858        expr: Arc<dyn PhysicalExpr>,
1859        input_schema: &Schema,
1860    ) -> Result<Arc<dyn PhysicalExpr>> {
1861        match op {
1862            UnaryOp::Not => datafusion::physical_expr::expressions::not(expr),
1863            UnaryOp::Neg => datafusion::physical_expr::expressions::negative(expr, input_schema),
1864        }
1865        .map_err(|e| anyhow!("Failed to create unary expression: {}", e))
1866    }
1867}
1868
1869// ---------------------------------------------------------------------------
1870// similar_to() AST helpers
1871// ---------------------------------------------------------------------------
1872
1873/// Extract the base variable name from a source expression.
1874///
1875/// For `d.embedding` → `"d"`, for `[d.embedding, d.content]` → `"d"`.
1876fn extract_source_variable(expr: &Expr) -> Option<String> {
1877    match expr {
1878        Expr::Property(inner, _) => extract_source_variable(inner),
1879        Expr::Variable(name) => Some(name.clone()),
1880        Expr::List(items) => items.first().and_then(extract_source_variable),
1881        _ => None,
1882    }
1883}
1884
1885/// Extract property names from source expressions.
1886///
1887/// For `d.embedding` → `["embedding"]`, for `[d.embedding, d.content]` → `["embedding", "content"]`.
1888fn extract_property_names(expr: &Expr) -> Vec<Option<String>> {
1889    match expr {
1890        Expr::Property(_, prop) => vec![Some(prop.clone())],
1891        Expr::List(items) => items
1892            .iter()
1893            .map(|item| {
1894                if let Expr::Property(_, prop) = item {
1895                    Some(prop.clone())
1896                } else {
1897                    None
1898                }
1899            })
1900            .collect(),
1901        _ => vec![None],
1902    }
1903}
1904
1905/// Normalize similar_to arguments for multi-source scoring.
1906///
1907/// If source arg is a `List`, returns the individual items as separate source/query pairs.
1908/// Otherwise returns the args as-is (single pair).
1909fn normalize_similar_to_args<'e>(
1910    sources: &'e Expr,
1911    queries: &'e Expr,
1912) -> (Vec<&'e Expr>, Vec<&'e Expr>) {
1913    match (sources, queries) {
1914        // Multi-source: [d.embedding, d.content] paired with [query_vec, query_text]
1915        (Expr::List(src_items), Expr::List(qry_items)) if src_items.len() == qry_items.len() => {
1916            (src_items.iter().collect(), qry_items.iter().collect())
1917        }
1918        // Multi-source with broadcast query: [d.embedding, d.content] paired with single query
1919        (Expr::List(src_items), _) if src_items.len() > 1 => {
1920            let queries_broadcast: Vec<&Expr> = vec![queries; src_items.len()];
1921            (src_items.iter().collect(), queries_broadcast)
1922        }
1923        // Single source
1924        _ => (vec![sources], vec![queries]),
1925    }
1926}
1927
1928use datafusion::physical_plan::DisplayAs;
1929use datafusion::physical_plan::DisplayFormatType;
1930
1931#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1932enum StringOp {
1933    StartsWith,
1934    EndsWith,
1935    Contains,
1936}
1937
1938impl std::fmt::Display for StringOp {
1939    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1940        match self {
1941            StringOp::StartsWith => write!(f, "STARTS WITH"),
1942            StringOp::EndsWith => write!(f, "ENDS WITH"),
1943            StringOp::Contains => write!(f, "CONTAINS"),
1944        }
1945    }
1946}
1947
1948#[derive(Debug, Eq)]
1949struct CypherStringMatchExpr {
1950    left: Arc<dyn PhysicalExpr>,
1951    right: Arc<dyn PhysicalExpr>,
1952    op: StringOp,
1953}
1954
1955impl PartialEq for CypherStringMatchExpr {
1956    fn eq(&self, other: &Self) -> bool {
1957        self.op == other.op && self.left.eq(&other.left) && self.right.eq(&other.right)
1958    }
1959}
1960
1961impl std::hash::Hash for CypherStringMatchExpr {
1962    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1963        self.op.hash(state);
1964        self.left.hash(state);
1965        self.right.hash(state);
1966    }
1967}
1968
1969impl CypherStringMatchExpr {
1970    fn new(left: Arc<dyn PhysicalExpr>, right: Arc<dyn PhysicalExpr>, op: StringOp) -> Self {
1971        Self { left, right, op }
1972    }
1973}
1974
1975impl std::fmt::Display for CypherStringMatchExpr {
1976    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1977        write!(f, "{} {} {}", self.left, self.op, self.right)
1978    }
1979}
1980
1981impl DisplayAs for CypherStringMatchExpr {
1982    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1983        write!(f, "{}", self)
1984    }
1985}
1986
1987impl PhysicalExpr for CypherStringMatchExpr {
1988    fn as_any(&self) -> &dyn std::any::Any {
1989        self
1990    }
1991
1992    fn data_type(
1993        &self,
1994        _input_schema: &Schema,
1995    ) -> datafusion::error::Result<arrow_schema::DataType> {
1996        Ok(arrow_schema::DataType::Boolean)
1997    }
1998
1999    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2000        Ok(true)
2001    }
2002
2003    fn evaluate(
2004        &self,
2005        batch: &arrow_array::RecordBatch,
2006    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2007        use crate::query::df_udfs::invoke_cypher_string_op;
2008        use arrow_schema::Field;
2009        use datafusion::config::ConfigOptions;
2010        use datafusion::logical_expr::ScalarFunctionArgs;
2011
2012        let left_val = self.left.evaluate(batch)?;
2013        let right_val = self.right.evaluate(batch)?;
2014
2015        let args = ScalarFunctionArgs {
2016            args: vec![left_val, right_val],
2017            number_rows: batch.num_rows(),
2018            return_field: Arc::new(Field::new("result", arrow_schema::DataType::Boolean, true)),
2019            config_options: Arc::new(ConfigOptions::default()),
2020            arg_fields: vec![], // Not used by invoke_cypher_string_op
2021        };
2022
2023        match self.op {
2024            StringOp::StartsWith => {
2025                invoke_cypher_string_op(&args, "starts_with", |s, p| s.starts_with(p))
2026            }
2027            StringOp::EndsWith => {
2028                invoke_cypher_string_op(&args, "ends_with", |s, p| s.ends_with(p))
2029            }
2030            StringOp::Contains => invoke_cypher_string_op(&args, "contains", |s, p| s.contains(p)),
2031        }
2032    }
2033
2034    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2035        vec![&self.left, &self.right]
2036    }
2037
2038    fn with_new_children(
2039        self: Arc<Self>,
2040        children: Vec<Arc<dyn PhysicalExpr>>,
2041    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2042        Ok(Arc::new(CypherStringMatchExpr::new(
2043            children[0].clone(),
2044            children[1].clone(),
2045            self.op,
2046        )))
2047    }
2048
2049    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2050        write!(f, "{}", self)
2051    }
2052}
2053
2054impl PartialEq<dyn PhysicalExpr> for CypherStringMatchExpr {
2055    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
2056        if let Some(other) = other.as_any().downcast_ref::<CypherStringMatchExpr>() {
2057            self == other
2058        } else {
2059            false
2060        }
2061    }
2062}
2063
2064/// Physical expression for extracting a field from a struct column.
2065///
2066/// Used when list comprehension iterates over a list of structs (maps)
2067/// and accesses a field, e.g., `[x IN [{a: 1}] | x.a]`.
2068#[derive(Debug, Eq)]
2069struct StructFieldAccessExpr {
2070    /// Expression producing the struct column.
2071    input: Arc<dyn PhysicalExpr>,
2072    /// Index of the field within the struct.
2073    field_idx: usize,
2074    /// Output data type of the extracted field.
2075    output_type: arrow_schema::DataType,
2076}
2077
2078impl PartialEq for StructFieldAccessExpr {
2079    fn eq(&self, other: &Self) -> bool {
2080        self.field_idx == other.field_idx
2081            && self.input.eq(&other.input)
2082            && self.output_type == other.output_type
2083    }
2084}
2085
2086impl std::hash::Hash for StructFieldAccessExpr {
2087    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2088        self.input.hash(state);
2089        self.field_idx.hash(state);
2090    }
2091}
2092
2093impl StructFieldAccessExpr {
2094    fn new(
2095        input: Arc<dyn PhysicalExpr>,
2096        field_idx: usize,
2097        output_type: arrow_schema::DataType,
2098    ) -> Self {
2099        Self {
2100            input,
2101            field_idx,
2102            output_type,
2103        }
2104    }
2105}
2106
2107impl std::fmt::Display for StructFieldAccessExpr {
2108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2109        write!(f, "{}[{}]", self.input, self.field_idx)
2110    }
2111}
2112
2113impl DisplayAs for StructFieldAccessExpr {
2114    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2115        write!(f, "{}", self)
2116    }
2117}
2118
2119impl PartialEq<dyn PhysicalExpr> for StructFieldAccessExpr {
2120    fn eq(&self, other: &dyn PhysicalExpr) -> bool {
2121        if let Some(other) = other.as_any().downcast_ref::<Self>() {
2122            self.field_idx == other.field_idx && self.input.eq(&other.input)
2123        } else {
2124            false
2125        }
2126    }
2127}
2128
2129impl PhysicalExpr for StructFieldAccessExpr {
2130    fn as_any(&self) -> &dyn std::any::Any {
2131        self
2132    }
2133
2134    fn data_type(
2135        &self,
2136        _input_schema: &Schema,
2137    ) -> datafusion::error::Result<arrow_schema::DataType> {
2138        Ok(self.output_type.clone())
2139    }
2140
2141    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2142        Ok(true)
2143    }
2144
2145    fn evaluate(
2146        &self,
2147        batch: &arrow_array::RecordBatch,
2148    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2149        use arrow_array::StructArray;
2150
2151        let input_val = self.input.evaluate(batch)?;
2152        let array = input_val.into_array(batch.num_rows())?;
2153
2154        let struct_array = array
2155            .as_any()
2156            .downcast_ref::<StructArray>()
2157            .ok_or_else(|| {
2158                datafusion::error::DataFusionError::Execution(
2159                    "StructFieldAccessExpr: input is not a StructArray".to_string(),
2160                )
2161            })?;
2162
2163        let field_col = struct_array.column(self.field_idx).clone();
2164        Ok(datafusion::physical_plan::ColumnarValue::Array(field_col))
2165    }
2166
2167    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2168        vec![&self.input]
2169    }
2170
2171    fn with_new_children(
2172        self: Arc<Self>,
2173        children: Vec<Arc<dyn PhysicalExpr>>,
2174    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2175        Ok(Arc::new(StructFieldAccessExpr::new(
2176            children[0].clone(),
2177            self.field_idx,
2178            self.output_type.clone(),
2179        )))
2180    }
2181
2182    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2183        write!(f, "{}", self)
2184    }
2185}
2186
2187// ---------------------------------------------------------------------------
2188// EXISTS subquery physical expression
2189// ---------------------------------------------------------------------------
2190
2191/// Physical expression that evaluates an EXISTS subquery per row.
2192///
2193/// For each input row, plans and executes the subquery with the row's columns
2194/// injected as parameters. Returns `true` if the subquery produces any rows.
2195///
2196/// NOT EXISTS is handled by the caller wrapping this in a NOT expression.
2197/// Nested EXISTS works because `execute_subplan` creates a full planner that
2198/// handles nested EXISTS recursively.
2199struct ExistsExecExpr {
2200    query: Query,
2201    graph_ctx: Arc<GraphExecutionContext>,
2202    session_ctx: Arc<RwLock<SessionContext>>,
2203    storage: Arc<StorageManager>,
2204    uni_schema: Arc<UniSchema>,
2205    params: HashMap<String, Value>,
2206    /// Entity variable names from outer scopes, threaded into nested subplans
2207    /// so the inner expression compiler can detect correlated references.
2208    outer_entity_vars: HashSet<String>,
2209}
2210
2211impl std::fmt::Debug for ExistsExecExpr {
2212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2213        f.debug_struct("ExistsExecExpr").finish_non_exhaustive()
2214    }
2215}
2216
2217impl ExistsExecExpr {
2218    fn new(
2219        query: Query,
2220        graph_ctx: Arc<GraphExecutionContext>,
2221        session_ctx: Arc<RwLock<SessionContext>>,
2222        storage: Arc<StorageManager>,
2223        uni_schema: Arc<UniSchema>,
2224        params: HashMap<String, Value>,
2225        outer_entity_vars: HashSet<String>,
2226    ) -> Self {
2227        Self {
2228            query,
2229            graph_ctx,
2230            session_ctx,
2231            storage,
2232            uni_schema,
2233            params,
2234            outer_entity_vars,
2235        }
2236    }
2237}
2238
2239impl std::fmt::Display for ExistsExecExpr {
2240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2241        write!(f, "EXISTS(<subquery>)")
2242    }
2243}
2244
2245impl PartialEq<dyn PhysicalExpr> for ExistsExecExpr {
2246    fn eq(&self, _other: &dyn PhysicalExpr) -> bool {
2247        false
2248    }
2249}
2250
2251impl PartialEq for ExistsExecExpr {
2252    fn eq(&self, _other: &Self) -> bool {
2253        false
2254    }
2255}
2256
2257impl Eq for ExistsExecExpr {}
2258
2259impl std::hash::Hash for ExistsExecExpr {
2260    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2261        "ExistsExecExpr".hash(state);
2262    }
2263}
2264
2265impl DisplayAs for ExistsExecExpr {
2266    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2267        write!(f, "{}", self)
2268    }
2269}
2270
2271impl PhysicalExpr for ExistsExecExpr {
2272    fn as_any(&self) -> &dyn std::any::Any {
2273        self
2274    }
2275
2276    fn data_type(
2277        &self,
2278        _input_schema: &Schema,
2279    ) -> datafusion::error::Result<arrow_schema::DataType> {
2280        Ok(DataType::Boolean)
2281    }
2282
2283    fn nullable(&self, _input_schema: &Schema) -> datafusion::error::Result<bool> {
2284        Ok(true)
2285    }
2286
2287    fn evaluate(
2288        &self,
2289        batch: &arrow_array::RecordBatch,
2290    ) -> datafusion::error::Result<datafusion::physical_plan::ColumnarValue> {
2291        let num_rows = batch.num_rows();
2292        let mut builder = BooleanBuilder::with_capacity(num_rows);
2293
2294        // 7.2: Extract entity variable names from batch schema.
2295        // Entity columns follow the pattern "varname._vid" (flattened) or are struct columns.
2296        // We pass ONLY entity base names (e.g., "p", "n", "m") as vars_in_scope so the
2297        // subquery planner treats them as bound (Imported) variables. The initial Project
2298        // then creates Parameter("n") AS "n" etc. — the traverse reads the VID from this
2299        // column via resolve_source_vid_col's bare-name fallback.
2300        //
2301        // We intentionally do NOT include raw column names (n._vid, n._labels, etc.) in
2302        // vars_in_scope to avoid duplicate column conflicts with traverse output columns.
2303        // Parameter expressions read directly from sub_params, not from plan columns.
2304        let schema = batch.schema();
2305        let mut entity_vars: HashSet<String> = HashSet::new();
2306        for field in schema.fields() {
2307            let name = field.name();
2308            if let Some(base) = name.strip_suffix("._vid") {
2309                entity_vars.insert(base.to_string());
2310            }
2311            if matches!(field.data_type(), DataType::Struct(_)) {
2312                entity_vars.insert(name.to_string());
2313            }
2314            // Also detect bare VID columns from parent EXISTS parameter projections.
2315            // In nested EXISTS, a parent level projects "n" as a bare Int64/UInt64 VID.
2316            // Simple identifier (no dots, no leading underscore) + integer type = VID.
2317            if !name.contains('.')
2318                && !name.starts_with('_')
2319                && matches!(field.data_type(), DataType::Int64 | DataType::UInt64)
2320            {
2321                entity_vars.insert(name.to_string());
2322            }
2323        }
2324        let vars_in_scope: Vec<String> = entity_vars.iter().cloned().collect();
2325
2326        // 7.3: Rewrite correlated property accesses to parameter references.
2327        // e.g., `n.prop` where `n` is an outer entity → `$param("n.prop")`
2328        let rewritten_query = rewrite_query_correlated(&self.query, &entity_vars);
2329
2330        // 7.4: Plan ONCE — the rewritten query is parameterized, same for all rows.
2331        let planner = QueryPlanner::new(self.uni_schema.clone());
2332        let logical_plan = match planner.plan_with_scope(rewritten_query, vars_in_scope) {
2333            Ok(plan) => plan,
2334            Err(e) => {
2335                return Err(datafusion::error::DataFusionError::Execution(format!(
2336                    "EXISTS subquery planning failed: {}",
2337                    e
2338                )));
2339            }
2340        };
2341
2342        // Execute all rows on a dedicated thread with a single tokio runtime.
2343        // The runtime must be created and dropped on this thread (not in an async context).
2344        let graph_ctx = self.graph_ctx.clone();
2345        let session_ctx = self.session_ctx.clone();
2346        let storage = self.storage.clone();
2347        let uni_schema = self.uni_schema.clone();
2348        let base_params = self.params.clone();
2349
2350        let result = std::thread::scope(|s| {
2351            s.spawn(|| {
2352                let rt = tokio::runtime::Builder::new_current_thread()
2353                    .enable_all()
2354                    .build()
2355                    .map_err(|e| {
2356                        datafusion::error::DataFusionError::Execution(format!(
2357                            "Failed to create runtime for EXISTS: {}",
2358                            e
2359                        ))
2360                    })?;
2361
2362                // Merge stored outer entity vars with batch-extracted vars
2363                // so the inner planner's compiler can detect correlated refs.
2364                let mut combined_entity_vars = self.outer_entity_vars.clone();
2365                combined_entity_vars.extend(entity_vars.iter().cloned());
2366
2367                for row_idx in 0..num_rows {
2368                    let row_params = extract_row_params(batch, row_idx);
2369                    let mut sub_params = base_params.clone();
2370                    sub_params.extend(row_params);
2371
2372                    // Add entity variable → VID value mappings so that
2373                    // Parameter("n") resolves to the VID for traversal sources.
2374                    for var in &entity_vars {
2375                        let vid_key = format!("{}._vid", var);
2376                        if let Some(vid_val) = sub_params.get(&vid_key).cloned() {
2377                            sub_params.insert(var.clone(), vid_val);
2378                        }
2379                    }
2380
2381                    let (batches, _plan) = rt.block_on(execute_subplan_with_outer_vars(
2382                        &logical_plan,
2383                        &sub_params,
2384                        &HashMap::new(), // No outer values for EXISTS subquery
2385                        &graph_ctx,
2386                        &session_ctx,
2387                        &storage,
2388                        &uni_schema,
2389                        &combined_entity_vars,
2390                        None, // EXISTS compile-time rejects mutations (expr_compiler:693)
2391                    ))?;
2392
2393                    let has_rows = batches.iter().any(|b| b.num_rows() > 0);
2394                    builder.append_value(has_rows);
2395                }
2396
2397                Ok::<_, datafusion::error::DataFusionError>(())
2398            })
2399            .join()
2400            .unwrap_or_else(|_| {
2401                Err(datafusion::error::DataFusionError::Execution(
2402                    "EXISTS subquery thread panicked".to_string(),
2403                ))
2404            })
2405        });
2406
2407        if let Err(e) = result {
2408            return Err(datafusion::error::DataFusionError::Execution(format!(
2409                "EXISTS subquery execution failed: {}",
2410                e
2411            )));
2412        }
2413
2414        Ok(datafusion::physical_plan::ColumnarValue::Array(Arc::new(
2415            builder.finish(),
2416        )))
2417    }
2418
2419    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
2420        vec![]
2421    }
2422
2423    fn with_new_children(
2424        self: Arc<Self>,
2425        children: Vec<Arc<dyn PhysicalExpr>>,
2426    ) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
2427        if !children.is_empty() {
2428            return Err(datafusion::error::DataFusionError::Plan(
2429                "ExistsExecExpr has no children".to_string(),
2430            ));
2431        }
2432        Ok(self)
2433    }
2434
2435    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2436        write!(f, "{}", self)
2437    }
2438}
2439
2440// ---------------------------------------------------------------------------
2441// EXISTS subquery helpers
2442// ---------------------------------------------------------------------------
2443
2444/// Check if a Query contains any mutation clauses (CREATE, SET, DELETE, REMOVE, MERGE).
2445/// EXISTS subqueries must be read-only per OpenCypher spec.
2446fn has_mutation_clause(query: &Query) -> bool {
2447    match query {
2448        Query::Single(stmt) => stmt.clauses.iter().any(|c| {
2449            matches!(
2450                c,
2451                Clause::Create(_)
2452                    | Clause::Delete(_)
2453                    | Clause::Set(_)
2454                    | Clause::Remove(_)
2455                    | Clause::Merge(_)
2456            ) || has_mutation_in_clause_exprs(c)
2457        }),
2458        Query::Union { left, right, .. } => has_mutation_clause(left) || has_mutation_clause(right),
2459        _ => false,
2460    }
2461}
2462
2463/// Check if a clause contains nested EXISTS with mutations (recursive).
2464fn has_mutation_in_clause_exprs(clause: &Clause) -> bool {
2465    let check_expr = |e: &Expr| -> bool { has_mutation_in_expr(e) };
2466
2467    match clause {
2468        Clause::Match(m) => m.where_clause.as_ref().is_some_and(check_expr),
2469        Clause::With(w) => {
2470            w.where_clause.as_ref().is_some_and(check_expr)
2471                || w.items.iter().any(|item| match item {
2472                    ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2473                    ReturnItem::All => false,
2474                })
2475        }
2476        Clause::Return(r) => r.items.iter().any(|item| match item {
2477            ReturnItem::Expr { expr, .. } => has_mutation_in_expr(expr),
2478            ReturnItem::All => false,
2479        }),
2480        _ => false,
2481    }
2482}
2483
2484/// Check if an expression tree contains an EXISTS with mutation clauses.
2485fn has_mutation_in_expr(expr: &Expr) -> bool {
2486    match expr {
2487        Expr::Exists { query, .. } => has_mutation_clause(query),
2488        _ => {
2489            let mut found = false;
2490            expr.for_each_child(&mut |child| {
2491                if has_mutation_in_expr(child) {
2492                    found = true;
2493                }
2494            });
2495            found
2496        }
2497    }
2498}
2499
2500/// Rewrite a Query AST to replace correlated property accesses with parameter references.
2501///
2502/// For each `Property(Variable(v), key)` where `v` is an outer-scope entity variable,
2503/// replaces it with `Parameter("{v}.{key}")`. This enables plan-once optimization since
2504/// the rewritten query is parameterized (same structure for every row).
2505fn rewrite_query_correlated(query: &Query, outer_vars: &HashSet<String>) -> Query {
2506    match query {
2507        Query::Single(stmt) => Query::Single(Statement {
2508            clauses: stmt
2509                .clauses
2510                .iter()
2511                .map(|c| rewrite_clause_correlated(c, outer_vars))
2512                .collect(),
2513        }),
2514        Query::Union { left, right, all } => Query::Union {
2515            left: Box::new(rewrite_query_correlated(left, outer_vars)),
2516            right: Box::new(rewrite_query_correlated(right, outer_vars)),
2517            all: *all,
2518        },
2519        other => other.clone(),
2520    }
2521}
2522
2523/// Rewrite expressions within a clause for correlated property access.
2524fn rewrite_clause_correlated(clause: &Clause, outer_vars: &HashSet<String>) -> Clause {
2525    match clause {
2526        Clause::Match(m) => Clause::Match(MatchClause {
2527            optional: m.optional,
2528            pattern: m.pattern.clone(),
2529            where_clause: m
2530                .where_clause
2531                .as_ref()
2532                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2533            for_update: m.for_update,
2534        }),
2535        Clause::With(w) => Clause::With(WithClause {
2536            distinct: w.distinct,
2537            items: w
2538                .items
2539                .iter()
2540                .map(|item| rewrite_return_item(item, outer_vars))
2541                .collect(),
2542            order_by: w.order_by.as_ref().map(|items| {
2543                items
2544                    .iter()
2545                    .map(|si| SortItem {
2546                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2547                        ascending: si.ascending,
2548                    })
2549                    .collect()
2550            }),
2551            skip: w
2552                .skip
2553                .as_ref()
2554                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2555            limit: w
2556                .limit
2557                .as_ref()
2558                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2559            where_clause: w
2560                .where_clause
2561                .as_ref()
2562                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2563        }),
2564        Clause::Return(r) => Clause::Return(ReturnClause {
2565            distinct: r.distinct,
2566            items: r
2567                .items
2568                .iter()
2569                .map(|item| rewrite_return_item(item, outer_vars))
2570                .collect(),
2571            order_by: r.order_by.as_ref().map(|items| {
2572                items
2573                    .iter()
2574                    .map(|si| SortItem {
2575                        expr: rewrite_expr_correlated(&si.expr, outer_vars),
2576                        ascending: si.ascending,
2577                    })
2578                    .collect()
2579            }),
2580            skip: r
2581                .skip
2582                .as_ref()
2583                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2584            limit: r
2585                .limit
2586                .as_ref()
2587                .map(|e| rewrite_expr_correlated(e, outer_vars)),
2588        }),
2589        Clause::Unwind(u) => Clause::Unwind(UnwindClause {
2590            expr: rewrite_expr_correlated(&u.expr, outer_vars),
2591            variable: u.variable.clone(),
2592        }),
2593        other => other.clone(),
2594    }
2595}
2596
2597fn rewrite_return_item(item: &ReturnItem, outer_vars: &HashSet<String>) -> ReturnItem {
2598    match item {
2599        ReturnItem::All => ReturnItem::All,
2600        ReturnItem::Expr {
2601            expr,
2602            alias,
2603            source_text,
2604        } => ReturnItem::Expr {
2605            expr: rewrite_expr_correlated(expr, outer_vars),
2606            alias: alias.clone(),
2607            source_text: source_text.clone(),
2608        },
2609    }
2610}
2611
2612/// Rewrite a single expression: Property(Variable(v), key) → Parameter("{v}.{key}")
2613/// when v is an outer-scope entity variable. Handles nested EXISTS recursively.
2614fn rewrite_expr_correlated(expr: &Expr, outer_vars: &HashSet<String>) -> Expr {
2615    match expr {
2616        // Core rewrite: n.prop → $param("n.prop") when n is an outer entity
2617        Expr::Property(base, key) => {
2618            if let Expr::Variable(v) = base.as_ref()
2619                && outer_vars.contains(v)
2620            {
2621                return Expr::Parameter(format!("{}.{}", v, key));
2622            }
2623            Expr::Property(
2624                Box::new(rewrite_expr_correlated(base, outer_vars)),
2625                key.clone(),
2626            )
2627        }
2628        // Nested EXISTS — recurse into the subquery body
2629        Expr::Exists {
2630            query,
2631            from_pattern_predicate,
2632        } => Expr::Exists {
2633            query: Box::new(rewrite_query_correlated(query, outer_vars)),
2634            from_pattern_predicate: *from_pattern_predicate,
2635        },
2636        // CountSubquery and CollectSubquery — recurse into subquery body
2637        Expr::CountSubquery(query) => {
2638            Expr::CountSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2639        }
2640        Expr::CollectSubquery(query) => {
2641            Expr::CollectSubquery(Box::new(rewrite_query_correlated(query, outer_vars)))
2642        }
2643        // All other expressions: recursively transform children
2644        other => other
2645            .clone()
2646            .map_children(&mut |child| rewrite_expr_correlated(&child, outer_vars)),
2647    }
2648}
2649
2650/// Look up the `DistanceMetric` for a vector-indexed property across all labels.
2651///
2652/// Returns `None` if no vector index exists for the property.
2653fn resolve_metric_for_property(schema: &UniSchema, property: &str) -> Option<DistanceMetric> {
2654    for idx in &schema.indexes {
2655        if let IndexDefinition::Vector(config) = idx
2656            && config.property == property
2657        {
2658            return Some(config.metric.clone());
2659        }
2660    }
2661    None
2662}